From 9eff33ea5505741e6edbb105419923ea0939c868 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 12 Mar 2026 09:11:25 +0100 Subject: [PATCH 1/2] fix(anthropic): Close span on GeneratorExit --- sentry_sdk/integrations/anthropic.py | 170 ++++++++++++++------------- tests/conftest.py | 62 ++++++++-- 2 files changed, 135 insertions(+), 97 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 8c4b09e89a..cf573f3bdb 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -364,52 +364,53 @@ def _wrap_synchronous_message_iterator( usage = _RecordedUsage() content_blocks: "list[str]" = [] - for event in iterator: - if not isinstance( - event, + try: + for event in iterator: + if not isinstance( + event, + ( + MessageStartEvent, + MessageDeltaEvent, + MessageStopEvent, + ContentBlockStartEvent, + ContentBlockDeltaEvent, + ContentBlockStopEvent, + ), + ): + yield event + continue + ( - MessageStartEvent, - MessageDeltaEvent, - MessageStopEvent, - ContentBlockStartEvent, - ContentBlockDeltaEvent, - ContentBlockStopEvent, - ), - ): + model, + usage, + content_blocks, + ) = _collect_ai_data( + event, + model, + usage, + content_blocks, + ) yield event - continue - - ( - model, - usage, - content_blocks, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, + finally: + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) ) - yield event - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - ) + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + ) async def _wrap_asynchronous_message_iterator( @@ -425,52 +426,53 @@ async def _wrap_asynchronous_message_iterator( usage = _RecordedUsage() content_blocks: "list[str]" = [] - async for event in iterator: - if not isinstance( - event, + try: + async for event in iterator: + if not isinstance( + event, + ( + MessageStartEvent, + MessageDeltaEvent, + MessageStopEvent, + ContentBlockStartEvent, + ContentBlockDeltaEvent, + ContentBlockStopEvent, + ), + ): + yield event + continue + ( - MessageStartEvent, - MessageDeltaEvent, - MessageStopEvent, - ContentBlockStartEvent, - ContentBlockDeltaEvent, - ContentBlockStopEvent, - ), - ): + model, + usage, + content_blocks, + ) = _collect_ai_data( + event, + model, + usage, + content_blocks, + ) yield event - continue - - ( - model, - usage, - content_blocks, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, + finally: + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) ) - yield event - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - ) + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + ) def _set_output_data( diff --git a/tests/conftest.py b/tests/conftest.py index 0853013dfd..7f76fc2aee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,13 +57,6 @@ from collections.abc import Iterator try: - from anyio import create_memory_object_stream, create_task_group, EndOfStream - from mcp.types import ( - JSONRPCMessage, - JSONRPCNotification, - JSONRPCRequest, - ) - from mcp.shared.message import SessionMessage from httpx import ( ASGITransport, Request as HttpxRequest, @@ -71,6 +64,22 @@ AsyncByteStream, AsyncClient, ) +except ImportError: + ASGITransport = None + HttpxRequest = None + HttpxResponse = None + AsyncByteStream = None + AsyncClient = None + + +try: + from anyio import create_memory_object_stream, create_task_group, EndOfStream + from mcp.types import ( + JSONRPCMessage, + JSONRPCNotification, + JSONRPCRequest, + ) + from mcp.shared.message import SessionMessage except ImportError: create_memory_object_stream = None create_task_group = None @@ -81,12 +90,6 @@ JSONRPCRequest = None SessionMessage = None - ASGITransport = None - HttpxRequest = None - HttpxResponse = None - AsyncByteStream = None - AsyncClient = None - SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json" @@ -1013,6 +1016,39 @@ async def inner(values): return inner +@pytest.fixture +def server_side_event_chunks(): + def inner(events): + for event in events: + payload = event.model_dump() + chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" + yield chunk.encode("utf-8") + + return inner + + +@pytest.fixture +def get_model_response(): + def inner(response_content, serialize_pydantic=False): + model_request = HttpxRequest( + "POST", + "/responses", + ) + + if serialize_pydantic: + response_content = json.dumps(response_content.model_dump()).encode("utf-8") + + response = HttpxResponse( + 200, + request=model_request, + content=response_content, + ) + + return response + + return inner + + class MockServerRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # noqa: N802 # Process an HTTP GET request and return a response. From 5e3cb5f28730a88739910432bd5d50072049ec97 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 12 Mar 2026 09:13:43 +0100 Subject: [PATCH 2/2] revert conftest --- tests/conftest.py | 62 ++++++++++------------------------------------- 1 file changed, 13 insertions(+), 49 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 7f76fc2aee..0853013dfd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -56,22 +56,6 @@ from typing import Any, Callable, MutableMapping, Optional from collections.abc import Iterator -try: - from httpx import ( - ASGITransport, - Request as HttpxRequest, - Response as HttpxResponse, - AsyncByteStream, - AsyncClient, - ) -except ImportError: - ASGITransport = None - HttpxRequest = None - HttpxResponse = None - AsyncByteStream = None - AsyncClient = None - - try: from anyio import create_memory_object_stream, create_task_group, EndOfStream from mcp.types import ( @@ -80,6 +64,13 @@ JSONRPCRequest, ) from mcp.shared.message import SessionMessage + from httpx import ( + ASGITransport, + Request as HttpxRequest, + Response as HttpxResponse, + AsyncByteStream, + AsyncClient, + ) except ImportError: create_memory_object_stream = None create_task_group = None @@ -90,6 +81,12 @@ JSONRPCRequest = None SessionMessage = None + ASGITransport = None + HttpxRequest = None + HttpxResponse = None + AsyncByteStream = None + AsyncClient = None + SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json" @@ -1016,39 +1013,6 @@ async def inner(values): return inner -@pytest.fixture -def server_side_event_chunks(): - def inner(events): - for event in events: - payload = event.model_dump() - chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" - yield chunk.encode("utf-8") - - return inner - - -@pytest.fixture -def get_model_response(): - def inner(response_content, serialize_pydantic=False): - model_request = HttpxRequest( - "POST", - "/responses", - ) - - if serialize_pydantic: - response_content = json.dumps(response_content.model_dump()).encode("utf-8") - - response = HttpxResponse( - 200, - request=model_request, - content=response_content, - ) - - return response - - return inner - - class MockServerRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # noqa: N802 # Process an HTTP GET request and return a response.