diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 8c4b09e89a..ebd3ff0745 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,6 +39,7 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages + from anthropic.lib.streaming._messages import MessageStreamManager from anthropic.types import ( MessageStartEvent, @@ -59,7 +60,15 @@ from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart - from anthropic.types import RawMessageStreamEvent + from anthropic import AsyncStream + from anthropic.types import ( + RawMessageStreamEvent, + MessageParam, + ModelParam, + TextBlockParam, + ToolUnionParam, + MessageStream, + ) class _RecordedUsage: @@ -84,6 +93,11 @@ def setup_once() -> None: Messages.create = _wrap_message_create(Messages.create) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + Messages.stream = _wrap_message_stream(Messages.stream) + MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( + MessageStreamManager.__enter__ + ) + def _capture_exception(exc: "Any") -> None: set_span_errored() @@ -253,27 +267,32 @@ def _transform_system_instructions( ] -def _set_input_data( - span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration" +def _set_common_input_data( + span: "Span", + integration: "AnthropicIntegration", + max_tokens: "int", + messages: "Iterable[MessageParam]", + model: "ModelParam", + system: "Optional[Union[str, Iterable[TextBlockParam]]]", + temperature: "Optional[float]", + top_k: "Optional[int]", + top_p: "Optional[float]", + tools: "Optional[Iterable[ToolUnionParam]]", ) -> None: """ Set input data for the span based on the provided keyword arguments for the anthropic message creation. """ span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") - system_instructions: "Union[str, Iterable[TextBlockParam]]" = kwargs.get("system") # type: ignore - messages = kwargs.get("messages") if ( messages is not None - and len(messages) > 0 + and len(messages) > 0 # type: ignore and should_send_default_pii() and integration.include_prompts ): - if isinstance(system_instructions, str) or isinstance( - system_instructions, Iterable - ): + if isinstance(system, str) or isinstance(system, Iterable): span.set_data( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, - json.dumps(_transform_system_instructions(system_instructions)), + json.dumps(_transform_system_instructions(system)), ) normalized_messages = [] @@ -329,32 +348,48 @@ def _set_input_data( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False ) + if max_tokens is not None and _is_given(max_tokens): + span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens) + if model is not None and _is_given(model): + span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model) + if temperature is not None and _is_given(temperature): + span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature) + if top_k is not None and _is_given(top_k): + span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k) + if top_p is not None and _is_given(top_p): + span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p) + + if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore + span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools)) + + +def _set_create_input_data( + span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration" +) -> None: + """ + Set input data for the span based on the provided keyword arguments for the anthropic message creation. + """ span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False)) - kwargs_keys_to_attributes = { - "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, - "model": SPANDATA.GEN_AI_REQUEST_MODEL, - "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE, - "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K, - "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P, - } - for key, attribute in kwargs_keys_to_attributes.items(): - value = kwargs.get(key) - - if value is not None and _is_given(value): - span.set_data(attribute, value) - - # Input attributes: Tools - tools = kwargs.get("tools") - if tools is not None and _is_given(tools) and len(tools) > 0: - span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools)) + _set_common_input_data( + span=span, + integration=integration, + max_tokens=kwargs.get("max_tokens"), # type: ignore + messages=kwargs.get("messages"), # type: ignore + model=kwargs.get("model"), + system=kwargs.get("system"), + temperature=kwargs.get("temperature"), + top_k=kwargs.get("top_k"), + top_p=kwargs.get("top_p"), + tools=kwargs.get("tools"), + ) def _wrap_synchronous_message_iterator( - iterator: "Iterator[RawMessageStreamEvent]", + iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", span: "Span", integration: "AnthropicIntegration", -) -> "Iterator[RawMessageStreamEvent]": +) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. Responsible for closing the AI Client Span. @@ -546,7 +581,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A ) span.__enter__() - _set_input_data(span, kwargs, integration) + _set_create_input_data(span, kwargs, integration) result = yield f, args, kwargs @@ -674,6 +709,90 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_async +def _wrap_message_stream(f: "Any") -> "Any": + """ + Attaches user-provided arguments to the returned context manager. + The attributes are set on AI Client Spans in the patch for the context manager. + """ + + @wraps(f) + def _sentry_patched_stream(*args: "Any", **kwargs: "Any") -> "MessageStreamManager": + stream_manager = f(*args, **kwargs) + + stream_manager._max_tokens = kwargs.get("max_tokens") + stream_manager._messages = kwargs.get("messages") + stream_manager._model = kwargs.get("model") + stream_manager._system = kwargs.get("system") + stream_manager._temperature = kwargs.get("temperature") + stream_manager._top_k = kwargs.get("top_k") + stream_manager._top_p = kwargs.get("top_p") + stream_manager._tools = kwargs.get("tools") + + return stream_manager + + return _sentry_patched_stream + + +def _wrap_message_stream_manager_enter(f: "Any") -> "Any": + """ + Creates and manages AI Client Spans. + """ + + @wraps(f) + def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": + stream = f(self) + if not hasattr(self, "_max_tokens"): + return stream + + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + + if integration is None: + return stream + + if self._messages is None: + return stream + + try: + iter(self._messages) + except TypeError: + return stream + + model = self._model + if model is None: + model = "" + + span = get_start_span_function()( + op=OP.GEN_AI_CHAT, + name=f"chat {model}".strip(), + origin=AnthropicIntegration.origin, + ) + span.__enter__() + + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + _set_common_input_data( + span=span, + integration=integration, + max_tokens=self._max_tokens, + messages=self._messages, + model=model, + system=self._system, + temperature=self._temperature, + top_k=self._top_k, + top_p=self._top_p, + tools=self._tools, + ) + + stream._iterator = _wrap_synchronous_message_iterator( + iterator=stream._iterator, + span=span, + integration=integration, + ) + + return stream + + return _sentry_patched_enter + + def _is_given(obj: "Any") -> bool: """ Check for givenness safely across different anthropic versions. diff --git a/tests/conftest.py b/tests/conftest.py index 0853013dfd..7f76fc2aee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,13 +57,6 @@ from collections.abc import Iterator try: - from anyio import create_memory_object_stream, create_task_group, EndOfStream - from mcp.types import ( - JSONRPCMessage, - JSONRPCNotification, - JSONRPCRequest, - ) - from mcp.shared.message import SessionMessage from httpx import ( ASGITransport, Request as HttpxRequest, @@ -71,6 +64,22 @@ AsyncByteStream, AsyncClient, ) +except ImportError: + ASGITransport = None + HttpxRequest = None + HttpxResponse = None + AsyncByteStream = None + AsyncClient = None + + +try: + from anyio import create_memory_object_stream, create_task_group, EndOfStream + from mcp.types import ( + JSONRPCMessage, + JSONRPCNotification, + JSONRPCRequest, + ) + from mcp.shared.message import SessionMessage except ImportError: create_memory_object_stream = None create_task_group = None @@ -81,12 +90,6 @@ JSONRPCRequest = None SessionMessage = None - ASGITransport = None - HttpxRequest = None - HttpxResponse = None - AsyncByteStream = None - AsyncClient = None - SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json" @@ -1013,6 +1016,39 @@ async def inner(values): return inner +@pytest.fixture +def server_side_event_chunks(): + def inner(events): + for event in events: + payload = event.model_dump() + chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" + yield chunk.encode("utf-8") + + return inner + + +@pytest.fixture +def get_model_response(): + def inner(response_content, serialize_pydantic=False): + model_request = HttpxRequest( + "POST", + "/responses", + ) + + if serialize_pydantic: + response_content = json.dumps(response_content.model_dump()).encode("utf-8") + + response = HttpxResponse( + 200, + request=model_request, + content=response_content, + ) + + return response + + return inner + + class MockServerRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # noqa: N802 # Process an HTTP GET request and return a response. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index ea48f5d4db..0a7900936e 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -308,6 +308,120 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_stream_messages( + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -547,6 +661,157 @@ def test_streaming_create_message_with_input_json_delta( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.skipif( + ANTHROPIC_VERSION < (0, 27), + reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", +) +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_stream_messages_with_input_json_delta( + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json='{"location": "', type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json='A"}', type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "What is the weather like in San Francisco?", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "What is the weather like in San Francisco?"}]' + ) + assert ( + span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] + == '{"location": "San Francisco, CA"}' + ) + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 366 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 41 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 407 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.asyncio @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), @@ -1345,6 +1610,133 @@ def test_streaming_create_message_with_system_prompt( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_stream_messages_with_system_prompt( + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, +): + """Test that system prompts are properly captured in streaming mode.""" + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + system="You are a helpful assistant.", + ) as stream: + for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS in span["data"] + system_instructions = json.loads( + span["data"][SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS] + ) + assert system_instructions == [ + {"type": "text", "content": "You are a helpful assistant."} + ] + + assert SPANDATA.GEN_AI_REQUEST_MESSAGES in span["data"] + stored_messages = json.loads(span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]) + assert len(stored_messages) == 1 + assert stored_messages[0]["role"] == "user" + assert stored_messages[0]["content"] == "Hello, Claude" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -2407,6 +2799,72 @@ def test_input_tokens_include_cache_read_streaming(sentry_init, capture_events): (span,) = events[0]["spans"] + # input_tokens should be total: 19 + 2846 = test_stream_messages_input_tokens_include_cache_read_streaming + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 2865 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 2879 # 2865 + 14 + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED] == 2846 + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHE_WRITE] == 0 + + +def test_stream_messages_input_tokens_include_cache_read_streaming( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + """ + Test that gen_ai.usage.input_tokens includes cache_read tokens (streaming). + + Same cache-hit scenario as non-streaming, using realistic streaming events. + """ + client = Anthropic(api_key="z") + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-sonnet-4-20250514", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=19, + output_tokens=0, + cache_read_input_tokens=2846, + cache_creation_input_tokens=0, + ), + ), + ), + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=14), + ), + ] + ) + ) + + sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) + events = capture_events() + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=[{"role": "user", "content": "What is 5+5?"}], + model="claude-sonnet-4-20250514", + ) as stream: + for event in stream: + pass + + (span,) = events[0]["spans"] + # input_tokens should be total: 19 + 2846 = 2865 assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 2865 assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 2879 # 2865 + 14 @@ -2500,3 +2958,63 @@ def test_cache_tokens_streaming(sentry_init, capture_events): assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 210 assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED] == 80 assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHE_WRITE] == 20 + + +def test_stream_messages_cache_tokens( + sentry_init, capture_events, get_model_response, server_side_event_chunks +): + """Test cache tokens are tracked for streaming responses.""" + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-3-5-sonnet-20241022", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=100, + output_tokens=0, + cache_read_input_tokens=80, + cache_creation_input_tokens=20, + ), + ), + ), + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=10), + ), + ] + ) + ) + + sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) + events = capture_events() + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=[{"role": "user", "content": "Hello"}], + model="claude-3-5-sonnet-20241022", + ) as stream: + for event in stream: + pass + + (span,) = events[0]["spans"] + # input_tokens normalized: 100 + 80 (cache_read) + 20 (cache_write) = 200 + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 200 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 210 + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED] == 80 + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHE_WRITE] == 20