From 5d728ca32e9725c02944645a425def92e78eea88 Mon Sep 17 00:00:00 2001 From: Gaurav Mishra Date: Mon, 27 Apr 2026 20:51:29 +0000 Subject: [PATCH] fix: enhance response body parsing --- .../otlp/proto/http/_common/__init__.py | 63 +++++- .../otlp/proto/http/_log_exporter/__init__.py | 6 +- .../proto/http/metric_exporter/__init__.py | 4 +- .../proto/http/trace_exporter/__init__.py | 6 +- .../tests/test_proto_log_exporter.py | 34 ++++ .../tests/test_proto_span_exporter.py | 48 +++++ .../tests/test_response_body_parsing.py | 180 ++++++++++++++++++ 7 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-http/tests/test_response_body_parsing.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 1bdb7d228c..329c788761 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -12,16 +12,77 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging from os import environ -from typing import Literal, Optional +from typing import Literal, Optional, Type import requests +from google.protobuf.message import Message from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER, ) from opentelemetry.util._importlib_metadata import entry_points +_logger = logging.getLogger(__name__) + +_CONTENT_TYPE_PROTOBUF = "application/x-protobuf" +_CONTENT_TYPE_JSON = "application/json" + + +def _parse_response_body( + resp: requests.Response, response_class: Type[Message] +) -> str: + """Parse an HTTP response body based on its Content-Type header. + + Args: + resp: The HTTP response from the OTLP endpoint. + response_class: The protobuf message class to use for deserialization + when the response content-type is ``application/x-protobuf``. + + Returns: + A human-readable string describing the response body error details, + or ``resp.reason`` if the body is empty or cannot be parsed. + """ + if not resp.content: + return resp.reason + + content_type = resp.headers.get("Content-Type", "") + + if content_type.startswith(_CONTENT_TYPE_PROTOBUF): + try: + message = response_class() + message.ParseFromString(resp.content) + partial_success = getattr(message, "partial_success", None) + if partial_success is not None: + error_message = getattr(partial_success, "error_message", "") + if error_message: + return error_message + except Exception: # pylint: disable=broad-except + _logger.debug( + "Failed to parse protobuf response body", exc_info=True + ) + return resp.reason + + if content_type.startswith(_CONTENT_TYPE_JSON): + try: + body = resp.json() + if isinstance(body, dict): + # OTLP partial_success uses camelCase in JSON + partial = body.get("partialSuccess", {}) + error_message = partial.get("errorMessage", "") + if error_message: + return error_message + # google.rpc.Status uses "message" + rpc_message = body.get("message", "") + if rpc_message: + return rpc_message + except Exception: # pylint: disable=broad-except + _logger.debug("Failed to parse JSON response body", exc_info=True) + return resp.text or resp.reason + + return resp.text or resp.reason + def _is_retryable(resp: requests.Response) -> bool: if resp.status_code == 408: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6032433dd1..de64377ec8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -37,6 +37,10 @@ from opentelemetry.exporter.otlp.proto.http._common import ( _is_retryable, _load_session_from_envvar, + _parse_response_body, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceResponse, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord @@ -220,7 +224,7 @@ def export( retryable = isinstance(error, ConnectionError) status_code = None else: - reason = resp.reason + reason = _parse_response_body(resp, ExportLogsServiceResponse) retryable = _is_retryable(resp) status_code = resp.status_code diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index efd63b4543..6179c41e6e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -53,10 +53,12 @@ from opentelemetry.exporter.otlp.proto.http._common import ( _is_retryable, _load_session_from_envvar, + _parse_response_body, ) from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, + ExportMetricsServiceResponse, ) from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 AnyValue, @@ -293,7 +295,7 @@ def _export_with_retries( retryable = isinstance(error, ConnectionError) status_code = None else: - reason = resp.reason + reason = _parse_response_body(resp, ExportMetricsServiceResponse) retryable = _is_retryable(resp) status_code = resp.status_code diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 018d89df1e..fce6a14a04 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -39,6 +39,10 @@ from opentelemetry.exporter.otlp.proto.http._common import ( _is_retryable, _load_session_from_envvar, + _parse_response_body, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceResponse, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( @@ -213,7 +217,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: retryable = isinstance(error, ConnectionError) status_code = None else: - reason = resp.reason + reason = _parse_response_body(resp, ExportTraceServiceResponse) retryable = _is_retryable(resp) status_code = resp.status_code diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 7981b0bc82..09a9c6a749 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -14,6 +14,7 @@ # pylint: disable=protected-access +import logging import threading import time import unittest @@ -38,7 +39,9 @@ ) from opentelemetry.exporter.otlp.proto.http.version import __version__ from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsPartialSuccess, ExportLogsServiceRequest, + ExportLogsServiceResponse, ) from opentelemetry.sdk._logs import ReadWriteLogRecord from opentelemetry.sdk._logs.export import LogRecordExportResult @@ -85,6 +88,13 @@ def setUp(self): self.meter_provider = MeterProvider( metric_readers=[self.metric_reader] ) + # Reset DuplicateFilter state between tests so each test can log freely. + log_exporter_logger = logging.getLogger( + "opentelemetry.exporter.otlp.proto.http._log_exporter" + ) + for log_filter in log_exporter_logger.filters: + if hasattr(log_filter, "last_log"): + del log_filter.last_log def test_constructor_default(self): exporter = OTLPLogExporter() @@ -661,6 +671,30 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + @patch.object(Session, "post") + def test_error_response_with_protobuf_body(self, mock_post): + proto_response = ExportLogsServiceResponse( + partial_success=ExportLogsPartialSuccess( + rejected_log_records=2, + error_message="invalid log data", + ) + ) + resp = Response() + resp.status_code = 400 + resp.reason = "Bad Request" + resp._content = proto_response.SerializeToString() # pylint: disable=protected-access + resp.headers["Content-Type"] = "application/x-protobuf" + mock_post.return_value = resp + + exporter = OTLPLogExporter() + with self.assertLogs(level="ERROR") as logs: + result = exporter.export(self._get_sdk_log_data()) + + self.assertEqual(result, LogRecordExportResult.FAILURE) + self.assertTrue( + any("invalid log data" in r.message for r in logs.records) + ) + def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_log_exporter" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 0df471aa69..fc27e76734 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import threading import time import unittest @@ -51,6 +52,10 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTracePartialSuccess, + ExportTraceServiceResponse, +) from opentelemetry.sdk.trace import _Span from opentelemetry.sdk.trace.export import SpanExportResult from opentelemetry.test.mock_test_classes import IterEntryPoint @@ -479,6 +484,49 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + @patch.object(Session, "post") + def test_error_response_with_protobuf_body(self, mock_post): + proto_response = ExportTraceServiceResponse( + partial_success=ExportTracePartialSuccess( + rejected_spans=1, + error_message="invalid span data", + ) + ) + resp = Response() + resp.status_code = 400 + resp.reason = "Bad Request" + resp._content = proto_response.SerializeToString() # pylint: disable=protected-access + resp.headers["Content-Type"] = "application/x-protobuf" + mock_post.return_value = resp + + exporter = OTLPSpanExporter() + with self.assertLogs(level="ERROR") as logs: + result = exporter.export([BASIC_SPAN]) + + self.assertEqual(result, SpanExportResult.FAILURE) + self.assertTrue( + any("invalid span data" in r.message for r in logs.records) + ) + + @patch.object(Session, "post") + def test_error_response_with_json_body(self, mock_post): + body = json.dumps({"message": "quota limit reached"}).encode() + resp = Response() + resp.status_code = 400 + resp.reason = "Bad Request" + resp._content = body # pylint: disable=protected-access + resp.headers["Content-Type"] = "application/json" + mock_post.return_value = resp + + exporter = OTLPSpanExporter() + with self.assertLogs(level="ERROR") as logs: + result = exporter.export([BASIC_SPAN]) + + self.assertEqual(result, SpanExportResult.FAILURE) + self.assertTrue( + any("quota limit reached" in r.message for r in logs.records) + ) + def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_span_exporter" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_response_body_parsing.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_response_body_parsing.py new file mode 100644 index 0000000000..e1a5d64c77 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_response_body_parsing.py @@ -0,0 +1,180 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import unittest + +from requests.models import Response + +from opentelemetry.exporter.otlp.proto.http._common import _parse_response_body +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsPartialSuccess, + ExportLogsServiceResponse, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTracePartialSuccess, + ExportTraceServiceResponse, +) + + +def _make_response( + content: bytes, + content_type: str, + reason: str = "Bad Request", + status_code: int = 400, +) -> Response: + resp = Response() + resp.status_code = status_code + resp.reason = reason + resp._content = content # pylint: disable=protected-access + resp.headers["Content-Type"] = content_type + return resp + + +class TestParseResponseBody(unittest.TestCase): + def test_protobuf_content_type_with_error_message(self): + proto_response = ExportTraceServiceResponse( + partial_success=ExportTracePartialSuccess( + rejected_spans=3, + error_message="invalid span data", + ) + ) + resp = _make_response( + content=proto_response.SerializeToString(), + content_type="application/x-protobuf", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "invalid span data", + ) + + def test_protobuf_content_type_without_error_message_falls_back_to_reason( + self, + ): + proto_response = ExportTraceServiceResponse() + resp = _make_response( + content=proto_response.SerializeToString(), + content_type="application/x-protobuf", + reason="Bad Request", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "Bad Request", + ) + + def test_protobuf_content_type_with_charset_parameter(self): + proto_response = ExportTraceServiceResponse( + partial_success=ExportTracePartialSuccess( + error_message="quota exceeded" + ) + ) + resp = _make_response( + content=proto_response.SerializeToString(), + content_type="application/x-protobuf; charset=utf-8", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "quota exceeded", + ) + + def test_json_content_type_with_partial_success_error_message(self): + body = json.dumps( + {"partialSuccess": {"errorMessage": "rate limit exceeded"}} + ).encode() + resp = _make_response(content=body, content_type="application/json") + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "rate limit exceeded", + ) + + def test_json_content_type_with_rpc_status_message(self): + body = json.dumps({"message": "permission denied"}).encode() + resp = _make_response(content=body, content_type="application/json") + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "permission denied", + ) + + def test_json_content_type_with_charset_parameter(self): + body = json.dumps({"message": "not authorized"}).encode() + resp = _make_response( + content=body, content_type="application/json; charset=utf-8" + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "not authorized", + ) + + def test_unknown_content_type_returns_text(self): + resp = _make_response( + content=b"something went wrong", + content_type="text/plain", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "something went wrong", + ) + + def test_empty_body_returns_reason(self): + resp = _make_response( + content=b"", + content_type="application/x-protobuf", + reason="Service Unavailable", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "Service Unavailable", + ) + + def test_malformed_protobuf_body_falls_back_to_reason(self): + resp = _make_response( + content=b"\xff\xfe invalid protobuf", + content_type="application/x-protobuf", + reason="Bad Request", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "Bad Request", + ) + + def test_malformed_json_body_falls_back_to_text(self): + resp = _make_response( + content=b"not valid json {{{", + content_type="application/json", + reason="Bad Request", + ) + self.assertEqual( + _parse_response_body(resp, ExportTraceServiceResponse), + "not valid json {{{", + ) + + def test_works_with_logs_response_class(self): + proto_response = ExportLogsServiceResponse( + partial_success=ExportLogsPartialSuccess( + rejected_log_records=2, + error_message="log quota exceeded", + ) + ) + resp = _make_response( + content=proto_response.SerializeToString(), + content_type="application/x-protobuf", + ) + self.assertEqual( + _parse_response_body(resp, ExportLogsServiceResponse), + "log quota exceeded", + ) + + +if __name__ == "__main__": + unittest.main()