diff --git a/tests/conftest.py b/tests/conftest.py index 0853013dfd..7f76fc2aee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,13 +57,6 @@ from collections.abc import Iterator try: - from anyio import create_memory_object_stream, create_task_group, EndOfStream - from mcp.types import ( - JSONRPCMessage, - JSONRPCNotification, - JSONRPCRequest, - ) - from mcp.shared.message import SessionMessage from httpx import ( ASGITransport, Request as HttpxRequest, @@ -71,6 +64,22 @@ AsyncByteStream, AsyncClient, ) +except ImportError: + ASGITransport = None + HttpxRequest = None + HttpxResponse = None + AsyncByteStream = None + AsyncClient = None + + +try: + from anyio import create_memory_object_stream, create_task_group, EndOfStream + from mcp.types import ( + JSONRPCMessage, + JSONRPCNotification, + JSONRPCRequest, + ) + from mcp.shared.message import SessionMessage except ImportError: create_memory_object_stream = None create_task_group = None @@ -81,12 +90,6 @@ JSONRPCRequest = None SessionMessage = None - ASGITransport = None - HttpxRequest = None - HttpxResponse = None - AsyncByteStream = None - AsyncClient = None - SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json" @@ -1013,6 +1016,39 @@ async def inner(values): return inner +@pytest.fixture +def server_side_event_chunks(): + def inner(events): + for event in events: + payload = event.model_dump() + chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" + yield chunk.encode("utf-8") + + return inner + + +@pytest.fixture +def get_model_response(): + def inner(response_content, serialize_pydantic=False): + model_request = HttpxRequest( + "POST", + "/responses", + ) + + if serialize_pydantic: + response_content = json.dumps(response_content.model_dump()).encode("utf-8") + + response = HttpxResponse( + 200, + request=model_request, + content=response_content, + ) + + return response + + return inner + + class MockServerRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # noqa: N802 # Process an HTTP GET request and return a response. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index ea48f5d4db..c6a071b90f 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -216,42 +216,51 @@ async def test_nonstreaming_create_message_async( ], ) def test_streaming_create_message( - sentry_init, capture_events, send_default_pii, include_prompts + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, ): client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -259,7 +268,6 @@ def test_streaming_create_message( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = mock.Mock(return_value=returned_stream) messages = [ { @@ -268,23 +276,26 @@ def test_streaming_create_message( } ] - with start_transaction(name="anthropic"): - message = client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) - for _ in message: - pass + for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events assert event["type"] == "transaction" assert event["transaction"] == "anthropic" - assert len(event["spans"]) == 1 - (span,) = event["spans"] + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) assert span["op"] == OP.GEN_AI_CHAT assert span["description"] == "chat model" @@ -319,43 +330,53 @@ def test_streaming_create_message( ], ) async def test_streaming_create_message_async( - sentry_init, capture_events, send_default_pii, include_prompts, async_iterator + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + async_iterator, + server_side_event_chunks, ): client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), ) sentry_init( @@ -364,7 +385,6 @@ async def test_streaming_create_message_async( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = AsyncMock(return_value=returned_stream) messages = [ { @@ -373,15 +393,19 @@ async def test_streaming_create_message_async( } ] - with start_transaction(name="anthropic"): - message = await client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) - async for _ in message: - pass + async for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events @@ -427,68 +451,81 @@ async def test_streaming_create_message_async( ], ) def test_streaming_create_message_with_input_json_delta( - sentry_init, capture_events, send_default_pii, include_prompts + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, ): client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=Message( - id="msg_0", - content=[], - model="claude-3-5-sonnet-20240620", - role="assistant", - stop_reason=None, - stop_sequence=None, - type="message", - usage=Usage(input_tokens=366, output_tokens=10), - ), - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=ToolUseBlock( - id="toolu_0", input={}, name="get_weather", type="tool_use" - ), - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="{'location':", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="Francisco, C", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="tool_use", stop_sequence=None), - usage=MessageDeltaUsage(output_tokens=41), - type="message_delta", - ), - ] + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="{'location':", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -496,7 +533,6 @@ def test_streaming_create_message_with_input_json_delta( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = mock.Mock(return_value=returned_stream) messages = [ { @@ -505,15 +541,19 @@ def test_streaming_create_message_with_input_json_delta( } ] - with start_transaction(name="anthropic"): - message = client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) - for _ in message: - pass + for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events @@ -562,73 +602,88 @@ def test_streaming_create_message_with_input_json_delta( ], ) async def test_streaming_create_message_with_input_json_delta_async( - sentry_init, capture_events, send_default_pii, include_prompts, async_iterator + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + async_iterator, + server_side_event_chunks, ): client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=Message( - id="msg_0", - content=[], - model="claude-3-5-sonnet-20240620", - role="assistant", - stop_reason=None, - stop_sequence=None, - type="message", - usage=Usage(input_tokens=366, output_tokens=10), - ), - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=ToolUseBlock( - id="toolu_0", input={}, name="get_weather", type="tool_use" - ), - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta( - partial_json="{'location':", type="input_json_delta" - ), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta( - partial_json="Francisco, C", type="input_json_delta" - ), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="tool_use", stop_sequence=None), - usage=MessageDeltaUsage(output_tokens=41), - type="message_delta", - ), - ] + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="{'location':", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json=" 'S", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="an ", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="A'}", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ) ) sentry_init( @@ -637,7 +692,6 @@ async def test_streaming_create_message_with_input_json_delta_async( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = AsyncMock(return_value=returned_stream) messages = [ { @@ -646,15 +700,19 @@ async def test_streaming_create_message_with_input_json_delta_async( } ] - with start_transaction(name="anthropic"): - message = await client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) - async for _ in message: - pass + async for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events @@ -1238,43 +1296,52 @@ async def test_nonstreaming_create_message_with_system_prompt_async( ], ) def test_streaming_create_message_with_system_prompt( - sentry_init, capture_events, send_default_pii, include_prompts + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + server_side_event_chunks, ): """Test that system prompts are properly captured in streaming mode.""" client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -1282,7 +1349,6 @@ def test_streaming_create_message_with_system_prompt( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = mock.Mock(return_value=returned_stream) messages = [ { @@ -1291,19 +1357,23 @@ def test_streaming_create_message_with_system_prompt( } ] - with start_transaction(name="anthropic"): - message = client.messages.create( - max_tokens=1024, - messages=messages, - model="model", - stream=True, - system="You are a helpful assistant.", - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = client.messages.create( + max_tokens=1024, + messages=messages, + model="model", + stream=True, + system="You are a helpful assistant.", + ) - for _ in message: - pass + for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events @@ -1356,44 +1426,54 @@ def test_streaming_create_message_with_system_prompt( ], ) async def test_streaming_create_message_with_system_prompt_async( - sentry_init, capture_events, send_default_pii, include_prompts, async_iterator + sentry_init, + capture_events, + send_default_pii, + include_prompts, + get_model_response, + async_iterator, + server_side_event_chunks, ): """Test that system prompts are properly captured in streaming mode (async).""" client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) ) sentry_init( @@ -1402,7 +1482,6 @@ async def test_streaming_create_message_with_system_prompt_async( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = AsyncMock(return_value=returned_stream) messages = [ { @@ -1411,19 +1490,23 @@ async def test_streaming_create_message_with_system_prompt_async( } ] - with start_transaction(name="anthropic"): - message = await client.messages.create( - max_tokens=1024, - messages=messages, - model="model", - stream=True, - system="You are a helpful assistant.", - ) + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + message = await client.messages.create( + max_tokens=1024, + messages=messages, + model="model", + stream=True, + system="You are a helpful assistant.", + ) - async for _ in message: - pass + async for _ in message: + pass - assert message == returned_stream assert len(events) == 1 (event,) = events @@ -2360,50 +2443,63 @@ def test_input_tokens_include_cache_read_nonstreaming(sentry_init, capture_event assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHE_WRITE] == 0 -def test_input_tokens_include_cache_read_streaming(sentry_init, capture_events): +def test_input_tokens_include_cache_read_streaming( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): """ Test that gen_ai.usage.input_tokens includes cache_read tokens (streaming). Same cache-hit scenario as non-streaming, using realistic streaming events. """ client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - type="message_start", - message=Message( - id="id", - model="claude-sonnet-4-20250514", - role="assistant", - content=[], - type="message", - usage=Usage( - input_tokens=19, - output_tokens=0, - cache_read_input_tokens=2846, - cache_creation_input_tokens=0, + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-sonnet-4-20250514", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=19, + output_tokens=0, + cache_read_input_tokens=2846, + cache_creation_input_tokens=0, + ), + ), ), - ), - ), - MessageDeltaEvent( - type="message_delta", - delta=Delta(stop_reason="end_turn"), - usage=MessageDeltaUsage(output_tokens=14), - ), - ] + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=14), + ), + ] + ) + ) sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() - client.messages._post = mock.Mock(return_value=returned_stream) - with start_transaction(name="anthropic"): - for _ in client.messages.create( - max_tokens=1024, - messages=[{"role": "user", "content": "What is 5+5?"}], - model="claude-sonnet-4-20250514", - stream=True, - ): - pass + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + for _ in client.messages.create( + max_tokens=1024, + messages=[{"role": "user", "content": "What is 5+5?"}], + model="claude-sonnet-4-20250514", + stream=True, + ): + pass (span,) = events[0]["spans"] @@ -2452,46 +2548,59 @@ def test_input_tokens_unchanged_without_caching(sentry_init, capture_events): assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 32 # 20 + 12 -def test_cache_tokens_streaming(sentry_init, capture_events): +def test_cache_tokens_streaming( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): """Test cache tokens are tracked for streaming responses.""" client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - type="message_start", - message=Message( - id="id", - model="claude-3-5-sonnet-20241022", - role="assistant", - content=[], - type="message", - usage=Usage( - input_tokens=100, - output_tokens=0, - cache_read_input_tokens=80, - cache_creation_input_tokens=20, + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-3-5-sonnet-20241022", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=100, + output_tokens=0, + cache_read_input_tokens=80, + cache_creation_input_tokens=20, + ), + ), ), - ), - ), - MessageDeltaEvent( - type="message_delta", - delta=Delta(stop_reason="end_turn"), - usage=MessageDeltaUsage(output_tokens=10), - ), - ] + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=10), + ), + ] + ) + ) sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() - client.messages._post = mock.Mock(return_value=returned_stream) - with start_transaction(name="anthropic"): - for _ in client.messages.create( - max_tokens=1024, - messages=[{"role": "user", "content": "Hello"}], - model="claude-3-5-sonnet-20241022", - stream=True, - ): - pass + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + for _ in client.messages.create( + max_tokens=1024, + messages=[{"role": "user", "content": "Hello"}], + model="claude-3-5-sonnet-20241022", + stream=True, + ): + pass (span,) = events[0]["spans"] # input_tokens normalized: 100 + 80 (cache_read) + 20 (cache_write) = 200 diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index 117755b963..12736f6229 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -201,25 +201,6 @@ def test_agent_custom_model(): ) -@pytest.fixture -def get_model_response(): - def inner(response_content): - model_request = httpx.Request( - "POST", - "/responses", - ) - - response = httpx.Response( - 200, - request=model_request, - content=json.dumps(response_content.model_dump()).encode("utf-8"), - ) - - return response - - return inner - - @pytest.mark.asyncio async def test_agent_invocation_span_no_pii( sentry_init, capture_events, test_agent, mock_model_response, get_model_response @@ -228,7 +209,7 @@ async def test_agent_invocation_span_no_pii( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -377,7 +358,7 @@ async def test_agent_invocation_span( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent_with_instructions(instructions).clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -538,7 +519,7 @@ async def test_client_span_custom_model( model = OpenAIResponsesModel(model="my-custom-model", openai_client=client) agent = test_agent_custom_model.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -581,7 +562,7 @@ def test_agent_invocation_span_sync_no_pii( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -724,7 +705,7 @@ def test_agent_invocation_span_sync( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent_with_instructions(instructions).clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -906,7 +887,8 @@ async def test_handoff_span(sentry_init, capture_events, get_model_response): ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -944,7 +926,8 @@ async def test_handoff_span(sentry_init, capture_events, get_model_response): ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -1032,7 +1015,8 @@ async def test_max_turns_before_handoff_span( ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -1070,7 +1054,8 @@ async def test_max_turns_before_handoff_span( ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -1152,7 +1137,8 @@ def simple_test_tool(message: str) -> str: ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -1190,7 +1176,8 @@ def simple_test_tool(message: str) -> str: ), total_tokens=25, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -1386,16 +1373,9 @@ def simple_test_tool(message: str) -> str: assert ai_client_span2["data"]["gen_ai.usage.total_tokens"] == 25 -def server_side_event_chunks(events): - for event in events: - payload = event.model_dump() - chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" - yield chunk.encode("utf-8") - - @pytest.mark.asyncio async def test_hosted_mcp_tool_propagation_header_streamed( - sentry_init, test_agent, async_iterator + sentry_init, test_agent, async_iterator, server_side_event_chunks ): """ Test responses API is given trace propagation headers with HostedMCPTool. @@ -1594,7 +1574,7 @@ async def test_hosted_mcp_tool_propagation_headers( release="d08ebdb9309e1b004c6f52202de58a09c2268e42", ) - response = get_model_response(EXAMPLE_RESPONSE) + response = get_model_response(EXAMPLE_RESPONSE, serialize_pydantic=True) with patch.object( agent_with_tool.model._client._client, @@ -1905,7 +1885,8 @@ async def test_mcp_tool_execution_spans( ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -1943,7 +1924,8 @@ async def test_mcp_tool_execution_spans( ), total_tokens=25, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2035,7 +2017,8 @@ async def test_mcp_tool_execution_with_error( ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -2073,7 +2056,8 @@ async def test_mcp_tool_execution_with_error( ), total_tokens=25, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2163,7 +2147,8 @@ async def test_mcp_tool_execution_without_pii( ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -2201,7 +2186,8 @@ async def test_mcp_tool_execution_without_pii( ), total_tokens=25, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2259,7 +2245,7 @@ async def test_multiple_agents_asyncio( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -2389,7 +2375,8 @@ def failing_tool(message: str) -> str: ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -2427,7 +2414,8 @@ def failing_tool(message: str) -> str: ), total_tokens=25, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2522,7 +2510,8 @@ async def test_invoke_agent_span_includes_usage_data( ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2613,7 +2602,8 @@ async def test_ai_client_span_includes_response_model( ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2700,7 +2690,8 @@ async def test_ai_client_span_response_model_with_chat_completions( ), total_tokens=40, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2777,7 +2768,8 @@ def calculator(a: int, b: int) -> int: ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -2815,7 +2807,8 @@ def calculator(a: int, b: int) -> int: ), total_tokens=35, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2903,7 +2896,8 @@ async def test_invoke_agent_span_includes_response_model( ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -2992,7 +2986,8 @@ def calculator(a: int, b: int) -> int: ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) second_response = get_model_response( @@ -3030,7 +3025,8 @@ def calculator(a: int, b: int) -> int: ), total_tokens=35, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -3162,7 +3158,9 @@ async def test_streaming_span_update_captures_response_data( @pytest.mark.asyncio -async def test_streaming_ttft_on_chat_span(sentry_init, test_agent, async_iterator): +async def test_streaming_ttft_on_chat_span( + sentry_init, test_agent, async_iterator, server_side_event_chunks +): """ Test that time-to-first-token (TTFT) is recorded on chat spans during streaming. @@ -3330,7 +3328,7 @@ async def test_conversation_id_on_all_spans( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client, @@ -3423,7 +3421,8 @@ def simple_tool(message: str) -> str: ), total_tokens=15, ), - ) + ), + serialize_pydantic=True, ) final_response = get_model_response( @@ -3461,7 +3460,8 @@ def simple_tool(message: str) -> str: ), total_tokens=30, ), - ) + ), + serialize_pydantic=True, ) with patch.object( @@ -3524,7 +3524,7 @@ async def test_no_conversation_id_when_not_provided( model = OpenAIResponsesModel(model="gpt-4", openai_client=client) agent = test_agent.clone(model=model) - response = get_model_response(mock_model_response) + response = get_model_response(mock_model_response, serialize_pydantic=True) with patch.object( agent.model._client._client,