Skip to content

Commit 7316d03

Browse files
authored
Merge pull request #202 from poissoncorp/RDBC-769
RDBC-769 Python Client - bulk insert
2 parents 0c7c327 + 17d7468 commit 7316d03

File tree

14 files changed

+1364
-52
lines changed

14 files changed

+1364
-52
lines changed

ravendb/documents/bulk_insert_operation.py

Lines changed: 247 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
11
from __future__ import annotations
22

3+
from datetime import datetime
4+
from abc import ABC
5+
36
import _queue
47
import concurrent
58
import json
69
from concurrent.futures import Future
710
from copy import deepcopy
811
from queue import Queue
912
from threading import Lock, Semaphore
10-
from typing import Optional, TYPE_CHECKING
13+
from typing import Optional, TYPE_CHECKING, List, TypeVar, Type, Generic, Callable
1114

1215
import requests
16+
17+
from ravendb.documents.session.time_series import (
18+
ITimeSeriesValuesBindable,
19+
TimeSeriesValuesHelper,
20+
TypedTimeSeriesEntry,
21+
)
22+
from ravendb.documents.time_series import TimeSeriesOperations
1323
from ravendb.primitives import constants
1424
from ravendb.exceptions.raven_exceptions import RavenException
1525
from ravendb.http.server_node import ServerNode
@@ -22,11 +32,15 @@
2232
from ravendb.documents.commands.bulkinsert import GetNextOperationIdCommand, KillOperationCommand
2333
from ravendb.exceptions.documents.bulkinsert import BulkInsertAbortedException
2434
from ravendb.documents.identity.hilo import GenerateEntityIdOnTheClient
35+
from ravendb.tools.utils import Utils
2536

2637
if TYPE_CHECKING:
2738
from ravendb.documents.store.definition import DocumentStore
2839

2940

41+
_T_TS_Bindable = TypeVar("_T_TS_Bindable", bound=ITimeSeriesValuesBindable)
42+
43+
3044
class BulkInsertOperation:
3145
class _BufferExposer:
3246
def __init__(self):
@@ -123,7 +137,7 @@ def __init__(self, database: str = None, store: "DocumentStore" = None, options:
123137

124138
self._current_data_buffer = bytearray()
125139

126-
# todo: self._time_series_batch_size = self._conventions.time_series_batch_size
140+
self._time_series_batch_size = self._conventions.time_series_batch_size
127141
self._buffer_exposer = BulkInsertOperation._BufferExposer()
128142

129143
self._generate_entity_id_on_the_client = GenerateEntityIdOnTheClient(
@@ -132,6 +146,7 @@ def __init__(self, database: str = None, store: "DocumentStore" = None, options:
132146
)
133147

134148
self._attachments_operation = BulkInsertOperation.AttachmentsBulkInsertOperation(self)
149+
self._counters_operation = BulkInsertOperation.CountersBulkInsertOperation(self)
135150

136151
def __enter__(self):
137152
return self
@@ -192,7 +207,7 @@ def _get_bulk_insert_operation_id(self):
192207
bulk_insert_get_id_request = GetNextOperationIdCommand()
193208
self._request_executor.execute_command(bulk_insert_get_id_request)
194209
self._operation_id = bulk_insert_get_id_request.result
195-
self._node_tag = bulk_insert_get_id_request._node_tag
210+
self._node_tag = bulk_insert_get_id_request.node_tag
196211

197212
def _fill_metadata_if_needed(self, entity: object, metadata: MetadataAsDictionary):
198213
# add collection name to metadata if needed
@@ -257,7 +272,7 @@ def _handle_errors(self, document_id: str, e: Exception) -> None:
257272

258273
self._throw_on_unavailable_stream(document_id, e)
259274

260-
def _concurrency_check(self):
275+
def _concurrency_check(self) -> Callable[[], None]:
261276
with self._concurrent_check_lock:
262277
if not self._concurrent_check_flag == 0:
263278
raise RuntimeError("Bulk Insert store methods cannot be executed concurrently.")
@@ -286,9 +301,9 @@ def __enqueue_buffer_for_flush(flushed_buffer: bytearray):
286301

287302
def _end_previous_command_if_needed(self) -> None:
288303
if self._in_progress_command == CommandType.COUNTERS:
289-
pass # todo: counters
304+
self._counters_operation.end_previous_command_if_needed()
290305
elif self._in_progress_command == CommandType.TIME_SERIES:
291-
pass # todo: time series
306+
self.TimeSeriesBulkInsert._throw_already_running_time_series()
292307

293308
def _write_string(self, input_string: str) -> None:
294309
for i in range(len(input_string)):
@@ -398,12 +413,204 @@ def _get_id(self, entity: object) -> str:
398413
self._generate_entity_id_on_the_client.try_set_identity(entity, key)
399414
return key
400415

401-
# todo: time_series_for
402416
# todo: CountersBulkInsert
403417
# todo: CountersBulkInsertOperation
404-
# todo: TimeSeriesBulkInsertBase
405-
# todo: TimeSeriesBulkInsert
406-
# todo: TypedTimeSeriesBulkInsert
418+
419+
class TimeSeriesBulkInsertBase(ABC):
420+
def __init__(self, operation: Optional[BulkInsertOperation], id_: Optional[str], name: Optional[str]):
421+
operation._end_previous_command_if_needed()
422+
423+
self._operation = operation
424+
self._id = id_
425+
self._name = name
426+
427+
self._operation._in_progress_command = CommandType.TIME_SERIES
428+
self._first: bool = True
429+
self._time_series_in_batch: int = 0
430+
431+
def _append_internal(self, timestamp: datetime, values: List[float], tag: str) -> None:
432+
check_exit_callback = self._operation._concurrency_check()
433+
try:
434+
self._operation._ensure_ongoing_operation()
435+
436+
try:
437+
if self._first:
438+
if not self._operation._first:
439+
self._operation._write_comma()
440+
self._write_prefix_for_new_command()
441+
elif self._time_series_in_batch >= self._operation._time_series_batch_size:
442+
self._operation._write_string_no_escape("]}},")
443+
self._write_prefix_for_new_command()
444+
445+
self._time_series_in_batch += 1
446+
447+
if not self._first:
448+
self._operation._write_comma()
449+
450+
self._first = False
451+
452+
self._operation._write_string_no_escape("[")
453+
454+
self._operation._write_string_no_escape(str(Utils.get_unix_time_in_ms(timestamp)))
455+
self._operation._write_comma()
456+
457+
self._operation._write_string_no_escape(str(len(values)))
458+
self._operation._write_comma()
459+
460+
first_value = True
461+
462+
for value in values:
463+
if not first_value:
464+
self._operation._write_comma()
465+
466+
first_value = False
467+
self._operation._write_string_no_escape(str(value) if value is not None else "null")
468+
469+
if tag is not None:
470+
self._operation._write_string_no_escape(',"')
471+
self._operation._write_string(tag)
472+
self._operation._write_string_no_escape('"')
473+
474+
self._operation._write_string_no_escape("]")
475+
476+
self._operation._flush_if_needed()
477+
except Exception as e:
478+
self._operation._handle_errors(self._id, e)
479+
finally:
480+
check_exit_callback()
481+
482+
def _write_prefix_for_new_command(self):
483+
self._first = True
484+
self._time_series_in_batch = 0
485+
486+
self._operation._write_string_no_escape('{"Id":"')
487+
self._operation._write_string(self._id)
488+
self._operation._write_string_no_escape('","Type":"TimeSeriesBulkInsert","TimeSeries":{"Name":"')
489+
self._operation._write_string(self._name)
490+
self._operation._write_string_no_escape('","TimeFormat":"UnixTimeInMs","Appends":[')
491+
492+
@staticmethod
493+
def _throw_already_running_time_series():
494+
raise RuntimeError("There is an already running time series operation, did you forget to close it?")
495+
496+
def __enter__(self):
497+
return self
498+
499+
def __exit__(self, exc_type, exc_val, exc_tb):
500+
self._operation._in_progress_command = CommandType.NONE
501+
502+
if not self._first:
503+
self._operation._write_string_no_escape("]}}")
504+
505+
class CountersBulkInsert:
506+
def __init__(self, operation: BulkInsertOperation, id_: str):
507+
self._operation = operation
508+
self._id = id_
509+
510+
def increment(self, name: str, delta: int = 1) -> None:
511+
self._operation._counters_operation.increment(self._id, name, delta)
512+
513+
class CountersBulkInsertOperation:
514+
_MAX_COUNTERS_IN_BATCH = 1024
515+
516+
def __init__(self, bulk_insert_operation: BulkInsertOperation):
517+
self._operation = bulk_insert_operation
518+
519+
self._id: Optional[str] = None
520+
self._first: bool = True
521+
self._counters_in_batch: int = 0
522+
523+
def increment(self, id_: str, name: str, delta: int = 1) -> None:
524+
check_callback = self._operation._concurrency_check()
525+
try:
526+
self._operation._ensure_ongoing_operation()
527+
528+
if self._operation._in_progress_command == CommandType.TIME_SERIES:
529+
BulkInsertOperation.TimeSeriesBulkInsert._throw_already_running_time_series()
530+
531+
try:
532+
is_first = self._id is None
533+
534+
if is_first or self._id.lower() != id_.lower():
535+
if not is_first:
536+
# we need to end the command for the previous document id
537+
self._operation._write_string_no_escape("]}},")
538+
elif not self._operation._first:
539+
self._operation._write_comma()
540+
541+
self._operation._first = False
542+
543+
self._id = id_
544+
self._operation._in_progress_command = CommandType.COUNTERS
545+
546+
self._write_prefix_for_new_command()
547+
548+
if self._counters_in_batch >= self._MAX_COUNTERS_IN_BATCH:
549+
self._operation._write_string_no_escape("]}},")
550+
551+
self._write_prefix_for_new_command()
552+
553+
self._counters_in_batch += 1
554+
555+
if not self._first:
556+
self._operation._write_comma()
557+
558+
self._first = False
559+
560+
self._operation._write_string_no_escape('{"Type":"Increment","CounterName":"')
561+
self._operation._write_string(name)
562+
self._operation._write_string_no_escape('","Delta":')
563+
self._operation._write_string_no_escape(str(delta))
564+
self._operation._write_string_no_escape("}")
565+
566+
self._operation._flush_if_needed()
567+
568+
except Exception as e:
569+
self._operation._handle_errors(self._id, e)
570+
finally:
571+
check_callback()
572+
573+
def end_previous_command_if_needed(self) -> None:
574+
if self._id is None:
575+
return
576+
577+
try:
578+
self._operation._write_string_no_escape("]}}")
579+
self._id = None
580+
except Exception as e:
581+
raise RavenException("Unable to write to stream", e)
582+
583+
def _write_prefix_for_new_command(self):
584+
self._first = True
585+
self._counters_in_batch = 0
586+
587+
self._operation._write_string_no_escape('{"Id":"')
588+
self._operation._write_string(str(self._id))
589+
self._operation._write_string_no_escape('","Type":"Counters","Counters":{"DocumentId":"')
590+
self._operation._write_string(str(self._id))
591+
self._operation._write_string_no_escape('","Operations":[')
592+
593+
class TimeSeriesBulkInsert(TimeSeriesBulkInsertBase):
594+
def __init__(self, operation: BulkInsertOperation, id_: str, name: str):
595+
super().__init__(operation, id_, name)
596+
597+
def append_single(self, timestamp: datetime, value: float, tag: Optional[str] = None) -> None:
598+
self._append_internal(timestamp, [value], tag)
599+
600+
def append(self, timestamp: datetime, values: List[float], tag: str = None) -> None:
601+
self._append_internal(timestamp, values, tag)
602+
603+
class TypedTimeSeriesBulkInsert(TimeSeriesBulkInsertBase, Generic[_T_TS_Bindable]):
604+
def __init__(self, operation: BulkInsertOperation, object_type: Type[_T_TS_Bindable], id_: str, name: str):
605+
super().__init__(operation, id_, name)
606+
self._object_type = object_type
607+
608+
def append_single(self, timestamp: datetime, value: _T_TS_Bindable, tag: str = None) -> None:
609+
values = TimeSeriesValuesHelper.get_values(self._object_type, value)
610+
self._append_internal(timestamp, values, tag)
611+
612+
def append_entry(self, entry: TypedTimeSeriesEntry[_T_TS_Bindable]) -> None:
613+
self.append_single(entry.timestamp, entry.value, entry.tag)
407614

408615
class AttachmentsBulkInsert:
409616
def __init__(self, operation: BulkInsertOperation, key: str):
@@ -454,6 +661,36 @@ def attachments_for(self, key: str) -> BulkInsertOperation.AttachmentsBulkInsert
454661

455662
return BulkInsertOperation.AttachmentsBulkInsert(self, key)
456663

664+
def counters_for(self, id_: str) -> CountersBulkInsert:
665+
if not id_:
666+
raise ValueError("Document id cannot be None or empty.")
667+
668+
return self.CountersBulkInsert(self, id_)
669+
670+
def typed_time_series_for(
671+
self, object_type: Type[_T_TS_Bindable], id_: str, name: str = None
672+
) -> BulkInsertOperation.TypedTimeSeriesBulkInsert[_T_TS_Bindable]:
673+
if not id_:
674+
raise ValueError("Document id cannot be None or empty")
675+
676+
ts_name = name
677+
if ts_name is None:
678+
ts_name = TimeSeriesOperations.get_time_series_name(object_type, self._conventions)
679+
680+
if not ts_name:
681+
raise ValueError("Time series name cannot be None or empty")
682+
683+
return self.TypedTimeSeriesBulkInsert(self, object_type, id_, ts_name)
684+
685+
def time_series_for(self, id_: str, name: str) -> BulkInsertOperation.TimeSeriesBulkInsert:
686+
if not id_:
687+
raise ValueError("Document id cannot be None or empty")
688+
689+
if not name:
690+
raise ValueError("Time series name cannot be None or empty")
691+
692+
return self.TimeSeriesBulkInsert(self, id_, name)
693+
457694

458695
class BulkInsertOptions:
459696
def __init__(self, use_compression: bool = None, skip_overwrite_if_unchanged: bool = None):

ravendb/documents/commands/bulkinsert.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ def __init__(self):
1212
super(GetNextOperationIdCommand, self).__init__(int)
1313
self._node_tag = 0
1414

15+
@property
16+
def node_tag(self):
17+
return self._node_tag
18+
1519
def is_read_request(self) -> bool:
1620
return False # disable caching
1721

ravendb/documents/conventions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self):
4141
self.max_number_of_requests_per_session = 30
4242
self._max_http_cache_size = 128 * 1024 * 1024
4343
self.max_length_of_query_using_get_url = 1024 + 512
44+
self.time_series_batch_size = 1024
4445

4546
# Flags
4647
self.disable_topology_updates = False

ravendb/documents/queries/time_series.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33
import datetime
44
from typing import Optional, Dict, Any, List, Type, TypeVar, Generic
55

6-
from ravendb.documents.session.time_series import TimeSeriesEntry, TypedTimeSeriesEntry, TimeSeriesValuesHelper
6+
from ravendb.documents.session.time_series import (
7+
TimeSeriesEntry,
8+
TypedTimeSeriesEntry,
9+
TimeSeriesValuesHelper,
10+
ITimeSeriesValuesBindable,
11+
)
712
from ravendb.tools.utils import Utils
813

9-
_T_TS_Bindable = TypeVar("_T_TS_Bindable")
14+
_T_TS_Bindable = TypeVar("_T_TS_Bindable", bound=ITimeSeriesValuesBindable)
1015

1116

1217
class TimeSeriesQueryResult:

0 commit comments

Comments
 (0)