From 35b64e2c1f43ecf8eb312e51d9c5d8fcf7ea276b Mon Sep 17 00:00:00 2001 From: Mark Cavage Date: Fri, 5 Jun 2026 06:34:22 -0700 Subject: [PATCH] fix(anthropic): route input_json deltas by content-block index Parallel tool_use blocks streamed from Anthropic share a single streamAdapter.toolID field that is overwritten on every ContentBlockStartEvent. Subsequent InputJSONDelta events for the first block then carry the second block's tool ID, and the runtime accumulator (keyed by ToolCall.ID in pkg/runtime/streaming.go) concatenates both calls' argument fragments into one buffer. The first tool's args end up empty; the second tool's args contain interleaved fragments that fail to parse, surfacing as invalid character 's' looking for beginning of value invalid character '-' after object key:value pair (the exact character depends on where the chunk boundaries fall). Track tool IDs per content-block index via toolIDByBlock map[int64]string and route InputJSONDelta events using eventVariant.Index. Apply the same fix to betaStreamAdapter, which shared the bug. Adds pkg/model/provider/anthropic/parallel_toolcall_test.go covering both the standard and Beta adapters with interleaved input_json_delta events; both tests fail without this fix and pass with it. Note: pkg/model/provider/bedrock/adapter.go carries the same single-currentToolID pattern and is likely vulnerable to the same bug on parallel Converse tool calls; left for a follow-up PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/model/provider/anthropic/adapter.go | 17 +- pkg/model/provider/anthropic/beta_adapter.go | 15 +- .../anthropic/parallel_toolcall_test.go | 282 ++++++++++++++++++ 3 files changed, 306 insertions(+), 8 deletions(-) create mode 100644 pkg/model/provider/anthropic/parallel_toolcall_test.go diff --git a/pkg/model/provider/anthropic/adapter.go b/pkg/model/provider/anthropic/adapter.go index 2565e69ed..6465f77a9 100644 --- a/pkg/model/provider/anthropic/adapter.go +++ b/pkg/model/provider/anthropic/adapter.go @@ -21,13 +21,19 @@ type streamAdapter struct { trackUsage bool toolCall bool - toolID string + // toolIDByBlock maps a content block index to its tool_use block ID. + // Anthropic emits each tool_use in its own content block; subsequent + // input_json_delta events carry the block index (not the tool ID), so + // we must remember the ID per block to route partial JSON correctly + // when multiple tool calls stream in parallel. + toolIDByBlock map[int64]string } func (c *Client) newStreamAdapter(stream *ssestream.Stream[anthropic.MessageStreamEventUnion], trackUsage bool) *streamAdapter { return &streamAdapter{ retryableStream: retryableStream[anthropic.MessageStreamEventUnion]{stream: stream}, trackUsage: trackUsage, + toolIDByBlock: map[int64]string{}, } } @@ -92,10 +98,13 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) { case anthropic.ContentBlockStartEvent: switch block := eventVariant.ContentBlock.AsAny().(type) { case anthropic.ToolUseBlock: - a.toolID = block.ID + if a.toolIDByBlock == nil { + a.toolIDByBlock = map[int64]string{} + } + a.toolIDByBlock[eventVariant.Index] = block.ID a.toolCall = true toolCall := tools.ToolCall{ - ID: a.toolID, + ID: block.ID, Type: "function", Function: tools.FunctionCall{ Name: block.Name, @@ -122,7 +131,7 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) { case anthropic.InputJSONDelta: inputBytes := deltaVariant.PartialJSON toolCall := tools.ToolCall{ - ID: a.toolID, + ID: a.toolIDByBlock[eventVariant.Index], Type: "function", Function: tools.FunctionCall{ Arguments: inputBytes, diff --git a/pkg/model/provider/anthropic/beta_adapter.go b/pkg/model/provider/anthropic/beta_adapter.go index 1f550cfff..b0232de88 100644 --- a/pkg/model/provider/anthropic/beta_adapter.go +++ b/pkg/model/provider/anthropic/beta_adapter.go @@ -17,7 +17,10 @@ type betaStreamAdapter struct { trackUsage bool toolCall bool - toolID string + // toolIDByBlock maps a content block index to its tool_use block ID. + // See streamAdapter.toolIDByBlock for the same rationale (parallel + // tool calls require per-block routing of input_json deltas). + toolIDByBlock map[int64]string } // newBetaStreamAdapter creates a new Beta stream adapter @@ -25,6 +28,7 @@ func (c *Client) newBetaStreamAdapter(stream *ssestream.Stream[anthropic.BetaRaw return &betaStreamAdapter{ retryableStream: retryableStream[anthropic.BetaRawMessageStreamEventUnion]{stream: stream}, trackUsage: trackUsage, + toolIDByBlock: map[int64]string{}, } } @@ -56,10 +60,13 @@ func (a *betaStreamAdapter) Recv() (chat.MessageStreamResponse, error) { case anthropic.BetaRawContentBlockStartEvent: switch block := eventVariant.ContentBlock.AsAny().(type) { case anthropic.BetaToolUseBlock: - a.toolID = block.ID + if a.toolIDByBlock == nil { + a.toolIDByBlock = map[int64]string{} + } + a.toolIDByBlock[eventVariant.Index] = block.ID a.toolCall = true toolCall := tools.ToolCall{ - ID: a.toolID, + ID: block.ID, Type: "function", Function: tools.FunctionCall{ Name: block.Name, @@ -84,7 +91,7 @@ func (a *betaStreamAdapter) Recv() (chat.MessageStreamResponse, error) { case anthropic.BetaInputJSONDelta: inputBytes := deltaVariant.PartialJSON toolCall := tools.ToolCall{ - ID: a.toolID, + ID: a.toolIDByBlock[eventVariant.Index], Type: "function", Function: tools.FunctionCall{ Arguments: inputBytes, diff --git a/pkg/model/provider/anthropic/parallel_toolcall_test.go b/pkg/model/provider/anthropic/parallel_toolcall_test.go new file mode 100644 index 000000000..d5635e985 --- /dev/null +++ b/pkg/model/provider/anthropic/parallel_toolcall_test.go @@ -0,0 +1,282 @@ +package anthropic + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/anthropics/anthropic-sdk-go" + "github.com/anthropics/anthropic-sdk-go/packages/ssestream" +) + +// fakeDecoder feeds a fixed sequence of SSE events to ssestream.Stream. +type fakeDecoder struct { + events []ssestream.Event + i int + closed bool +} + +func (d *fakeDecoder) Next() bool { + if d.i >= len(d.events) { + return false + } + d.i++ + return true +} + +func (d *fakeDecoder) Event() ssestream.Event { return d.events[d.i-1] } +func (d *fakeDecoder) Close() error { d.closed = true; return nil } +func (d *fakeDecoder) Err() error { return nil } + +func sseEvent(t string, payload any) ssestream.Event { + data, _ := json.Marshal(payload) + return ssestream.Event{Type: t, Data: data} +} + +// TestParallelToolCallIDsAreNotCrossWired reproduces a bug where the Anthropic +// stream adapter loses track of which tool_use block a given input_json delta +// belongs to when two or more tool calls stream in parallel. +// +// Anthropic's streaming protocol emits a content_block_start for each +// tool_use block (each with its own block index and its own tool ID), then +// emits content_block_delta events of type input_json_delta carrying partial +// JSON for each block. Every event carries the block's index. The adapter +// must use that index to route partial JSON back to the correct tool call. +// +// The current adapter stores the most recently seen tool ID in a single +// streamAdapter.toolID field. When a second tool_use block starts, that +// field is overwritten. Subsequent input_json deltas for the FIRST block +// then carry the SECOND block's ID, and the runtime accumulator +// (keyed by tool call ID in pkg/runtime/streaming.go) concatenates both +// calls' argument fragments into the same buffer, producing malformed JSON +// that surfaces upstream as Go json errors like +// +// "invalid character 's' looking for beginning of value" +// "invalid character '-' after object key:value pair" +// +// This test demonstrates the bug. With the fix in place (route by block +// index, not by a single shared toolID), both tool calls' arguments end up +// in their own buffers and parse cleanly. +func TestParallelToolCallIDsAreNotCrossWired(t *testing.T) { + // Event sequence: two parallel tool_use blocks with interleaved + // input_json_delta events. This mirrors what Anthropic emits when the + // model issues parallel tool calls. + events := []ssestream.Event{ + // message_start (minimal — we only care about content blocks below) + sseEvent("message_start", map[string]any{ + "type": "message_start", + "message": map[string]any{"id": "msg_test", "model": "claude-test", "role": "assistant", "type": "message"}, + }), + + // content_block_start, index 0: tool A (memory_refresh_complete) + sseEvent("content_block_start", map[string]any{ + "type": "content_block_start", + "index": 0, + "content_block": map[string]any{ + "type": "tool_use", + "id": "toolu_AAA", + "name": "memory_refresh_complete", + "input": map[string]any{}, + }, + }), + + // content_block_start, index 1: tool B (memory_learning_add) + sseEvent("content_block_start", map[string]any{ + "type": "content_block_start", + "index": 1, + "content_block": map[string]any{ + "type": "tool_use", + "id": "toolu_BBB", + "name": "memory_learning_add", + "input": map[string]any{}, + }, + }), + + // Interleaved input_json_delta events. Each carries its block index. + // Tool A args: {"refresh_id":"abc-def"} + // Tool B args: {"category":"tool_failure","summary":"x"} + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 0, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `{"refresh_id":"abc-`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 1, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `{"category":"tool_failure",`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 0, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `def"}`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 1, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `"summary":"x"}`}, + }), + + sseEvent("content_block_stop", map[string]any{"type": "content_block_stop", "index": 0}), + sseEvent("content_block_stop", map[string]any{"type": "content_block_stop", "index": 1}), + sseEvent("message_delta", map[string]any{ + "type": "message_delta", + "delta": map[string]any{"stop_reason": "tool_use"}, + }), + sseEvent("message_stop", map[string]any{"type": "message_stop"}), + } + + stream := ssestream.NewStream[anthropic.MessageStreamEventUnion](&fakeDecoder{events: events}, nil) + adapter := &streamAdapter{retryableStream: retryableStream[anthropic.MessageStreamEventUnion]{stream: stream}} + + // Replicate the runtime's tool-call accumulator (pkg/runtime/streaming.go). + // It keys by ToolCall.ID and concatenates Arguments fragments. This is the + // downstream layer that turns malformed concatenation into a JSON parse + // error when the tool is invoked. + argsByID := map[string]*strings.Builder{} + nameByID := map[string]string{} + + for { + resp, err := adapter.Recv() + if err != nil { + break + } + if len(resp.Choices) == 0 { + continue + } + for _, tc := range resp.Choices[0].Delta.ToolCalls { + if _, ok := argsByID[tc.ID]; !ok { + argsByID[tc.ID] = &strings.Builder{} + } + if tc.Function.Name != "" { + nameByID[tc.ID] = tc.Function.Name + } + argsByID[tc.ID].WriteString(tc.Function.Arguments) + } + } + + // Expected behaviour: tool A and tool B each get exactly their own args, + // and both parse as valid JSON. + expectedA := `{"refresh_id":"abc-def"}` + expectedB := `{"category":"tool_failure","summary":"x"}` + + gotA := argsByID["toolu_AAA"].String() + gotB := argsByID["toolu_BBB"].String() + + t.Logf("toolu_AAA name=%q args=%q", nameByID["toolu_AAA"], gotA) + t.Logf("toolu_BBB name=%q args=%q", nameByID["toolu_BBB"], gotB) + + if gotA != expectedA { + t.Errorf("tool A (toolu_AAA, memory_refresh_complete) args wrong\n want: %s\n got: %s", expectedA, gotA) + } + if gotB != expectedB { + t.Errorf("tool B (toolu_BBB, memory_learning_add) args wrong\n want: %s\n got: %s", expectedB, gotB) + } + + // As a sharper assertion: both buffers must individually parse as JSON. + // With the bug, one or both fail with the exact Go errors observed in + // production ("invalid character ... looking for beginning of value" or + // "invalid character ... after object key:value pair"). + var sink any + if err := json.Unmarshal([]byte(gotA), &sink); err != nil { + t.Errorf("tool A args failed to parse as JSON: %v\n buffer was: %s", err, gotA) + } + if err := json.Unmarshal([]byte(gotB), &sink); err != nil { + t.Errorf("tool B args failed to parse as JSON: %v\n buffer was: %s", err, gotB) + } +} + +// TestBetaParallelToolCallIDsAreNotCrossWired is the same scenario but for +// the Beta stream adapter. The bug and fix are identical. +func TestBetaParallelToolCallIDsAreNotCrossWired(t *testing.T) { + events := []ssestream.Event{ + sseEvent("message_start", map[string]any{ + "type": "message_start", + "message": map[string]any{"id": "msg_test", "model": "claude-test", "role": "assistant", "type": "message"}, + }), + sseEvent("content_block_start", map[string]any{ + "type": "content_block_start", + "index": 0, + "content_block": map[string]any{ + "type": "tool_use", + "id": "toolu_AAA", + "name": "memory_refresh_complete", + "input": map[string]any{}, + }, + }), + sseEvent("content_block_start", map[string]any{ + "type": "content_block_start", + "index": 1, + "content_block": map[string]any{ + "type": "tool_use", + "id": "toolu_BBB", + "name": "memory_learning_add", + "input": map[string]any{}, + }, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 0, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `{"refresh_id":"abc-`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 1, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `{"category":"tool_failure",`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 0, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `def"}`}, + }), + sseEvent("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": 1, + "delta": map[string]any{"type": "input_json_delta", "partial_json": `"summary":"x"}`}, + }), + sseEvent("content_block_stop", map[string]any{"type": "content_block_stop", "index": 0}), + sseEvent("content_block_stop", map[string]any{"type": "content_block_stop", "index": 1}), + sseEvent("message_delta", map[string]any{ + "type": "message_delta", + "delta": map[string]any{"stop_reason": "tool_use"}, + }), + sseEvent("message_stop", map[string]any{"type": "message_stop"}), + } + + stream := ssestream.NewStream[anthropic.BetaRawMessageStreamEventUnion](&fakeDecoder{events: events}, nil) + adapter := &betaStreamAdapter{retryableStream: retryableStream[anthropic.BetaRawMessageStreamEventUnion]{stream: stream}} + + argsByID := map[string]*strings.Builder{} + for { + resp, err := adapter.Recv() + if err != nil { + break + } + if len(resp.Choices) == 0 { + continue + } + for _, tc := range resp.Choices[0].Delta.ToolCalls { + if _, ok := argsByID[tc.ID]; !ok { + argsByID[tc.ID] = &strings.Builder{} + } + argsByID[tc.ID].WriteString(tc.Function.Arguments) + } + } + + gotA := argsByID["toolu_AAA"].String() + gotB := argsByID["toolu_BBB"].String() + expectedA := `{"refresh_id":"abc-def"}` + expectedB := `{"category":"tool_failure","summary":"x"}` + if gotA != expectedA { + t.Errorf("[beta] tool A args wrong\n want: %s\n got: %s", expectedA, gotA) + } + if gotB != expectedB { + t.Errorf("[beta] tool B args wrong\n want: %s\n got: %s", expectedB, gotB) + } + var sink any + if err := json.Unmarshal([]byte(gotA), &sink); err != nil { + t.Errorf("[beta] tool A args failed to parse: %v\n buffer was: %s", err, gotA) + } + if err := json.Unmarshal([]byte(gotB), &sink); err != nil { + t.Errorf("[beta] tool B args failed to parse: %v\n buffer was: %s", err, gotB) + } +}