[Driver] Stream the auction JSON body to S3 + solvers#4575
Conversation
3377af2 to
ed8210a
Compare
| S: Write + Finalize + Send + 'static, | ||
| { | ||
| let (tx, rx) = mpsc::channel::<std::io::Result<Bytes>>(CHANNEL_CAPACITY); | ||
| let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx)); |
There was a problem hiding this comment.
Can we use the ByteStream that currently lives in the autopilot? That would automatically provide metrics about how long each solver needs to stream the bytes which can be very helpful to check if an issue is on our end or on the solver's end.
There was a problem hiding this comment.
done but this makes the PR bigger and a bit harder to reason about, i'd extract it to reduce burden on you guys
There was a problem hiding this comment.
Okay. Then let's leave this for later to not have too many changes in the same PR.
Review fixes that don't depend on the transmission-timing wrapper: - Reduce CHANNEL_CAPACITY 8 -> 2; double buffering is enough to keep the consumer fed without raising the memory ceiling. - Carry raw `Bytes` over the body channel and wrap them into infallible `Ok`s at the `wrap_stream` boundary instead of channeling `Result`s. - Move the gzip compression policy into the `s3` crate via `GzipWriter`, so the streamed archive copy is compressed with the same settings as the eager upload path. - Use the `mod.rs` layout for the `streaming` module. - Note the UNIX `tee` lineage in `TeeWriter`'s docs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Extract the request-body transmission timing that lived in autopilot's `ByteStream` into a generic `observe::http_body::Measured` stream wrapper, and apply it to the driver's streamed `/solve` body so the driver->solver hop also reports `to_transmission_start_ms` / `transmission_ms`. autopilot now wraps a plain single-chunk stream with `Measured` and drops the bespoke `ByteStream` type, which was only a `Bytes`-to-stream adapter once its timing moved out. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Claude finished @jmg-duarte's task in 3m 11s —— View job PR Review: Stream the auction JSON body to S3 + solversNice change — the tee + chunked-streaming approach is clean and the memory-spike graphs are convincing. The
Findings (inline):
Nothing blocking from a pure-correctness standpoint; #2 is the one I'd most want a deliberate decision on (is dropping the archive on a failed solver request acceptable?). The |
| if auction.id().is_some() { | ||
| ::observe::metrics::metrics().measure_auction_overhead( | ||
| start, | ||
| "driver", | ||
| "serialize_request", | ||
| ); | ||
| } |
There was a problem hiding this comment.
The serialize_request overhead metric no longer measures serialization. stream_body/stream_body_and_gzip return immediately with a lazy body handle — the actual serde_json::to_writer work now happens inside the spawn_blocking task, driven by network backpressure. So start..now here captures only the DTO construction + channel setup (near-zero), not the serialization cost it used to measure on the eager path.
If the intent is to keep tracking serialization time, the measurement needs to move into the streaming task (e.g. around serde_json::to_writer); otherwise this metric will silently flatline after this change.
There was a problem hiding this comment.
I'm not sure it's possible to measure this without the stream wrapper we discussed
| let body: reqwest::Body = match archive_id { | ||
| // Stream the request body while capturing a gzipped copy for S3, so | ||
| // neither the request nor the archive holds the full JSON at once. | ||
| Some(id) => { | ||
| let (body, compressed) = streaming::stream_body_and_gzip(auction_dto); | ||
| self.persistence.archive_auction_gzipped(id, compressed); | ||
| body | ||
| } | ||
| None => streaming::stream_body(auction_dto), | ||
| }; |
There was a problem hiding this comment.
Behavior change worth a second look: S3 archival is now coupled to the solver request streaming successfully to completion.
On main, the auction was serialized eagerly into Bytes and archive_auction uploaded that buffer independently of the solver request outcome. Here, the gzipped bytes only reach archive_auction_gzipped when finalize() runs, which only happens if serde_json::to_writer completes — and that requires reqwest to pull the entire body. If the solver connection drops or times out mid-upload, ChannelWriter::blocking_send errors → serialization aborts early → GzipCapture is dropped without finalize() → the oneshot sender is dropped → the archive is silently skipped.
Net effect: auctions whose solver request fails partway through transmission no longer get archived — which may be exactly the auctions you'd want to inspect later. Is dropping the archive in that case intended? If not, consider decoupling the archive from the request stream (e.g. tee into an independent buffer rather than gating finalize on full request consumption).
Secondary note: the spawn_blocking thread is now held for the full duration of the body upload (it blocks on blocking_send until reqwest drains each chunk), rather than being released right after producing the buffer. Per-solve that's a blocking-pool thread occupied for the whole solver round-trip.
There was a problem hiding this comment.
I solved this with the TeeWriter and the BestEffortSink, which will allow the TeeWriter to continue uploading to S3 when this scenarion happens
There was a problem hiding this comment.
Code Review
This pull request introduces streaming serialization and compression for auction payloads to optimize memory usage. Instead of buffering the entire uncompressed JSON in memory, the request body is streamed to the solver while a gzipped copy is captured on the fly for S3 archival. Feedback on these changes highlights the need to elevate the log level for serialization and flushing errors from debug to warn/error, and to make the TeeWriter::write implementation more robust by properly handling partial writes instead of assuming full writes.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
MartinquaXD
left a comment
There was a problem hiding this comment.
LGTM overall. When I briefly looked at the metrics I remember this incurring some non-trivial CPU overhead. Could you please provide updated metrics for this?
Ideally from some network with reasonably sized auctions. Given that this was already e2e tested thoroughly one should probably just run it in prod for a little bit to get the more accurate and relevant numbers for this.
Description
Streams the auction JSON through the network, avoiding the large memory spike from generating it upfront and then working with it.
The implementation tee's the JSON to the solvers and S3; in the S3 path it gets gzipped first (if only brotli was properly supported 😮💨)
Big picture (Arb1 shadow, Arb1 Staging, Avalanche Staging)

Avalanche, in yellow, before the patch, in blue after (with a trade)

Longer time horizon — note the size of the spikes in orange VS the purple ones

In blue, orders I placed

Production, Base, avg_over_time CPU
Production, Base, current + avg_over_time Memory
The tests above were carried out without 381aa8a which we've seen reduce the memory spikes
Changes
How to test
Graphs above, S3 auctions are intact