feat(stream): support XACKDEL and XDELEX commands#3499
Closed
kirito632 wants to merge 2 commits into
Closed
Conversation
|
Contributor
Author
|
I noticed the SonarCloud Quality Gate failed. I am currently working on a quick update to resolve this. |
Contributor
|
@torwig please, review this pr, thanks |
Member
|
Hi @kirito632 Thanks for your contribution. However, I feel this PR is a bit too large. If possible, I’d suggest splitting it into two PRs, with each PR adding one command. |
Contributor
Author
|
Got it, thanks. I’ll split it into two PRs as suggested. |
Contributor
Author
Member
|
@kirito632 Thanks. You can close this PR directly |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


What
This PR introduces the
XACKDELandXDELEXstream commands with configurablePEL cleanup behaviors.
It also fixes an existing issue in
XINFO CONSUMERSwhere consumer metadatafrom different groups could leak into the same scan result.
XINFO CONSUMERS Fix
Root cause:
GetConsumerInfo()setsiterate_upper_boundby incrementing only the versionwhile keeping the sub-key prefix (which includes the group name) unchanged.
Since the version field precedes the sub-key in the InternalKey encoding
(
[version][sub_key]), consumer keys from other groups can fall within the[version, version+1)range when their group name sorts lexicographicallyat or after the target group's name prefix.
Fix:
Extract and validate the group name from each internal key during iteration,
filtering out consumers belonging to other groups.
XACKDEL / XDELEX
Supported deletion strategies:
KEEPREF(default):Delete the stream entry only.
DELREF:Delete the stream entry and remove it from all groups' PELs.
ACKED:Delete the stream entry only if it has been acknowledged by all consumer groups.
Both commands return per-ID status codes:
1: entry deleted2: entry retained (e.g. ACKED condition not satisfied)-1: entry not found or duplicate ID within the same requestImplementation Notes
Pending-number decrements are aggregated in memory and flushed once
through a single
WriteBatch, avoiding repeated metadata updates duringDELREF operations.
Duplicate IDs within the same request are deduplicated to preserve
idempotency and prevent stream metadata corruption.
The originating command name (
XACKDELorXDELEX) is propagated intoWriteBatchExtractorso replicas replay the exact same semantics.Internal stream metadata / PEL delete events are filtered out in
WriteBatchExtractorby skipping synthetic subkeys whose entry-id prefixstarts with
UINT64_MAX, preventing invalid replicatedXDELcommands.Testing
Added Go integration tests covering:
AI-Assisted Contribution Disclosure
This contribution complies with the ASF AI-assisted contribution guidelines.
AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.