|
4 | 4 | import concurrent |
5 | 5 | import json |
6 | 6 | from concurrent.futures import Future |
| 7 | +from copy import deepcopy |
7 | 8 | from queue import Queue |
8 | 9 | from threading import Lock, Semaphore |
9 | 10 | from typing import Optional, TYPE_CHECKING |
@@ -258,19 +259,18 @@ def __return_func(): |
258 | 259 | return __return_func |
259 | 260 |
|
260 | 261 | def _flush_if_needed(self) -> None: |
261 | | - pass |
262 | | - # if len(self._current_data_buffer) > self._max_size_in_buffer or self._enqueue_current_buffer_async.done(): |
263 | | - # self._enqueue_current_buffer_async.result() |
264 | | - # |
265 | | - # buffer = self._current_data_buffer |
266 | | - # self._current_data_buffer.clear() |
267 | | - # |
268 | | - # # todo: check if it's better to create a new bytearray of max size instead of clearing it (possible dealloc) |
269 | | - # |
270 | | - # def __enqueue_buffer_for_flush(): |
271 | | - # self._buffer_exposer.enqueue_buffer_for_flush(buffer) |
272 | | - # |
273 | | - # self._enqueue_current_buffer_async = self._thread_pool_executor.submit(__enqueue_buffer_for_flush) |
| 262 | + if len(self._current_data_buffer) > self._max_size_in_buffer or self._enqueue_current_buffer_async.done(): |
| 263 | + self._enqueue_current_buffer_async.result() # wait |
| 264 | + |
| 265 | + buffer = deepcopy(self._current_data_buffer) |
| 266 | + self._current_data_buffer.clear() |
| 267 | + |
| 268 | + # todo: check if it's better to create a new bytearray of max size instead of clearing it (possible dealloc) |
| 269 | + |
| 270 | + def __enqueue_buffer_for_flush(flushed_buffer: bytearray): |
| 271 | + self._buffer_exposer.enqueue_buffer_for_flush(flushed_buffer) |
| 272 | + |
| 273 | + self._enqueue_current_buffer_async = self._thread_pool_executor.submit(__enqueue_buffer_for_flush, buffer) |
274 | 274 |
|
275 | 275 | def _end_previous_command_if_needed(self) -> None: |
276 | 276 | if self._in_progress_command == CommandType.COUNTERS: |
|
0 commit comments