Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 165 additions & 48 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages
from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager
from anthropic.lib.streaming import (
MessageStreamManager,
MessageStream,
AsyncMessageStreamManager,
)

from anthropic.types import (
MessageStartEvent,
Expand All @@ -56,7 +60,7 @@
raise DidNotEnable("Anthropic not installed")

if TYPE_CHECKING:
from typing import Any, AsyncIterator, Iterator, Optional, Union
from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

Expand All @@ -67,7 +71,7 @@
TextBlockParam,
ToolUnionParam,
)
from anthropic.lib.streaming import MessageStream, AsyncMessageStream
from anthropic.lib.streaming import AsyncMessageStream


class _RecordedUsage:
Expand All @@ -89,14 +93,43 @@ def setup_once() -> None:
version = package_version("anthropic")
_check_minimum_version(AnthropicIntegration, version)

"""
client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol.
The private _iterator variable and the close() method are patched. During iteration over the _iterator generator,
information from intercepted events is accumulated and used to populate output attributes on the AI Client Span.

The span can be finished in two places:
- When the user exits the context manager or directly calls close(), the patched close() finishes the span.
- When iteration ends, the finally block in the _iterator wrapper finishes the span.

Both paths may run. For example, the context manager exit can follow iterator exhaustion.
"""
Messages.create = _wrap_message_create(Messages.create)
Stream.close = _wrap_close(Stream.close)

AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

"""
client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol.
The private _iterator variable and the close() method are patched. During iteration over the _iterator generator,
information from intercepted events is accumulated and used to populate output attributes on the AI Client Span.

The span can be finished in two places:
- When the user exits the context manager or directly calls close(), the patched close() finishes the span.
- When iteration ends, the finally block in the _iterator wrapper finishes the span.

Both paths may run. For example, the context manager exit can follow iterator exhaustion.
"""
Messages.stream = _wrap_message_stream(Messages.stream)
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
MessageStreamManager.__enter__
)

# Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a
# MessageStream inherits from Stream, so patching Stream is sufficient on these versions.
if not issubclass(MessageStream, Stream):
MessageStream.close = _wrap_close(MessageStream.close)

AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream)
AsyncMessageStreamManager.__aenter__ = (
_wrap_async_message_stream_manager_aenter(
Expand Down Expand Up @@ -399,21 +432,13 @@ def _set_create_input_data(


def _wrap_synchronous_message_iterator(
stream: "Union[Stream, MessageStream]",
iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []
response_id = None
finish_reason = None

try:
for event in iterator:
# Message and content types are aliases for corresponding Raw* types, introduced in
Expand All @@ -432,40 +457,21 @@ def _wrap_synchronous_message_iterator(
yield event
continue

(model, usage, content_blocks, response_id, finish_reason) = (
_collect_ai_data(
event,
model,
usage,
content_blocks,
response_id,
finish_reason,
)
)
_accumulate_event_data(stream, event)
yield event
finally:
with capture_internal_exceptions():
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)
if hasattr(stream, "_span"):
_finish_streaming_span(
span=stream._span,
integration=stream._integration,
model=stream._model,
usage=stream._usage,
content_blocks=stream._content_blocks,
response_id=stream._response_id,
finish_reason=stream._finish_reason,
)
del stream._span


async def _wrap_asynchronous_message_iterator(
Expand Down Expand Up @@ -625,9 +631,15 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
result = yield f, args, kwargs

if isinstance(result, Stream):
result._span = span
result._integration = integration

_initialize_data_accumulation_state(result)
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
result,
result._iterator,
)

return result

if isinstance(result, AsyncStream):
Expand Down Expand Up @@ -712,6 +724,108 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
return _sentry_patched_create_sync


def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None:
"""
Initialize fields for accumulating output on the Stream instance.
"""
if not hasattr(stream, "_model"):
stream._model = None
stream._usage = _RecordedUsage()
stream._content_blocks = []
stream._response_id = None
stream._finish_reason = None


def _accumulate_event_data(
stream: "Union[Stream, MessageStream]",
event: "Union[RawMessageStreamEvent, MessageStreamEvent]",
) -> None:
"""
Update accumulated output from a single stream event.
"""
(model, usage, content_blocks, response_id, finish_reason) = _collect_ai_data(
event,
stream._model,
stream._usage,
stream._content_blocks,
stream._response_id,
stream._finish_reason,
)

stream._model = model
stream._usage = usage
stream._content_blocks = content_blocks
stream._response_id = response_id
stream._finish_reason = finish_reason


def _finish_streaming_span(
span: "Span",
integration: "AnthropicIntegration",
model: "Optional[str]",
usage: "_RecordedUsage",
content_blocks: "list[str]",
response_id: "Optional[str]",
finish_reason: "Optional[str]",
) -> None:
"""
Set output attributes on the AI Client Span and end the span.
"""
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)


def _wrap_close(
f: "Callable[..., None]",
) -> "Callable[..., None]":
"""
Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first.
"""

def close(self: "Union[Stream, MessageStream]") -> None:
if not hasattr(self, "_span"):
return f(self)

if not hasattr(self, "_model"):
self._span.__exit__(None, None, None)
del self._span
return f(self)

_finish_streaming_span(
span=self._span,
integration=self._integration,
model=self._model,
usage=self._usage,
content_blocks=self._content_blocks,
response_id=self._response_id,
finish_reason=self._finish_reason,
)
del self._span

return f(self)

return close


def _wrap_message_create_async(f: "Any") -> "Any":
async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)
Expand Down Expand Up @@ -819,10 +933,13 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
tools=self._tools,
)

stream._span = span
stream._integration = integration

_initialize_data_accumulation_state(stream)
stream._iterator = _wrap_synchronous_message_iterator(
iterator=stream._iterator,
span=span,
integration=integration,
stream,
stream._iterator,
)

return stream
Expand Down
Loading
Loading