@@ -119,6 +119,8 @@ def __init__(self, database: str = None, store: "DocumentStore" = None):
119119 lambda entity : self ._request_executor .conventions .generate_document_id (database , entity ),
120120 )
121121
122+ self ._attachments_operation = BulkInsertOperation .AttachmentsBulkInsertOperation (self )
123+
122124 def __enter__ (self ):
123125 return self
124126
@@ -133,7 +135,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
133135 # process the leftovers and finish the stream
134136 if self ._current_data_buffer :
135137 try :
136- self ._write_misc ("]" )
138+ self ._write_string_no_escape ("]" )
137139 self ._enqueue_current_buffer_async .result () # wait for enqueue
138140 buffer = self ._current_data_buffer
139141 self ._buffer_exposer .enqueue_buffer_for_flush (buffer )
@@ -221,14 +223,14 @@ def store_as(
221223 self ._first = False
222224 self ._in_progress_command = CommandType .NONE
223225
224- self ._write_misc ('{"Id":"' )
225- self ._write_key (key )
226- self ._write_misc ('","Type":"PUT","Document":' )
226+ self ._write_string_no_escape ('{"Id":"' )
227+ self ._write_string (key )
228+ self ._write_string_no_escape ('","Type":"PUT","Document":' )
227229
228- # self._flush_if_needed() # why?
230+ self ._flush_if_needed ()
229231
230232 self ._write_document (entity , metadata )
231- self ._write_misc ("}" )
233+ self ._write_string_no_escape ("}" )
232234 except Exception as e :
233235 self ._handle_errors (key , e )
234236 finally :
@@ -256,26 +258,27 @@ def __return_func():
256258 return __return_func
257259
258260 def _flush_if_needed (self ) -> None :
259- if len (self ._current_data_buffer ) > self ._max_size_in_buffer or self ._enqueue_current_buffer_async .done ():
260- self ._enqueue_current_buffer_async .result ()
261-
262- buffer = self ._current_data_buffer
263- self ._current_data_buffer .clear ()
264-
265- # todo: check if it's better to create a new bytearray of max size instead of clearing it (possible dealloc)
266-
267- def __enqueue_buffer_for_flush ():
268- self ._buffer_exposer .enqueue_buffer_for_flush (buffer )
269-
270- self ._enqueue_current_buffer_async = self ._thread_pool_executor .submit (__enqueue_buffer_for_flush )
261+ pass
262+ # if len(self._current_data_buffer) > self._max_size_in_buffer or self._enqueue_current_buffer_async.done():
263+ # self._enqueue_current_buffer_async.result()
264+ #
265+ # buffer = self._current_data_buffer
266+ # self._current_data_buffer.clear()
267+ #
268+ # # todo: check if it's better to create a new bytearray of max size instead of clearing it (possible dealloc)
269+ #
270+ # def __enqueue_buffer_for_flush():
271+ # self._buffer_exposer.enqueue_buffer_for_flush(buffer)
272+ #
273+ # self._enqueue_current_buffer_async = self._thread_pool_executor.submit(__enqueue_buffer_for_flush)
271274
272275 def _end_previous_command_if_needed (self ) -> None :
273276 if self ._in_progress_command == CommandType .COUNTERS :
274277 pass # todo: counters
275278 elif self ._in_progress_command == CommandType .TIME_SERIES :
276279 pass # todo: time series
277280
278- def _write_key (self , input_string : str ) -> None :
281+ def _write_string (self , input_string : str ) -> None :
279282 for i in range (len (input_string )):
280283 c = input_string [i ]
281284 if '"' == c :
@@ -287,7 +290,7 @@ def _write_key(self, input_string: str) -> None:
287290 def _write_comma (self ) -> None :
288291 self ._current_data_buffer += bytearray ("," , encoding = "utf-8" )
289292
290- def _write_misc (self , data : str ) -> None :
293+ def _write_string_no_escape (self , data : str ) -> None :
291294 self ._current_data_buffer += bytearray (data , encoding = "utf-8" )
292295
293296 def _write_document (self , entity : object , metadata : MetadataAsDictionary ):
@@ -383,12 +386,58 @@ def _get_id(self, entity: object) -> str:
383386 self ._generate_entity_id_on_the_client .try_set_identity (entity , key )
384387 return key
385388
386- # todo: attachments_for
387389 # todo: time_series_for
388390 # todo: CountersBulkInsert
389391 # todo: CountersBulkInsertOperation
390392 # todo: TimeSeriesBulkInsertBase
391393 # todo: TimeSeriesBulkInsert
392394 # todo: TypedTimeSeriesBulkInsert
393- # todo: AttachmentsBulkInsert
394- # todo: AttachmentsBulkInsertOperation
395+
396+ class AttachmentsBulkInsert :
397+ def __init__ (self , operation : BulkInsertOperation , key : str ):
398+ self .operation = operation
399+ self .key = key
400+
401+ def store (self , name : str , attachment_bytes : bytes , content_type : Optional [str ] = None ) -> None :
402+ self .operation ._attachments_operation .store (self .key , name , attachment_bytes , content_type )
403+
404+ class AttachmentsBulkInsertOperation :
405+ def __init__ (self , operation : BulkInsertOperation ):
406+ self .operation = operation
407+
408+ def store (self , key : str , name : str , attachment_bytes : bytes , content_type : Optional [str ] = None ):
409+ self .operation ._concurrency_check ()
410+ self .operation ._end_previous_command_if_needed ()
411+ self .operation ._ensure_ongoing_operation ()
412+
413+ try :
414+ if not self .operation ._first :
415+ self .operation ._write_comma ()
416+
417+ self .operation ._write_string_no_escape ('{"Id":"' )
418+ self .operation ._write_string (key )
419+ self .operation ._write_string_no_escape ('","Type":"AttachmentPUT","Name":"' )
420+ self .operation ._write_string (name )
421+
422+ if content_type :
423+ self .operation ._write_string_no_escape ('","ContentType:"' )
424+ self .operation ._write_string (content_type )
425+
426+ self .operation ._write_string_no_escape ('","ContentLength":' )
427+ self .operation ._write_string_no_escape (str (len (attachment_bytes )))
428+ self .operation ._write_string_no_escape ("}" )
429+
430+ self .operation ._flush_if_needed ()
431+
432+ self .operation ._current_data_buffer += bytearray (attachment_bytes )
433+
434+ self .operation ._flush_if_needed ()
435+
436+ except Exception as e :
437+ self .operation ._handle_errors (key , e )
438+
439+ def attachments_for (self , key : str ) -> BulkInsertOperation .AttachmentsBulkInsert :
440+ if not key or key .isspace ():
441+ raise ValueError ("Document id cannot be None or empty." )
442+
443+ return BulkInsertOperation .AttachmentsBulkInsert (self , key )
0 commit comments