Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 110 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

if TYPE_CHECKING:
Expand All @@ -49,6 +50,8 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -338,6 +341,101 @@
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

Check warning on line 369 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: code-review

Span never closes if streaming iteration fails or is abandoned early

The `_wrap_synchronous_message_iterator` generator function lacks a `try...finally` block around the iteration loop. If an exception occurs during iteration, or if the consumer breaks out of the loop early, `_set_output_data(..., finish_span=True)` will never be called and the span will never be closed via `__exit__()`. This results in unclosed spans and potential resource leaks. Other integrations like langchain.py use `try...finally` to ensure proper span cleanup.

Check warning on line 369 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Span never closed if synchronous streaming iterator is interrupted

The `_wrap_synchronous_message_iterator` function lacks try/finally handling around the iteration loop. If an exception is raised during iteration, or if the consumer breaks out of the loop early (e.g., `for event in stream: break`), or if the generator is explicitly closed, the code after the `for` loop (lines 371-389) will never execute. This leaves the span unclosed, causing resource leaks and incomplete traces. Other integrations like `google_genai` and `openai_agents` use try/finally patterns to ensure span cleanup.

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


async def _wrap_asynchronous_message_iterator(
iterator: "AsyncIterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "AsyncIterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

Check warning on line 416 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: code-review

[BY7-HCY] Span never closes if streaming iteration fails or is abandoned early (additional location)

The `_wrap_synchronous_message_iterator` generator function lacks a `try...finally` block around the iteration loop. If an exception occurs during iteration, or if the consumer breaks out of the loop early, `_set_output_data(..., finish_span=True)` will never be called and the span will never be closed via `__exit__()`. This results in unclosed spans and potential resource leaks. Other integrations like langchain.py use `try...finally` to ensure proper span cleanup.

Check warning on line 416 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[EA8-HD2] Span never closed if synchronous streaming iterator is interrupted (additional location)

The `_wrap_synchronous_message_iterator` function lacks try/finally handling around the iteration loop. If an exception is raised during iteration, or if the consumer breaks out of the loop early (e.g., `for event in stream: break`), or if the generator is explicitly closed, the code after the `for` loop (lines 371-389) will never execute. This leaves the span unclosed, causing resource leaks and incomplete traces. Other integrations like `google_genai` and `openai_agents` use try/finally patterns to ensure span cleanup.

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


def _set_output_data(
span: "Span",
integration: "AnthropicIntegration",
Expand Down Expand Up @@ -415,6 +513,18 @@

result = yield f, args, kwargs

if isinstance(result, Stream):
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
)
return result

if isinstance(result, AsyncStream):
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
)
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +554,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading