Skip to content

Commit 4ffba19

Browse files
ihabadhamAbanoubGhadbangithub-actions[bot]
authored
Concurrently drain streaming fibers (#2015)
Implements #550 - Concurrent draining of streamed React components (Async tasks + single writer) - Default backpressure via Async::Semaphore; handle client disconnects - Specs for sequential vs concurrent, per-component ordering, edge cases, and backpressure Example: - With two stream components fibers, chunks interleave (first-ready wins) while each component's own chunk order is preserved. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Improvements** * Concurrent streaming for React components replaces sequential processing, reducing latency and adding memory‑bounded buffering for steadier throughput. * **Added** * New configuration: concurrent_component_streaming_buffer_size (default: 64) to tune memory vs. performance. * Runtime dependency on the async gem (>= 2.6) to enable concurrent streaming. * **Documentation** * Updated inline references to Shakapacker terminology. * **Tests** * Expanded coverage for concurrent streaming, ordering, backpressure, and disconnect scenarios. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --- **Migrated from**: shakacode/react_on_rails_pro#552 <!-- Reviewable:start --> - - - This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/shakacode/react_on_rails/2015) <!-- Reviewable:end --> --------- Co-authored-by: Abanoub Ghadban <abanoub@shakacode.com> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Ihab Adham <ihabadham@users.noreply.github.com>
1 parent e612f66 commit 4ffba19

File tree

8 files changed

+288
-20
lines changed

8 files changed

+288
-20
lines changed

react_on_rails_pro/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ You can find the **package** version numbers from this repo's tags and below in
1919

2020
_Add changes in master not yet tagged._
2121

22+
### Improved
23+
- Significantly improved streaming performance by processing React components concurrently instead of sequentially. This reduces latency and improves responsiveness when using `stream_view_containing_react_components`.
24+
25+
### Added
26+
- Added `config.concurrent_component_streaming_buffer_size` configuration option to control the memory buffer size for concurrent component streaming (defaults to 64). This allows fine-tuning of memory usage vs. performance for streaming applications.
27+
2228
### Added
2329

2430
- Added `cached_stream_react_component` helper method, similar to `cached_react_component` but for streamed components.
@@ -48,6 +54,7 @@ _Add changes in master not yet tagged._
4854

4955
- `config.prerender_caching`, which controls caching for non-streaming components, now also controls caching for streamed components. To disable caching for an individual render, pass `internal_option(:skip_prerender_cache)`.
5056
- **Configuration Migration Required**: If you are using RSC features, you must move the RSC-related configurations from `ReactOnRails.configure` to `ReactOnRailsPro.configure` in your initializers. See the migration example in the [React on Rails CHANGELOG](https://github.com/shakacode/react_on_rails/blob/master/CHANGELOG.md#unreleased).
57+
- Added `async` gem dependency (>= 2.6) to support concurrent streaming functionality.
5158

5259
## [4.0.0-rc.15] - 2025-08-11
5360

react_on_rails_pro/Gemfile.lock

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PATH
2222
specs:
2323
react_on_rails_pro (16.2.0.beta.4)
2424
addressable
25+
async (>= 2.6)
2526
connection_pool
2627
execjs (~> 2.9)
2728
httpx (~> 1.5)
@@ -107,6 +108,12 @@ GEM
107108
public_suffix (>= 2.0.2, < 7.0)
108109
amazing_print (1.6.0)
109110
ast (2.4.2)
111+
async (2.27.4)
112+
console (~> 1.29)
113+
fiber-annotation
114+
io-event (~> 1.11)
115+
metrics (~> 0.12)
116+
traces (~> 0.15)
110117
base64 (0.2.0)
111118
benchmark (0.4.0)
112119
bigdecimal (3.1.9)
@@ -134,6 +141,10 @@ GEM
134141
commonmarker (1.1.4-x86_64-linux)
135142
concurrent-ruby (1.3.5)
136143
connection_pool (2.5.0)
144+
console (1.33.0)
145+
fiber-annotation
146+
fiber-local (~> 1.1)
147+
json
137148
coveralls (0.8.23)
138149
json (>= 1.8, < 3)
139150
simplecov (~> 0.16.1)
@@ -158,6 +169,10 @@ GEM
158169
ffi (1.17.0-arm64-darwin)
159170
ffi (1.17.0-x86_64-darwin)
160171
ffi (1.17.0-x86_64-linux-gnu)
172+
fiber-annotation (0.2.0)
173+
fiber-local (1.1.0)
174+
fiber-storage
175+
fiber-storage (1.0.1)
161176
gem-release (2.2.2)
162177
generator_spec (0.10.0)
163178
activesupport (>= 3.0.0)
@@ -173,6 +188,7 @@ GEM
173188
i18n (1.14.7)
174189
concurrent-ruby (~> 1.0)
175190
io-console (0.8.0)
191+
io-event (1.12.1)
176192
irb (1.15.1)
177193
pp (>= 0.6.0)
178194
rdoc (>= 4.0.0)
@@ -205,6 +221,7 @@ GEM
205221
marcel (1.0.4)
206222
matrix (0.4.2)
207223
method_source (1.1.0)
224+
metrics (0.14.0)
208225
mini_mime (1.1.5)
209226
minitest (5.25.4)
210227
mize (0.4.1)
@@ -411,6 +428,7 @@ GEM
411428
tins (1.33.0)
412429
bigdecimal
413430
sync
431+
traces (0.18.1)
414432
turbolinks (5.2.1)
415433
turbolinks-source (~> 5.2)
416434
turbolinks-source (5.2.0)

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,66 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
3838
# So we strip extra newlines from the template string and add a single newline
3939
response.stream.write(template_string)
4040

41-
@rorp_rendering_fibers.each do |fiber|
42-
while (chunk = fiber.resume)
43-
response.stream.write(chunk)
41+
begin
42+
drain_streams_concurrently
43+
ensure
44+
response.stream.close if close_stream_at_end
45+
end
46+
end
47+
48+
private
49+
50+
def drain_streams_concurrently
51+
require "async"
52+
require "async/limited_queue"
53+
54+
return if @rorp_rendering_fibers.empty?
55+
56+
Sync do |parent|
57+
# To avoid memory bloat, we use a limited queue to buffer chunks in memory.
58+
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
59+
queue = Async::LimitedQueue.new(buffer_size)
60+
61+
writer = build_writer_task(parent: parent, queue: queue)
62+
tasks = build_producer_tasks(parent: parent, queue: queue)
63+
64+
# This structure ensures that even if a producer task fails, we always
65+
# signal the writer to stop and then wait for it to finish draining
66+
# any remaining items from the queue before propagating the error.
67+
begin
68+
tasks.each(&:wait)
69+
ensure
70+
# `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit.
71+
queue.close
72+
writer.wait
73+
end
74+
end
75+
end
76+
77+
def build_producer_tasks(parent:, queue:)
78+
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
79+
parent.async do
80+
loop do
81+
chunk = fiber.resume
82+
break unless chunk
83+
84+
# Will be blocked if the queue is full until a chunk is dequeued
85+
queue.enqueue([idx, chunk])
86+
end
87+
end
88+
end
89+
end
90+
91+
def build_writer_task(parent:, queue:)
92+
parent.async do
93+
loop do
94+
pair = queue.dequeue
95+
break if pair.nil?
96+
97+
_idx_from_queue, item = pair
98+
response.stream.write(item)
4499
end
45100
end
46-
response.stream.close if close_stream_at_end
47101
end
48102
end
49103
end

react_on_rails_pro/lib/react_on_rails_pro/configuration.rb

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def self.configuration
3232
rsc_payload_generation_url_path: Configuration::DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH,
3333
rsc_bundle_js_file: Configuration::DEFAULT_RSC_BUNDLE_JS_FILE,
3434
react_client_manifest_file: Configuration::DEFAULT_REACT_CLIENT_MANIFEST_FILE,
35-
react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE
35+
react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE,
36+
concurrent_component_streaming_buffer_size: Configuration::DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE
3637
)
3738
end
3839

@@ -59,6 +60,7 @@ class Configuration # rubocop:disable Metrics/ClassLength
5960
DEFAULT_RSC_BUNDLE_JS_FILE = "rsc-bundle.js"
6061
DEFAULT_REACT_CLIENT_MANIFEST_FILE = "react-client-manifest.json"
6162
DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE = "react-server-client-manifest.json"
63+
DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE = 64
6264

6365
attr_accessor :renderer_url, :renderer_password, :tracing,
6466
:server_renderer, :renderer_use_fallback_exec_js, :prerender_caching,
@@ -68,7 +70,7 @@ class Configuration # rubocop:disable Metrics/ClassLength
6870
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
6971
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
7072
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
71-
:react_server_client_manifest_file
73+
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
7274

7375
def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
7476
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
@@ -79,7 +81,9 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
7981
renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil,
8082
profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil,
8183
enable_rsc_support: nil, rsc_payload_generation_url_path: nil,
82-
rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil)
84+
rsc_bundle_js_file: nil, react_client_manifest_file: nil,
85+
react_server_client_manifest_file: nil,
86+
concurrent_component_streaming_buffer_size: DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE)
8387
self.renderer_url = renderer_url
8488
self.renderer_password = renderer_password
8589
self.server_renderer = server_renderer
@@ -105,6 +109,7 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
105109
self.rsc_bundle_js_file = rsc_bundle_js_file
106110
self.react_client_manifest_file = react_client_manifest_file
107111
self.react_server_client_manifest_file = react_server_client_manifest_file
112+
self.concurrent_component_streaming_buffer_size = concurrent_component_streaming_buffer_size
108113
end
109114

110115
def setup_config_values
@@ -113,6 +118,7 @@ def setup_config_values
113118
validate_remote_bundle_cache_adapter
114119
setup_renderer_password
115120
setup_assets_to_copy
121+
validate_concurrent_component_streaming_buffer_size
116122
setup_execjs_profiler_if_needed
117123
check_react_on_rails_support_for_rsc
118124
end
@@ -204,6 +210,14 @@ def validate_remote_bundle_cache_adapter
204210
end
205211
end
206212

213+
def validate_concurrent_component_streaming_buffer_size
214+
return if concurrent_component_streaming_buffer_size.is_a?(Integer) &&
215+
concurrent_component_streaming_buffer_size.positive?
216+
217+
raise ReactOnRailsPro::Error,
218+
"config.concurrent_component_streaming_buffer_size must be a positive integer"
219+
end
220+
207221
def setup_renderer_password
208222
return if renderer_password.present?
209223

react_on_rails_pro/lib/react_on_rails_pro/utils.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def self.rsc_bundle_hash
108108
@rsc_bundle_hash = calc_bundle_hash(server_rsc_bundle_js_file_path)
109109
end
110110

111-
# Returns the hashed file name when using webpacker. Useful for creating cache keys.
111+
# Returns the hashed file name when using Shakapacker. Useful for creating cache keys.
112112
def self.bundle_file_name(bundle_name)
113113
# bundle_js_uri_from_packer can return a file path or a HTTP URL (for files served from the dev server)
114114
# Pathname can handle both cases
@@ -117,8 +117,8 @@ def self.bundle_file_name(bundle_name)
117117
pathname.basename.to_s
118118
end
119119

120-
# Returns the hashed file name of the server bundle when using webpacker.
121-
# Necessary fragment-caching keys.
120+
# Returns the hashed file name of the server bundle when using Shakapacker.
121+
# Necessary for fragment-caching keys.
122122
def self.server_bundle_file_name
123123
return @server_bundle_hash if @server_bundle_hash && !Rails.env.development?
124124

react_on_rails_pro/react_on_rails_pro.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Gem::Specification.new do |s|
3737
s.add_runtime_dependency "execjs", "~> 2.9"
3838
s.add_runtime_dependency "httpx", "~> 1.5"
3939
s.add_runtime_dependency "jwt", "~> 2.7"
40+
s.add_runtime_dependency "async", ">= 2.6"
4041
s.add_runtime_dependency "rainbow"
4142
s.add_runtime_dependency "react_on_rails", ReactOnRails::VERSION
4243
s.add_development_dependency "bundler"

react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# frozen_string_literal: true
22

3+
require "async"
4+
require "async/queue"
35
require "rails_helper"
46
require "support/script_tag_utils"
57

@@ -327,6 +329,7 @@ def response; end
327329
HTML
328330
end
329331

332+
# mock_chunks can be an Async::Queue or an Array
330333
def mock_request_and_response(mock_chunks = chunks, count: 1)
331334
# Reset connection instance variables to ensure clean state for tests
332335
ReactOnRailsPro::Request.instance_variable_set(:@connection, nil)
@@ -339,9 +342,19 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
339342
chunks_read.clear
340343
mock_streaming_response(%r{http://localhost:3800/bundles/[a-f0-9]{32}-test/render/[a-f0-9]{32}}, 200,
341344
count: count) do |yielder|
342-
mock_chunks.each do |chunk|
343-
chunks_read << chunk
344-
yielder.call("#{chunk.to_json}\n")
345+
if mock_chunks.is_a?(Async::Queue)
346+
loop do
347+
chunk = mock_chunks.dequeue
348+
break if chunk.nil?
349+
350+
chunks_read << chunk
351+
yielder.call("#{chunk.to_json}\n")
352+
end
353+
else
354+
mock_chunks.each do |chunk|
355+
chunks_read << chunk
356+
yielder.call("#{chunk.to_json}\n")
357+
end
345358
end
346359
end
347360
end
@@ -428,18 +441,35 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
428441

429442
allow(mocked_stream).to receive(:write) do |chunk|
430443
written_chunks << chunk
431-
# Ensures that any chunk received is written immediately to the stream
432-
expect(written_chunks.count).to eq(chunks_read.count) # rubocop:disable RSpec/ExpectInHook
433444
end
434445
allow(mocked_stream).to receive(:close)
435446
mocked_response = instance_double(ActionDispatch::Response)
436447
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
437448
allow(self).to receive(:response).and_return(mocked_response)
438-
mock_request_and_response
449+
end
450+
451+
def execute_stream_view_containing_react_components
452+
queue = Async::Queue.new
453+
mock_request_and_response(queue)
454+
455+
Sync do |parent|
456+
parent.async { stream_view_containing_react_components(template: template_path) }
457+
458+
chunks_to_write = chunks.dup
459+
while (chunk = chunks_to_write.shift)
460+
queue.enqueue(chunk)
461+
sleep 0.05
462+
463+
# Ensures that any chunk received is written immediately to the stream
464+
expect(written_chunks.count).to eq(chunks_read.count)
465+
end
466+
queue.close
467+
sleep 0.05
468+
end
439469
end
440470

441471
it "writes the chunk to stream as soon as it is received" do
442-
stream_view_containing_react_components(template: template_path)
472+
execute_stream_view_containing_react_components
443473
expect(self).to have_received(:render_to_string).once.with(template: template_path)
444474
expect(chunks_read.count).to eq(chunks.count)
445475
expect(written_chunks.count).to eq(chunks.count)
@@ -448,7 +478,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
448478
end
449479

450480
it "prepends the rails context to the first chunk only" do
451-
stream_view_containing_react_components(template: template_path)
481+
execute_stream_view_containing_react_components
452482
initial_result = written_chunks.first
453483
expect(initial_result).to script_tag_be_included(rails_context_tag)
454484

@@ -464,7 +494,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
464494
end
465495

466496
it "prepends the component specification tag to the first chunk only" do
467-
stream_view_containing_react_components(template: template_path)
497+
execute_stream_view_containing_react_components
468498
initial_result = written_chunks.first
469499
expect(initial_result).to script_tag_be_included(react_component_specification_tag)
470500

@@ -475,7 +505,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
475505
end
476506

477507
it "renders the rails view content in the first chunk" do
478-
stream_view_containing_react_components(template: template_path)
508+
execute_stream_view_containing_react_components
479509
initial_result = written_chunks.first
480510
expect(initial_result).to include("<h1>Header Rendered In View</h1>")
481511
written_chunks[1..].each do |chunk|

0 commit comments

Comments
 (0)