fix(sinks): correct event finalization in socket sinks#25132
fix(sinks): correct event finalization in socket sinks#25132simdugas wants to merge 14 commits intovectordotdev:masterfrom
Conversation
Events were advanced and finalized before delivery confirmation. Replace send_all_peekable with a peek-then-send loop: peek the next event, send it with empty finalizers, and only advance the stream marking EventStatus::Delivered on success. A disconnect mid-send now leaves the event in-flight for retry rather than silently dropping it. Also move the shutdown_check into poll_ready (before start_send) so a peer disconnect is detected without consuming the next stream item. Remove the now-unused sink_ext module (VecSinkExt / SendAll).
|
All contributors have signed the CLA ✍️ ✅ |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9d7f945ec8
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
I have read the CLA Document and I hereby sign the CLA |
The previous per-event peek-send-advance loop dropped in-flight events when a peer reset occurred between items. Replace it with a collect-then-flush approach: drain available input into a pending batch, feed+flush the whole batch atomically, and on error reconnect and retry the same batch — no events are lost on peer reset or TCP RST. Applies identically to both the TCP and Unix stream sinks.
Cap pending batch at MAX_PENDING_BATCH_ITEMS (1024) to bound memory when the peer is slow or disconnected. Bundle sink and open_token into an Option so the connection is established lazily (only when data is ready) and torn down cleanly via RAII when the loop exits. Applies to both TCP and Unix stream sinks.
Bump event count from 1000 to 2000 so the stream exceeds the 1024-item pending batch cap, exercising the split-batch retry path through a server reset.
Cast named functions to fn(usize) and annotate the connection Option with an explicit type so the compiler can resolve the OpenToken generic parameter without ambiguity.
Change 1024 to 1_000 to follow Rust convention for large numeric literals, and remove the now-redundant hard-coded value from the test comment.
Move the pending-batch cap to socket_bytes_sink where the sink lives and re-export it as pub(crate), removing the duplicate local constants in tcp and unix sinks.
Mirror the existing TCP reconnect test for the Unix stream sink: bind a first listener, drain a small number of lines, drop it hard, then assert the sink reconnects to a second listener and delivers all remaining events without loss.
Restrict the shutdown check in poll_ready to the moment when events_total is zero (start of a new batch).
Replace the string comparison used to detect peer shutdown errors (error.kind() == Other && to_string() == "ShutdownCheck::Close") with a typed PeerShutdownError struct and is_peer_shutdown_error() helper. This is more robust and removes the magic string dependency.
Add inline comments to the TCP and Unix stream sink batch loops and the reconnect test to make the at-least-once delivery guarantee (whole-batch resend on reconnect) explicit for future readers.
Verify that is_peer_shutdown_error correctly identifies errors created by peer_shutdown_io_error and rejects unrelated io::Error values.
Without a backoff the TCP/Unix stream sinks spin tightly on reconnect when a remote endpoint is consistently refusing connections. Introduce an exponential backoff (capped at 5 s) with full jitter on each send failure and reset it after a successful flush, preventing thundering-herd reconnection storms.
Collapse nested if-let blocks into a single let-chain expression in the TCP and Unix stream sink shutdown paths for clarity.
|
@codex review |
|
Codex Review: Didn't find any major issues. Hooray! ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
|
Check Spelling seems to be failing in other PRs as well. |
|
@vectordotdev/vector this PR should be ready for feedback. |
Summary
This should help fix an issue with the Vector TCP Sink where data was being lost when the server side of the TCP connection got reset, or timed out.
The original code used a peek-one/send-one/advance loop: if the connection was torn down between peek and send (or between poll_ready calls during a batch), the in-flight item was discarded and its finalizers were never marked Delivered.
This PR fixes event loss by switching to a batch-collect-then-flush model with retry on reconnect:
Vector configuration
See simdugas/vector-tcp-reset-issue for a full demonstration of the issue/fix including vector configurations in the
vector-beforeandvector-afterfolders.How did you test this PR?
I have detailed the full testing steps with a demonstration of the issue before and after the fix in the repository simdugas/vector-tcp-reset-issue.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Related: #9040
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.