diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 32522a7234..0592e337de 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,7 +39,11 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager + from anthropic.lib.streaming import ( + MessageStreamManager, + MessageStream, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -56,7 +60,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union + from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -67,7 +71,7 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import MessageStream, AsyncMessageStream + from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -89,14 +93,43 @@ def setup_once() -> None: version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) + """ + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. + + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. + + Both paths may run. For example, the context manager exit can follow iterator exhaustion. + """ Messages.create = _wrap_message_create(Messages.create) + Stream.close = _wrap_close(Stream.close) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + """ + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. + + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. + + Both paths may run. For example, the context manager exit can follow iterator exhaustion. + """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if not issubclass(MessageStream, Stream): + MessageStream.close = _wrap_close(MessageStream.close) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( _wrap_async_message_stream_manager_aenter( @@ -399,21 +432,13 @@ def _set_create_input_data( def _wrap_synchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - finish_reason = None - try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -432,40 +457,21 @@ def _wrap_synchronous_message_iterator( yield event continue - (model, usage, content_blocks, response_id, finish_reason) = ( - _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - finish_reason, - ) - ) + _accumulate_event_data(stream, event) yield event finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - finish_reason=finish_reason, - ) + if hasattr(stream, "_span"): + _finish_streaming_span( + span=stream._span, + integration=stream._integration, + model=stream._model, + usage=stream._usage, + content_blocks=stream._content_blocks, + response_id=stream._response_id, + finish_reason=stream._finish_reason, + ) + del stream._span async def _wrap_asynchronous_message_iterator( @@ -625,9 +631,15 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs if isinstance(result, Stream): + result._span = span + result._integration = integration + + _initialize_data_accumulation_state(result) result._iterator = _wrap_synchronous_message_iterator( - result._iterator, span, integration + result, + result._iterator, ) + return result if isinstance(result, AsyncStream): @@ -712,6 +724,108 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: + """ + Initialize fields for accumulating output on the Stream instance. + """ + if not hasattr(stream, "_model"): + stream._model = None + stream._usage = _RecordedUsage() + stream._content_blocks = [] + stream._response_id = None + stream._finish_reason = None + + +def _accumulate_event_data( + stream: "Union[Stream, MessageStream]", + event: "Union[RawMessageStreamEvent, MessageStreamEvent]", +) -> None: + """ + Update accumulated output from a single stream event. + """ + (model, usage, content_blocks, response_id, finish_reason) = _collect_ai_data( + event, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + stream._finish_reason, + ) + + stream._model = model + stream._usage = usage + stream._content_blocks = content_blocks + stream._response_id = response_id + stream._finish_reason = finish_reason + + +def _finish_streaming_span( + span: "Span", + integration: "AnthropicIntegration", + model: "Optional[str]", + usage: "_RecordedUsage", + content_blocks: "list[str]", + response_id: "Optional[str]", + finish_reason: "Optional[str]", +) -> None: + """ + Set output attributes on the AI Client Span and end the span. + """ + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + response_id=response_id, + finish_reason=finish_reason, + ) + + +def _wrap_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self: "Union[Stream, MessageStream]") -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + del self._span + return f(self) + + _finish_streaming_span( + span=self._span, + integration=self._integration, + model=self._model, + usage=self._usage, + content_blocks=self._content_blocks, + response_id=self._response_id, + finish_reason=self._finish_reason, + ) + del self._span + + return f(self) + + return close + + def _wrap_message_create_async(f: "Any") -> "Any": async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": gen = _sentry_patched_create_common(f, *args, **kwargs) @@ -819,10 +933,13 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": tools=self._tools, ) + stream._span = span + stream._integration = integration + + _initialize_data_accumulation_state(stream) stream._iterator = _wrap_synchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, + stream, + stream._iterator, ) return stream diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 8b83d2d128..2139d74a1b 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -28,6 +28,11 @@ async def __call__(self, *args, **kwargs): except ImportError: pass +try: + from anthropic.lib.streaming import TextEvent +except ImportError: + TextEvent = None + try: # 0.27+ from anthropic.types.raw_message_delta_event import Delta @@ -328,6 +333,107 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] +def test_streaming_create_message_close( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + for _ in range(4): + next(messages) + + messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -445,6 +551,112 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] +def test_stream_messages_close( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="max_tokens"), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + for _ in range(4): + next(stream) + + # New versions add TextEvent, so consume one more event. + if TextEvent is not None and isinstance(next(stream), TextEvent): + next(stream) + + stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts",