Skip to content

Commit ecd13ab

Browse files
committed
RDBC-766 Python client - time series 3 - roll-ups
1 parent bcc8593 commit ecd13ab

File tree

7 files changed

+168
-15
lines changed

7 files changed

+168
-15
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from __future__ import annotations
2+
from typing import Optional, Dict, Any, List, Type, TypeVar, Generic
3+
4+
from ravendb.documents.session.time_series import TimeSeriesEntry, TypedTimeSeriesEntry
5+
6+
_T_TS_Bindable = TypeVar("_T_TS_Bindable")
7+
8+
9+
class TimeSeriesQueryResult:
10+
def __init__(self, count: Optional[int] = None):
11+
self.count = count
12+
13+
@classmethod
14+
def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult:
15+
return cls(json_dict["Count"])
16+
17+
18+
class TimeSeriesRawResult(TimeSeriesQueryResult):
19+
def __init__(self, count: Optional[int] = None, results: Optional[List[TimeSeriesEntry]] = None):
20+
super().__init__(count)
21+
self.results = results
22+
23+
@classmethod
24+
def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult:
25+
return cls(
26+
json_dict["Count"],
27+
[TimeSeriesEntry.from_json(time_series_entry_json) for time_series_entry_json in json_dict["Results"]],
28+
)
29+
30+
def as_typed_result(self, object_type: Type[_T_TS_Bindable]) -> TypedTimeSeriesRawResult[_T_TS_Bindable]:
31+
result = TypedTimeSeriesRawResult()
32+
result.count = self.count
33+
result.results = [time_series_entry.as_typed_entry(object_type) for time_series_entry in self.results]
34+
return result
35+
36+
37+
class TypedTimeSeriesRawResult(TimeSeriesQueryResult, Generic[_T_TS_Bindable]):
38+
def __init__(
39+
self, count: Optional[int] = None, results: Optional[List[TypedTimeSeriesEntry[_T_TS_Bindable]]] = None
40+
):
41+
super().__init__(count)
42+
self.results = results
43+
44+
@classmethod
45+
def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult:
46+
return cls(
47+
json_dict["Count"],
48+
[TypedTimeSeriesEntry.from_json(typed_ts_entry_json) for typed_ts_entry_json in json_dict["Results"]],
49+
)

ravendb/documents/session/document_session.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2324,4 +2324,8 @@ def get(
23242324

23252325
def append_entry(self, entry: TypedTimeSeriesRollupEntry) -> None:
23262326
values = entry.get_values_from_members()
2327-
self.append(entry.timestamp, values, entry.tag)
2327+
super().append(entry.timestamp, values, entry.tag)
2328+
2329+
def append(self, entry: TypedTimeSeriesRollupEntry[_T_TS_Values_Bindable]) -> None: # todo: investigate warning
2330+
values = entry.get_values_from_members()
2331+
super().append(entry.timestamp, values, entry.tag)

ravendb/documents/session/operations/query.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import enum
3+
import inspect
34
import logging
45
from typing import Union, Optional, TypeVar, List, Type, Callable, Dict, TYPE_CHECKING
56

@@ -199,7 +200,12 @@ def deserialize(
199200
BeforeConversionToEntityEventArgs(session, key, object_type, document)
200201
)
201202

202-
result = Utils.initialize_object(document, object_type)
203+
# By custom defined 'from_json' serializer class method
204+
# todo: make separate interface to do from_json
205+
if "from_json" in object_type.__dict__ and inspect.ismethod(object_type.from_json):
206+
result = object_type.from_json(document)
207+
else:
208+
result = Utils.initialize_object(document, object_type)
203209
session.after_conversion_to_entity_invoke(AfterConversionToEntityEventArgs(session, key, document, result))
204210

205211
return result

ravendb/documents/session/query.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,12 @@ def get_query_result(self):
15851585

15861586
return self._query_operation.current_query_results.create_snapshot()
15871587

1588+
def single(self) -> _T:
1589+
result = list(self.__execute_query_operation(2))
1590+
if len(result) != 1:
1591+
raise ValueError(f"Expected single result, got: {len(result)} ")
1592+
return result[0]
1593+
15881594
def _aggregate_by(self, facet: FacetBase) -> None:
15891595
for token in self._select_tokens:
15901596
if isinstance(token, FacetToken):
@@ -1872,7 +1878,7 @@ def not_(self) -> DocumentQuery[_T]:
18721878
self.negate_next()
18731879
return self
18741880

1875-
def take(self, count: int) -> DocumentQuery[_T]:
1881+
def take(self: DocumentQuery[_T], count: int) -> DocumentQuery[_T]:
18761882
self._take(count)
18771883
return self
18781884

@@ -1888,12 +1894,6 @@ def count(self) -> int:
18881894
query_result = self.get_query_result()
18891895
return query_result.total_results
18901896

1891-
def single(self) -> _T:
1892-
result = list(self.take(2))
1893-
if len(result) != 1:
1894-
raise ValueError(f"Expected signle result, got: {len(result)} ")
1895-
return result[0]
1896-
18971897
def where_lucene(self, field_name: str, where_clause: str, exact: bool = False) -> DocumentQuery[_T]:
18981898
self._where_lucene(field_name, where_clause, exact)
18991899
return self

ravendb/documents/session/time_series.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(self, object_type: Type[_T_Values], timestamp: datetime.datetime):
6464

6565
def _create_instance(self) -> _T_Values:
6666
try:
67-
raise NotImplementedError() # create object type instance
67+
return Utils.try_get_new_instance(self._object_type) # create object type instance
6868
except Exception as e:
6969
raise RavenException(f"Unable to create instance of class: {self._object_type.__name__}", e)
7070

@@ -183,13 +183,13 @@ def __init__(
183183
timestamp: datetime.datetime = None,
184184
tag: str = None,
185185
values: List[int] = None,
186-
rollup: bool = None,
186+
is_rollup: bool = None,
187187
value: _T_TSBindable = None,
188188
):
189189
self.timestamp = timestamp
190190
self.tag = tag
191191
self.values = values
192-
self.rollup = rollup
192+
self.is_rollup = is_rollup
193193
self.value = value
194194

195195
@classmethod

ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_typed_session.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
ConfigureTimeSeriesOperation,
1010
RawTimeSeriesPolicy,
1111
)
12-
from ravendb.documents.session.time_series import ITimeSeriesValuesBindable, TypedTimeSeriesEntry
12+
from ravendb.documents.queries.time_series import TimeSeriesRawResult
13+
from ravendb.documents.session.time_series import (
14+
ITimeSeriesValuesBindable,
15+
TypedTimeSeriesEntry,
16+
TypedTimeSeriesRollupEntry,
17+
)
1318
from ravendb.infrastructure.entities import User
1419
from ravendb.primitives.time_series import TimeValue
1520
from ravendb.tests.test_base import TestBase
@@ -278,7 +283,7 @@ def test_can_work_with_rollup_time_series_2(self):
278283

279284
config = TimeSeriesConfiguration()
280285
config.collections = {"users": time_series_collection_configuration}
281-
config.policy_check_frequency = timedelta(seconds=1)
286+
config.policy_check_frequency = timedelta(milliseconds=100)
282287

283288
self.store.maintenance.send(ConfigureTimeSeriesOperation(config))
284289
self.store.time_series.register_type(User, StockPrice)
@@ -313,3 +318,92 @@ def test_can_work_with_rollup_time_series_2(self):
313318
self.assertIsNotNone(r.max)
314319
self.assertIsNotNone(r.count)
315320
self.assertIsNotNone(r.average)
321+
322+
def test_can_work_with_rollup_time_series(self):
323+
raw_hours = 24
324+
raw = RawTimeSeriesPolicy(TimeValue.of_hours(raw_hours))
325+
326+
p1 = TimeSeriesPolicy("By6Hours", TimeValue.of_hours(6), TimeValue.of_hours(raw_hours * 4))
327+
p2 = TimeSeriesPolicy("By1Day", TimeValue.of_days(1), TimeValue.of_hours(raw_hours * 5))
328+
p3 = TimeSeriesPolicy("By30Minutes", TimeValue.of_minutes(30), TimeValue.of_hours(raw_hours * 2))
329+
p4 = TimeSeriesPolicy("By1Hour", TimeValue.of_hours(1), TimeValue.of_hours(raw_hours * 3))
330+
331+
users_config = TimeSeriesCollectionConfiguration()
332+
users_config.raw_policy = raw
333+
users_config.policies = [p1, p2, p3, p4]
334+
335+
config = TimeSeriesConfiguration()
336+
config.collections = {"Users": users_config}
337+
config.policy_check_frequency = timedelta(milliseconds=100)
338+
339+
self.store.maintenance.send(ConfigureTimeSeriesOperation(config))
340+
self.store.time_series.register_type(User, StockPrice)
341+
342+
# please notice we don't modify server time here!
343+
344+
now = datetime.utcnow()
345+
base_line = RavenTestHelper.utc_today() - timedelta(days=12)
346+
347+
total = TimeValue.of_days(12).value // 60
348+
349+
with self.store.open_session() as session:
350+
session.store(User(name="Karmel"), "users/karmel")
351+
352+
ts = session.typed_time_series_for(StockPrice, "users/karmel")
353+
for i in range(total + 1):
354+
open = i
355+
close = i + 100_000
356+
high = i + 200_000
357+
low = i + 300_000
358+
volume = i + 400_000
359+
ts.append(
360+
base_line + timedelta(minutes=i), StockPrice(open, close, high, low, volume), "watches/fitbit"
361+
)
362+
363+
session.save_changes()
364+
365+
time.sleep(1.5) # wait for rollups
366+
367+
with self.store.open_session() as session:
368+
query = (
369+
session.advanced.raw_query(
370+
"declare timeseries out()\n"
371+
"{\n"
372+
" from StockPrices\n"
373+
" between $start and $end\n"
374+
"}\n"
375+
"from Users as u\n"
376+
"select out()",
377+
TimeSeriesRawResult,
378+
)
379+
.add_parameter("start", base_line - timedelta(days=1))
380+
.add_parameter("end", now + timedelta(days=1))
381+
)
382+
result_raw = query.single()
383+
result = result_raw.as_typed_result(StockPrice)
384+
385+
self.assertGreater(len(result.results), 0)
386+
387+
for res in result.results:
388+
if res.is_rollup:
389+
self.assertGreater(len(res.values), 0)
390+
self.assertGreater(len(res.value.low), 0)
391+
self.assertGreater(len(res.value.high), 0)
392+
else:
393+
self.assertEqual(5, len(res.values))
394+
395+
now = datetime.utcnow()
396+
397+
with self.store.open_session() as session:
398+
ts = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name)
399+
a = TypedTimeSeriesRollupEntry(StockPrice, datetime.utcnow())
400+
a.max.close = 1
401+
ts.append(a)
402+
session.save_changes()
403+
404+
with self.store.open_session() as session:
405+
ts = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name)
406+
407+
res = ts.get(now - timedelta(milliseconds=1), now + timedelta(days=1))
408+
self.assertEqual(1, len(res))
409+
self.assertEqual(1, res[0].max.close)

ravendb/tests/jvm_migrated_tests/spatial_tests/test_spatial.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self, Id: str = None, date: datetime.datetime = None, latitude: flo
4545
def from_json(cls, json_dict: Dict) -> MyProjection:
4646
return cls(
4747
json_dict["Id"],
48-
datetime.datetime.fromisoformat(json_dict["date"]),
48+
datetime.datetime.fromisoformat(json_dict["date"]) if "date" in json_dict else None,
4949
json_dict["latitude"],
5050
json_dict["longitude"],
5151
)

0 commit comments

Comments
 (0)