feat(stream): support XACKDEL command#3504
Conversation
…vent cross-group leakage
|
Hi @kirito632. If the core C++ implementation part was also completed by AI, please state that directly as well — no need to worry about it. |
|
Thanks for the clarification. |
There was a problem hiding this comment.
Pull request overview
This PR adds a new Stream command XACKDEL (with multiple PEL cleanup strategies) and fixes XINFO CONSUMERS to avoid leaking consumer metadata across groups when scanning internal keys.
Changes:
- Add
XACKDELcommand parsing/registration and implement entry deletion withKEEPREF|DELREF|ACKEDsemantics. - Fix
XINFO CONSUMERSscanning by explicitly validating the group name extracted from each internal key. - Update replication/migration write-batch extraction to replay
XACKDELsemantics and filter internal stream metadata/PEL events; add Go integration tests for the new behaviors.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/gocase/unit/type/stream/stream_test.go | Adds integration tests for XACKDEL behaviors and XINFO CONSUMERS group isolation. |
| src/types/redis_stream.h | Declares new helper methods for XACKDEL implementation. |
| src/types/redis_stream.cc | Implements XACKDEL deletion/PEL cleanup logic and fixes XINFO CONSUMERS group filtering. |
| src/types/redis_stream_base.h | Introduces enums for delete options and per-ID delete results. |
| src/storage/batch_extractor.cc | Adjusts stream write-batch extraction to skip internal stream metadata/PEL keys and replay XACKDEL deletes. |
| src/commands/cmd_stream.cc | Adds XACKDEL command parsing, execution, and registration. |
Comments suppressed due to low confidence (1)
src/storage/batch_extractor.cc:282
- In
PutCFfor the Stream column family, the extracted command is appended toresp_commands_[ns], butnsis never set from theInternalKey. This will bucket stream commands under the empty namespace and can break namespace-aware outputs (e.g., slot migration or command dump). Also, the Stream CF path does not apply theslot_range_filtering used by other CF handlers, so out-of-range stream updates may be emitted during slot migration.
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice entry_id_check = ikey.GetSubKey();
uint64_t delimiter = 0;
GetFixed64(&entry_id_check, &delimiter);
if (delimiter == UINT64_MAX) {
return rocksdb::Status::OK();
}
auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
if (!s.IsOK()) {
ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
return rocksdb::Status::OK();
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (last_deleted) { | ||
| iter->SeekToLast(); | ||
| while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { | ||
| iter->Prev(); | ||
| } | ||
| if (iter->Valid()) { | ||
| metadata.last_entry_id = entryIDFromInternalKey(iter->key()); | ||
| } else { | ||
| metadata.last_entry_id.Clear(); | ||
| } |
| bool pel_cleaned = false; | ||
| if (pel_entry_exists) { | ||
| s = batch->Delete(stream_cf_handle_, pel_key); | ||
| if (!s.ok()) return s; | ||
| pel_cleaned = true; | ||
| acknowledged_cnt++; | ||
| batch_modified = true; | ||
|
|
||
| auto pel_entry = decodeStreamPelEntryValue(pel_value); | ||
| consumer_acknowledges[pel_entry.consumer_name]++; | ||
| } |
| } else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) { | ||
| InternalKey ikey(key, is_slot_id_encoded_); | ||
| Slice encoded_id = ikey.GetSubKey(); | ||
| redis::StreamEntryID entry_id; | ||
| GetFixed64(&encoded_id, &entry_id.ms); | ||
|
|
||
| if (entry_id.ms == UINT64_MAX) { | ||
| return rocksdb::Status::OK(); | ||
| } | ||
|
|
||
| GetFixed64(&encoded_id, &entry_id.seq); | ||
| command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; | ||
| std::string entry_id_str = entry_id.ToString(); | ||
| std::string user_key = ikey.GetKey().ToString(); | ||
|
|
||
| auto args = log_data_.GetArguments(); | ||
| if (!args->empty()) { | ||
| if ((*args)[0] == "XACKDEL" && args->size() >= 3) { | ||
| command_args = {(*args)[0], user_key, (*args)[1], (*args)[2], "IDS", "1", entry_id_str}; | ||
| } else { | ||
| command_args = {"XDEL", user_key, entry_id_str}; | ||
| } | ||
| } else { | ||
| command_args = {"XDEL", user_key, entry_id_str}; | ||
| } | ||
| } |
fa65eda to
259cd7c
Compare
259cd7c to
18e8c86
Compare
What
This PR introduces the XACKDEL stream command with configurable
PEL cleanup behaviors.
It also fixes an existing issue in XINFO CONSUMERS where consumer metadata
from different groups could leak into the same scan result.
XINFO CONSUMERS Fix
Root cause:
GetConsumerInfo()setsiterate_upper_boundby incrementing only thestream version while keeping the sub-key prefix unchanged.
Since
InternalKeyencoding places the version field before the sub-key,consumer records belonging to other groups can still fall within the
same
[version, version+1)iteration range.Fix:
Extract and validate the group name from each internal key during iteration,
filtering out unrelated consumers.
XACKDEL
XACKDEL key group [KEEPREF|DELREF|ACKED] IDS numids id [id ...]Deletes stream entries with configurable PEL cleanup behavior.
Supported deletion strategies:
KEEPREF (default):
Delete the stream entry only, without touching existing PEL references.
DELREF:
Delete the stream entry and remove it from all consumer groups' PELs.
ACKED:
Delete the stream entry only if it has been acknowledged by all consumer groups.
Per-ID return codes:
1: entry deleted2: entry retained (e.g. ACKED condition not satisfied)-1: entry not found or duplicate ID within the same requestImplementation Notes
a single WriteBatch to avoid repeated metadata updates during DELREF operations.
stream metadata corruption.
WriteBatchExtractorso replicas replay the exactsame deletion semantics.
WriteBatchExtractorto avoid generating invalidreplicated XDEL commands.
Testing
Added Go integration tests covering:
AI-Assisted Contribution Disclosure
This contribution complies with the ASF AI-assisted contribution guidelines.
AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.