Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
70c65c7
perf(redis): stream entry-per-key layout for O(new) XREAD
bootjp Apr 24, 2026
66a073a
fix(redis): address PR #620 review — restore XADD O(1) + migration MV…
bootjp Apr 24, 2026
5ea7f5e
fix(redis,stream): address round-3 review
bootjp Apr 24, 2026
50def0d
test(redis,stream): stabilise TestRedis_StreamXReadLatencyIsConstant
bootjp Apr 24, 2026
8bc3eeb
fix(redis,stream): XREAD BLOCK timeout no longer surfaces as error
bootjp Apr 24, 2026
01cdf5e
Merge branch 'main' into ops/xread-range-scan
bootjp Apr 24, 2026
a6b176d
fix(redis,stream): accept shorthand IDs + uncap full-stream scan
bootjp Apr 24, 2026
96ca38d
fix(redis,stream): uncap DEL path for migrated streams
bootjp Apr 24, 2026
8cf12e2
fix(redis,stream): address Copilot review
bootjp Apr 24, 2026
28b5aa4
fix: apply Copilot review suggestions for scanStreamEntriesAt and del…
Copilot Apr 24, 2026
4c5d48a
docs: clarify comments for scanStreamEntriesAt and deleteStreamWideCo…
Copilot Apr 24, 2026
b10df2c
fix(redis,stream): cap legacy blob migration at maxWideColumnItems en…
Copilot Apr 24, 2026
88ffdfa
Merge branch 'main' into ops/xread-range-scan
bootjp Apr 24, 2026
b867709
fix(redis,stream): fail-fast on empty entry, validate afterID, precom…
Copilot Apr 24, 2026
2fa9b80
fix(redis,stream): reject migration of legacy streams exceeding maxWi…
Copilot Apr 24, 2026
87ac40e
Merge branch 'main' into ops/xread-range-scan
bootjp Apr 24, 2026
e675e4d
fix(redis,stream): MAXLEN sentinel, reject ID 0-0, hoist prefix bytes…
Copilot Apr 24, 2026
45f4c1e
test(redis,stream): add TestNextXAddID_RejectsZeroID for 0-0 rejection
Copilot Apr 24, 2026
b1bbe09
fix: XADD MAXLEN 0 deletes own entry, parseStreamBoundID shorthand fi…
Copilot Apr 24, 2026
fd9281a
docs: clarify parseStreamBoundID lower-exclusive expansion comment
Copilot Apr 24, 2026
3dd2f79
fix: stream entry key routing and XREAD deadline-aware context
Copilot Apr 24, 2026
bedb3de
refactor(redis,stream): drop migration path — legacy data is cleared,…
bootjp Apr 24, 2026
7d38b20
Merge branch 'main' into ops/xread-range-scan
bootjp Apr 24, 2026
428888a
fix(redis,stream): address Copilot round-3 with matching tests
bootjp Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 112 additions & 11 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,15 @@ func (r *RedisServer) mergeInternalNamespaces(start []byte, pattern []byte, merg
return err
}
for _, prefix := range redisInternalPrefixes {
// !stream|meta| keys are length-prefixed (see store.StreamMetaKey):
// a pattern-bound scan over the raw prefix would mask out every
// migrated stream because the user-key bytes do not start at
// prefix[len(prefix):]. Delegate to the wide-column scan below,
// which uses streamMetaScanStart(start) to place the user-key
// lower bound past the length field.
if prefix == store.StreamMetaPrefix {
continue
}
internalStart, internalEnd := listPatternScanBounds(prefix, pattern)
if err := mergeScannedKeys(internalStart, internalEnd); err != nil {
return err
Expand All @@ -1518,7 +1527,27 @@ func (r *RedisServer) mergeInternalNamespaces(start []byte, pattern []byte, merg
}
zsetMemberStart := store.ZSetMemberScanPrefix(start)
zsetMemberEnd := prefixScanEnd([]byte(store.ZSetMemberPrefix))
return mergeScannedKeys(zsetMemberStart, zsetMemberEnd)
if err := mergeScannedKeys(zsetMemberStart, zsetMemberEnd); err != nil {
return err
}
// Post-migration streams live under !stream|meta|<len><userKey>.
// The meta record is enough to expose the logical key via KEYS;
// entry rows are filtered out by redisVisibleUserKey / collectUserKeys
// so the result stays one-line-per-stream regardless of entry count.
streamMetaStart := streamMetaScanStart(start)
streamMetaEnd := prefixScanEnd([]byte(store.StreamMetaPrefix))
return mergeScannedKeys(streamMetaStart, streamMetaEnd)
}

// streamMetaScanStart returns the lower bound for scanning stream meta
// keys that begin with the given user-key prefix. The store helper
// already returns StreamMetaPrefix + len(userKey) + userKey, so callers
// only need to supply the bounded pattern prefix.
func streamMetaScanStart(userPrefix []byte) []byte {
if len(userPrefix) == 0 {
return []byte(store.StreamMetaPrefix)
}
return store.StreamMetaKey(userPrefix)
}

func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
Expand Down Expand Up @@ -1664,9 +1693,25 @@ func wideColumnVisibleUserKey(key []byte) (userKey []byte, isWide bool) {
if store.IsSetMemberKey(key) {
return store.ExtractSetUserKeyFromMember(key), true
}
if userKey, ok := streamWideColumnVisibleUserKey(key); ok {
return userKey, true
}
return zsetWideColumnVisibleUserKey(key)
}

// streamWideColumnVisibleUserKey maps a wide-column stream key to its
// visible user key. Meta keys expose the stream exactly once; entry keys
// are internal-only so KEYS / SCAN don't leak one result per entry.
func streamWideColumnVisibleUserKey(key []byte) ([]byte, bool) {
if store.IsStreamMetaKey(key) {
return store.ExtractStreamUserKeyFromMeta(key), true
}
if store.IsStreamEntryKey(key) {
return nil, true
}
return nil, false
}

func redisVisibleUserKey(key []byte) []byte {
if bytes.HasPrefix(key, redisTxnKeyPrefix) || isRedisTTLKey(key) {
return nil
Expand Down Expand Up @@ -1863,7 +1908,13 @@ type txnContext struct {
zsetStates map[string]*zsetTxnState
ttlStates map[string]*ttlTxnState
readKeys map[string][]byte
startTS uint64
// streamDeletions tracks user keys whose stream wide-column layout must
// be tombstoned on commit: the !stream|meta|<key> record plus every
// !stream|entry|<key><ID> row. stageKeyDeletion seeds this (MULTI/EXEC
// DEL / EXPIRE 0) so migrated streams are properly removed rather than
// leaking entry keys past the DEL's apparent success.
streamDeletions map[string][]byte
startTS uint64
}

type listTxnState struct {
Expand Down Expand Up @@ -1918,7 +1969,8 @@ func (t *txnContext) trackTypeReadKeys(key []byte) {
redisHashKey(key),
redisSetKey(key),
redisZSetKey(key),
redisStreamKey(key),
redisStreamKey(key), // legacy single-blob stream key
store.StreamMetaKey(key), // post-migration wide-column stream meta
redisHLLKey(key),
redisStrKey(key),
key, // legacy bare key for fallback reads
Expand Down Expand Up @@ -2459,7 +2511,7 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
zs.members = map[string]float64{}
zs.exists = false
zs.dirty = true
// Mark hash, set, stream, and HLL internal keys for deletion.
// Mark hash, set, stream (legacy blob), and HLL internal keys for deletion.
for _, internalKey := range [][]byte{
redisHashKey(key),
redisSetKey(key),
Expand All @@ -2473,6 +2525,19 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
iv.deleted = true
iv.dirty = true
}
// Stage the wide-column stream cleanup: the !stream|meta| record and
// every !stream|entry| row must also be tombstoned when the user deletes
// a migrated stream via MULTI/EXEC DEL or EXPIRE 0. Without this step
// the command would report success but leave rows behind, and a later
// XLEN / XREAD would "resurrect" the stream. commit() expands this
// entry into concrete Del elems by scanning the entry-key prefix.
// The map is lazy-initialised so test fixtures that build a minimal
// txnContext literal without this field still work.
if t.streamDeletions == nil {
t.streamDeletions = map[string][]byte{}
}
t.streamDeletions[string(key)] = bytes.Clone(key)
t.trackReadKey(store.StreamMetaKey(key))
// Mark legacy bare string key for deletion. We bypass load() here
// because load() auto-prefixes bare keys to !redis|str|.
// Track the bare key in the read set for conflict detection.
Expand Down Expand Up @@ -2553,9 +2618,15 @@ func (t *txnContext) commit() error {
// non-string keys get a !redis|ttl| element written in the same transaction.
ttlElems := t.buildTTLElems()

streamElems, err := t.buildStreamDeletionElems()
if err != nil {
return err
}

elems = append(elems, listElems...)
elems = append(elems, zsetElems...)
elems = append(elems, ttlElems...)
elems = append(elems, streamElems...)
if len(elems) == 0 {
return nil
}
Expand Down Expand Up @@ -2795,6 +2866,35 @@ func buildZSetWideElems(key []byte, st *zsetTxnState) ([]*kv.Elem[kv.OP], int64)
return elems, lenDelta
}

// buildStreamDeletionElems expands every user key queued in streamDeletions
// into the Del operations that actually tombstone a migrated stream:
// !stream|meta|<key> and every !stream|entry|<key><ID> row. Called from
// commit() so that MULTI/EXEC DEL / EXPIRE 0 on a migrated stream leaves
// the store in a consistent state instead of only dropping the legacy blob.
// Each scan runs at t.startTS so the delete honours the transaction's
// snapshot view.
func (t *txnContext) buildStreamDeletionElems() ([]*kv.Elem[kv.OP], error) {
if len(t.streamDeletions) == 0 {
return nil, nil
}
keys := make([]string, 0, len(t.streamDeletions))
for k := range t.streamDeletions {
keys = append(keys, k)
}
sort.Strings(keys)
ctx := t.server.handlerContext()
var elems []*kv.Elem[kv.OP]
for _, k := range keys {
userKey := t.streamDeletions[k]
streamElems, err := t.server.deleteStreamWideColumnElems(ctx, userKey, t.startTS)
if err != nil {
Comment on lines +2886 to +2890
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildStreamDeletionElems expands each staged stream deletion into a full scan of entry keys and appends all resulting Del elems to the transaction batch. For large streams this can produce an extremely large Raft/coordinator payload (and high memory usage) during EXEC/commit. It likely needs a bounded/batched deletion strategy (or a prefix-delete operation) rather than materializing every delete as an elem in a single commit.

Copilot uses AI. Check for mistakes.
return nil, err
}
elems = append(elems, streamElems...)
}
return elems, nil
}

// buildTTLElems returns !redis|ttl| Raft elements for non-string keys with dirty TTL state.
// String keys have TTL embedded in the value; they are handled by buildKeyElems.
func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] {
Expand Down Expand Up @@ -2827,13 +2927,14 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
defer readPin.Release()

txn := &txnContext{
server: r,
working: map[string]*txnValue{},
listStates: map[string]*listTxnState{},
zsetStates: map[string]*zsetTxnState{},
ttlStates: map[string]*ttlTxnState{},
readKeys: map[string][]byte{},
startTS: startTS,
server: r,
working: map[string]*txnValue{},
listStates: map[string]*listTxnState{},
zsetStates: map[string]*zsetTxnState{},
ttlStates: map[string]*ttlTxnState{},
readKeys: map[string][]byte{},
streamDeletions: map[string][]byte{},
startTS: startTS,
}

nextResults := make([]redisResult, 0, len(queue))
Expand Down
Loading
Loading