From ace24aa05d5d3666e08a9c350d7bd8051174b0ae Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Wed, 4 Feb 2026 20:18:28 +0000 Subject: [PATCH 1/2] feat: migrate stats.py to use request helper --- src/secops/chronicle/stats.py | 48 ++++++++++++++--------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index ac5d1e3..4defdfb 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -14,13 +14,19 @@ # """Statistics functionality for Chronicle searches.""" from datetime import datetime -from typing import Any +from typing import Any, TYPE_CHECKING -from secops.exceptions import APIError +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_request, +) + +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient def get_stats( - client, + client: "ChronicleClient", query: str, start_time: datetime, end_time: datetime, @@ -54,39 +60,23 @@ def get_stats( APIError: If the API request fails """ # Unused parameters, kept for backward compatibility - _ = (max_events, case_insensitive, max_attempts) - - # Format the instance ID for the API call - instance = client.instance_id - - # Endpoint for UDM search - url = f"{client.base_url}/{instance}:udmSearch" - - # Format times for the API - start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + _ = (max_events, case_insensitive, max_attempts, timeout) # Query parameters for the API call params = { "query": query, - "timeRange.start_time": start_time_str, - "timeRange.end_time": end_time_str, + "timeRange.start_time": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "timeRange.end_time": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "limit": max_values, # Limit to specified number of results } - # Make the API request - response = client.session.get(url, params=params, timeout=timeout) - if response.status_code != 200: - raise APIError( - f"Error executing stats search: Status {response.status_code}, " - f"Response: {response.text}" - ) - - results = response.json() - - # Check if stats data is available in the response - if "stats" not in results: - raise APIError("No stats found in response") + results = chronicle_request( + client, + method="GET", + endpoint_path=":udmSearch", + api_version=APIVersion.V1ALPHA, + params=params, + ) # Process the stats results return process_stats_results(results["stats"]) From 25f8f7dbbefaa261b02ca293459bd8860904b32b Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Wed, 4 Feb 2026 20:40:10 +0000 Subject: [PATCH 2/2] feat: Update tests for stats.py --- tests/chronicle/test_client.py | 38 ---------- tests/chronicle/test_stats.py | 131 ++++++++++++--------------------- 2 files changed, 47 insertions(+), 122 deletions(-) diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index 68cd402..e9cb648 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -90,44 +90,6 @@ def test_chronicle_client_custom_session_user_agent(): assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk" -def test_get_stats(chronicle_client): - """Test stats search functionality.""" - # Mock the search request - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "stats": { - "results": [ - {"column": "count", "values": [{"value": {"int64Val": "42"}}]}, - { - "column": "hostname", - "values": [{"value": {"stringVal": "test-host"}}], - }, - ] - } - } - - with patch.object(chronicle_client.session, "get", return_value=mock_response): - result = chronicle_client.get_stats( - query="""target.ip != "" -match: - target.ip, principal.hostname -outcome: - $count = count(metadata.id) -order: - principal.hostname asc""", - start_time=datetime(2024, 1, 1, tzinfo=timezone.utc), - end_time=datetime(2024, 1, 2, tzinfo=timezone.utc), - max_events=10, - max_values=10, - ) - - assert result["total_rows"] == 1 - assert result["columns"] == ["count", "hostname"] - assert result["rows"][0]["count"] == 42 - assert result["rows"][0]["hostname"] == "test-host" - - def test_search_udm(chronicle_client): """Test UDM search functionality.""" # Mock the search request diff --git a/tests/chronicle/test_stats.py b/tests/chronicle/test_stats.py index b87f1a7..eb54f55 100644 --- a/tests/chronicle/test_stats.py +++ b/tests/chronicle/test_stats.py @@ -2,31 +2,23 @@ import unittest from unittest import mock -from datetime import datetime, timedelta -from typing import Dict, Any +from datetime import datetime, timedelta, timezone from secops.chronicle.stats import get_stats, process_stats_results +from secops.chronicle.models import APIVersion class TestChronicleStats(unittest.TestCase): """Tests for Chronicle stats functionality.""" def setUp(self) -> None: - """Set up test fixtures.""" self.mock_client = mock.MagicMock() - self.mock_client.instance_id = "test-instance" - self.mock_client.base_url = "https://test-url.com" - self.mock_session = mock.MagicMock() - self.mock_client.session = self.mock_session - self.start_time = datetime.now() - timedelta(days=7) - self.end_time = datetime.now() - - def test_get_stats_regular_values(self) -> None: - """Test get_stats with regular single value results.""" - # Mock response data with simple values - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + self.start_time = datetime.now(tz=timezone.utc) - timedelta(days=7) + self.end_time = datetime.now(tz=timezone.utc) + + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_regular_values(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = { "stats": { "results": [ { @@ -46,49 +38,50 @@ def test_get_stats_regular_values(self) -> None: ] } } - self.mock_session.get.return_value = mock_response - # Execute the function - result = get_stats( - self.mock_client, "test query", self.start_time, self.end_time - ) + result = get_stats(self.mock_client, "test query", self.start_time, self.end_time) - # Assertions self.assertEqual(result["total_rows"], 2) self.assertEqual(result["columns"], ["col1", "col2"]) - self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0], {"col1": "value1", "col2": 10}) self.assertEqual(result["rows"][1], {"col1": "value2", "col2": 20}) - def test_get_stats_array_distinct(self) -> None: - """Test get_stats with array_distinct returning list values.""" - # Mock response with array_distinct list structure - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + # Verify get_stats called chronicle_request with expected args/params + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + + self.assertEqual(kwargs["method"], "GET") + self.assertEqual(kwargs["endpoint_path"], ":udmSearch") + self.assertEqual(kwargs["api_version"], APIVersion.V1ALPHA) + + params = kwargs["params"] + self.assertEqual(params["query"], "test query") + self.assertEqual(params["limit"], 60) + self.assertEqual( + params["timeRange.start_time"], + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + self.assertEqual( + params["timeRange.end_time"], + self.end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_array_distinct(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = { "stats": { "results": [ { "column": "array_col", "values": [ - { - "list": { - "values": [{"stringVal": "X1"}, {"stringVal": "X2"}] - } - }, - { - "list": { - "values": [{"stringVal": "Y1"}, {"stringVal": "Y2"}] - } - }, + {"list": {"values": [{"stringVal": "X1"}, {"stringVal": "X2"}]}}, + {"list": {"values": [{"stringVal": "Y1"}, {"stringVal": "Y2"}]}}, ], } ] } } - self.mock_session.get.return_value = mock_response - # Execute the function result = get_stats( self.mock_client, "test query with array_distinct", @@ -96,30 +89,21 @@ def test_get_stats_array_distinct(self) -> None: self.end_time, ) - # This will fail with the current implementation, but after our fix - # it should handle array_distinct properly self.assertEqual(result["total_rows"], 2) self.assertEqual(result["columns"], ["array_col"]) - self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0]["array_col"], ["X1", "X2"]) self.assertEqual(result["rows"][1]["array_col"], ["Y1", "Y2"]) - def test_get_stats_timestamp_values(self) -> None: - """Test get_stats with timestampVal support.""" - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_timestamp_values(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = { "stats": { "results": [ { "column": "timestamp_col", "values": [ {"value": {"timestampVal": "2024-01-15T10:30:00Z"}}, - { - "value": { - "timestampVal": "2024-01-15T11:45:30.123Z" - } - }, + {"value": {"timestampVal": "2024-01-15T11:45:30.123Z"}}, ], }, { @@ -132,25 +116,19 @@ def test_get_stats_timestamp_values(self) -> None: ] } } - self.mock_session.get.return_value = mock_response - result = get_stats( - self.mock_client, "test query", self.start_time, self.end_time - ) + result = get_stats(self.mock_client, "test query", self.start_time, self.end_time) self.assertEqual(result["total_rows"], 2) self.assertEqual(result["columns"], ["timestamp_col", "event_count"]) - self.assertEqual(len(result["rows"]), 2) self.assertIsInstance(result["rows"][0]["timestamp_col"], datetime) self.assertIsInstance(result["rows"][1]["timestamp_col"], datetime) self.assertEqual(result["rows"][0]["event_count"], 100) self.assertEqual(result["rows"][1]["event_count"], 200) - def test_get_stats_timestamp_in_list(self) -> None: - """Test get_stats with timestampVal in list values.""" - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_timestamp_in_list(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = { "stats": { "results": [ { @@ -159,43 +137,28 @@ def test_get_stats_timestamp_in_list(self) -> None: { "list": { "values": [ - { - "timestampVal": ( - "2024-01-15T10:00:00Z" - ) - }, - { - "timestampVal": ( - "2024-01-15T11:00:00Z" - ) - }, + {"timestampVal": "2024-01-15T10:00:00Z"}, + {"timestampVal": "2024-01-15T11:00:00Z"}, ] } - }, + } ], } ] } } - self.mock_session.get.return_value = mock_response - result = get_stats( - self.mock_client, "test query", self.start_time, self.end_time - ) + result = get_stats(self.mock_client, "test query", self.start_time, self.end_time) self.assertEqual(result["total_rows"], 1) self.assertEqual(result["columns"], ["timestamp_array"]) - self.assertEqual(len(result["rows"]), 1) self.assertIsInstance(result["rows"][0]["timestamp_array"], list) self.assertEqual(len(result["rows"][0]["timestamp_array"]), 2) self.assertIsInstance(result["rows"][0]["timestamp_array"][0], datetime) self.assertIsInstance(result["rows"][0]["timestamp_array"][1], datetime) def test_process_stats_results_empty(self) -> None: - """Test processing empty stats results.""" - empty_stats: Dict[str, Any] = {} - result = process_stats_results(empty_stats) - + result = process_stats_results({}) self.assertEqual(result["total_rows"], 0) self.assertEqual(result["columns"], []) self.assertEqual(result["rows"], [])