fix(kafka source): track contiguous offsets before committing to Kafka#25130
fix(kafka source): track contiguous offsets before committing to Kafka#25130rohitmanohar wants to merge 1 commit intovectordotdev:masterfrom
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
|
I have read the CLA Document and I hereby sign the CLA |
3f28186 to
4da6160
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3f28186138
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
4da6160 to
e2733e6
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e2733e6bf9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
The Kafka source previously committed each successfully delivered offset directly, without considering whether earlier offsets in the partition had also been delivered. If message N failed downstream but message N+1 succeeded, offset N+1 would be committed, causing message N to be skipped on consumer restart. This change introduces a per-partition offset tracker that maintains a high watermark representing the last contiguously delivered offset. The watermark only advances when the next sequential offset is delivered; any gap (caused by a failed batch) holds it back. Only the watermark value is passed to store_offset, ensuring Kafka replays from the correct position on restart. This provides at-least-once delivery semantics when end-to-end acknowledgements are enabled. When acknowledgements are disabled, every batch resolves as Delivered immediately, so the watermark always advances and behavior is unchanged from before.
e2733e6 to
f816418
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f816418e14
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Summary
The Kafka source previously committed each successfully delivered offset directly, without considering whether earlier offsets in the partition had also been delivered. If message N failed downstream but message N+1 succeeded, offset N+1 would be committed, causing message N to be skipped on consumer restart.
This change introduces a per-partition offset tracker that maintains a high watermark representing the last contiguously delivered offset. The watermark only advances when the next sequential offset is delivered; any gap (caused by a failed batch) holds it back. Only the watermark value is passed to store_offset, ensuring Kafka replays from the correct position on restart.
This provides at-least-once delivery semantics when end-to-end acknowledgements are enabled. When acknowledgements are disabled, every batch resolves as Delivered immediately, so the watermark always advances and behavior is unchanged from before.
How did you test this PR?
Manually tested against a Kafka source to a non-local sink (e.g. S3)
T0: Everything is functional, Vector reads from Kafka and writes to sink. Kafka offsets increment as expected.
T1: Break connectivity to sink. Vector retries and eventually gives up writing to sink. Offset doesn't increment
T2: Recover connectivity to sink. Vector moves past the failed events and delivers new events to the sink, but the offset isn't incremented.
The offset remains stuck at the event that wasn't delivered to the sink, to ensure "at-least" once semantics are honored.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.