feat(solana-indexer) PR 5.1: ingester drain loop#4549
Conversation
Fills in the ingester run loop for the solana-indexer.
There was a problem hiding this comment.
Code Review
This pull request implements the Ingester component to drain the Yellowstone gRPC stream and forward transaction and account updates to the decoder. It also refactors the Store trait to support thread-safe async operations. Feedback on the changes suggests using fetch_max instead of store on the atomic slot counter to prevent regression from out-of-order updates, and rate-limiting the backpressure warning log to avoid log flooding when the decoder channel is full.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Patches remote memory exhaustion DoS in quinn-proto via unbounded out-of-order stream reassembly. Transitive dep via solana-client.
| } | ||
| } | ||
| } | ||
| tracing::info!("yellowstone stream ended; ingester stopping"); |
There was a problem hiding this comment.
Given that the architecture heavily leans towards actors did you already think about graceful shutdowns?
There was a problem hiding this comment.
Yes, and it is covered in the Notion page plan as one of the later steps.
| latest_chain_slot: &AtomicU64, | ||
| update: SubscribeUpdate, | ||
| ) -> ControlFlow<()> { | ||
| use UpdateOneof::*; |
There was a problem hiding this comment.
Let's keep all imports on the top of the file.
| /// Associated function taking the channel and chain-tip counter by | ||
| /// reference rather than `&self`, so the future borrows only those (both | ||
| /// `Sync`) fields across awaits. That keeps `run`'s future `Send` without | ||
| /// requiring `Ingester: Sync` — the `GeyserStream` field is `Send` but not | ||
| /// `Sync`. |
There was a problem hiding this comment.
IMO this is not a doc comment. Doc comments should explain the effects of a function without overwhelming the reader with implementation details. If impl details are important they should be added as regular code comments in the code sections they relate to.
| Transaction(tx_msg) => Self::handle_transaction(tx, tx_msg).await, | ||
| Account(account) => Self::handle_account(tx, account).await, | ||
| Slot(slot) => Self::handle_slot(latest_chain_slot, slot).await, |
There was a problem hiding this comment.
This serializes the handling of each packet which can be slow depending on the futures. Is this intended? Do we not have to be worried about creating unnecessary back pressure here?
There was a problem hiding this comment.
I guess, intended. The per-message work here is just a channel send, which is fast. All the slow work (decode, DB writes) lives behind the channel in the decoder, so this loop stays quick.
cc @tilacog
| let Some(inner) = tx_msg.transaction else { | ||
| tracing::warn!(slot = tx_msg.slot, "transaction update without a body"); | ||
| return ControlFlow::Continue(()); | ||
| }; |
There was a problem hiding this comment.
Is this something that can actually happen or is transaction unnecessarily optional?
There was a problem hiding this comment.
The yellowstone proto types transaction as optional, so it can be absent on an empty or malformed frame. We skip that case defensively rather than assume it is always present.
| }; | ||
| let Ok(signature) = Signature::try_from(inner.signature.as_slice()) else { | ||
| tracing::warn!( | ||
| slot = tx_msg.slot, |
There was a problem hiding this comment.
It's not too bad yet in this code but ideally slot should be passed via a tracing span and .instrument(). That way you don't have to remember and manually add the slot to every related log.
There was a problem hiding this comment.
Done. handle_transaction and handle_account now carry a slot span via #[instrument], so the warns inside no longer thread slot through manually.
| /// The persisted watermark could not be read. | ||
| #[error("failed to read the resume watermark: {0}")] | ||
| Store(#[from] StoreError), |
There was a problem hiding this comment.
The names of the error variants are too generic. Why not incorporate the additional context of the log messages into the names? Otherwise the reader will always have to jump to the error definition to understand anything about the error.
There was a problem hiding this comment.
Each variant wraps a distinct source error via #[from] (e.g. StoreError, GeyserGrpcClientError, Status), and the #[error("…")] message spells out the context, so the name plus the message read clearly where it surfaces. Did I miss anything?
| let request = SubscribeRequest { | ||
| from_slot, | ||
| ..request | ||
| }; |
There was a problem hiding this comment.
What's the reason to not move this and the from_slot logic into subscribe_request()?
| ingester.run().await?; | ||
| Ok(()) |
There was a problem hiding this comment.
nit: you can just return ingester.run().await, no?
| //! Re-exports of the `yellowstone-grpc-proto` message types the indexer | ||
| //! consumes as its wire-format surface. | ||
| pub use yellowstone_grpc_proto::{ |
There was a problem hiding this comment.
What's the reasoning for re-exporting those types btw?
There was a problem hiding this comment.
Probably, two reasons: shorter imports, and one place to fix if the upstream crate moves those types.
@tilacog ?
| /// Consume a slot message: advance the in-memory chain-tip counter. Slot | ||
| /// messages never enter the channel, so this always continues. | ||
| async fn handle_slot( | ||
| latest_chain_slot: &AtomicU64, | ||
| slot: SubscribeUpdateSlot, | ||
| ) -> ControlFlow<()> { | ||
| latest_chain_slot.fetch_max(slot.slot, Ordering::Relaxed); | ||
| ControlFlow::Continue(()) | ||
| } |
There was a problem hiding this comment.
Do we also have to update the last_chain_slot when we encounter the other message types? They also contain a slot number after all.
There was a problem hiding this comment.
Probably, no? IIUC, the slot filter already sends one message per slot, so the tip advances on every slot. Tx and account updates only fire for the two programs we subscribe to (settlement and SolFlow), so their slots are always ones the slot filter already covered - they'd add nothing. (It's fetch_max, so bumping from them wouldn't be wrong, just a wasted atomic on the hot path) Added a doc line on handle_slot explaining this.
cc @tilacog
squadgazzz
left a comment
There was a problem hiding this comment.
Will need to do another round.
| tracing::warn!("decoder channel full; ingester blocked on backpressure"); | ||
| match tx.send(update).await { | ||
| Ok(()) => ControlFlow::Continue(()), | ||
| Err(_) => ControlFlow::Break(()), | ||
| } |
There was a problem hiding this comment.
Will the failed update itself be logged somewhere? I don't really see it.
| // Ping/Pong frames carry no data the ingester needs; the library passes them through, | ||
| // and we drop them here. | ||
| Ping(_) | Pong(_) => ControlFlow::Continue(()), |
There was a problem hiding this comment.
As I understand yellowstone, the server sends periodic Ping frames and expects a Pong back on the request stream, or it can drop an idle connection. Here Ping/Pong are ignored and serve drops, so nothing answers. The PR says AutoReconnect handles keepalive, but the linked reconnect.rs looks like it forwards pings to us without answering them. If that's right, the connection only stays up on HTTP/2 keepalive set on the GeyserGrpcClient.
There was a problem hiding this comment.
Replying to myself 😂
We can't set it in this PR: keepalive is a builder option on the tonic endpoint, set before .connect(), and serve receives an already-built GeyserGrpcClient. There's also no live connection here - serve has no spawn site yet (the is_send helper never runs) and the tests use mock streams, so there's no idle socket to drop. The keepalive has to go where the client is built and spawned, which is the wiring PR. I documented the contract on serve. If we'd rather not rely on a doc note, the wiring PR can add a build_client helper that bakes in keepalive + reconnect config so the call site can't forget.
…ootstrap Resolve conflicts from PR4's async-trait + Arc<dyn> base: - store.rs: keep async_trait Store, drop PR5.1's impl-Future refactor (async_trait already yields Send-boxed futures) - ingester.rs: keep PR5.1's drain-loop impl; make Ingester and Error pub(crate) to match the pub(crate) domain types; wrap wire u64 slots in Slot for StreamUpdate - Cargo.toml: keep futures (PR5.1) plus async-trait/dashmap/derive_more (PR4), drop observe/prometheus - errors.rs: keep the detailed ReplayWindowExceeded message
- hoist the UpdateOneof import out of handle_update and qualify the match arms - demote the handle_update Send-workaround note from a doc comment to a regular comment - warn on a present-but-malformed account txn_signature instead of dropping it silently - log the latest chain slot on payload-less updates - simplify serve to return ingester.run().await directly
- instrument handle_transaction/handle_account with a slot span instead of threading slot into each log - subscribe_request takes from_slot as a param, dropping the post-build patch in serve - document that serve's client needs HTTP/2 keepalive since the ingester does not answer pings, and drop the wrong claim that the wrapper handles keepalive
…-bootstrap # Conflicts: # crates/solana-indexer/src/indexer/ingester.rs
…-bootstrap # Conflicts: # crates/solana-indexer/src/indexer/ingester.rs
Description
This PR fills in
Ingester::run, which was previously justunimplemented!().The ingester now pulls updates from an
AutoReconnect-backedGeyserStreamand pushes taggedStreamUpdates into the decoder channel. Slot filter messages advance the latest chain slot counter in memory.Mind that the ingester does not decode messages and does not write the watermark; the decoder handles persistence.
Ingester::serveis the production entrypoint. It builds the subscription request, resumes from the stored watermark (from_slot = watermark + 1, or the live tip on a cold start), opens the stream throughGeyserGrpcClient::subscribe_with_request, and runs the drain loop. The caller passes in aGeyserGrpcClientbuilt withset_reconnect_config; otherwise the auto-reconnect wrapper will not actually reconnect.Reconnects, backoff, keepalive, and resume from checkpoint are handled by the
AutoReconnectstream wrapper. That wrapper also injects its ownBlockMeta+slotfilter under the__autoreconnectkey so it can checkpoint and resume; those messages stay inside the wrapper and never reach the ingester. Recoverable errors are swallowed there too.runonly returns when the stream ends for good or the decoder receiver drops.Ping/Pongframes are dropped. The library passes them through, but the ingester has no use for them.Unit tests for the run loop are defined in this follow-up PR: #4550.
Changes
Ingester::runas a plain drain loop. It reads updates from the stream, dispatches them, and stops when the stream ends or the decoder channel closes. No reconnect or backoff logic inside the ingester.S: Stream<Item = Result<SubscribeUpdate, Status>> + Unpin + Sendinstead of tying it to aGrpcConnector. Production uses anAutoReconnect-backedGeyserStream; tests can pass any stream.newtakes the stream, the decoder sender, and a sharedArc<AtomicU64>for the latest chain slot.Ingester::serve, a production entrypoint generic overSt: Store. It builds theSubscribeRequest, reads the watermark to setfrom_slot, opens the stream, and runs the drain loop. Added anErrorenum for setup failures, terminal stream errors, and clean stream end.subscribe_request, which defines the four program filters (settlement and solflow transactions and accounts, failed transactions included) plus achain_tipslot filter atconfirmedcommitment.from_slotis left empty soservecan fill it from the watermark.handle_updateand its helpershandle_transaction,handle_account, andhandle_slot. Transactions and accounts are forwarded asStreamUpdate::Tx/StreamUpdate::Account. Frames without a body or with malformed signatures are skipped with a warning. Slot messages only updatelatest_chain_slotin memory.forward, which tries a non-blocking send into the decoder channel, falls back to a blocking send with a warning when the channel is full, and stops when the receiver is gone.LATEST_CHAIN_SLOT: AtomicU64static with anArc<AtomicU64>owned by theIngester. The watchdog and finalization worker will take read clones. Updated related doc comments inwatchdog.rs,commitment.rs, anderrors.rs.Storeto requireSend + Syncand returnimpl Future<Output = Result<..., StoreError>> + Sendfrom each method. This letsIngester::servebetokio::spawned while still allowing implementors to writeasync fnbodies.serveintypes/wire.rs.futuresas a dependency forStreamExt.How to test
Implementation only; run-loop unit tests follow in the next PR.
cargo check -p solana-indexercargo clippy -p solana-indexer --all-targetscargo +nightly fmt --all -- --checkThis is a follow-up PR to #4514