From bc84f8aa65b3af00ee39b764285afe827ef9ef0c Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:04:59 +0530 Subject: [PATCH] fix: optimize force_log_type validation to skip API call when enabled --- src/secops/chronicle/log_ingest.py | 2 +- tests/chronicle/test_log_ingest.py | 65 +++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/secops/chronicle/log_ingest.py b/src/secops/chronicle/log_ingest.py index cb0e79f3..b86ca537 100644 --- a/src/secops/chronicle/log_ingest.py +++ b/src/secops/chronicle/log_ingest.py @@ -822,7 +822,7 @@ def ingest_log( APIError: If the API request fails """ # Validate log type - if not is_valid_log_type(client, log_type) and not force_log_type: + if not force_log_type and not is_valid_log_type(client, log_type): raise ValueError( f"Invalid log type: {log_type}. " "Use force_log_type=True to override." diff --git a/tests/chronicle/test_log_ingest.py b/tests/chronicle/test_log_ingest.py index ba366293..f8041082 100644 --- a/tests/chronicle/test_log_ingest.py +++ b/tests/chronicle/test_log_ingest.py @@ -435,7 +435,70 @@ def test_ingest_log_force_log_type( assert "operation" in result -def test_ingest_log_with_custom_forwarder(chronicle_client, mock_ingest_response): +def test_ingest_log_force_log_type_skips_validation( + chronicle_client, mock_forwarders_list_response, mock_ingest_response +): + """Verify force_log_type=True skips is_valid_log_type API call.""" + test_log = {"test": "log", "message": "Test message"} + + with ( + patch.object( + chronicle_client.session, + "get", + return_value=mock_forwarders_list_response, + ), + patch.object( + chronicle_client.session, "post", return_value=mock_ingest_response + ), + patch("secops.chronicle.log_ingest.is_valid_log_type") as mock_validate, + ): + result = ingest_log( + client=chronicle_client, + log_type="CUSTOM_LOG_TYPE", + log_message=json.dumps(test_log), + force_log_type=True, + ) + + assert "operation" in result + mock_validate.assert_not_called() + + +def test_ingest_log_force_log_type_with_api_error_simulation( + chronicle_client, mock_forwarders_list_response, mock_ingest_response +): + """Verify force_log_type bypasses validation that would raise APIError.""" + test_log = {"test": "log", "message": "Test message"} + + def validation_raises_error(*args, **kwargs): + raise APIError("Permission denied: requires Admin access") + + with ( + patch.object( + chronicle_client.session, + "get", + return_value=mock_forwarders_list_response, + ), + patch.object( + chronicle_client.session, "post", return_value=mock_ingest_response + ), + patch( + "secops.chronicle.log_ingest.is_valid_log_type", + side_effect=validation_raises_error, + ), + ): + result = ingest_log( + client=chronicle_client, + log_type="CUSTOM_LOG_TYPE", + log_message=json.dumps(test_log), + force_log_type=True, + ) + + assert "operation" in result + + +def test_ingest_log_with_custom_forwarder( + chronicle_client, mock_ingest_response +): """Test log ingestion with a custom forwarder ID.""" test_log = {"test": "log", "message": "Test message"}