perf(redis): stream entry-per-key layout for O(new) XREAD#620
perf(redis): stream entry-per-key layout for O(new) XREAD#620
Conversation
Incident 2026-04-24: one client doing 11 XREAD/s on a large stream
consumed 14 CPU cores on the leader, starving raft and Lua paths.
proto.Unmarshal alone took 59% of those 14 cores because every read
re-parsed the entire stream blob.
Store each stream entry under its own key:
Meta: !stream|meta|<userKeyLen><userKey>
-> Length | LastMs | LastSeq (binary, fixed 24 B)
Entry: !stream|entry|<userKeyLen><userKey><StreamID(16 binary B)>
-> RedisStreamEntry proto
XREAD / XRANGE become bounded range scans that unmarshal only the
selected entries. XADD writes one entry + one meta. XLEN reads meta
only. The StreamID suffix is binary big-endian ms||seq so the lex
order over entry keys matches the (ms, seq) numeric order the client
sees.
Migration is dual-read with write-time rewrite: reads try the new
meta first and fall back to the legacy blob, bumping
elastickv_stream_legacy_format_reads_total; writes that observe the
legacy blob convert it to the new layout in the same transaction and
delete the blob. The legacy blob and the new layout never coexist in
a committed state, so XLEN never double-counts. When the counter
stays at zero the fallback code can be removed in a follow-up.
Commands touched: XADD, XREAD, XRANGE, XREVRANGE, XLEN, XTRIM.
deleteLogicalKeyElems also learns to clean up the new meta + entry
keys so DEL / overwrite paths stay correct.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 8 minutes and 1 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
📝 WalkthroughWalkthroughThis PR transitions Redis stream commands (XADD/XTRIM/XREAD/XRANGE) from a legacy whole-stream blob format to an entry-per-key layout with stream metadata. It maintains backward compatibility while introducing dual-read migration tracking, comprehensive test coverage, and monitoring for legacy format access. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request migrates Redis stream storage from a single-blob format to an entry-per-key (wide-column) layout to improve performance and scalability. It introduces a StreamMeta structure to track stream length and the last assigned ID, ensuring monotonicity even after trimming. The implementation includes a transparent migration path that converts legacy blobs to the new format during write operations. Review feedback correctly identified several critical issues where the request context and transaction read timestamp were not properly propagated in functions like xaddTxn, xtrimTxn, and buildXTrimHeadElems, which could lead to MVCC consistency violations and ignored request cancellations.
| } | ||
|
|
||
| stream, err := r.loadStreamAt(context.Background(), key, readTS) | ||
| migrationElems, existingEntries, meta, metaFound, err := r.streamWriteBase(context.Background(), key, readTS) |
There was a problem hiding this comment.
The ctx parameter passed to xaddTxn should be used here instead of context.Background() to ensure that the migration read respects the request's deadline and cancellation. Additionally, ensure the readTS is consistent with the transaction's start timestamp to maintain MVCC atomicity.
| migrationElems, existingEntries, meta, metaFound, err := r.streamWriteBase(context.Background(), key, readTS) | |
| migrationElems, existingEntries, meta, metaFound, err := r.streamWriteBase(ctx, key, readTS) |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
| func (r *RedisServer) buildXTrimHeadElems( | ||
| _ context.Context, | ||
| key []byte, | ||
| _ uint64, | ||
| migrationActive bool, | ||
| existingEntries []redisStreamEntry, | ||
| count int, | ||
| ) ([]*kv.Elem[kv.OP], error) { |
There was a problem hiding this comment.
The ctx and readTS parameters must be used in the underlying ScanAt call to ensure the scan is performed against the consistent snapshot of the transaction and respects the request context.
| func (r *RedisServer) buildXTrimHeadElems( | |
| _ context.Context, | |
| key []byte, | |
| _ uint64, | |
| migrationActive bool, | |
| existingEntries []redisStreamEntry, | |
| count int, | |
| ) ([]*kv.Elem[kv.OP], error) { | |
| func (r *RedisServer) buildXTrimHeadElems( | |
| ctx context.Context, | |
| key []byte, | |
| readTS uint64, | |
| migrationActive bool, | |
| existingEntries []redisStreamEntry, | |
| count int, | |
| ) ([]*kv.Elem[kv.OP], error) { |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
| // Live layout: fetch the oldest `count` entry keys via a bounded range scan. | ||
| prefix := store.StreamEntryScanPrefix(key) | ||
| end := store.PrefixScanEnd(prefix) | ||
| kvs, err := r.store.ScanAt(context.Background(), prefix, end, count, r.readTS()) |
There was a problem hiding this comment.
This ScanAt call is using a fresh timestamp from r.readTS() and context.Background(), which breaks the atomicity and consistency of the transaction. It must use the readTS and ctx passed to the function.
| kvs, err := r.store.ScanAt(context.Background(), prefix, end, count, r.readTS()) | |
| kvs, err := r.store.ScanAt(ctx, prefix, end, count, readTS) |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
| } | ||
|
|
||
| stream, err := r.loadStreamAt(context.Background(), key, readTS) | ||
| migrationElems, existingEntries, meta, _, err := r.streamWriteBase(context.Background(), key, readTS) |
There was a problem hiding this comment.
The ctx parameter passed to xtrimTxn should be used here instead of context.Background() to ensure the migration read is properly bounded by the request context and maintains transaction consistency.
| migrationElems, existingEntries, meta, _, err := r.streamWriteBase(context.Background(), key, readTS) | |
| migrationElems, existingEntries, meta, _, err := r.streamWriteBase(ctx, key, readTS) |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
adapter/redis_compat_commands_stream_test.go (2)
227-230: Minor:<<1to synthesise a future readTS is obscure.
uint64(time.Now().UnixNano()) << 1relies on HLC encoding internals to guarantee "strictly after any commit so far". A plain+ uint64(time.Second)(or a call into whatever helper the store uses) is easier to reason about for future maintainers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/redis_compat_commands_stream_test.go` around lines 227 - 230, The test's readTS uses a left-shift (readTS := uint64(time.Now().UnixNano()) << 1) which is obscure; change it to produce a clearly-future timestamp (e.g., readTS := uint64(time.Now().UnixNano()) + uint64(time.Second) or use the store's timestamp helper if available) so the subsequent GetAt call (nodes[0].redisServer.store.GetAt(ctx, redisStreamKey(key), readTS)) unambiguously reads "after any commit"; update the readTS assignment in the test accordingly.
183-255: Consider adding a migration + MAXLEN combo test.
TestRedis_StreamMigrationFromLegacyBlobexercises the migration path, andTestRedis_StreamXTrimMaxLenexercises MAXLEN on a post-migration stream. Nothing covers the interleaved case: seed a legacy blob with N entries, thenXADD … MAXLEN MwithM < N+1in one call. That is exactly the transaction shape where the migration Puts and trim Dels target the same keys, so it's worth pinning down as a regression guard.Want me to draft a
TestRedis_StreamMigrationWithMaxLenTrimcase that seeds a 5-entry legacy blob and issuesXADD … MAXLEN 2, asserting the final state is{new_last_2_plus_1}withXLEN == 2?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/redis_compat_commands_stream_test.go` around lines 183 - 255, Add a new test TestRedis_StreamMigrationWithMaxLenTrim that seeds a legacy blob via marshalStreamValue and nodes[0].redisServer.store.PutAt using redisStreamValue with 5 entries, registers the StreamLegacyFormatReadObserver on nodes[0].redisServer (like TestRedis_StreamMigrationFromLegacyBlob), then call rdb.XAdd with MaxLen set to 2 (using redis.XAddArgs{Stream: "legacy-stream", ID: "*", MaxLen: 2, Values: ...}) to exercise migration+trim in one transaction; assert the legacy blob key is removed from store (GetAt returns error), the stream now contains only the expected two newest entries (use rdb.XRead or rdb.XLen/XRange), XLEN returns 2, and gatherLegacyReads(registry) reflects only the expected legacy-read count.adapter/redis_compat_commands.go (1)
3769-3795: Drop the unusedhasLasttracking.
hasLastis set inside the loop and then explicitly discarded with_ = hasLaston Line 3794.resolveXAddIDdecides the "ever-assigned" flag frommeta.LastMs/LastSeqanyway, so this local is entirely dead.♻️ Proposed cleanup
- var ( - lastMs, lastSeq uint64 - hasLast bool - ) + var lastMs, lastSeq uint64 for _, entry := range val.Entries { @@ - lastMs, lastSeq, hasLast = parsed.ms, parsed.seq, true + lastMs, lastSeq = parsed.ms, parsed.seq } @@ - _ = hasLast return elems, val.Entries, migratedMeta, true, nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/redis_compat_commands.go` around lines 3769 - 3795, Remove the dead hasLast tracking: delete the hasLast variable declaration, any assignments "hasLast = true" inside the loop, and the redundant "_ = hasLast" discard; rely on lastMs/lastSeq (and resolveXAddID which uses meta.LastMs/meta.LastSeq) to indicate whether any IDs were assigned. Keep the existing parsing (tryParseRedisStreamID), marshaling (marshalStreamEntry), elems append, and construction of migratedMeta unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapter/redis_compat_commands.go`:
- Around line 3837-3870: The buildXTrimHeadElems helper currently ignores its
ctx and readTS parameters and calls context.Background() and r.readTS() inside,
causing snapshot drift and lost cancellation; change buildXTrimHeadElems to use
the passed-in ctx and readTS (instead of context.Background() and r.readTS())
when calling r.store.ScanAt, and better yet avoid the second scan entirely by
deriving deletions from the provided existingEntries[:count] when
existingEntries is populated by streamWriteBase; ensure callers like
xtrimTxn/xaddTxn pass their ctx and readTS through to buildXTrimHeadElems so the
scan is performed at the same snapshot and is cancellable.
- Around line 3702-3716: Add a unit test (e.g.,
TestRedis_StreamMigrationWithMaxLenTrim) that performs an XADD where
migrationElems != nil and MAXLEN causes a trim in the same transaction to assert
the final stream entries match expected (i.e., migrated entries trimmed as
appropriate), and add an inline comment in xaddTxn near the code that appends
migrationElems then trim elems (the block using migrationElems,
buildXTrimHeadElems, and the elems slice) explaining that overlapping
Put(StreamEntryKey) followed by Del(StreamEntryKey) is resolved by the
coordinator because operations are applied sequentially in insertion order (so
Del tombstones the Put at the same commitTS), and that the test documents this
insertion-order dependency of the apply layer.
In `@adapter/redis_compat_helpers.go`:
- Around line 926-951: The MULTI/EXEC path must mirror
deleteStreamWideColumnElems: update txnContext.stageKeyDeletion (and the
MULTI/EXEC delete handling around redisStreamKey(key)) to also tombstone the
stream meta key (store.StreamMetaKey(key)) and all entry keys (scanAllDeltaElems
/ store.StreamEntryScanPrefix(key)) instead of only redisStreamKey(key);
additionally ensure trackTypeReadKeys includes store.StreamMetaKey(key) so
migrated-stream writes are added to the txn read-set validation. Locate and call
the same logic used in deleteStreamWideColumnElems (or extract/reuse it) from
the code that stages deletions in adapter/redis.go (txnContext.stageKeyDeletion
and surrounding MULTI/EXEC handling) and add the meta/entry keys to the staged
deletions and read-key tracking.
In `@adapter/redis_compat_types.go`:
- Line 215: The new "!stream|" namespace was only added to knownInternalPrefixes
but not to redisInternalPrefixes or the internal->logical key mapping code, so
key discovery and reverse-mapping fail for migrated streams; add "!stream|" (and
its subprefixes like "!stream|meta|" and "!stream|entry|") to
redisInternalPrefixes and update the mapping logic in the adapter/redis.go
functions that scan bounded patterns and translate internal keys back to logical
keys (the code that handles redisInternalPrefixes and the internal-to-logical
mapping for meta/entry keys) so that KEYS and bounded scans treat "!stream|..."
as internal and map "!stream|meta|..." / "!stream|entry|..." correctly to the
original logical key.
---
Nitpick comments:
In `@adapter/redis_compat_commands_stream_test.go`:
- Around line 227-230: The test's readTS uses a left-shift (readTS :=
uint64(time.Now().UnixNano()) << 1) which is obscure; change it to produce a
clearly-future timestamp (e.g., readTS := uint64(time.Now().UnixNano()) +
uint64(time.Second) or use the store's timestamp helper if available) so the
subsequent GetAt call (nodes[0].redisServer.store.GetAt(ctx,
redisStreamKey(key), readTS)) unambiguously reads "after any commit"; update the
readTS assignment in the test accordingly.
- Around line 183-255: Add a new test TestRedis_StreamMigrationWithMaxLenTrim
that seeds a legacy blob via marshalStreamValue and
nodes[0].redisServer.store.PutAt using redisStreamValue with 5 entries,
registers the StreamLegacyFormatReadObserver on nodes[0].redisServer (like
TestRedis_StreamMigrationFromLegacyBlob), then call rdb.XAdd with MaxLen set to
2 (using redis.XAddArgs{Stream: "legacy-stream", ID: "*", MaxLen: 2, Values:
...}) to exercise migration+trim in one transaction; assert the legacy blob key
is removed from store (GetAt returns error), the stream now contains only the
expected two newest entries (use rdb.XRead or rdb.XLen/XRange), XLEN returns 2,
and gatherLegacyReads(registry) reflects only the expected legacy-read count.
In `@adapter/redis_compat_commands.go`:
- Around line 3769-3795: Remove the dead hasLast tracking: delete the hasLast
variable declaration, any assignments "hasLast = true" inside the loop, and the
redundant "_ = hasLast" discard; rely on lastMs/lastSeq (and resolveXAddID which
uses meta.LastMs/meta.LastSeq) to indicate whether any IDs were assigned. Keep
the existing parsing (tryParseRedisStreamID), marshaling (marshalStreamEntry),
elems append, and construction of migratedMeta unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0ac5c96a-6985-4141-8848-19a8fbce6c42
📒 Files selected for processing (10)
adapter/redis.goadapter/redis_compat_commands.goadapter/redis_compat_commands_stream_test.goadapter/redis_compat_helpers.goadapter/redis_compat_types.goadapter/redis_storage_codec.gomain.gomonitoring/redis.gomonitoring/registry.gostore/stream_helpers.go
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 70c65c7125
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| return nil, nil, store.StreamMeta{}, false, err | ||
| } | ||
| if metaFound { | ||
| entries, err := r.scanStreamEntriesAt(ctx, key, readTS, meta.Length) |
There was a problem hiding this comment.
Avoid full-stream scan before every XADD
xaddTxn now calls streamWriteBase on every append, and this branch eagerly scans and unmarshals the entire stream whenever meta exists. That makes each XADD O(n) again and, more importantly, routes appends through scanStreamEntriesAt's maxWideColumnItems guard, so once a stream grows past 100,000 entries, further XADDs fail with ErrCollectionTooLarge instead of continuing to append.
Useful? React with 👍 / 👎.
- Layer 1 (line 84): replace "full-range ZRANGE/ZRANGEBYSCORE" with unconditional gating of the whole ZRANGE family. Arg-inspection contradicted the stated "static, byte-level classification"; a bounded ZRANGE 0 10 costs at most one unmarshal and gating it is cheaper than the dispatcher branch that would distinguish the bounded vs unbounded case. - Layer 1 (line 122, container awareness): document that runtime.GOMAXPROCS(0) returns the host CPU count on Linux without cgroup awareness and call out two mitigations (operator-set GOMAXPROCS env at rolling-update level, or wire uber-go/automaxprocs). v1 prefers the operator-set path for auditability; automaxprocs acceptable as follow-up. - Layer 1 (line 139, single-pool starvation): acknowledge the risk that KEYS/SCAN bursts can exhaust pool slots and starve XREAD/Lua. v1 still ships a single pool but requires a per-command submit metric so a tier split is measurable from observability rather than guessed; sub-pools/slot reservation are the named follow-up. - Layer 3 (line 252, reject semantics): change the v1 shape from "close TCP without RESP" to "accept, write -ERR, then close." The protocol-level error is the signal that distinguishes "server overload" from "network blip" on the client side. - Layer 4 (line 315, synchronous migration cost): add a chunked migration section. A single XADD on a 100k-entry legacy blob would rewrite every entry in one Raft commit, reproducing the CPU/commit spike the design is supposed to prevent. Document STREAM_MIGRATION_CHUNK (default 1024) and the rolling drain model; explicitly scope chunked migration as a stacked follow-up to PR #620 (which ships the simple one-txn version).
- Layer 1 classification (line 87, Gemini HIGH): carve out blocking XREAD/BLPOP/BRPOP/BZPOP from pool gating. Blocking variants hold a slot while idle, trivially exhaust the pool; handle them on their own goroutine and re-gate on wake-up. - Layer 1 Lua recursion (line 117, Gemini Medium): document the context-propagation mechanism that makes option A implementable. A package-private sentinel value on ctx distinguishes inner call from new request; external callers cannot fake it. - Layer 1 GOMAXPROCS (line 134, Codex P2): correct stale guidance. Go 1.25+ already derives GOMAXPROCS from cgroup v2 quota, so the automaxprocs recommendation no longer applies to this repo; keep the override env knob for operators who want explicit control. - Layer 3 reject semantics (line 283, Gemini Medium): add a reject-storm mitigation. Rate-limit the reject itself — after R rejects/s to one peer, switch from accept+write+close to RST; recommend client-side backoff in the ops runbook. - Layer 4 migration (line 345, Gemini HIGH + Codex P1): split the migration doc into Mode A (simple, PR #620 ships this) and Mode B (chunked, stacked follow-up). The read rules differ: Mode A has no mixed state so fall-through is correct; Mode B has a legal mixed state and MUST always merge both layouts. Making the distinction explicit prevents the dual-read correctness bug the reviewers flagged — Mode B with Mode A's fall-through rule returns incomplete results during chunking. - Sequencing (line 431, Gemini Medium): resolve the XREAD gated-vs-cheap contradiction. Layer 4 makes XREAD steady-state O(new) but we keep it gated in Layer 1 v1 for three concrete reasons (large XRANGE bounds, legacy-fallback window, data-driven promotion); promotion to ungated is gated on the pool-submit metric added in Layer 1.
…CC fixes
Critical correctness fixes:
- streamWriteBase now takes a fast path (meta-only read) when the stream is
already in the new entry-per-key layout, so post-migration XADD/XTRIM stay
O(1) in the stream size. Only the migration branch (legacy blob present)
loads entries, and only once — matching the original O(new) design intent.
Codex P1: full-stream scan on every XADD after migration was the exact
starvation pattern this PR was meant to fix.
- xaddTxn, xtrimTxn, buildXTrimHeadElems, resolveXAddID-related reads all
use the outer transaction's ctx+readTS now. The previous code mixed
context.Background() and a fresh r.readTS() inside, which broke MVCC
atomicity: the commit landed at readTS but the migration read saw a
later snapshot, so concurrent writes in the gap could be trimmed away
out from under the caller's view. Gemini (4 critical hits), CodeRabbit.
- MULTI/EXEC DEL / EXPIRE 0 on a migrated stream now tombstones
!stream|meta|<key> and every !stream|entry|<key><id> row, not just the
(already-empty) legacy blob key. Added streamDeletions to txnContext,
expanded at commit via deleteStreamWideColumnElems. trackTypeReadKeys
also tracks store.StreamMetaKey so concurrent writers trigger OCC
conflicts. CodeRabbit critical.
- KEYS / SCAN now understand the !stream|meta|<len><userKey> layout:
redisInternalPrefixes includes StreamMetaPrefix, mergeInternalNamespaces
scans it via the length-prefix-aware bound, extractRedisInternalUserKey
and wideColumnVisibleUserKey reverse-map meta→userKey and classify entry
keys as internal-only. Meta is the uniqueness anchor so each migrated
stream appears exactly once, never once per entry. CodeRabbit major.
Linter / style fixes flagged by reviewdog:
- gci on adapter/redis{.go,_compat_commands.go,_compat_commands_stream_test.go,
_compat_helpers.go,_compat_types.go,_storage_codec.go}, store/stream_helpers.go,
monitoring/redis.go.
- gocritic appendAssign at 3 sites (migrationElems destination slice).
- gocritic ifElseChain → switch in streamBoundHigh.
- gosec G115 in stream test: centralised nowNanos helper with an explicit
positive bound so gosec can see the conversion is safe.
- mnd: magic number 63 in store/stream_helpers.go → named
streamMetaLengthSignBit. Magic number +3 in xaddTxn capacity hint →
named xaddFixedElemCount.
- SA9003 empty branch in scanStreamEntriesAt: removed and documented
expectedLen as a reserved hint.
- wrapcheck on store.MarshalStreamMeta / UnmarshalStreamMeta /
cockerrors.Newf — wrapped with errors.WithStack.
- cyclop: xaddTxn split into xaddTxn + xaddTrimIfNeeded.
resolveXReadAfterIDs → resolveXReadAfterIDs + resolveXReadDollarID +
dollarIDFromState. extractRedisInternalUserKey uses a table-driven
trim-prefix pass to keep complexity under the cap.
- goconst: "0-0" → streamZeroID.
New tests:
- TestRedis_StreamMigrationWithMaxLenTrim: exercises XADD MAXLEN on a
legacy blob, documenting the Put-then-Del same-key resolution contract
the apply layer provides.
- TestRedis_StreamMultiExecDelRemovesWideColumnLayout: regression guard
for the MULTI/EXEC DEL wide-column cleanup.
go build / vet / golangci-lint run ./... → clean. Stream tests pass
(TestRedis_StreamXReadLatencyIsConstant still enforces the O(new)
guarantee at 10k entries).
|
Pushed Critical correctness fixes
Linter issues flagged by reviewdog
New tests
Verification
Other review nits addressed: the dead /gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates Redis streams from a legacy single-blob format to a wide-column layout, improving performance for large streams by allowing entry-level access. The changes include migration logic during write operations, updated command handlers for XADD, XREAD, and XRANGE, and enhanced deletion logic to prevent row leaks. Review feedback suggests optimizing slice allocations for batch operations and improving architectural consistency by propagating context and read timestamps through internal helper functions.
| // Capacity hint covers: migrationElems + one entry Put + variadic trim Dels | ||
| // (typically 0, bounded by maxLen) + one meta Put. | ||
| const xaddFixedElemCount = 2 | ||
| elems := make([]*kv.Elem[kv.OP], 0, len(migrationElems)+xaddFixedElemCount) |
There was a problem hiding this comment.
The capacity hint for elems does not account for potential trim operations. If MAXLEN is specified and the stream is large, this slice will undergo multiple reallocations. Consider estimating the trim count to pre-allocate a more accurate buffer.
trimCount := 0
if req.maxLen > 0 && meta.Length+1 > int64(req.maxLen) {
trimCount = int(meta.Length+1) - req.maxLen
}
elems := make([]*kv.Elem[kv.OP], 0, len(migrationElems)+xaddFixedElemCount+trimCount)References
- Pre-allocating a buffer is acceptable if a strictly enforced maximum size limit is checked before allocation to avoid unnecessary reallocations.
| // asking the store for the highest ID ever assigned. New-layout streams | ||
| // answer from meta in one read; legacy blobs fall back to a full load. | ||
| // Returns streamZeroID for non-existent and empty-never-written streams. | ||
| func (r *RedisServer) resolveXReadDollarID(key []byte) (string, error) { |
There was a problem hiding this comment.
Update this signature to accept ctx and readTS from the caller to avoid redundant snapshot acquisition and ensure consistent context propagation.
| func (r *RedisServer) resolveXReadDollarID(key []byte) (string, error) { | |
| func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte, readTS uint64) (string, error) { |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
- Gemini Medium (resolveXReadDollarID): thread caller's ctx through resolveXReadAfterIDs / resolveXReadDollarID / dollarIDFromState so cancellation of the outer BLOCK deadline tears down the in-flight dollar-resolve scan. - Gemini Medium (xaddTxn capacity hint): extract estimateXAddTrimCount helper to pre-size the elems slice when MAXLEN will trim entries. Avoids repeated slice grows on large streams hitting MAXLEN; kept the main xaddTxn body within the cyclop budget. - xread: ctx now derived from the BLOCK deadline so dollar resolve and the busy-poll share one cancellation path.
|
Round 3 in 5ea7f5e:
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates Redis streams from a single-blob format to a wide-column entry-per-key layout to improve performance and scalability. Key changes include refactoring XADD, XTRIM, XREAD, and XRANGE to support the new layout, implementing a dual-read migration path with observability, and ensuring consistent deletion of migrated streams. Feedback highlights the need to enforce maxWideColumnItems limits during XADD and migration to prevent unbounded growth, capping the number of deletions in XTRIM to avoid Raft size issues, fixing a boundary condition in stream scanning for the maximum possible ID, and removing redundant ID storage in the entry serialization.
| elems := make([]*kv.Elem[kv.OP], 0, | ||
| len(migrationElems)+xaddFixedElemCount+estimateXAddTrimCount(req.maxLen, meta.Length)) |
There was a problem hiding this comment.
The xaddTxn function lacks a check against maxWideColumnItems for the resulting stream length. If an XADD (without MAXLEN or with a large MAXLEN) would push the stream beyond the 100,000 entry limit, it should be rejected with ErrCollectionTooLarge to maintain consistency with other wide-column types and prevent unbounded growth.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| val, err := unmarshalStreamValue(legacy) | ||
| if err != nil { | ||
| return nil, nil, store.StreamMeta{}, false, err | ||
| } |
There was a problem hiding this comment.
When migrating a legacy stream blob, streamWriteBase should verify that the number of entries does not exceed maxWideColumnItems. Migrating a blob larger than this limit into individual entry keys in a single transaction could lead to OOM or exceed Raft message size limits.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| if count <= 0 { | ||
| return nil, nil | ||
| } | ||
| elems := make([]*kv.Elem[kv.OP], 0, count) | ||
| if migrationActive { | ||
| for i := 0; i < count && i < len(existingEntries); i++ { | ||
| parsed, ok := tryParseRedisStreamID(existingEntries[i].ID) | ||
| if !ok { | ||
| return nil, cockerrors.WithStack(cockerrors.Newf("invalid legacy stream ID %q", existingEntries[i].ID)) | ||
| } | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: store.StreamEntryKey(key, parsed.ms, parsed.seq)}) | ||
| } | ||
| return elems, nil | ||
| } | ||
| // Live layout: fetch the oldest `count` entry keys via a bounded range scan. | ||
| prefix := store.StreamEntryScanPrefix(key) | ||
| end := store.PrefixScanEnd(prefix) | ||
| kvs, err := r.store.ScanAt(ctx, prefix, end, count, readTS) | ||
| if err != nil { | ||
| return nil, cockerrors.WithStack(err) | ||
| } | ||
| for _, pair := range kvs { | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: append([]byte(nil), pair.Key...)}) | ||
| } | ||
| return elems, nil |
There was a problem hiding this comment.
The buildXTrimHeadElems function should enforce a cap on the count of entries to be deleted, ideally using maxWideColumnItems. A very large trim request (e.g., XTRIM key MAXLEN 1 on a stream with 100k entries) will generate a massive number of Del operations in a single transaction, which will likely fail Raft size limits or cause performance degradation.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| ms, seq := parsed.ms, parsed.seq | ||
| if seq < ^uint64(0) { | ||
| seq++ | ||
| } else if ms < ^uint64(0) { | ||
| ms++ | ||
| seq = 0 | ||
| } |
There was a problem hiding this comment.
There is a logic error in streamScanStartForAfter when afterID is the maximum possible stream ID (MaxUint64-MaxUint64). In this case, the increment logic fails to produce a key strictly after the input, and the resulting scan will incorrectly include the entry with the maximum ID. Since no ID can exist strictly after the maximum, the function should return a sentinel (like store.PrefixScanEnd(prefix)) that results in an empty scan.
| ms, seq := parsed.ms, parsed.seq | |
| if seq < ^uint64(0) { | |
| seq++ | |
| } else if ms < ^uint64(0) { | |
| ms++ | |
| seq = 0 | |
| } | |
| ms, seq := parsed.ms, parsed.seq | |
| if seq < ^uint64(0) { | |
| seq++ | |
| } else if ms < ^uint64(0) { | |
| ms++ | |
| seq = 0 | |
| } else { | |
| return store.PrefixScanEnd(prefix) | |
| } |
| func marshalStreamEntry(entry redisStreamEntry) ([]byte, error) { | ||
| return marshalStoredRedisMessage(storedRedisStreamEntryProtoPrefix, &pb.RedisStreamEntry{ | ||
| Id: entry.ID, | ||
| Fields: cloneStringSlice(entry.Fields), | ||
| }) | ||
| } |
There was a problem hiding this comment.
The implementation of marshalStreamEntry serializes the entry.ID into the value protobuf. However, the PR description states that only Fields need to be serialized because the ID is already encoded in the storage key. Storing the ID in the value is redundant and increases storage overhead per entry.
While unmarshalStreamEntry currently relies on this field, it could be refactored to reconstruct the string ID from the binary suffix of the storage key (which is available to the callers).
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5ea7f5e49f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := r.resolveXReadAfterIDs(ctx, &req); err != nil { | ||
| conn.WriteError(err.Error()) | ||
| return |
There was a problem hiding this comment.
Return timeout null when '$' resolution hits BLOCK deadline
When XREAD ... BLOCK ... STREAMS ... $ runs, $ resolution now uses the same timeout context as the block window, but this branch returns any context deadline error directly to the client. If resolving $ is slower than the requested block interval (for example, a large legacy stream that requires a full load), the command responds with an error instead of the expected timeout-null behavior, which changes client-visible protocol semantics for blocked reads.
Useful? React with 👍 / 👎.
| if seq < ^uint64(0) { | ||
| seq++ | ||
| } else if ms < ^uint64(0) { | ||
| ms++ | ||
| seq = 0 | ||
| } |
There was a problem hiding this comment.
Preserve strict-after semantics at max stream ID
streamScanStartForAfter makes scans exclusive by incrementing afterID, but when afterID is the maximal ID (ms and seq both ^uint64(0)), no increment occurs and the start key stays equal to the boundary. Because ScanAt includes the start key, XREAD can return the boundary entry again instead of returning only IDs strictly greater than afterID.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5ea7f5e49f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } else if exists { | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: metaKey}) | ||
| } | ||
| entryElems, err := r.scanAllDeltaElems(ctx, store.StreamEntryScanPrefix(key), readTS) |
There was a problem hiding this comment.
Stop capping stream deletes at 100k entries
deleteStreamWideColumnElems delegates stream-entry cleanup to scanAllDeltaElems, which hard-fails once more than maxWideColumnItems keys are seen. Streams are not write-capped to 100k entries, so migrated streams larger than that will make DEL/overwrite cleanup (including MULTI/EXEC delete paths) return ErrCollectionTooLarge instead of removing the key. This regresses behavior from the legacy blob layout and can leave large streams undeletable.
Useful? React with 👍 / 👎.
| limit := count | ||
| if limit <= 0 { | ||
| limit = maxWideScanLimit | ||
| } |
There was a problem hiding this comment.
Preserve full XRANGE results when COUNT is omitted
For new-layout streams, rangeStreamNewLayout forces unbounded ranges (COUNT not provided) to maxWideScanLimit and returns that single scan directly, so XRANGE key - + / XREVRANGE silently truncate once the stream exceeds 100001 entries. Callers get no error or continuation signal, so replay/backfill logic can miss entries on large streams.
Useful? React with 👍 / 👎.
The prior threshold `max <= 2*first + 10ms` was fooled by two separate sources of variance that have nothing to do with the O(new) property the test is meant to guard: - `first` is a cold sample. Single-ms noise on it turned the whole budget tight. 31.6ms first + 73ms ceiling is what CI showed in 24899093387, and a 97.8ms outlier was enough to flip red even though the algorithmic shape is unchanged. - `max(100 probes)` under `-race` on a shared CI runner is dominated by scheduler tail latency (GC pauses, neighbour-job preemption). The old blob-based XREAD was 10x+ slower on every probe, not just the max, so comparing the median catches the regression without absorbing scheduler noise. Rework: - 8-probe warmup discarded; baseline = median of warmups. - Assert median <= 3*baseline + 20ms (O(new) guard). - Assert p95 <= 6*baseline + 40ms (headroom for scheduler jitter). The old blob implementation made every probe O(stream_size), so even under the relaxed 3x median / 6x p95 ceilings the regression would still show up an order of magnitude wider than the threshold. Verified locally (non-race): test passes in 142s end-to-end.
|
Round 4 in 50def0d (additional, see prior comment for no-reviewer-feedback state): Stabilised Rework:
Under the old blob implementation every probe was O(stream_size), so a 10k-entry stream still produces the original 10x+ regression signal — the widened thresholds do not hide the class of bug this test guards against. Verified locally (non-race): test passes in 142s. /gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates Redis streams from a single-blob format to a wide-column layout (entry-per-key), introducing a metadata record to track stream length and the last assigned ID for O(1) appends. The implementation includes migration logic for legacy blobs, updated stream commands, and improved transaction handling to prevent row leaks during deletion. Feedback identifies several critical issues: the absence of collection size limits in xaddTxn, scanStreamEntriesAfter, and rangeStreamNewLayout which poses OOM risks; potential performance bottlenecks during migration-path trimming; a logic error where MAXLEN 0 is ignored; and unhandled edge cases for the maximum possible stream ID in range scans.
| Value: entryValue, | ||
| }) | ||
|
|
||
| nextLen, trim, err := r.xaddTrimIfNeeded(ctx, key, readTS, req.maxLen, meta.Length+1, migrationElems != nil, existingEntries) |
There was a problem hiding this comment.
The xaddTxn function does not enforce the maxWideColumnItems limit when appending a new entry. This violates the general rule to apply a fixed bound to collections that can grow from external requests to prevent unbounded memory growth and potential OOM issues. If a stream grows beyond the limit, subsequent operations like DEL or XLEN (on legacy blobs) will fail with ErrCollectionTooLarge.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| elems = append(elems, migrationElems...) | ||
| elems = append(elems, &kv.Elem[kv.OP]{ | ||
| Op: kv.Put, | ||
| Key: store.StreamEntryKey(key, parsedID.ms, parsedID.seq), | ||
| Value: entryValue, | ||
| }) | ||
|
|
||
| nextLen, trim, err := r.xaddTrimIfNeeded(ctx, key, readTS, req.maxLen, meta.Length+1, migrationElems != nil, existingEntries) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| return id, r.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Put, Key: redisStreamKey(key), Value: payload}, | ||
| elems = append(elems, trim...) |
There was a problem hiding this comment.
When migrationActive is true and a MAXLEN trim is requested, the code currently appends all legacy entries as Put operations in migrationElems and then appends Del operations for the trimmed entries in trim. This results in redundant Raft operations and can lead to extremely large transactions that might exceed the Raft message size limit (etcdMaxSizePerMsg) or cause performance spikes.
Consider optimizing streamWriteBase or xaddTxn to only emit Put operations for the entries that will survive the trim.
| limit = maxWideScanLimit | ||
| } |
There was a problem hiding this comment.
The scanStreamEntriesAfter function uses maxWideScanLimit as a default limit but does not check if the resulting number of entries exceeds maxWideColumnItems. This could lead to OOM if a user performs an XREAD without a COUNT on a very large stream. It should mirror the check found in scanStreamEntriesAt to apply a fixed bound to collections growing from external requests.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| limit = maxWideScanLimit | ||
| } |
There was a problem hiding this comment.
The rangeStreamNewLayout function uses maxWideScanLimit as a default limit but does not check if the resulting number of entries exceeds maxWideColumnItems. This could lead to OOM if a user performs an XRANGE with a very wide range and no COUNT. It should mirror the check found in scanStreamEntriesAt to prevent unbounded memory growth.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| migrationActive bool, | ||
| existingEntries []redisStreamEntry, | ||
| ) (int64, []*kv.Elem[kv.OP], error) { | ||
| if maxLen <= 0 || candidateLen <= int64(maxLen) { |
There was a problem hiding this comment.
The condition if maxLen <= 0 causes XADD ... MAXLEN 0 to skip the trim operation entirely. In Redis, MAXLEN 0 is a valid command that should result in an empty stream (or a stream containing only the newly added entry if it's not immediately trimmed). This check should be changed to maxLen < 0 or handled specifically to allow trimming to zero.
| if maxLen <= 0 || candidateLen <= int64(maxLen) { | |
| if maxLen < 0 || (maxLen > 0 && candidateLen <= int64(maxLen)) { |
| ms, seq := parsed.ms, parsed.seq | ||
| if seq < ^uint64(0) { | ||
| seq++ | ||
| } else if ms < ^uint64(0) { | ||
| ms++ | ||
| seq = 0 | ||
| } |
There was a problem hiding this comment.
If afterID is the maximum possible stream ID (^uint64(0)-^uint64(0)), the increment logic will fail to produce a key strictly greater, and streamScanStartForAfter will return a key equal to afterID. Since the scan is inclusive, it will incorrectly include the afterID entry. While this is an extreme edge case, the logic should handle it to ensure XREAD is always strictly exclusive of the afterID.
| if seq < ^uint64(0) { | ||
| seq++ | ||
| } else if ms < ^uint64(0) { | ||
| ms++ | ||
| seq = 0 | ||
| } |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 50def0d7fc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if seq < ^uint64(0) { | ||
| seq++ | ||
| } else if ms < ^uint64(0) { | ||
| ms++ | ||
| seq = 0 |
There was a problem hiding this comment.
Exclude max-ID bound when computing XREAD start key
When afterID is 18446744073709551615-18446744073709551615, neither increment branch runs, so streamScanStartForAfter returns a start key equal to afterID. Because ScanAt uses an inclusive start bound, XREAD can return the boundary entry again instead of strictly newer entries. This breaks the > semantics for the max-ID edge case (including $ resolved to that ID).
Useful? React with 👍 / 👎.
| if err := r.resolveXReadAfterIDs(ctx, &req); err != nil { | ||
| conn.WriteError(err.Error()) | ||
| return |
There was a problem hiding this comment.
Treat XREAD BLOCK timeout as null, not error
The $ resolution step now runs under context.WithTimeout(blockDuration) and any timeout is written back as an error. For short BLOCK windows (for example BLOCK 1) or a slow store, this can surface context deadline exceeded instead of the expected timeout/null response for XREAD. This is a protocol-level behavior regression introduced by routing deadline errors directly to WriteError.
Useful? React with 👍 / 👎.
| limit := count | ||
| if limit <= 0 { | ||
| limit = maxWideScanLimit | ||
| } |
There was a problem hiding this comment.
Avoid silently truncating unbounded XREAD scans
For XREAD without COUNT, scanStreamEntriesAfter hard-caps the scan to maxWideScanLimit and returns immediately. If more rows exist after afterID, callers get only the first page with no continuation or error signal, which can silently drop data for consumers expecting an unbounded read. The previous path read the full in-memory stream, so this is a regression in observable behavior for large streams.
Useful? React with 👍 / 👎.
Codex P2: round 3 wrapped the whole xread body in a context.WithTimeout(blockDuration). Short BLOCK windows (e.g. 'BLOCK 1') could then produce context.DeadlineExceeded during $ resolution and return that to the client as an error, breaking the XREAD BLOCK contract (timeout = null, not error). Fix: $ resolution uses its own short redisDispatchTimeout context (a single bounded read, not a wait). The busy-poll loop still honours the BLOCK deadline and returns null on expiry. MAXLEN 0 semantics (Gemini Medium) not changed in this PR. The current parser conflates 'MAXLEN absent' and 'MAXLEN 0' both into req.maxLen == 0; fixing MAXLEN 0 to trim-to-empty requires a parser change (sentinel -1 for absent) and audit of every caller of xaddRequest.maxLen. Out of scope for this round; tracked as a follow-up.
|
Addressed in 8bc3eeb: Codex P2 line 4338 (BLOCK timeout → error regression, introduced in round 3): Comments I'm declining with justification:
/gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 01cdf5eb6e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if len(kvs) > maxWideColumnItems { | ||
| return nil, errors.Wrapf(ErrCollectionTooLarge, "stream %q exceeds %d entries", key, maxWideColumnItems) |
There was a problem hiding this comment.
Remove hard 100k cap from full stream materialization
This scan path now hard-fails when more than maxWideColumnItems entries are present, but loadStreamAt uses scanStreamEntriesAt for migrated streams and Lua stream commands still materialize full stream state through streamState (adapter/redis_lua_context.go, loadStreamAt call). That means EVAL/Lua flows touching large streams can start returning ErrCollectionTooLarge after migration, even though the legacy blob path did not have this failure mode, so this introduces a production regression for large-stream scripting workloads.
Useful? React with 👍 / 👎.
| parsed, ok := tryParseRedisStreamID(raw) | ||
| if !ok { | ||
| return nil, errors.New("ERR Invalid stream ID specified as stream command argument") |
There was a problem hiding this comment.
Accept shorthand stream IDs in new-layout range bounds
The new range-bound parser rejects any bound that is not strict ms-seq, which makes migrated streams error on inputs like XRANGE key 0 + that the legacy in-memory path previously accepted via compareRedisStreamID fallback logic. Because this behavior now depends on whether the stream has migrated, the same client request can start failing with ERR Invalid stream ID specified as stream command argument after a write-triggered migration, creating a protocol regression.
Useful? React with 👍 / 👎.
Two Codex P2 protocol regressions introduced by the entry-per-key migration. (1) Shorthand stream IDs in XRANGE/XREVRANGE bounds streamBoundLow/High rejected any bound that was not strict ms-seq. The legacy in-memory path accepted shorthand like "XRANGE key 0 +" or "XRANGE key 1001 1002" via compareRedisStreamID's string-compare fallback. After a write-triggered migration the same command now errors with "ERR Invalid stream ID specified as stream command argument", which is a user-visible protocol regression that depends on whether the stream has been migrated. Fix: new parseStreamBoundID helper accepts both ms-seq and the shorthand "ms" form. Shorthand expands to either (ms, 0) or (ms, MaxUint64) depending on which side of the plus/minus one exclusive shift produces the "match all entries with this ms" half-open range Redis specifies. Guarded by three new assertions in TestRedis_StreamXRangeBounds covering lower inclusive, upper inclusive, and upper exclusive shorthand cases. (2) Hard 100k cap in scanStreamEntriesAt loadStreamAt is used by the Lua stream bridge (streamState in adapter/redis_lua_context.go) to rebuild the full stream state for XADD-via-Lua / XDEL / etc. The legacy blob load returned the full stream regardless of size; the new-layout scan capped at maxWideColumnItems and errored on any stream with more entries. Migrated large-stream Lua workloads would start returning ErrCollectionTooLarge on the same input that worked pre-migration. Fix: size the scan from the caller-supplied expectedLen (meta.Length) plus a small slack for concurrent writes, and drop the hard cap. User-bounded paths -- scanStreamEntriesAfter and rangeStreamNewLayout -- keep the cap because they serve unbounded user requests (XREAD/XRANGE without COUNT) which could legitimately OOM the server.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Addressed both Codex P2 regressions in a6b176d: Shorthand stream IDs in XRANGE/XREVRANGE bounds (
Hard 100k cap in /gemini review |
| kvs, err := r.store.ScanAt(ctx, cursor, end, store.MaxDeltaScanLimit, readTS) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| for _, pair := range kvs { |
There was a problem hiding this comment.
scanStreamEntryKeysForDelete accumulates a Del elem for every stream entry key into a single slice. For large streams this can blow up memory and/or exceed the coordinator/Raft request size limits when the resulting elem list is dispatched, making DEL/EXPIRE on big migrated streams unreliable. Consider switching to a prefix-delete primitive (e.g., store-level DeletePrefixAt for the entry prefix plus a single Del for the meta key) or dispatching deletes in bounded batches instead of returning the full list.
| var elems []*kv.Elem[kv.OP] | ||
| for _, k := range keys { | ||
| userKey := t.streamDeletions[k] | ||
| streamElems, err := t.server.deleteStreamWideColumnElems(ctx, userKey, t.startTS) | ||
| if err != nil { |
There was a problem hiding this comment.
buildStreamDeletionElems expands each staged stream deletion into a full scan of entry keys and appends all resulting Del elems to the transaction batch. For large streams this can produce an extremely large Raft/coordinator payload (and high memory usage) during EXEC/commit. It likely needs a bounded/batched deletion strategy (or a prefix-delete operation) rather than materializing every delete as an elem in a single commit.
| elems := make([]*kv.Elem[kv.OP], 0, len(val.Entries)+1) | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: redisStreamKey(key)}) | ||
| var lastMs, lastSeq uint64 | ||
| for _, entry := range val.Entries { | ||
| parsed, ok := tryParseRedisStreamID(entry.ID) |
There was a problem hiding this comment.
The migration path in streamWriteBase builds a transaction batch with one Put elem per legacy entry (plus a Del for the blob). For sufficiently large legacy streams this can exceed the Raft/coordinator request size limits (e.g., the 64MiB max marshaled command) and cause XADD/XTRIM to fail, leaving the stream stuck in legacy format. Consider adding a size guard and falling back to a batched/incremental migration strategy when the legacy blob is too large to rewrite in a single commit.
…eteStreamWideColumnElems Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/a4f917f4-3153-4862-a6e3-10c011a202c9 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
…lumnElems Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/a4f917f4-3153-4862-a6e3-10c011a202c9 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Applied both changes from the review thread in commit
Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
|
@copilot apply changes based on the comments in this thread |
…tries Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/a12556c2-7ef6-467f-ad8d-34102d3d8ef6 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Applied all three comments from the review thread in commits
|
| // All reads use the caller-supplied ctx and readTS, so the scan happens | ||
| // at the exact same MVCC snapshot as the outer transaction and honours | ||
| // request cancellation. | ||
| func (r *RedisServer) streamWriteBase(ctx context.Context, key []byte, readTS uint64) ([]*kv.Elem[kv.OP], []redisStreamEntry, store.StreamMeta, bool, error) { |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
calculated cyclomatic complexity for function streamWriteBase is 12, max is 10 (cyclop)
| // supplies the raw value bytes loaded from an entry key. | ||
| func unmarshalStreamEntry(raw []byte) (redisStreamEntry, error) { | ||
| if len(raw) == 0 { | ||
| return redisStreamEntry{}, nil |
There was a problem hiding this comment.
unmarshalStreamEntry returns an empty redisStreamEntry when raw is empty. Entry values should never be empty for the entry-per-key layout (writes always include a prefix + protobuf payload), so silently treating this as a valid entry can surface blank IDs/fields and hide on-disk corruption. Consider returning an error on len(raw)==0 (or at least errUnrecognizedStoredRedisFormat) so corrupted rows fail fast instead of producing a malformed stream entry.
| return redisStreamEntry{}, nil | |
| return redisStreamEntry{}, errUnrecognizedStoredRedisFormat |
| prefix := store.StreamEntryScanPrefix(key) | ||
| end := store.PrefixScanEnd(prefix) | ||
| start := streamScanStartForAfter(prefix, afterID) | ||
| limit := count | ||
| unbounded := limit <= 0 | ||
| if unbounded { | ||
| limit = maxWideScanLimit | ||
| } | ||
| kvs, err := r.store.ScanAt(ctx, start, end, limit, readTS) | ||
| if err != nil { | ||
| return nil, cockerrors.WithStack(err) | ||
| } | ||
| if unbounded && len(kvs) > maxWideColumnItems { | ||
| return nil, cockerrors.Wrapf(ErrCollectionTooLarge, "stream %q exceeds %d entries", key, maxWideColumnItems) | ||
| } | ||
| entries := make([]redisStreamEntry, 0, len(kvs)) | ||
| for _, pair := range kvs { | ||
| entry, err := unmarshalStreamEntry(pair.Value) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| entries = append(entries, entry) | ||
| } | ||
| return entries, nil | ||
| } | ||
|
|
||
| // streamScanStartForAfter returns the inclusive start key to use for an | ||
| // XREAD-style "after afterID" range scan. If afterID parses cleanly we | ||
| // start at ID+1 so the scan is exclusive of afterID. If afterID is | ||
| // malformed we fall back to the entry prefix, which is conservatively | ||
| // wider and will be filtered above. | ||
| // | ||
| // Edge case: if afterID is (math.MaxUint64-math.MaxUint64), there is no | ||
| // successor ID inside the entry-prefix keyspace, so the correct start is | ||
| // one past the prefix (empty scan). Returning the afterID key itself | ||
| // would make the inclusive scan include it, which is the opposite of | ||
| // "strictly after." | ||
| func streamScanStartForAfter(prefix []byte, afterID string) []byte { | ||
| parsed, ok := tryParseRedisStreamID(afterID) | ||
| if !ok { | ||
| return prefix | ||
| } | ||
| ms, seq := parsed.ms, parsed.seq | ||
| switch { | ||
| case seq < ^uint64(0): | ||
| seq++ | ||
| case ms < ^uint64(0): | ||
| ms++ | ||
| seq = 0 | ||
| default: | ||
| // afterID is the largest representable stream ID. No entry can be | ||
| // strictly after it; return the scan-end sentinel so the scan is | ||
| // empty instead of silently inclusive. |
There was a problem hiding this comment.
scanStreamEntriesAfter relies on streamScanStartForAfter() to make the scan exclusive. When afterID fails to parse, streamScanStartForAfter returns the entry prefix and the scan returns all entries, but nothing filters them afterward (despite the comment saying it will be filtered). This makes XREAD on migrated streams return incorrect results for malformed/non-standard afterIDs. Consider either validating afterID up-front and returning "ERR Invalid stream ID..." on parse failure, or filtering the scanned results with the same compare logic used in selectXReadEntries before applying COUNT.
| // redisInternalTrimPrefixes is the fixed ordered list of plain-prefix | ||
| // internal namespaces whose logical user key is just the bytes after the | ||
| // prefix. It's used by extractRedisInternalUserKey to keep the cyclomatic | ||
| // complexity below the package cap; the extra table indirection costs | ||
| // nothing because every entry is a string constant. | ||
| var redisInternalTrimPrefixes = []string{ | ||
| redisStrPrefix, | ||
| redisHashPrefix, | ||
| redisSetPrefix, | ||
| redisZSetPrefix, | ||
| redisHLLPrefix, | ||
| redisStreamPrefix, | ||
| } | ||
|
|
||
| func extractRedisInternalUserKey(key []byte) []byte { | ||
| for _, prefix := range redisInternalTrimPrefixes { | ||
| if bytes.HasPrefix(key, []byte(prefix)) { | ||
| return bytes.TrimPrefix(key, []byte(prefix)) | ||
| } | ||
| } |
There was a problem hiding this comment.
extractRedisInternalUserKey converts each string prefix to []byte on every call (bytes.HasPrefix(key, []byte(prefix)) / bytes.TrimPrefix(...)). This allocates repeatedly and can become noticeable in KEYS/SCAN-heavy workloads. Consider storing the prefixes as precomputed [][]byte (or using a small switch on key[0:...] where possible) to avoid per-call allocations.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 88ffdfa004
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if len(entriesToMigrate) > maxWideColumnItems { | ||
| entriesToMigrate = entriesToMigrate[len(entriesToMigrate)-maxWideColumnItems:] |
There was a problem hiding this comment.
Do not silently truncate legacy streams during migration
streamWriteBase unconditionally slices legacy streams down to maxWideColumnItems before returning migration elems, which means commands that should be no-ops can drop data. For example, on a pre-migration stream with 150k entries, XTRIM key MAXLEN 200000 will report removed=0 in xtrimTxn but still migrate only the newest 100k entries because this truncation already happened. That is silent data loss on a client-visible no-op path; this should fail explicitly (or require explicit trim) rather than rewriting a truncated stream.
Useful? React with 👍 / 👎.
|
@copilot apply changes based on the comments in this thread and #620 (comment) |
…pute prefix bytes Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/82fca655-7d34-4fa6-9a7e-0cf948cd0de4 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Applied all three changes in commit
|
|
@copilot adapter/redis_compat_commands.go streamWriteBase unconditionally slices legacy streams down to maxWideColumnItems before returning migration elems, which means commands that should be no-ops can drop data. For example, on a pre-migration stream with 150k entries, XTRIM key MAXLEN 200000 will report removed=0 in xtrimTxn but still migrate only the newest 100k entries because this truncation already happened. That is silent data loss on a client-visible no-op path; this should fail explicitly (or require explicit trim) rather than rewriting a truncated stream.adapter/redis_compat_commands.go streamWriteBase unconditionally slices legacy streams down to maxWideColumnItems before returning migration elems, which means commands that should be no-ops can drop data. For example, on a pre-migration stream with 150k entries, XTRIM key MAXLEN 200000 will report removed=0 in xtrimTxn but still migrate only the newest 100k entries because this truncation already happened. That is silent data loss on a client-visible no-op path; this should fail explicitly (or require explicit trim) rather than rewriting a truncated stream. |
…deColumnItems instead of silently truncating Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/74567bcf-65d6-486a-81b5-3e88a5d107f2 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fixed in |
| func xaddEnforceMaxWideColumn(key []byte, currentLength int64, maxLen int) error { | ||
| if maxLen > 0 && maxLen <= maxWideColumnItems { | ||
| return nil | ||
| } | ||
| if currentLength < int64(maxWideColumnItems) { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The MAXLEN handling here treats maxLen <= 0 as “no MAXLEN clause” (unbounded), but the XADD parser accepts MAXLEN 0. With the current representation, an explicit XADD ... MAXLEN 0 ... will be indistinguishable from “no MAXLEN” and will neither trim nor bypass the maxWideColumnItems cap. Consider tracking whether MAXLEN was specified (e.g., hasMaxLen or sentinel -1 for unset) so MAXLEN=0 can be handled deterministically (either trim-to-zero or rejected).
| if requested != "*" { | ||
| requestedID, requestedValid := tryParseRedisStreamID(requested) | ||
| if len(stream.Entries) > 0 && compareParsedRedisStreamID( | ||
| requested, | ||
| requestedID, | ||
| requestedValid, | ||
| stream.Entries[len(stream.Entries)-1].ID, | ||
| stream.Entries[len(stream.Entries)-1].parsedID, | ||
| stream.Entries[len(stream.Entries)-1].parsedIDValid, | ||
| ) <= 0 { | ||
| if !requestedValid { | ||
| return "", errors.New("ERR Invalid stream ID specified as stream command argument") | ||
| } | ||
| if hasLast && compareStreamIDs(requestedID.ms, requestedID.seq, lastMs, lastSeq) <= 0 { | ||
| return "", errors.New("ERR The ID specified in XADD is equal or smaller than the target stream top item") | ||
| } | ||
| return requested, nil | ||
| } |
There was a problem hiding this comment.
nextXAddID allows an explicit requested ID of "0-0" (or any ID <= streamZeroID) when the stream is empty (hasLast=false). Since XREAD uses streamZeroID ("0-0") as the “smallest possible” sentinel and the new-layout scan logic is strictly-after(afterID), an actual entry with ID "0-0" would be unreachable via XREAD ... 0 (and is also invalid in Redis). Consider rejecting requested IDs <= streamZeroID even when the stream has no prior entries.
| // IsStreamMetaKey reports whether the key is a stream metadata key. | ||
| func IsStreamMetaKey(key []byte) bool { | ||
| return bytes.HasPrefix(key, []byte(StreamMetaPrefix)) | ||
| } | ||
|
|
||
| // IsStreamEntryKey reports whether the key is a stream entry key. | ||
| func IsStreamEntryKey(key []byte) bool { | ||
| return bytes.HasPrefix(key, []byte(StreamEntryPrefix)) | ||
| } |
There was a problem hiding this comment.
These helpers convert string constants to []byte on every call (e.g. []byte(StreamMetaPrefix) / []byte(StreamEntryPrefix)), which allocates. Since these predicates are likely used in key-scanning hot paths (KEYS/SCAN filtering, deletion, etc.), consider hoisting the byte-slice prefixes into package-level vars (e.g. var streamMetaPrefixBytes = []byte(StreamMetaPrefix)) and reusing them in HasPrefix/TrimPrefix calls.
| // prefix[len(prefix):]. Delegate to the wide-column scan below | ||
| // which uses StreamMetaScanPrefix(start) to place the user-key | ||
| // lower bound past the length field. |
There was a problem hiding this comment.
This comment refers to StreamMetaScanPrefix(start), but the code below uses streamMetaScanStart(start) (which calls store.StreamMetaKey). Since there is no StreamMetaScanPrefix symbol, the comment is misleading—please update it to match the actual helper name/behavior so future readers don’t go looking for a non-existent function.
| // prefix[len(prefix):]. Delegate to the wide-column scan below | |
| // which uses StreamMetaScanPrefix(start) to place the user-key | |
| // lower bound past the length field. | |
| // prefix[len(prefix):]. Delegate to the wide-column scan below, | |
| // which uses streamMetaScanStart(start) to derive a lower bound | |
| // past the length field using the stream meta key layout. |
Summary
!stream|entry|<userKey><StreamID(16B)>) with a separate meta key (!stream|meta|<userKey>->Length | LastMs | LastSeq); XREAD / XRANGE become bounded range scans that unmarshal only the selected entries, XADD is two small writes, XLEN is a single meta read.elastickv_stream_legacy_format_reads_totalwhen they fall through to the legacy blob; writes convert the legacy blob to the new layout in the same transaction and delete the blob, so XLEN never double-counts.ms || seqso lex order over entry keys matches the numeric(ms, seq)order the client sees; parsing the "ms-seq" string form for range bounds is done once up front to compute the scan bounds.Motivation
Incident 2026-04-24: one client doing 11 XREAD/s on a large stream consumed 14 CPU cores on the leader, starving raft and Lua paths. Per the CPU profile,
proto.Unmarshalalone took 59% of 14 cores becauseloadStreamAtread the entire stream as a single protobuf blob andunmarshalStreamValuere-parsed every entry on every read.Design
Length(8) | LastMs(8) | LastSeq(8)); entry keys embed the StreamID in big-endian binary so a prefix range scan is enough to serve XRANGE / XREAD without loading the whole stream.LastMs/LastSeqtrack the highest ID ever assigned, so XADD*stays monotonic even after XTRIM removes the current tail.countentry keys and emitting Dels in the same txn as the new Put.!stream|entry|…put, emit the new meta, and delete the legacy blob, all in a single coordinator-dispatched transaction. The legacy blob and the new layout never coexist in a committed state.redisStreamKeyis marked Deprecated and kept only for the dual-read fallback.Commands touched
XADD,XREAD,XRANGE,XREVRANGE,XLEN,XTRIM.deleteLogicalKeyElemsalso learns to clean up the new meta + every entry key soDELand overwrite paths stay correct.Test plan
New tests in
adapter/redis_compat_commands_stream_test.go:TestRedis_StreamXAddXReadRoundTrip— XADD then XREAD0returns all entries.TestRedis_StreamXReadLatencyIsConstant— 10 000 XADDs, then 100 XREADs from$must not grow beyond2 * first + 10ms. On the old blob path this failed catastrophically.TestRedis_StreamXTrimMaxLen— XADD 100, XTRIM MAXLEN=10, XLEN==10, XRANGE returns the last 10.TestRedis_StreamXRangeBounds— inclusive / exclusive (() bounds on both sides, plus XREVRANGE.TestRedis_StreamMigrationFromLegacyBlob— seed a legacy blob directly, XREAD bumps the counter, XADD migrates and deletes the blob, subsequent XREAD hits the new layout and does NOT bump the counter, XLEN is original + new (no double count), XADD*is > the pre-migration last ID.TestRedis_StreamAutoIDMonotonicAfterTrim— XTRIM that empties the stream must not rewind XADD*.go build ./... && go vet ./... && go test -short ./adapter/... ./store/...all pass locally.Follow-ups (not in this PR)
redisStreamKeyhelper + legacy blob fallback onceelastickv_stream_legacy_format_reads_totalstays at zero across production nodes for long enough to guarantee no surviving legacy streams.Summary by CodeRabbit
New Features
Tests