Skip to content

feat(stream): support XDELEX command#3502

Open
kirito632 wants to merge 3 commits into
apache:unstablefrom
kirito632:feat/stream-xdelex
Open

feat(stream): support XDELEX command#3502
kirito632 wants to merge 3 commits into
apache:unstablefrom
kirito632:feat/stream-xdelex

Conversation

@kirito632
Copy link
Copy Markdown
Contributor

What

This PR introduces the XDELEX stream command with configurable
PEL cleanup behaviors.

It also fixes an existing issue in XINFO CONSUMERS where consumer metadata
from different groups could leak into the same scan result.

XINFO CONSUMERS Fix

Root cause:

GetConsumerInfo() used iterator bounds based only on the stream version range
([version, version+1)).

Since the InternalKey encoding places the version field before the sub-key,
consumer records from other groups could still fall within the same iteration
range.

Fix:

Extract and validate the group name from each internal key during iteration,
filtering out unrelated consumers.

XDELEX

Supported deletion strategies:

KEEPREF (default):
Delete the stream entry only.
DELREF:
Delete the stream entry and remove it from all groups' PELs.
ACKED:
Delete the stream entry only if it has been acknowledged by all consumer groups.

Per-ID return codes:

1: entry deleted
2: entry retained (e.g. ACKED condition not satisfied)
-1: entry not found or duplicate ID within the same request

Implementation Notes

Pending-number decrements are aggregated in memory and flushed through
a single WriteBatch to avoid repeated metadata updates during DELREF operations.
Duplicate IDs are deduplicated to preserve idempotency and prevent
stream metadata corruption.
The originating command name is propagated into WriteBatchExtractor
so replicas replay the exact same deletion semantics.
Internal stream metadata and PEL delete events are filtered out in
WriteBatchExtractor to avoid generating invalid replicated XDEL commands.

Testing

Added Go integration tests covering:

multi-group DELREF cleanup
ACKED blocking semantics
duplicate-ID idempotency
invalid syntax handling
XINFO CONSUMERS group isolation

AI-Assisted Contribution Disclosure

This contribution complies with the ASF AI-assisted contribution guidelines.

AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.

@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
46.0% Coverage on New Code (required ≥ 50%)

See analysis details on SonarQube Cloud

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new stream deletion command XDELEX with configurable deletion/PEL-cleanup semantics, and fixes XINFO CONSUMERS so consumer metadata from other groups doesn’t leak into the scan results.

Changes:

  • Implement XDELEX with KEEPREF (default), DELREF, and ACKED strategies, including replica replay support via WriteBatchLogData.
  • Fix XINFO CONSUMERS iteration to filter consumers by group name, preventing cross-group leakage.
  • Add Go integration tests and a C++ unit test for new deletion behaviors and parsing edge cases.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/gocase/unit/type/stream/stream_test.go Adds Go integration coverage for XDELEX behaviors and error handling.
tests/cppunit/types/stream_test.cc Adds a C++ unit test ensuring empty ID list handling for the new deletion API.
src/types/redis_stream.h Exposes DeleteEntriesWithOption and helper declarations for XDELEX implementation.
src/types/redis_stream.cc Implements DeleteEntriesWithOption plus PEL cleanup and ACKED semantics; fixes GetConsumerInfo group isolation.
src/types/redis_stream_base.h Introduces enums for delete options and per-ID deletion result codes.
src/storage/batch_extractor.cc Ensures replication replays XDELEX semantics and filters internal stream/PEL/meta events.
src/commands/cmd_stream.cc Adds the xdelex command parser/executor and registers the command.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/types/redis_stream.cc
Comment on lines +684 to +707
if (first_deleted) {
iter->SeekToFirst();
while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) {
iter->Next();
}
if (iter->Valid()) {
metadata.first_entry_id = entryIDFromInternalKey(iter->key());
metadata.recorded_first_entry_id = metadata.first_entry_id;
} else {
metadata.first_entry_id.Clear();
metadata.recorded_first_entry_id.Clear();
}
}
if (last_deleted) {
iter->SeekToLast();
while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) {
iter->Prev();
}
if (iter->Valid()) {
metadata.last_entry_id = entryIDFromInternalKey(iter->key());
} else {
metadata.last_entry_id.Clear();
}
}
Comment thread src/types/redis_stream.h
Comment on lines +25 to 32
#include <map>
#include <optional>
#include <string>
#include <vector>

#include "common/db_util.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
Comment on lines +304 to +312
for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids_.emplace_back(id);
}

return Status::OK();
@jihuayu
Copy link
Copy Markdown
Member

jihuayu commented May 27, 2026

Hi @kirito632. Thanks for your PR.

I reviewed your PR once using my AI tools (Codex / GPT-5.5 xhigh), and it found quite a few issues (I’m not going to reveal these things. I want you to try and see whether you can find them on your own.).
This is not meant as criticism — I just want to share my workflow with you. This can help the PR get merged faster.

I hope you can also try using an AI tool locally to review your PR once before submitting it. Below is the prompt I use. It is written in Chinese, but you can translate it into the language you use and run it there:

请你审核一下这个PR是否符合redis和kvrocks规范,代码是否清晰,命名是否清晰

If you have any other questions, feel free to @ me directly. Hope this helps you.

@kirito632
Copy link
Copy Markdown
Contributor Author

Thanks a lot for the detailed feedback and for sharing your workflow. I’m going through the current PR again to identify and fix the remaining issues on my own first.Really appreciate the guidance and the time you spent reviewing this PR.

kirito632 added 2 commits May 28, 2026 22:03
Add XDELEX to delete stream entries with KEEPREF/DELREF options.
Includes refactoring for unified entry/PEL deletion.
Also fix a consumer pending count bug when destroying a consumer.
@kirito632 kirito632 force-pushed the feat/stream-xdelex branch from 80fa2b0 to d2839d9 Compare May 29, 2026 11:03
@kirito632 kirito632 force-pushed the feat/stream-xdelex branch from d2839d9 to 649043e Compare May 29, 2026 13:03
@kirito632
Copy link
Copy Markdown
Contributor Author

Sorry for another force-push. I've added two missing checks:
Get key_slot_id from user_key, and if slot_range_.IsValid() and key_slot_id is not in range, return rocksdb::Status::OK() immediately.
Extract namespace ns from ikey.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants