11from __future__ import annotations
22
33import _queue
4+ import concurrent
45import json
56from concurrent .futures import Future
67from queue import Queue
2829class BulkInsertOperation :
2930 class _BufferExposer :
3031 def __init__ (self ):
31- self ._data = {}
3232 self ._ongoing_operation = Future () # todo: is there any reason to use Futures? (look at error handling)
3333 self ._yield_buffer_semaphore = Semaphore (1 )
3434 self ._buffers_to_flush_queue = Queue ()
35- self .output_stream = Future ()
36- self .running = True
35+ self .output_stream_mock = Future ()
3736
3837 def enqueue_buffer_for_flush (self , buffer : bytearray ):
3938 self ._buffers_to_flush_queue .put (buffer )
4039
4140 # todo: blocking semaphore acquired and released on enter and exit from bulk insert operation context manager
4241 def send_data (self ):
43- while self . running :
42+ while True :
4443 try :
4544 buffer_to_flush = self ._buffers_to_flush_queue .get (timeout = 0.05 ) # todo: adjust this pooling time
4645 yield buffer_to_flush
@@ -59,7 +58,7 @@ def error_on_processing_request(self, exception: Exception):
5958 self ._ongoing_operation .set_exception (exception )
6059
6160 def error_on_request_start (self , exception : Exception ):
62- self .output_stream .set_exception (exception )
61+ self .output_stream_mock .set_exception (exception )
6362
6463 class _BulkInsertCommand (RavenCommand [requests .Response ]):
6564 def __init__ (self , key : int , buffer_exposer : BulkInsertOperation ._BufferExposer , node_tag : str ):
@@ -96,7 +95,7 @@ def __init__(self, database: str = None, store: "DocumentStore" = None):
9695 self ._in_progress_command : Optional [CommandType ] = None
9796 self ._operation_id = - 1
9897 self ._node_tag = None
99- self ._concurrent_check = 0
98+ self ._concurrent_check_flag = 0
10099 self ._concurrent_check_lock = Lock ()
101100
102101 self ._thread_pool_executor = store .thread_pool_executor
@@ -234,7 +233,7 @@ def store_as(
234233 self ._handle_errors (key , e )
235234 finally :
236235 with self ._concurrent_check_lock :
237- self ._concurrent_check = 0
236+ self ._concurrent_check_flag = 0
238237
239238 def _handle_errors (self , document_id : str , e : Exception ) -> None :
240239 error = self ._get_exception_from_operation ()
@@ -245,14 +244,14 @@ def _handle_errors(self, document_id: str, e: Exception) -> None:
245244
246245 def _concurrency_check (self ):
247246 with self ._concurrent_check_lock :
248- if not self ._concurrent_check == 0 :
247+ if not self ._concurrent_check_flag == 0 :
249248 raise RuntimeError ("Bulk Insert store methods cannot be executed concurrently." )
250- self ._concurrent_check = 1
249+ self ._concurrent_check_flag = 1
251250
252251 def __return_func ():
253252 with self ._concurrent_check_lock :
254- if self ._concurrent_check == 1 :
255- self ._concurrent_check = 0
253+ if self ._concurrent_check_flag == 1 :
254+ self ._concurrent_check_flag = 0
256255
257256 return __return_func
258257
@@ -344,6 +343,13 @@ def __execute_bulk_insert_raven_command():
344343 self ._ongoing_bulk_insert_execute_task = self ._thread_pool_executor .submit (
345344 __execute_bulk_insert_raven_command
346345 )
346+
347+ try :
348+ self ._buffer_exposer .output_stream_mock .result (timeout = 0.01 )
349+ except Exception as e :
350+ if not isinstance (e , concurrent .futures ._base .TimeoutError ):
351+ raise e
352+
347353 self ._current_data_buffer += bytearray ("[" , encoding = "utf-8" )
348354
349355 except Exception as e :
0 commit comments