From 467e23ffbbcb890e00a5b4235bac79d63d4ad312 Mon Sep 17 00:00:00 2001 From: jbfuzier Date: Fri, 23 Jan 2026 12:58:39 +0100 Subject: [PATCH 1/4] Handle timestampVal type in stats.py Add handling for timestampVal, this is used when stats queries return timebuckets, example : events: $e.metadata.log_type="CS_EDR" $log_type = $e.metadata.log_type match: $log_type by 5m outcome: $total = math.round(count($e.metadata.id)/300) --- src/secops/chronicle/stats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 5b5f4e0..5fe1098 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -13,7 +13,7 @@ # limitations under the License. # """Statistics functionality for Chronicle searches.""" -from datetime import datetime +from datetime import datetime, UTC from typing import Any from secops.exceptions import APIError @@ -127,6 +127,8 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: values.append(float(val["doubleVal"])) elif "stringVal" in val: values.append(val["stringVal"]) + elif "timestampVal" in val: + values.append(datetime.fromisoformat(val["timestampVal"].replace('Z', '+00:00')).replace(tzinfo=UTC)) else: values.append(None) # Handle list value cells (like those from array_distinct) From e5a957b5cb70353fa651c460ecb265cb743d5b4c Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 29 Jan 2026 11:49:13 +0530 Subject: [PATCH 2/4] chore: fixed tests --- src/secops/chronicle/stats.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 5fe1098..5618765 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -13,7 +13,7 @@ # limitations under the License. # """Statistics functionality for Chronicle searches.""" -from datetime import datetime, UTC +from datetime import datetime, timezone from typing import Any from secops.exceptions import APIError @@ -128,7 +128,11 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: elif "stringVal" in val: values.append(val["stringVal"]) elif "timestampVal" in val: - values.append(datetime.fromisoformat(val["timestampVal"].replace('Z', '+00:00')).replace(tzinfo=UTC)) + values.append( + datetime.fromisoformat( + val["timestampVal"].replace("Z", "+00:00") + ).replace(tzinfo=timezone.utc) + ) else: values.append(None) # Handle list value cells (like those from array_distinct) From 2e393e9183895084d2815be8aad44b2dba418838 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:48:27 +0530 Subject: [PATCH 3/4] feat: add timestampVal support in list values and remove redundant timezone conversion. --- src/secops/chronicle/stats.py | 8 +++- tests/chronicle/test_stats.py | 87 +++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 5618765..18e7487 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -131,7 +131,7 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: values.append( datetime.fromisoformat( val["timestampVal"].replace("Z", "+00:00") - ).replace(tzinfo=timezone.utc) + ) ) else: values.append(None) @@ -145,6 +145,12 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: list_values.append(float(list_val["doubleVal"])) elif "stringVal" in list_val: list_values.append(list_val["stringVal"]) + elif "timestampVal" in list_val: + list_values.append( + datetime.fromisoformat( + list_val["timestampVal"].replace("Z", "+00:00") + ) + ) values.append(list_values) else: values.append(None) diff --git a/tests/chronicle/test_stats.py b/tests/chronicle/test_stats.py index 08f68e5..b87f1a7 100644 --- a/tests/chronicle/test_stats.py +++ b/tests/chronicle/test_stats.py @@ -104,6 +104,93 @@ def test_get_stats_array_distinct(self) -> None: self.assertEqual(result["rows"][0]["array_col"], ["X1", "X2"]) self.assertEqual(result["rows"][1]["array_col"], ["Y1", "Y2"]) + def test_get_stats_timestamp_values(self) -> None: + """Test get_stats with timestampVal support.""" + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "stats": { + "results": [ + { + "column": "timestamp_col", + "values": [ + {"value": {"timestampVal": "2024-01-15T10:30:00Z"}}, + { + "value": { + "timestampVal": "2024-01-15T11:45:30.123Z" + } + }, + ], + }, + { + "column": "event_count", + "values": [ + {"value": {"int64Val": "100"}}, + {"value": {"int64Val": "200"}}, + ], + }, + ] + } + } + self.mock_session.get.return_value = mock_response + + result = get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertEqual(result["total_rows"], 2) + self.assertEqual(result["columns"], ["timestamp_col", "event_count"]) + self.assertEqual(len(result["rows"]), 2) + self.assertIsInstance(result["rows"][0]["timestamp_col"], datetime) + self.assertIsInstance(result["rows"][1]["timestamp_col"], datetime) + self.assertEqual(result["rows"][0]["event_count"], 100) + self.assertEqual(result["rows"][1]["event_count"], 200) + + def test_get_stats_timestamp_in_list(self) -> None: + """Test get_stats with timestampVal in list values.""" + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "stats": { + "results": [ + { + "column": "timestamp_array", + "values": [ + { + "list": { + "values": [ + { + "timestampVal": ( + "2024-01-15T10:00:00Z" + ) + }, + { + "timestampVal": ( + "2024-01-15T11:00:00Z" + ) + }, + ] + } + }, + ], + } + ] + } + } + self.mock_session.get.return_value = mock_response + + result = get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertEqual(result["total_rows"], 1) + self.assertEqual(result["columns"], ["timestamp_array"]) + self.assertEqual(len(result["rows"]), 1) + self.assertIsInstance(result["rows"][0]["timestamp_array"], list) + self.assertEqual(len(result["rows"][0]["timestamp_array"]), 2) + self.assertIsInstance(result["rows"][0]["timestamp_array"][0], datetime) + self.assertIsInstance(result["rows"][0]["timestamp_array"][1], datetime) + def test_process_stats_results_empty(self) -> None: """Test processing empty stats results.""" empty_stats: Dict[str, Any] = {} From 3b11501d94e1c361e3e312c4145603cd6e5161ed Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:50:47 +0530 Subject: [PATCH 4/4] chore: fixed linting --- src/secops/chronicle/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 18e7487..ac5d1e3 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -13,7 +13,7 @@ # limitations under the License. # """Statistics functionality for Chronicle searches.""" -from datetime import datetime, timezone +from datetime import datetime from typing import Any from secops.exceptions import APIError