Skip to content

fix(storage): serve the write buffer on read so acked records are consumable (read-after-ack)#149

Open
kamir wants to merge 4 commits into
KafScale:mainfrom
kamir:fix/broker-ack-but-lost-v1.6.0
Open

fix(storage): serve the write buffer on read so acked records are consumable (read-after-ack)#149
kamir wants to merge 4 commits into
KafScale:mainfrom
kamir:fix/broker-ack-but-lost-v1.6.0

Conversation

@kamir

@kamir kamir commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Summary

Make an acknowledged-but-not-yet-flushed record consumable through the real
fetch path when the per-acknowledgement flush is disabled
(KAFSCALE_PRODUCE_SYNC_FLUSH=false) or for acks=0 produces. This is
read-after-ack for that non-default configuration.

The change has two parts:

  1. Storage layer: PartitionLog.Read serves the in-memory write buffer in
    addition to flushed segments, so a buffered offset is readable instead of
    returning ErrOffsetOutOfRange.
  2. Broker layer: the fetch handler advertises a high-watermark that includes the
    buffered tail when flush-on-ack is disabled, so a consumer actually requests
    those offsets and the storage fallback is reachable end to end.

Without part 2, part 1 alone is not reachable through a real consumer. See "Why
the storage fix alone is not enough" below.

Scope and the default path

In the default configuration (KAFSCALE_PRODUCE_SYNC_FLUSH=true) every
acknowledged produce is flushed before the ack, the write buffer is empty at
fetch time, and the high-watermark already covers every acknowledged offset.
Both parts of this change are a no-op there. This is a correctness hardening for
the flush-disabled / acks=0 path, and defense-in-depth for that non-default
configuration.

Why the storage fix alone is not enough

A Kafka consumer reads up to the high-watermark the broker advertises. The fetch
handler bounds the read by that watermark: FetchOffset == watermark returns
empty, FetchOffset > watermark returns OFFSET_OUT_OF_RANGE, and only
FetchOffset < watermark reaches PartitionLog.Read. The watermark comes from
the metadata store and advances only on flush. With flush-on-ack disabled and no
threshold tripped, no flush fires, the watermark stays behind the acknowledged
tail, and a consumer never requests the buffered offsets. So serving the buffer
in Read is correct but unreachable through the real fetch path until the
broker also raises the watermark.

The end-to-end broker test TestHandleFetchReadAfterAckFlushDisabled drives the
real fetch path (handleFetch -> wait-for-data -> watermark bound ->
PartitionLog.Read), not a direct Read call. It fails with only the storage
change (watermark stays at 0) and passes once the broker raises the effective
watermark to include the buffered tail.

Durability contract

In flush-disabled / acks=0 mode the watermark raise is non-durable. Buffered
records are readable while the process lives, but they are LOST on a broker
restart: the buffer is in-memory and restore-from-S3 rebuilds from segments
only. The durable high-watermark in the metadata store is left untouched by this
change, so a restart never points the watermark past data that no longer exists.
This is a read-after-ack visibility fix for a running broker, not a durability
fix. The contract is stated in code on PartitionLog.BufferedHighWatermark and
at the fetch-handler call site.

Flush window

prepareFlush drains the buffer and builds a segment artifact, but the segment
is not registered until uploadFlush commits it after the S3 upload. Between
those steps an acknowledged offset was in neither the buffer (drained) nor a
committed segment, so it was briefly unreadable. This PR keeps the drained
batches readable for the duration of that window (cleared on commit and on the
upload-failure reset). Covered by TestPartitionLogFlushWindowOffsetReadable,
which drives the exact window.

maxBytes guard

WriteBuffer.RecordsFrom now treats maxBytes <= 0 as "first matching batch
only" rather than draining the whole buffered tail, so a malformed or zero
PartitionMaxBytes cannot produce an unbounded response. A positive cap still
returns at least the first matching batch in full so a read always makes
progress. Covered by TestWriteBufferRecordsFromMaxBytesGuard.

Observability

A debug log fires when a read is served from the write buffer or from the
in-flight flush batches, so the flush-disabled path is visible in broker logs.

Tests

  • TestHandleFetchReadAfterAckFlushDisabled (cmd/broker): end-to-end
    read-after-ack via the real fetch path under flush-on-ack disabled. Fails
    without the watermark raise.
  • TestHandleFetchDefaultFlushOnAckNoBufferFallback (cmd/broker): in the default
    path the durable watermark already covers every acknowledged offset and the
    watermark raise is a no-op.
  • TestPartitionLogReadAfterAckBeforeFlush, TestPartitionLogMultiFlushAllOffsetsReadable
    (pkg/storage): the buffer fallback and the segment path both serve every
    acknowledged offset.
  • TestPartitionLogFlushWindowOffsetReadable (pkg/storage): readability across
    the prepare/upload flush window.
  • TestWriteBufferRecordsFromMaxBytesGuard, TestWriteBufferDrainEmptiesBuffer
    (pkg/storage): the maxBytes contract and the empty-buffer-after-flush state.
  • go build ./..., go vet, gofmt, and go test ./pkg/storage/... ./cmd/broker/...
    are green.

Relation to the proxy consume fix

This change is broker-side and only affects the flush-disabled / acks=0
configuration. The proxy-side consume fix tracked in #157 addresses the consume
path in the default configuration. The two are independent: this PR is
defense-in-depth for the non-default flush-disabled config and does not change
behavior in the default path.

Note on the branch name

The branch name predates this corrected framing and overstates the effect. An
earlier draft of this PR attributed a large "acknowledged but unreadable at
volume" effect to a data-loss path. That was wrong: it traced to a test that
produced one record per produce request against the flush-on-ack broker (one
record per segment, expensive to read back one segment at a time, so the
consumer hit its read deadline after a fraction). The data was durable and
complete; a batched producer round-trips byte-clean. This PR is scoped to the
read-after-ack consistency for the flush-disabled / acks=0 path described
above and the test comments have been corrected accordingly.

Scalytics added 3 commits June 4, 2026 16:48
AppendBatch returns an AppendResult (the produce ACK basis) as soon as the
batch is buffered, but flush-to-segment only happens when a WriteBuffer
threshold trips, and flushing is evaluated only inside AppendBatch (no
background flusher). Read serves flushed segments only and returns
ErrOffsetOutOfRange for buffered offsets. So a just-acked record whose
partition then goes quiet stays unreadable (and is lost on broker restart,
since the buffer is in-memory), violating Kafka read-after-ack.

This test appends 10 batches with flush thresholds set so nothing flushes,
then asserts every acked offset is readable. It FAILS on the current code
(offset 0 -> ErrOffsetOutOfRange) and must pass once Read serves the buffer
(or produce flushes before acking under acks=all).

Existing tests use MaxBytes:1 so every append flushes immediately, which is
why this path was never exercised.

Refs: scalytics UPSTREAM/2026-06-04-kafscale-consume-readpath.md
…sumable

PartitionLog.Read served only flushed segments and returned ErrOffsetOutOfRange
for any offset still in the in-memory WriteBuffer. Because flush is
append-triggered (ShouldFlush is evaluated only inside AppendBatch; there is no
background flusher), a partition that goes quiet below the flush threshold keeps
its just-acked tail in the buffer, where it was unreadable — breaking Kafka's
read-after-ack contract (observed end-to-end as 1015 acked -> 588 readable on
v1.6.0).

Read now falls back to the buffer when the offset is not in a flushed segment:
new WriteBuffer.RecordsFrom(offset, maxBytes) returns the buffered batch bytes
for the requested offset onward, non-destructively. The fetch handler
(cmd/broker fetch -> plog.Read) picks this up unchanged.

Makes TestPartitionLogReadAfterAckBeforeFlush pass; full pkg/storage, pkg/broker
and cmd/broker suites stay green.

Note: this fixes READABILITY (read-after-ack). Durability-on-restart is separate
— the buffer is still in-memory, so acked-but-unflushed records are lost if the
broker restarts before flush. A complete acks=all guarantee additionally needs
flush-before-ack or a WAL; tracked separately.

Refs: scalytics UPSTREAM/2026-06-04-kafscale-consume-readpath.md
…ross rotations

Appends 30 batches with MaxBatches=3 (frequent flush rotations) and asserts every
acked offset stays readable. Passes over MemoryS3, isolating the end-to-end
'1019 acked -> ~32 readable' loss OUT of the pkg/storage state machine (no segment
overwrite, no drop across rotations). The live loss is therefore in the real S3
client / proxy fetch-forward / concurrency, not the storage logic.
…n flush-disabled mode

The storage-layer change in this branch makes PartitionLog.Read serve the
in-memory write buffer, but a real consumer never reaches those offsets: the
fetch handler bounds reads at the high-watermark from the metadata store, which
advances only on flush. With KAFSCALE_PRODUCE_SYNC_FLUSH=false (or acks=0) and
no flush threshold tripped, the watermark stays behind the acknowledged tail, so
the buffer fallback was unreachable end to end.

Add PartitionLog.BufferedHighWatermark and, gated on flushOnAck=false, raise the
effective fetch watermark to include the buffered tail. Acknowledged records are
now consumable end to end in flush-disabled mode. The raise is non-durable: those
records are lost on restart (in-memory buffer; restore is segment-only). It does
not touch the durable metadata-store offset and is a no-op in the default
flushOnAck=true path (buffer empty at fetch time).

Also:
- Keep batches drained by prepareFlush readable until uploadFlush commits the
  segment, closing a window where an acknowledged offset was in neither the
  buffer nor a committed segment.
- Make RecordsFrom maxBytes<=0 mean "first matching batch only" so a malformed
  or zero PartitionMaxBytes cannot produce an unbounded response.
- Debug log when a read is served from the buffer or the in-flight flush batches.

Tests:
- cmd/broker: end-to-end read-after-ack via the real fetch path under
  flushOnAck=false (fails without the watermark raise); no-op-in-default test.
- pkg/storage: flush-window readability, maxBytes guard, drain-empties-buffer.

Scrub the retracted volume data-loss numbers from the two storage test comments;
reframe them as read-after-ack at the storage layer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant