diff --git a/src/secops/chronicle/log_ingest.py b/src/secops/chronicle/log_ingest.py index cb0e79f..b86ca53 100644 --- a/src/secops/chronicle/log_ingest.py +++ b/src/secops/chronicle/log_ingest.py @@ -822,7 +822,7 @@ def ingest_log( APIError: If the API request fails """ # Validate log type - if not is_valid_log_type(client, log_type) and not force_log_type: + if not force_log_type and not is_valid_log_type(client, log_type): raise ValueError( f"Invalid log type: {log_type}. " "Use force_log_type=True to override." diff --git a/tests/chronicle/test_log_ingest.py b/tests/chronicle/test_log_ingest.py index ba36629..f804108 100644 --- a/tests/chronicle/test_log_ingest.py +++ b/tests/chronicle/test_log_ingest.py @@ -435,7 +435,70 @@ def test_ingest_log_force_log_type( assert "operation" in result -def test_ingest_log_with_custom_forwarder(chronicle_client, mock_ingest_response): +def test_ingest_log_force_log_type_skips_validation( + chronicle_client, mock_forwarders_list_response, mock_ingest_response +): + """Verify force_log_type=True skips is_valid_log_type API call.""" + test_log = {"test": "log", "message": "Test message"} + + with ( + patch.object( + chronicle_client.session, + "get", + return_value=mock_forwarders_list_response, + ), + patch.object( + chronicle_client.session, "post", return_value=mock_ingest_response + ), + patch("secops.chronicle.log_ingest.is_valid_log_type") as mock_validate, + ): + result = ingest_log( + client=chronicle_client, + log_type="CUSTOM_LOG_TYPE", + log_message=json.dumps(test_log), + force_log_type=True, + ) + + assert "operation" in result + mock_validate.assert_not_called() + + +def test_ingest_log_force_log_type_with_api_error_simulation( + chronicle_client, mock_forwarders_list_response, mock_ingest_response +): + """Verify force_log_type bypasses validation that would raise APIError.""" + test_log = {"test": "log", "message": "Test message"} + + def validation_raises_error(*args, **kwargs): + raise APIError("Permission denied: requires Admin access") + + with ( + patch.object( + chronicle_client.session, + "get", + return_value=mock_forwarders_list_response, + ), + patch.object( + chronicle_client.session, "post", return_value=mock_ingest_response + ), + patch( + "secops.chronicle.log_ingest.is_valid_log_type", + side_effect=validation_raises_error, + ), + ): + result = ingest_log( + client=chronicle_client, + log_type="CUSTOM_LOG_TYPE", + log_message=json.dumps(test_log), + force_log_type=True, + ) + + assert "operation" in result + + +def test_ingest_log_with_custom_forwarder( + chronicle_client, mock_ingest_response +): """Test log ingestion with a custom forwarder ID.""" test_log = {"test": "log", "message": "Test message"}