Skip to content

Commit d15e278

Browse files
committed
RDBC-700 Bulk Insert Options
1 parent 5631b2a commit d15e278

File tree

3 files changed

+40
-16
lines changed

3 files changed

+40
-16
lines changed

ravendb/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class Metadata:
1616
LAST_MODIFIED = "@last-modified"
1717
CHANGE_VECTOR = "@change-vector"
1818
EXPIRES = "@expires"
19+
REFRESH = "@refresh"
1920
ALL_DOCUMENTS_COLLECTION = "@all_docs"
2021
EMPTY_COLLECTION = "@empty"
2122
NESTED_OBJECT_TYPES = "@nested-object-types"

ravendb/documents/bulk_insert_operation.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,25 @@ def error_on_request_start(self, exception: Exception):
6262
self.output_stream_mock.set_exception(exception)
6363

6464
class _BulkInsertCommand(RavenCommand[requests.Response]):
65-
def __init__(self, key: int, buffer_exposer: BulkInsertOperation._BufferExposer, node_tag: str):
65+
def __init__(
66+
self,
67+
key: int,
68+
buffer_exposer: BulkInsertOperation._BufferExposer,
69+
node_tag: str,
70+
skip_overwrite_if_unchanged: bool,
71+
):
6672
super().__init__(requests.Response)
6773
self._buffer_exposer = buffer_exposer
6874
self._key = key
6975
self._selected_node_tag = node_tag
7076
self.use_compression = False
77+
self._skip_overwrite_if_unchanged = skip_overwrite_if_unchanged
7178

7279
def create_request(self, node: ServerNode) -> requests.Request:
7380
return requests.Request(
7481
"POST",
75-
f"{node.url}/databases/{node.database}/bulk_insert?id={self._key}",
82+
f"{node.url}/databases/{node.database}/bulk_insert?id={self._key}"
83+
f"&skipOverwriteIfUnchanged={'true' if self._skip_overwrite_if_unchanged else 'false'}",
7684
data=self._buffer_exposer.send_data(),
7785
)
7886

@@ -88,7 +96,7 @@ def send(self, session: requests.Session, request: requests.Request) -> requests
8896
except Exception as e:
8997
self._buffer_exposer.error_on_request_start(e)
9098

91-
def __init__(self, database: str = None, store: "DocumentStore" = None):
99+
def __init__(self, database: str = None, store: "DocumentStore" = None, options: BulkInsertOptions = None):
92100
self.use_compression = False
93101

94102
self._ongoing_bulk_insert_execute_task: Optional[Future] = None
@@ -103,6 +111,9 @@ def __init__(self, database: str = None, store: "DocumentStore" = None):
103111
self._conventions = store.conventions
104112
if not database or database.isspace():
105113
self._throw_no_database()
114+
115+
self._use_compression = options.use_compression if options else False
116+
self._options = options or BulkInsertOptions()
106117
self._request_executor = store.get_request_executor(database)
107118

108119
self._enqueue_current_buffer_async = Future()
@@ -232,6 +243,7 @@ def store_as(
232243

233244
self._write_document(entity, metadata)
234245
self._write_string_no_escape("}")
246+
# todo: self._flush_if_needed() - causes error - https://issues.hibernatingrhinos.com/issue/RDBC-701
235247
except Exception as e:
236248
self._handle_errors(key, e)
237249
finally:
@@ -336,7 +348,7 @@ def _get_exception_from_operation(self) -> Optional[BulkInsertAbortedException]:
336348
def _start_executing_bulk_insert_command(self) -> None:
337349
try:
338350
bulk_command = BulkInsertOperation._BulkInsertCommand(
339-
self._operation_id, self._buffer_exposer, self._node_tag
351+
self._operation_id, self._buffer_exposer, self._node_tag, self._options.skip_overwrite_if_unchanged
340352
)
341353
bulk_command.use_compression = self.use_compression
342354

@@ -441,3 +453,9 @@ def attachments_for(self, key: str) -> BulkInsertOperation.AttachmentsBulkInsert
441453
raise ValueError("Document id cannot be None or empty.")
442454

443455
return BulkInsertOperation.AttachmentsBulkInsert(self, key)
456+
457+
458+
class BulkInsertOptions:
459+
def __init__(self, use_compression: bool = None, skip_overwrite_if_unchanged: bool = None):
460+
self.use_compression = use_compression
461+
self.skip_overwrite_if_unchanged = skip_overwrite_if_unchanged

ravendb/documents/store/definition.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from ravendb import constants, exceptions
1010
from ravendb.changes.database_changes import DatabaseChanges
11-
from ravendb.documents.bulk_insert_operation import BulkInsertOperation
11+
from ravendb.documents.bulk_insert_operation import BulkInsertOperation, BulkInsertOptions
1212
from ravendb.documents.operations.executor import MaintenanceOperationExecutor, OperationExecutor
1313
from ravendb.documents.operations.indexes import PutIndexesOperation
1414
from ravendb.documents.session.event_args import (
@@ -48,12 +48,12 @@ def __init__(self):
4848
self.__conventions = None
4949
self._initialized: bool = False
5050

51-
self.__certificate_pem_path: Union[None, str] = None
52-
self.__trust_store_path: Union[None, str] = None
51+
self.__certificate_pem_path: Optional[str] = None
52+
self.__trust_store_path: Optional[str] = None
5353

5454
self._urls: List[str] = []
55-
self._database: Union[None, str] = None
56-
self._disposed: Union[None, bool] = None
55+
self._database: Optional[str] = None
56+
self._disposed: Optional[bool] = None
5757

5858
self.__before_store: List[Callable[[BeforeStoreEventArgs], None]] = []
5959
self.__after_save_changes: List[Callable[[AfterSaveChangesEventArgs], None]] = []
@@ -311,10 +311,11 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] =
311311
self.database = database
312312
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
313313
# todo: aggressive cache
314-
self.__maintenance_operation_executor: Union[None, MaintenanceOperationExecutor] = None
315-
self.__operation_executor: Union[None, OperationExecutor] = None
314+
self.__maintenance_operation_executor: Optional[MaintenanceOperationExecutor] = None
315+
self.__operation_executor: Optional[OperationExecutor] = None
316316
# todo: database smuggler
317-
self.__identifier: Union[None, str] = None
317+
self.__multi_db_hilo: Optional[MultiDatabaseHiLoGenerator] = None
318+
self.__identifier: Optional[str] = None
318319
self.__add_change_lock = threading.Lock()
319320
self.__database_changes = {}
320321
self.__after_close: List[Callable[[], None]] = []
@@ -327,15 +328,19 @@ def __exit__(self, exc_type, exc_val, exc_tb):
327328
self.close()
328329

329330
@property
330-
def thread_pool_executor(self):
331+
def hilo_id_generator(self) -> Optional[MultiDatabaseHiLoGenerator]:
332+
return self.__multi_db_hilo
333+
334+
@property
335+
def thread_pool_executor(self) -> ThreadPoolExecutor:
331336
return self.__thread_pool_executor
332337

333338
@property
334339
def subscriptions(self) -> DocumentSubscriptions:
335340
return self.__subscriptions
336341

337342
@property
338-
def identifier(self) -> Union[None, str]:
343+
def identifier(self) -> Optional[str]:
339344
if self.__identifier is not None:
340345
return self.__identifier
341346

@@ -520,9 +525,9 @@ def initialize(self) -> DocumentStore:
520525

521526
# todo: aggressively cache
522527

523-
def bulk_insert(self, database_name: Optional[str] = None) -> BulkInsertOperation:
528+
def bulk_insert(self, database_name: str = None, options: BulkInsertOptions = None) -> BulkInsertOperation:
524529
self.assert_initialized()
525-
return BulkInsertOperation(self.get_effective_database(database_name), self)
530+
return BulkInsertOperation(self.get_effective_database(database_name), self, options)
526531

527532
def _assert_valid_configuration(self) -> None:
528533
if not self.urls:

0 commit comments

Comments
 (0)