From 530bd31cabb5b506e93cf11d6d692f43f2949b25 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Tue, 16 Jun 2026 03:49:13 +0000 Subject: [PATCH 1/2] Fix Bedrock streaming when Faraday env is nil The Faraday 2 on_data callback gates chunk parsing on `env&.status == 200`. With the net_http adapter, env is nil during streaming (the status is not yet known), so this is false for every chunk. Each valid AWS event-stream frame is then routed to the failed-response handler, which tries to JSON-parse binary eventstream bytes, fails, logs a "failed stream error chunk" debug line, and drops it. The result: ConverseStream responses come back completely empty (zero chunks, blank content, nil token counts) even though the HTTP request itself succeeds with status 200. Invert the guard so a nil env (status unknown) falls through to normal chunk parsing, and only a present env reporting a non-200 status is treated as a failure. Applied in both the Converse streaming module and the shared FaradayHandlers.v2_on_data helper. --- lib/ruby_llm/protocols/converse/streaming.rb | 11 ++++++++--- lib/ruby_llm/streaming.rb | 12 +++++++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/lib/ruby_llm/protocols/converse/streaming.rb b/lib/ruby_llm/protocols/converse/streaming.rb index 1d0abf9be..a57ec23c8 100644 --- a/lib/ruby_llm/protocols/converse/streaming.rb +++ b/lib/ruby_llm/protocols/converse/streaming.rb @@ -33,10 +33,15 @@ def stream_response(payload, additional_headers = {}, &block) end else req.options.on_data = proc do |chunk, _bytes, env| - if env&.status == 200 - parse_stream_chunk(decoder, chunk, accumulator, &block) - else + # A nil env means the status is not yet known (Faraday 2 with the + # net_http adapter passes nil during streaming) — process the chunk + # normally. Only a present env reporting a non-200 status is a real + # failure. Gating on `env&.status == 200` would discard every chunk + # whenever env is nil, yielding an empty streamed response. + if env && env.status != 200 handle_failed_stream(chunk, env) + else + parse_stream_chunk(decoder, chunk, accumulator, &block) end end end diff --git a/lib/ruby_llm/streaming.rb b/lib/ruby_llm/streaming.rb index 081379093..88916de15 100644 --- a/lib/ruby_llm/streaming.rb +++ b/lib/ruby_llm/streaming.rb @@ -165,10 +165,16 @@ def v1_on_data(on_chunk) def v2_on_data(on_chunk, on_failed_response) proc do |chunk, _bytes, env| - if env&.status == 200 - on_chunk.call(chunk, env) - else + # A nil env means the status is not yet known (Faraday 2 with the + # net_http adapter passes nil during streaming) — treat the chunk as a + # normal response chunk. Only a present env reporting a non-200 status + # is a real failure. Gating on `env&.status == 200` would route every + # chunk to the failure handler whenever env is nil, discarding the + # entire streamed response. + if env && env.status != 200 on_failed_response.call(chunk, env) + else + on_chunk.call(chunk, env) end end end From a1093e9bf7d3acf8b68e4fc646aa732bca3f6676 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Tue, 16 Jun 2026 03:49:44 +0000 Subject: [PATCH 2/2] Add tests for nil-env streaming chunk routing Cover the regression where a nil Faraday env (Faraday 2 + net_http during streaming) caused every chunk to be routed to the failed-response handler and discarded. Tests exercise both the shared FaradayHandlers.v2_on_data helper and the Converse stream_response on_data proc directly, asserting that a nil env and a 200 env both parse the chunk while a present non-200 env triggers failure handling. --- .../protocols/converse/streaming_spec.rb | 59 +++++++++++++++++++ spec/ruby_llm/streaming_spec.rb | 48 +++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/spec/ruby_llm/protocols/converse/streaming_spec.rb b/spec/ruby_llm/protocols/converse/streaming_spec.rb index db3fe3e9c..c2e5b999a 100644 --- a/spec/ruby_llm/protocols/converse/streaming_spec.rb +++ b/spec/ruby_llm/protocols/converse/streaming_spec.rb @@ -70,4 +70,63 @@ expect(message.thinking.text).to eq('thinking text') expect(message.thinking.signature).to eq('thinking-signature') end + + describe 'on_data routing in stream_response' do + # Captures the on_data proc that stream_response installs on the Faraday + # request, by faking @connection/@provider/req just enough to run the block. + let(:captured_on_data) do + captured = nil + request_options = Object.new + request_options.define_singleton_method(:on_data=) { |proc| captured = proc } + req = Object.new + req.define_singleton_method(:headers) { {} } + req.define_singleton_method(:options) { request_options } + + connection = instance_double(RubyLLM::Connection) + allow(connection).to receive(:post) do |_url, _payload, &block| + block.call(req) + nil + end + + provider = double('provider', sign_headers: {}) # rubocop:disable RSpec/VerifiedDoubles + + streaming.instance_variable_set(:@connection, connection) + streaming.instance_variable_set(:@provider, provider) + allow(streaming).to receive_messages(event_stream_decoder: :decoder, parse_stream_chunk: nil, + handle_failed_stream: nil) + allow(RubyLLM::StreamAccumulator).to receive(:new).and_return( + instance_double(RubyLLM::StreamAccumulator, to_message: RubyLLM::Message.new(role: :assistant, content: '')) + ) + + streaming.send(:stream_response, {}) + captured + end + + before { stub_const('Faraday::VERSION', '2.0.0') } + + # Faraday 2 with the net_http adapter passes a nil env during streaming + # (status unknown). Chunks must still be parsed, not dropped as failures. + it 'parses the chunk when env is nil (status not yet known)' do + captured_on_data.call('event-frame', 11, nil) + + expect(streaming).to have_received(:parse_stream_chunk).with(:decoder, 'event-frame', anything) + expect(streaming).not_to have_received(:handle_failed_stream) + end + + it 'parses the chunk when env reports a 200 status' do + env = Struct.new(:status).new(200) + captured_on_data.call('event-frame', 11, env) + + expect(streaming).to have_received(:parse_stream_chunk).with(:decoder, 'event-frame', anything) + expect(streaming).not_to have_received(:handle_failed_stream) + end + + it 'routes the chunk to failure handling when env reports a non-200 status' do + env = Struct.new(:status).new(403) + captured_on_data.call('error-frame', 11, env) + + expect(streaming).to have_received(:handle_failed_stream).with('error-frame', env) + expect(streaming).not_to have_received(:parse_stream_chunk) + end + end end diff --git a/spec/ruby_llm/streaming_spec.rb b/spec/ruby_llm/streaming_spec.rb index 2252474e8..d4f25f87a 100644 --- a/spec/ruby_llm/streaming_spec.rb +++ b/spec/ruby_llm/streaming_spec.rb @@ -32,4 +32,52 @@ expect(yielded_chunks).to eq(['chunk:ok']) end + + # Faraday 2 with the net_http adapter invokes on_data with a nil env (the + # status is not yet known mid-stream). The handler must process such chunks + # normally rather than treating them as a failed response and discarding them. + it 'processes chunks when env is nil (status not yet known)' do + yielded_chunks = [] + handler = test_obj.send(:handle_stream) { |chunk| yielded_chunks << chunk } + + handler.call("data: {\"x\":\"ok\"}\n\n", 0, nil) + + expect(yielded_chunks).to eq(['chunk:ok']) + end + + describe RubyLLM::Streaming::FaradayHandlers do + describe '.v2_on_data' do + let(:on_chunk_calls) { [] } + let(:on_failed_calls) { [] } + let(:handler) do + described_class.v2_on_data( + ->(chunk, env) { on_chunk_calls << [chunk, env] }, + ->(chunk, env) { on_failed_calls << [chunk, env] } + ) + end + + it 'routes the chunk to on_chunk when env is nil (status unknown)' do + handler.call('frame', 5, nil) + + expect(on_chunk_calls).to eq([['frame', nil]]) + expect(on_failed_calls).to be_empty + end + + it 'routes the chunk to on_chunk when env reports a 200 status' do + env = Struct.new(:status).new(200) + handler.call('frame', 5, env) + + expect(on_chunk_calls).to eq([['frame', env]]) + expect(on_failed_calls).to be_empty + end + + it 'routes the chunk to on_failed_response when env reports a non-200 status' do + env = Struct.new(:status).new(403) + handler.call('error-frame', 11, env) + + expect(on_failed_calls).to eq([['error-frame', env]]) + expect(on_chunk_calls).to be_empty + end + end + end end