Skip to content

Commit b58fc21

Browse files
committed
RDBC-644 Bulk Insert Operation miscellaneous
1 parent 11209d8 commit b58fc21

File tree

5 files changed

+69
-33
lines changed

5 files changed

+69
-33
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import json
2+
from typing import Optional
3+
4+
import requests
5+
6+
from ravendb.http.server_node import ServerNode
7+
from ravendb.http.raven_command import RavenCommand, VoidRavenCommand
8+
9+
10+
class GetNextOperationIdCommand(RavenCommand[int]):
11+
def __init__(self):
12+
super(GetNextOperationIdCommand, self).__init__(int)
13+
self._node_tag = 0
14+
15+
def is_read_request(self) -> bool:
16+
return False # disable caching
17+
18+
def create_request(self, node: ServerNode) -> requests.Request:
19+
return requests.Request("GET", f"{node.url}/databases/{node.database}/operations/next-operation-id")
20+
21+
def set_response(self, response: Optional[str], from_cache: bool) -> None:
22+
json_node = json.loads(response)
23+
self.result = json_node.get("Id", None)
24+
self._node_tag = json_node.get("NodeTag", None)
25+
26+
27+
class KillOperationCommand(VoidRavenCommand):
28+
def __init__(self, operation_id: int, node_tag: Optional[str] = None):
29+
super(KillOperationCommand, self).__init__()
30+
self._id = operation_id
31+
self._selected_node_tag = node_tag
32+
33+
def create_request(self, node: ServerNode) -> requests.Request:
34+
return requests.Request("POST", f"{node.url}/databases/{node.database}/operations/kill?id={self._id}")

ravendb/documents/conventions.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,10 @@ def json_default(o):
219219
elif isinstance(o, (int, float)):
220220
return str(o)
221221
else:
222-
raise TypeError(repr(o) + " is not JSON serializable (Try add a json default method to convention)")
222+
raise TypeError(
223+
repr(o) + " is not JSON serializable (Try add a json default method to convention"
224+
" or try to add methods - to_json & classmethod from_json - to object class)"
225+
)
223226

224227
@staticmethod
225228
def default_transform_plural(name):

ravendb/documents/store/definition.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from ravendb import constants, exceptions
1010
from ravendb.changes.database_changes import DatabaseChanges
11+
from ravendb.documents.bulk_insert_operation import BulkInsertOperation
1112
from ravendb.documents.operations.executor import MaintenanceOperationExecutor, OperationExecutor
1213
from ravendb.documents.operations.indexes import PutIndexesOperation
1314
from ravendb.documents.session.event_args import (
@@ -163,14 +164,10 @@ def maintenance(self) -> MaintenanceOperationExecutor:
163164
def operations(self) -> OperationExecutor:
164165
pass
165166

166-
# todo: changes
167-
168167
# todo: aggressive_caching
169168

170169
# todo: time_series
171170

172-
# todo: bulk_insert
173-
174171
@abstractmethod
175172
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
176173
pass
@@ -313,7 +310,6 @@ def __init__(self, urls: Optional[Union[str, List[str]]] = None, database: Optio
313310
self.urls = [urls] if isinstance(urls, str) else urls
314311
self.database = database
315312
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
316-
# todo: database changes
317313
# todo: aggressive cache
318314
self.__maintenance_operation_executor: Union[None, MaintenanceOperationExecutor] = None
319315
self.__operation_executor: Union[None, OperationExecutor] = None
@@ -519,6 +515,10 @@ def initialize(self) -> DocumentStore:
519515

520516
# todo: aggressively cache
521517

518+
def bulk_insert(self, database_name: Optional[str] = None) -> BulkInsertOperation:
519+
self.assert_initialized()
520+
return BulkInsertOperation(self.get_effective_database(database_name), self)
521+
522522
def _assert_valid_configuration(self) -> None:
523523
if not self.urls:
524524
raise ValueError("Document URLs cannot be empty.")
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Optional
2+
3+
from ravendb.exceptions.raven_exceptions import RavenException
4+
5+
6+
class BulkInsertAbortedException(RavenException):
7+
def __init__(self, message: str, cause: Optional[Exception] = None):
8+
super(BulkInsertAbortedException, self).__init__(message, cause)
9+
10+
11+
class BulkInsertProtocolViolationException(RavenException):
12+
def __init__(self, message: str, cause: Optional[Exception] = None):
13+
super(BulkInsertProtocolViolationException, self).__init__(message, cause)

ravendb/http/request_executor.py

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import datetime
4+
import inspect
45
import json
56
import logging
67
import os
@@ -579,33 +580,21 @@ def __send_request_to_server(
579580
session_info: SessionInfo,
580581
request: requests.Request,
581582
url: str,
582-
) -> requests.Response:
583+
) -> Optional[requests.Response]:
583584
try:
584585
self.number_of_server_requests += 1
585586
timeout = command.timeout if command.timeout else self.__default_timeout
586-
if timeout:
587+
588+
if not timeout:
589+
return self.__send(chosen_node, command, session_info, request)
590+
591+
else:
587592
try:
588-
# todo: create Task from lines below and call it
589-
# AggressiveCacheOptions callingTheadAggressiveCaching = aggressiveCaching.get();
590-
# CompletableFuture<CloseableHttpResponse> sendTask = CompletableFuture.supplyAsync(() ->
591-
# AggressiveCacheOptions aggressiveCacheOptionsToRestore = aggressiveCaching.get();
592-
try:
593-
return self.__send(chosen_node, command, session_info, request)
594-
except IOError:
595-
# throw ExceptionsUtils.unwrapException(e);
596-
raise
597-
# finally aggressiveCaching.set(aggressiveCacheOptionsToRestore);
593+
return self.__send(chosen_node, command, session_info, request)
598594
except requests.Timeout as t:
599-
# request.abort()
600-
# net.ravendb.client.exceptions.TimeoutException timeoutException =
601-
# new net.ravendb.client.exceptions.TimeoutException(
602-
# "The request for " + request.getURI() + " failed with timeout after " +
603-
# TimeUtils.durationToTimeSpan(timeout), e);
604-
605595
if not should_retry:
606596
if command.failed_nodes is None:
607597
command.failed_nodes = {}
608-
609598
command.failed_nodes[chosen_node] = t
610599
raise t
611600

@@ -615,10 +604,6 @@ def __send_request_to_server(
615604
self.__throw_failed_to_contact_all_nodes(command, request)
616605

617606
return None
618-
except IOError as e:
619-
raise e
620-
else:
621-
return self.__send(chosen_node, command, session_info, request)
622607
except IOError as e:
623608
if not should_retry:
624609
raise
@@ -633,7 +618,7 @@ def __send_request_to_server(
633618
def __send(
634619
self, chosen_node: ServerNode, command: RavenCommand, session_info: SessionInfo, request: requests.Request
635620
) -> requests.Response:
636-
response: requests.Response = None
621+
response: Optional[requests.Response] = None
637622

638623
if self.should_execute_on_all(chosen_node, command):
639624
response = self.__execute_on_all_to_figure_out_the_fastest(chosen_node, command)
@@ -891,7 +876,8 @@ def __supply_async(
891876

892877
def __create_request(self, node: ServerNode, command: RavenCommand) -> requests.Request:
893878
request = command.create_request(node)
894-
if request.data and not isinstance(request.data, str):
879+
# todo: optimize that if - look for the way to make less ifs each time
880+
if request.data and not isinstance(request.data, str) and not inspect.isgenerator(request.data):
895881
request.data = json.dumps(request.data, default=self.conventions.json_default_method)
896882

897883
# todo: 1117 - 1133
@@ -1125,10 +1111,10 @@ def __handle_server_down(
11251111
self,
11261112
url: str,
11271113
chosen_node: ServerNode,
1128-
node_index: int,
1114+
node_index: Optional[int],
11291115
command: RavenCommand,
11301116
request: requests.Request,
1131-
response: requests.Response,
1117+
response: Optional[requests.Response],
11321118
e: Exception,
11331119
session_info: SessionInfo,
11341120
should_retry: bool,

0 commit comments

Comments
 (0)