Skip to content

Commit 9b536a4

Browse files
justin808ihabadham
authored andcommitted
Improve concurrent streaming error handling and configuration
This commit addresses several critical issues and improvements for concurrent component streaming in React on Rails Pro: 1. CRITICAL: Client Disconnect Handling - Added IOError and Errno::EPIPE exception handling in producer tasks - Added stream.closed? check before expensive operations - Added exception handling in writer task to stop gracefully on disconnect - Prevents wasted resources when clients disconnect mid-stream 2. Configuration Validation Improvements - Replaced attr_accessor with custom setter for concurrent_component_streaming_buffer_size - Added validation at assignment time, not just initialization - Added comprehensive documentation about memory implications - Documented that buffer size * chunk size determines max memory usage 3. Documentation Enhancements - Added detailed comments explaining producer-consumer pattern - Documented ordering guarantees for concurrent streaming - Clarified that chunks from same component maintain order - Clarified that chunks from different components may interleave - Added memory management documentation to drain_streams_concurrently 4. Testing - Added test for client disconnect scenario - Verifies producer stops when client disconnects - Ensures no further processing after IOError Changes: - react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb - react_on_rails_pro/lib/react_on_rails_pro/configuration.rb - react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb Note: Did not implement sleep-based test synchronization improvements as the existing approach is acceptable for test purposes and more complex synchronization mechanisms would add unnecessary complexity to the test suite.
1 parent 62e2995 commit 9b536a4

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
4747

4848
private
4949

50+
# Drains all streaming fibers concurrently using a producer-consumer pattern.
51+
#
52+
# Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue.
53+
# Consumer task: Single writer dequeues chunks and writes them to the response stream.
54+
#
55+
# Ordering guarantees:
56+
# - Chunks from the same component maintain their order
57+
# - Chunks from different components may interleave based on production timing
58+
# - The first component to produce a chunk will have it written first
59+
#
60+
# Memory management:
61+
# - Uses a limited queue (configured via concurrent_component_streaming_buffer_size)
62+
# - Producers block when the queue is full, providing backpressure
63+
# - This prevents unbounded memory growth from fast producers
5064
def drain_streams_concurrently
5165
require "async"
5266
require "async/limited_queue"
@@ -58,7 +72,9 @@ def drain_streams_concurrently
5872
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
5973
queue = Async::LimitedQueue.new(buffer_size)
6074

75+
# Consumer task: Single writer dequeues and writes to response stream
6176
writer = build_writer_task(parent: parent, queue: queue)
77+
# Producer tasks: Each fiber drains its stream and enqueues chunks
6278
tasks = build_producer_tasks(parent: parent, queue: queue)
6379

6480
# This structure ensures that even if a producer task fails, we always
@@ -78,12 +94,18 @@ def build_producer_tasks(parent:, queue:)
7894
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
7995
parent.async do
8096
loop do
97+
# Check if client disconnected before expensive operations
98+
break if response.stream.closed?
99+
81100
chunk = fiber.resume
82101
break unless chunk
83102

84103
# Will be blocked if the queue is full until a chunk is dequeued
85104
queue.enqueue([idx, chunk])
86105
end
106+
rescue IOError, Errno::EPIPE
107+
# Client disconnected - stop producing
108+
break
87109
end
88110
end
89111
end
@@ -97,6 +119,9 @@ def build_writer_task(parent:, queue:)
97119
_idx_from_queue, item = pair
98120
response.stream.write(item)
99121
end
122+
rescue IOError, Errno::EPIPE
123+
# Client disconnected - stop writing
124+
nil
100125
end
101126
end
102127
end

react_on_rails_pro/lib/react_on_rails_pro/configuration.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,31 @@ class Configuration # rubocop:disable Metrics/ClassLength
7070
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
7171
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
7272
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
73-
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
73+
:react_server_client_manifest_file
74+
75+
attr_reader :concurrent_component_streaming_buffer_size
76+
77+
# Sets the buffer size for concurrent component streaming.
78+
#
79+
# This value controls how many chunks can be buffered in memory during
80+
# concurrent streaming operations. When producers generate chunks faster
81+
# than they can be written to the client, this buffer prevents unbounded
82+
# memory growth by blocking producers when the buffer is full.
83+
#
84+
# Memory implications:
85+
# - Buffer size of 64 (default) with 1KB chunks = ~64KB max memory
86+
# - Buffer size of 64 with 1MB chunks = ~64MB max memory
87+
# - Consider your typical chunk size when configuring this value
88+
#
89+
# @param value [Integer] A positive integer specifying the buffer size
90+
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
91+
def concurrent_component_streaming_buffer_size=(value)
92+
unless value.is_a?(Integer) && value.positive?
93+
raise ReactOnRailsPro::Error,
94+
"config.concurrent_component_streaming_buffer_size must be a positive integer"
95+
end
96+
@concurrent_component_streaming_buffer_size = value
97+
end
7498

7599
def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
76100
renderer_use_fallback_exec_js: nil, prerender_caching: nil,

react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,57 @@ def execute_stream_view_containing_react_components
512512
expect(chunk).not_to include("<h1>Header Rendered In View</h1>")
513513
end
514514
end
515+
516+
it "stops producing when client disconnects" do
517+
queue = Async::Queue.new
518+
mock_request_and_response(queue)
519+
520+
# Simulate client disconnect by making stream.write raise IOError
521+
call_count = 0
522+
allow(mocked_stream).to receive(:write) do |chunk|
523+
call_count += 1
524+
written_chunks << chunk
525+
raise IOError, "client disconnected" if call_count == 2
526+
end
527+
528+
# Configure render_to_string to call stream_react_component
529+
allow(self).to receive(:render_to_string) do
530+
render_result = stream_react_component(component_name, props: props, **component_options)
531+
<<-HTML
532+
<div>
533+
<h1>Header Rendered In View</h1>
534+
#{render_result}
535+
</div>
536+
HTML
537+
end
538+
539+
Sync do |parent|
540+
parent.async do
541+
stream_view_containing_react_components(template: template_path)
542+
rescue IOError
543+
# Expected - client disconnected
544+
end
545+
546+
# Enqueue first chunk - should be written successfully
547+
queue.enqueue(chunks[0])
548+
sleep 0.05
549+
550+
# Enqueue second chunk - should trigger disconnect
551+
queue.enqueue(chunks[1])
552+
sleep 0.05
553+
554+
# Enqueue third chunk - should not be written (producer stopped)
555+
queue.enqueue(chunks[2])
556+
sleep 0.05
557+
558+
queue.close
559+
sleep 0.05
560+
end
561+
562+
# Should only have written the first chunk before disconnect
563+
expect(written_chunks.count).to eq(1)
564+
expect(mocked_stream).to have_received(:write).at_most(2).times
565+
end
515566
end
516567

517568
describe "#cached_stream_react_component", :caching do # rubocop:disable RSpec/MultipleMemoizedHelpers

0 commit comments

Comments
 (0)