Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/ruby_llm/protocols/converse/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ def stream_response(payload, additional_headers = {}, &block)
end
else
req.options.on_data = proc do |chunk, _bytes, env|
if env&.status == 200
parse_stream_chunk(decoder, chunk, accumulator, &block)
else
# A nil env means the status is not yet known (Faraday 2 with the
# net_http adapter passes nil during streaming) — process the chunk
# normally. Only a present env reporting a non-200 status is a real
# failure. Gating on `env&.status == 200` would discard every chunk
# whenever env is nil, yielding an empty streamed response.
if env && env.status != 200
handle_failed_stream(chunk, env)
else
parse_stream_chunk(decoder, chunk, accumulator, &block)
end
end
end
Expand Down
12 changes: 9 additions & 3 deletions lib/ruby_llm/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,16 @@ def v1_on_data(on_chunk)

def v2_on_data(on_chunk, on_failed_response)
proc do |chunk, _bytes, env|
if env&.status == 200
on_chunk.call(chunk, env)
else
# A nil env means the status is not yet known (Faraday 2 with the
# net_http adapter passes nil during streaming) — treat the chunk as a
# normal response chunk. Only a present env reporting a non-200 status
# is a real failure. Gating on `env&.status == 200` would route every
# chunk to the failure handler whenever env is nil, discarding the
# entire streamed response.
if env && env.status != 200
on_failed_response.call(chunk, env)
else
on_chunk.call(chunk, env)
end
end
end
Expand Down
59 changes: 59 additions & 0 deletions spec/ruby_llm/protocols/converse/streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,63 @@
expect(message.thinking.text).to eq('thinking text')
expect(message.thinking.signature).to eq('thinking-signature')
end

describe 'on_data routing in stream_response' do
# Captures the on_data proc that stream_response installs on the Faraday
# request, by faking @connection/@provider/req just enough to run the block.
let(:captured_on_data) do
captured = nil
request_options = Object.new
request_options.define_singleton_method(:on_data=) { |proc| captured = proc }
req = Object.new
req.define_singleton_method(:headers) { {} }
req.define_singleton_method(:options) { request_options }

connection = instance_double(RubyLLM::Connection)
allow(connection).to receive(:post) do |_url, _payload, &block|
block.call(req)
nil
end

provider = double('provider', sign_headers: {}) # rubocop:disable RSpec/VerifiedDoubles

streaming.instance_variable_set(:@connection, connection)
streaming.instance_variable_set(:@provider, provider)
allow(streaming).to receive_messages(event_stream_decoder: :decoder, parse_stream_chunk: nil,
handle_failed_stream: nil)
allow(RubyLLM::StreamAccumulator).to receive(:new).and_return(
instance_double(RubyLLM::StreamAccumulator, to_message: RubyLLM::Message.new(role: :assistant, content: ''))
)

streaming.send(:stream_response, {})
captured
end

before { stub_const('Faraday::VERSION', '2.0.0') }

# Faraday 2 with the net_http adapter passes a nil env during streaming
# (status unknown). Chunks must still be parsed, not dropped as failures.
it 'parses the chunk when env is nil (status not yet known)' do
captured_on_data.call('event-frame', 11, nil)

expect(streaming).to have_received(:parse_stream_chunk).with(:decoder, 'event-frame', anything)
expect(streaming).not_to have_received(:handle_failed_stream)
end

it 'parses the chunk when env reports a 200 status' do
env = Struct.new(:status).new(200)
captured_on_data.call('event-frame', 11, env)

expect(streaming).to have_received(:parse_stream_chunk).with(:decoder, 'event-frame', anything)
expect(streaming).not_to have_received(:handle_failed_stream)
end

it 'routes the chunk to failure handling when env reports a non-200 status' do
env = Struct.new(:status).new(403)
captured_on_data.call('error-frame', 11, env)

expect(streaming).to have_received(:handle_failed_stream).with('error-frame', env)
expect(streaming).not_to have_received(:parse_stream_chunk)
end
end
end
48 changes: 48 additions & 0 deletions spec/ruby_llm/streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,52 @@

expect(yielded_chunks).to eq(['chunk:ok'])
end

# Faraday 2 with the net_http adapter invokes on_data with a nil env (the
# status is not yet known mid-stream). The handler must process such chunks
# normally rather than treating them as a failed response and discarding them.
it 'processes chunks when env is nil (status not yet known)' do
yielded_chunks = []
handler = test_obj.send(:handle_stream) { |chunk| yielded_chunks << chunk }

handler.call("data: {\"x\":\"ok\"}\n\n", 0, nil)

expect(yielded_chunks).to eq(['chunk:ok'])
end

describe RubyLLM::Streaming::FaradayHandlers do
describe '.v2_on_data' do
let(:on_chunk_calls) { [] }
let(:on_failed_calls) { [] }
let(:handler) do
described_class.v2_on_data(
->(chunk, env) { on_chunk_calls << [chunk, env] },
->(chunk, env) { on_failed_calls << [chunk, env] }
)
end

it 'routes the chunk to on_chunk when env is nil (status unknown)' do
handler.call('frame', 5, nil)

expect(on_chunk_calls).to eq([['frame', nil]])
expect(on_failed_calls).to be_empty
end

it 'routes the chunk to on_chunk when env reports a 200 status' do
env = Struct.new(:status).new(200)
handler.call('frame', 5, env)

expect(on_chunk_calls).to eq([['frame', env]])
expect(on_failed_calls).to be_empty
end

it 'routes the chunk to on_failed_response when env reports a non-200 status' do
env = Struct.new(:status).new(403)
handler.call('error-frame', 11, env)

expect(on_failed_calls).to eq([['error-frame', env]])
expect(on_chunk_calls).to be_empty
end
end
end
end
Loading