11from __future__ import annotations
2+
3+ import _queue
24import json
35from concurrent .futures import Future
46from queue import Queue
@@ -27,31 +29,34 @@ class BulkInsertOperation:
2729 class _BufferExposer :
2830 def __init__ (self ):
2931 self ._data = {}
30- self ._done = Future ()
32+ self ._ongoing_operation = Future () # todo: is there any reason to use Futures? (look at error handling )
3133 self ._yield_buffer_semaphore = Semaphore (1 )
32- self ._flushed_buffers_queue = Queue ()
34+ self ._buffers_to_flush_queue = Queue ()
3335 self .output_stream = Future ()
3436 self .running = True
3537
36- def enqueue_buffer (self , buffer : bytearray ):
37- self ._flushed_buffers_queue .put (buffer )
38+ def enqueue_buffer_for_flush (self , buffer : bytearray ):
39+ self ._buffers_to_flush_queue .put (buffer )
3840
3941 # todo: blocking semaphore acquired and released on enter and exit from bulk insert operation context manager
4042 def send_data (self ):
4143 while self .running :
4244 try :
43- yield self ._flushed_buffers_queue .get (timeout = 2 )
45+ buffer_to_flush = self ._buffers_to_flush_queue .get (timeout = 0.05 ) # todo: adjust this pooling time
46+ yield buffer_to_flush
4447 except Exception as e :
45- break # expected Empty exception coming from queue
48+ if not isinstance (e , _queue .Empty ) or self .is_operation_finished ():
49+ break
50+ continue # expected Empty exception coming from queue, operation isn't finished yet
4651
47- def is_done (self ) -> bool :
48- return self ._done .done ()
52+ def is_operation_finished (self ) -> bool :
53+ return self ._ongoing_operation .done ()
4954
50- def done (self ):
51- self ._done .set_result (None )
55+ def finish_operation (self ):
56+ self ._ongoing_operation .set_result (None )
5257
5358 def error_on_processing_request (self , exception : Exception ):
54- self ._done .set_exception (exception )
59+ self ._ongoing_operation .set_exception (exception )
5560
5661 def error_on_request_start (self , exception : Exception ):
5762 self .output_stream .set_exception (exception )
@@ -86,7 +91,7 @@ def send(self, session: requests.Session, request: requests.Request) -> requests
8691 def __init__ (self , database : str = None , store : "DocumentStore" = None ):
8792 self .use_compression = False
8893
89- self ._bulk_insert_execute_task : Optional [Future ] = None
94+ self ._ongoing_bulk_insert_execute_task : Optional [Future ] = None
9095 self ._first = True
9196 self ._in_progress_command : Optional [CommandType ] = None
9297 self ._operation_id = - 1
@@ -100,8 +105,8 @@ def __init__(self, database: str = None, store: "DocumentStore" = None):
100105 self ._throw_no_database ()
101106 self ._request_executor = store .get_request_executor (database )
102107
103- self ._async_write = Future ()
104- self ._async_write .set_result (None )
108+ self ._enqueue_current_buffer_async = Future ()
109+ self ._enqueue_current_buffer_async .set_result (None )
105110
106111 self ._max_size_in_buffer = 1024 * 1024
107112
@@ -123,28 +128,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):
123128
124129 flush_ex = None
125130
126- if self ._buffer_exposer .is_done ():
131+ if self ._buffer_exposer .is_operation_finished ():
127132 return
128133
134+ # process the leftovers and finish the stream
129135 if self ._current_data_buffer :
130136 try :
131- self ._current_data_buffer += bytearray ("]" , encoding = "utf-8" )
132- self ._async_write .result ()
133-
137+ self ._write_misc ("]" )
138+ self ._enqueue_current_buffer_async .result () # wait for enqueue
134139 buffer = self ._current_data_buffer
135- self ._buffer_exposer .enqueue_buffer (buffer )
140+ self ._buffer_exposer .enqueue_buffer_for_flush (buffer )
136141 except Exception as e :
137142 flush_ex = e
138143
139- self ._buffer_exposer .done ()
144+ self ._buffer_exposer .finish_operation ()
140145
141146 if self ._operation_id == - 1 :
142147 # closing without calling a single store
143148 return
144149
145- if self ._bulk_insert_execute_task is not None :
150+ if self ._ongoing_bulk_insert_execute_task is not None :
146151 try :
147- self ._bulk_insert_execute_task .result ()
152+ self ._ongoing_bulk_insert_execute_task .result ()
148153 except Exception as e :
149154 self ._throw_bulk_insert_aborted (e , flush_ex )
150155
@@ -167,7 +172,7 @@ def _throw_no_database(self):
167172 "default database can be defined using 'DocumentStore.database' property."
168173 )
169174
170- def _wait_for_id (self ):
175+ def _get_bulk_insert_operation_id (self ):
171176 if self ._operation_id != - 1 :
172177 return
173178
@@ -176,56 +181,55 @@ def _wait_for_id(self):
176181 self ._operation_id = bulk_insert_get_id_request .result
177182 self ._node_tag = bulk_insert_get_id_request ._node_tag
178183
184+ def _fill_metadata_if_needed (self , entity : object , metadata : MetadataAsDictionary ):
185+ # add collection name to metadata if needed
186+ if constants .Documents .Metadata .COLLECTION not in metadata :
187+ collection = self ._request_executor .conventions .get_collection_name (entity )
188+ if collection is not None :
189+ metadata [constants .Documents .Metadata .COLLECTION ] = collection
190+
191+ # add type path to metadata if needed
192+ if constants .Documents .Metadata .RAVEN_PYTHON_TYPE not in metadata :
193+ python_type = self ._request_executor .conventions .get_python_class_name (entity .__class__ )
194+ if python_type is not None :
195+ metadata [constants .Documents .Metadata .RAVEN_PYTHON_TYPE ] = python_type
196+
179197 def store_by_entity (self , entity : object , metadata : Optional [MetadataAsDictionary ] = None ) -> str :
180198 key = (
181199 self ._get_id (entity )
182200 if metadata is None or constants .Documents .Metadata .ID not in metadata
183201 else metadata [constants .Documents .Metadata .ID ]
184202 )
185203
186- self .store (entity , key , metadata )
204+ self .store (entity , key , metadata or MetadataAsDictionary () )
187205 return key
188206
189- def store (self , entity : object , key : str , metadata : Optional [MetadataAsDictionary ] = None ) -> None :
207+ def store (
208+ self , entity : object , key : str , metadata : Optional [MetadataAsDictionary ] = MetadataAsDictionary ()
209+ ) -> None :
190210 try :
191- check = self ._concurrency_check ()
192- self ._verify_valid_id (key )
193-
194- self ._execute_before_store ()
195- if metadata is None :
196- metadata = MetadataAsDictionary ()
211+ self ._concurrency_check ()
212+ self ._verify_valid_key (key )
213+ self ._ensure_ongoing_operation ()
197214
198- if constants .Documents .Metadata .COLLECTION not in metadata :
199- collection = self ._request_executor .conventions .get_collection_name (entity )
200- if collection is not None :
201- metadata [constants .Documents .Metadata .COLLECTION ] = collection
202-
203- if constants .Documents .Metadata .RAVEN_PYTHON_TYPE not in metadata :
204- python_type = self ._request_executor .conventions .get_python_class_name (entity .__class__ )
205- if python_type is not None :
206- metadata [constants .Documents .Metadata .RAVEN_PYTHON_TYPE ] = python_type
207-
208- self ._end_previous_command_if_needed ()
215+ self ._fill_metadata_if_needed (entity , metadata )
216+ self ._end_previous_command_if_needed () # counters & time series commands shall end before pushing docs
209217
210218 try :
211219 if not self ._first :
212220 self ._write_comma ()
213221
214222 self ._first = False
215223 self ._in_progress_command = CommandType .NONE
216- self ._current_data_buffer += bytearray ('{"Id":"' , encoding = "utf-8" )
217- self ._write_string (key )
218- self ._current_data_buffer += bytearray ('","Type":"PUT","Document":' , encoding = "utf-8" )
219224
220- # self._flush_if_needed()
225+ self ._write_misc ('{"Id":"' )
226+ self ._write_key (key )
227+ self ._write_misc ('","Type":"PUT","Document":' )
221228
222- document_info = DocumentInfo (metadata_instance = metadata )
223- json_dict = EntityToJson .convert_entity_to_json_internal_static (
224- entity , self ._conventions , document_info , True
225- )
229+ # self._flush_if_needed() # why?
226230
227- self ._current_data_buffer += bytearray ( json . dumps ( json_dict ), encoding = "utf-8" )
228- self ._current_data_buffer += bytearray ("}" , encoding = "utf-8 " )
231+ self ._write_document ( entity , metadata )
232+ self ._write_misc ("}" )
229233 except Exception as e :
230234 self ._handle_errors (key , e )
231235 finally :
@@ -253,26 +257,26 @@ def __return_func():
253257 return __return_func
254258
255259 def _flush_if_needed (self ) -> None :
256- if len (self ._current_data_buffer ) > self ._max_size_in_buffer or self ._async_write .done ():
257- self ._async_write .result ()
260+ if len (self ._current_data_buffer ) > self ._max_size_in_buffer or self ._enqueue_current_buffer_async .done ():
261+ self ._enqueue_current_buffer_async .result ()
258262
259263 buffer = self ._current_data_buffer
260264 self ._current_data_buffer .clear ()
261265
262- # todo: check if it's better to create a new bytearray of max size or clear it (possible dealloc)
266+ # todo: check if it's better to create a new bytearray of max size instead of clearing it (possible dealloc)
263267
264- def __async_write ():
265- self ._buffer_exposer .enqueue_buffer (buffer )
268+ def __enqueue_buffer_for_flush ():
269+ self ._buffer_exposer .enqueue_buffer_for_flush (buffer )
266270
267- self ._async_write = self ._thread_pool_executor .submit (__async_write )
271+ self ._enqueue_current_buffer_async = self ._thread_pool_executor .submit (__enqueue_buffer_for_flush )
268272
269273 def _end_previous_command_if_needed (self ) -> None :
270274 if self ._in_progress_command == CommandType .COUNTERS :
271275 pass # todo: counters
272276 elif self ._in_progress_command == CommandType .TIME_SERIES :
273277 pass # todo: time series
274278
275- def _write_string (self , input_string : str ) -> None :
279+ def _write_key (self , input_string : str ) -> None :
276280 for i in range (len (input_string )):
277281 c = input_string [i ]
278282 if '"' == c :
@@ -284,21 +288,29 @@ def _write_string(self, input_string: str) -> None:
284288 def _write_comma (self ) -> None :
285289 self ._current_data_buffer += bytearray ("," , encoding = "utf-8" )
286290
287- def _execute_before_store (self ) -> None :
288- if self ._bulk_insert_execute_task is None : # todo: check if it's valid way
289- self ._wait_for_id ()
290- self ._ensure_execute_task ()
291+ def _write_misc (self , data : str ) -> None :
292+ self ._current_data_buffer += bytearray (data , encoding = "utf-8" )
293+
294+ def _write_document (self , entity : object , metadata : MetadataAsDictionary ):
295+ document_info = DocumentInfo (metadata_instance = metadata )
296+ json_dict = EntityToJson .convert_entity_to_json_internal_static (entity , self ._conventions , document_info , True )
297+ self ._current_data_buffer += bytearray (json .dumps (json_dict ), encoding = "utf-8" )
298+
299+ def _ensure_ongoing_operation (self ) -> None :
300+ if self ._ongoing_bulk_insert_execute_task is None :
301+ self ._get_bulk_insert_operation_id ()
302+ self ._start_executing_bulk_insert_command ()
291303
292304 if (
293- self ._bulk_insert_execute_task .done () and self ._bulk_insert_execute_task .exception ()
305+ self ._ongoing_bulk_insert_execute_task .done () and self ._ongoing_bulk_insert_execute_task .exception ()
294306 ): # todo: check if isCompletedExceptionally returns false if task isn't finished
295307 try :
296- self ._bulk_insert_execute_task .result ()
308+ self ._ongoing_bulk_insert_execute_task .result ()
297309 except Exception as e :
298310 self ._throw_bulk_insert_aborted (e , None )
299311
300312 @staticmethod
301- def _verify_valid_id (key : str ) -> None :
313+ def _verify_valid_key (key : str ) -> None :
302314 if not key or key .isspace ():
303315 raise ValueError ("Document id must have a non empty value" )
304316
@@ -319,18 +331,19 @@ def _get_exception_from_operation(self) -> Optional[BulkInsertAbortedException]:
319331
320332 return None
321333
322- def _ensure_execute_task (self ) -> None :
334+ def _start_executing_bulk_insert_command (self ) -> None :
323335 try :
324336 bulk_command = BulkInsertOperation ._BulkInsertCommand (
325337 self ._operation_id , self ._buffer_exposer , self ._node_tag
326338 )
327339 bulk_command .use_compression = self .use_compression
328340
329- def __core_async ():
341+ def __execute_bulk_insert_raven_command ():
330342 self ._request_executor .execute_command (bulk_command )
331- return None
332343
333- self ._bulk_insert_execute_task = self ._thread_pool_executor .submit (__core_async )
344+ self ._ongoing_bulk_insert_execute_task = self ._thread_pool_executor .submit (
345+ __execute_bulk_insert_raven_command
346+ )
334347 self ._current_data_buffer += bytearray ("[" , encoding = "utf-8" )
335348
336349 except Exception as e :
@@ -345,7 +358,7 @@ def abort(self) -> None:
345358 if self ._operation_id == - 1 :
346359 return # nothing was done, nothing to kill
347360
348- self ._wait_for_id ()
361+ self ._get_bulk_insert_operation_id ()
349362
350363 try :
351364 self ._request_executor .execute_command (KillOperationCommand (self ._operation_id , self ._node_tag ))
0 commit comments