Skip to content

Commit 4d7afca

Browse files
committed
RDBC-766 Python client - time series 3 - stage #1 + test for roll-ups
1 parent 9fa19e6 commit 4d7afca

File tree

11 files changed

+495
-39
lines changed

11 files changed

+495
-39
lines changed

ravendb/documents/operations/indexes.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22
import enum
33
import json
4-
from typing import List, TYPE_CHECKING
4+
from typing import List, TYPE_CHECKING, Optional
55

66
import requests
77

@@ -556,7 +556,7 @@ def get_raft_unique_request_id(self) -> str:
556556

557557

558558
class GetTermsOperation(MaintenanceOperation[List[str]]):
559-
def __init__(self, index_name: str, field: str, from_value: str, page_size: int = None):
559+
def __init__(self, index_name: str, field: str, from_value: Optional[str], page_size: int = None):
560560
if index_name is None:
561561
raise ValueError("Index name cannot be None")
562562
if field is None:
@@ -584,8 +584,8 @@ def create_request(self, server_node) -> requests.Request:
584584
f"{server_node.url}/databases/{server_node.database}"
585585
f"/indexes/terms?name={Utils.escape(self.__index_name, False, False)}"
586586
f"&field={Utils.escape(self.__field, False, False)}"
587-
f"&fromValue={self.__from_value if self.__from_value is not None else ''}"
588-
f"&pageSize={self.__page_size if self.__page_size is not None else ''}",
587+
f"&fromValue={self.__from_value or ''}"
588+
f"&pageSize={self.__page_size or ''}",
589589
)
590590

591591
def set_response(self, response: str, from_cache: bool) -> None:

ravendb/documents/operations/time_series.py

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,25 @@ def __init__(
3232
if not name or name.isspace():
3333
raise ValueError("Name cannot be None or empty")
3434

35-
if aggregation_time is not None and aggregation_time.compare_to(TimeValue.ZERO()) <= 0:
35+
if aggregation_time and aggregation_time.compare_to(TimeValue.ZERO()) <= 0:
3636
raise ValueError("Aggregation time must be greater than zero")
3737

38-
if retention_time is not None and retention_time.compare_to(TimeValue.ZERO()) <= 0:
38+
if retention_time is None or retention_time.compare_to(TimeValue.ZERO()) <= 0:
3939
raise ValueError("Retention time must be greater than zero")
4040

4141
self.retention_time = retention_time
4242
self.aggregation_time = aggregation_time
4343

4444
self.name = name
4545

46-
@classmethod
47-
def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesPolicy:
48-
return cls(
49-
json_dict["Name"], # todo: Invalid deserialization
50-
Utils.string_to_timedelta(json_dict["AggregationTime"]),
51-
Utils.string_to_timedelta(json_dict["RetentionTime"]),
52-
)
53-
5446
def get_time_series_name(self, raw_name: str) -> str:
5547
return raw_name + TimeSeriesConfiguration.TIME_SERIES_ROLLUP_SEPARATOR + self.name
5648

5749
def to_json(self) -> Dict[str, Any]:
5850
return {
59-
"Name": self.name, # todo: Invalid serialization
60-
"AggregationTime": Utils.timedelta_to_str(self.aggregation_time),
61-
"RetentionTime": Utils.timedelta_to_str(self.retention_time),
51+
"Name": self.name,
52+
"AggregationTime": self.aggregation_time.to_json() if self.aggregation_time else None,
53+
"RetentionTime": self.retention_time.to_json(),
6254
}
6355

6456

@@ -72,30 +64,20 @@ def DEFAULT_POLICY(cls) -> RawTimeSeriesPolicy:
7264
def __init__(self, retention_time: TimeValue = TimeValue.MAX_VALUE()):
7365
if retention_time.compare_to(TimeValue.ZERO()) <= 0:
7466
raise ValueError("Retention time must be greater than zero")
75-
76-
self.name = self.POLICY_STRING
77-
self.retention_time = retention_time
67+
super().__init__(self.POLICY_STRING, retention_time=retention_time)
7868

7969

8070
class TimeSeriesCollectionConfiguration:
8171
def __init__(
8272
self,
83-
disabled: Optional[bool] = None,
73+
disabled: Optional[bool] = False,
8474
policies: Optional[List[TimeSeriesPolicy]] = None,
8575
raw_policy: Optional[RawTimeSeriesPolicy] = RawTimeSeriesPolicy.DEFAULT_POLICY(),
8676
):
8777
self.disabled = disabled
8878
self.policies = policies
8979
self.raw_policy = raw_policy
9080

91-
@classmethod
92-
def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesCollectionConfiguration:
93-
return cls(
94-
json_dict["Disabled"],
95-
[TimeSeriesPolicy.from_json(policy_json) for policy_json in json_dict["Policies"]],
96-
RawTimeSeriesPolicy.from_json(json_dict["RawPolicy"]),
97-
)
98-
9981
def to_json(self) -> Dict[str, Any]:
10082
return {
10183
"Disabled": self.disabled,
@@ -348,12 +330,12 @@ def to_json(self) -> Dict[str, Any]:
348330

349331
def append(self, append_operation: AppendOperation) -> None:
350332
if self._appends is None:
351-
self._appends = []
352-
filtered = list(filter(lambda x: x.timestamp == append_operation.timestamp, self._appends))
333+
self._appends = [] # todo: perf
334+
filtered = self._appends
353335

354-
if len(filtered) != 0:
355-
# element with given timestamp already exists - remove and retry add operation
356-
self._appends.remove(filtered.pop())
336+
# if len(filtered) != 0:
337+
# # element with given timestamp already exists - remove and retry add operation
338+
# self._appends.remove(filtered.pop())
357339

358340
self._appends.append(append_operation)
359341

@@ -699,3 +681,36 @@ def create_request(self, node: ServerNode) -> requests.Request:
699681

700682
def set_response(self, response: Optional[str], from_cache: bool) -> None:
701683
self.result = TimeSeriesStatistics.from_json(json.loads(response))
684+
685+
686+
class ConfigureTimeSeriesOperation(MaintenanceOperation[ConfigureTimeSeriesOperationResult]):
687+
def __init__(self, configuration: TimeSeriesConfiguration):
688+
if not configuration:
689+
raise ValueError("Configuration cannot be None")
690+
691+
self._configuration = configuration
692+
693+
def get_command(self, conventions: "DocumentConventions") -> "RavenCommand[ConfigureTimeSeriesOperationResult]":
694+
return self.ConfigureTimeSeriesCommand(self._configuration)
695+
696+
class ConfigureTimeSeriesCommand(RavenCommand[ConfigureTimeSeriesOperationResult], RaftCommand):
697+
def __init__(self, configuration: TimeSeriesConfiguration):
698+
super().__init__(ConfigureTimeSeriesOperationResult)
699+
self._configuration = configuration
700+
701+
def is_read_request(self) -> bool:
702+
return False
703+
704+
def create_request(self, node: ServerNode) -> requests.Request:
705+
request = requests.Request("POST", f"{node.url}/databases/{node.database}/admin/timeseries/config")
706+
request.data = self._configuration.to_json()
707+
return request
708+
709+
def set_response(self, response: Optional[str], from_cache: bool) -> None:
710+
if not response:
711+
self._throw_invalid_response()
712+
713+
self.result = ConfigureTimeSeriesOperationResult.from_json(json.loads(response))
714+
715+
def get_raft_unique_request_id(self) -> str:
716+
return RaftIdGenerator.new_id()

ravendb/documents/session/document_session.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def execute_all_pending_lazy_operations(self) -> ResponseTimeInformation:
171171
requests = []
172172
for i in range(len(self._pending_lazy_operations)):
173173
# todo: pending lazy operation create request - WIP
174-
req = self._pending_lazy_operations[i].create_arequest()
174+
req = self._pending_lazy_operations[i].create_request()
175175
if req is None:
176176
self._pending_lazy_operations.pop(i)
177177
i -= 1
@@ -1757,6 +1757,13 @@ def _throw_document_already_deleted_in_session(document_id: str, time_series: st
17571757
)
17581758

17591759
def append_single(self, timestamp: datetime, value: float, tag: Optional[str] = None) -> None:
1760+
if isinstance(value, int):
1761+
value = float(value)
1762+
if not isinstance(value, float):
1763+
raise TypeError(
1764+
f"Value passed ('{value}') is not a '{float.__name__}'. " f"It is '{value.__class__.__name__}'."
1765+
)
1766+
17601767
self.append(timestamp, [value], tag)
17611768

17621769
def append(self, timestamp: datetime, values: List[float], tag: Optional[str] = None) -> None:
@@ -2267,7 +2274,7 @@ def append(self, timestamp: datetime, entry: _T_TS_Values_Bindable, tag: Optiona
22672274
super().append(timestamp, values, tag)
22682275

22692276
def append_entry(self, entry: TypedTimeSeriesEntry[_T_TS_Values_Bindable]) -> None:
2270-
self.append_single(entry.timestamp, entry.value, entry.tag)
2277+
self.append(entry.timestamp, entry.value, entry.tag)
22712278

22722279

22732280
class SessionDocumentRollupTypedTimeSeries(SessionTimeSeriesBase, Generic[_T_TS_Values_Bindable]):

ravendb/documents/store/definition.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
)
3333
from ravendb.documents.session.misc import SessionOptions
3434
from ravendb.documents.subscriptions.document_subscriptions import DocumentSubscriptions
35+
from ravendb.documents.time_series import TimeSeriesOperations
3536
from ravendb.http.request_executor import RequestExecutor
3637
from ravendb.documents.identity.hilo import MultiDatabaseHiLoGenerator
3738
from ravendb.http.topology import Topology
@@ -321,6 +322,7 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] =
321322
self.__database_changes = {}
322323
self.__after_close: List[Callable[[], None]] = []
323324
self.__before_close: List[Callable[[], None]] = []
325+
self.__time_series_operation: Optional[TimeSeriesOperations] = None
324326

325327
def __enter__(self):
326328
return self
@@ -550,3 +552,10 @@ def operations(self) -> OperationExecutor:
550552
self.__operation_executor = OperationExecutor(self)
551553

552554
return self.__operation_executor
555+
556+
@property
557+
def time_series(self) -> TimeSeriesOperations:
558+
if self.__time_series_operation is None:
559+
self.__time_series_operation = TimeSeriesOperations(self)
560+
561+
return self.__time_series_operation

ravendb/http/request_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,14 +890,14 @@ def __supply_async(
890890

891891
return preferred_task.result().response
892892

893-
def __create_request(self, node: ServerNode, command: RavenCommand) -> requests.Request:
893+
def __create_request(self, node: ServerNode, command: RavenCommand) -> Optional[requests.Request]:
894894
request = command.create_request(node)
895895
# todo: optimize that if - look for the way to make less ifs each time
896896
if request.data and not isinstance(request.data, str) and not inspect.isgenerator(request.data):
897897
request.data = json.dumps(request.data, default=self.conventions.json_default_method)
898898

899899
# todo: 1117 - 1133
900-
return request if request else None
900+
return request or None
901901

902902
def should_broadcast(self, command: RavenCommand) -> bool:
903903
if not isinstance(command, Broadcast):

ravendb/primitives/time_series.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22
from enum import Enum
3-
from typing import List, Tuple
3+
from typing import List, Tuple, Dict, Any
44

55
from ravendb.primitives.constants import int_max, int_min
66

@@ -22,6 +22,9 @@ def __init__(self, value: int, unit: TimeValueUnit):
2222
self.value = value
2323
self.unit = unit
2424

25+
def to_json(self) -> Dict[str, Any]:
26+
return {"Value": self.value, "Unit": self.unit}
27+
2528
def __str__(self):
2629
if self.value == int_max:
2730
return "MaxValue"

ravendb/misc.py renamed to ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/__init__.py

File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from datetime import datetime
2+
3+
from ravendb import GetTermsOperation
4+
from ravendb.documents.indexes.index_creation import AbstractJavaScriptIndexCreationTask
5+
from ravendb.infrastructure.orders import Company
6+
from ravendb.tests.test_base import TestBase
7+
from ravendb.tools.raven_test_helper import RavenTestHelper
8+
9+
10+
class Companies_ByTimeSeriesNames(AbstractJavaScriptIndexCreationTask):
11+
def __init__(self):
12+
super(Companies_ByTimeSeriesNames, self).__init__()
13+
self.maps = ["map('Companies', function (company) {return ({names: timeSeriesNamesFor(company)})})"]
14+
15+
16+
company_id = "companies/1"
17+
base_line = datetime(2023, 8, 20, 21, 30)
18+
19+
20+
class TestBasicTimeSeriesIndexesJavaScript(TestBase):
21+
def setUp(self):
22+
super(TestBasicTimeSeriesIndexesJavaScript, self).setUp()
23+
24+
def test_time_series_names_for(self):
25+
now = RavenTestHelper.utc_today()
26+
index = Companies_ByTimeSeriesNames()
27+
index.execute(self.store)
28+
29+
with self.store.open_session() as session:
30+
session.store(Company(), company_id)
31+
session.save_changes()
32+
33+
self.wait_for_indexing(self.store)
34+
RavenTestHelper.assert_no_index_errors(self.store)
35+
36+
terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names", None))
37+
self.assertEqual(0, len(terms))
38+
39+
terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names_IsArray", None))
40+
self.assertEqual(1, len(terms))
41+
self.assertIn("true", terms)
42+
43+
with self.store.open_session() as session:
44+
company = session.load(company_id, Company)
45+
session.time_series_for_entity(company, "heartRate").append_single(now, 2.5, "tag1")
46+
session.time_series_for_entity(company, "heartRate2").append_single(now, 3.5, "tag2")
47+
session.save_changes()
48+
49+
self.wait_for_indexing(self.store)
50+
51+
RavenTestHelper.assert_no_index_errors(self.store)
52+
53+
terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names", None))
54+
self.assertEqual(2, len(terms))
55+
self.assertIn("heartrate", terms)
56+
self.assertIn("heartrate2", terms)
57+
58+
terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names_IsArray", None))
59+
self.assertEqual(1, len(terms))
60+
self.assertIn("true", terms)

0 commit comments

Comments
 (0)