diff --git a/cmd/broker/main.go b/cmd/broker/main.go index d999ea0..f713fd3 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -1820,6 +1820,22 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade continue } nextOffset, offsetErr := h.waitForFetchData(ctx, topicName, part.Partition, part.FetchOffset, maxWait) + // In flush-disabled / acks=0 mode (KAFSCALE_PRODUCE_SYNC_FLUSH=false) + // no flush fires for the buffered tail, so the metadata store's + // high-watermark (nextOffset) lags the records already acknowledged to + // the producer. The fetch handler bounds reads at the watermark, so a + // consumer would otherwise never request those acknowledged offsets. + // Raise the effective watermark to include the in-memory buffered tail + // so acknowledged records are consumable (read-after-ack). This is a + // non-durable visibility raise: those buffered records are lost on a + // broker restart (the buffer is in-memory; restore is segment-only). In + // the default flushOnAck=true path every acknowledged produce is already + // flushed, the buffer is empty at fetch time, and this is a no-op. + if !h.flushOnAck && offsetErr == nil { + if buffered := plog.BufferedHighWatermark(); buffered > nextOffset { + nextOffset = buffered + } + } if offsetErr != nil { if errors.Is(offsetErr, metadata.ErrUnknownTopic) { p := kmsg.NewFetchResponseTopicPartition() diff --git a/cmd/broker/readafterack_fetch_test.go b/cmd/broker/readafterack_fetch_test.go new file mode 100644 index 0000000..93b642b --- /dev/null +++ b/cmd/broker/readafterack_fetch_test.go @@ -0,0 +1,227 @@ +// Copyright 2025 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kmsg" + + "github.com/KafScale/platform/pkg/metadata" + "github.com/KafScale/platform/pkg/protocol" + "github.com/KafScale/platform/pkg/storage" +) + +// newFlushDisabledHandler builds a handler whose per-acknowledgement flush is +// turned off and whose buffer thresholds never trip, so an acknowledged produce +// stays in the in-memory write buffer instead of being written to a segment. +// This is the KAFSCALE_PRODUCE_SYNC_FLUSH=false shape, reproduced without env. +func newFlushDisabledHandler(store metadata.Store) *handler { + h := newTestHandler(store) + h.flushOnAck = false + // Thresholds that never trip: no size flush (1 GiB), no time flush. + h.logConfig.Buffer = storage.WriteBufferConfig{ + MaxBytes: 1 << 30, + FlushInterval: 0, + } + return h +} + +// TestHandleFetchReadAfterAckFlushDisabled drives the REAL fetch path +// (handleFetch -> waitForFetchData -> high-watermark bound -> PartitionLog.Read), +// not a direct PartitionLog.Read call. It produces with acks=-1 under +// flushOnAck=false (nothing flushes), then fetches the just-acknowledged offset. +// +// A Kafka client reads up to the high-watermark the broker advertises. The fetch +// handler bounds the read by that watermark: FetchOffset == watermark returns +// empty, FetchOffset > watermark returns OFFSET_OUT_OF_RANGE, and only +// FetchOffset < watermark reaches PartitionLog.Read. The watermark comes from the +// metadata store and advances on flush. With flushOnAck=false and no threshold +// tripped, no flush fires, so the watermark stays at 0 and the acknowledged +// record is never requested through the real path. The storage-layer buffer +// fallback (PartitionLog.Read serving the buffer) is therefore not reachable +// end-to-end unless the broker also makes the buffered tail visible in the +// high-watermark it advertises on fetch. +// +// This test asserts the corrected end-to-end behavior: with flushOnAck=false the +// broker advertises a watermark that includes the buffered tail, so the +// acknowledged record is consumable. It fails when the watermark ignores the +// buffer. +func TestHandleFetchReadAfterAckFlushDisabled(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + h := newFlushDisabledHandler(store) + + const messages = 5 + for i := 0; i < messages; i++ { + produceReq := &kmsg.ProduceRequest{ + Acks: -1, + TimeoutMillis: 1000, + Topics: []kmsg.ProduceRequestTopic{ + { + Topic: "orders", + Partitions: []kmsg.ProduceRequestTopicPartition{ + { + Partition: 0, + Records: testBatchBytes(0, 0, 1), + }, + }, + }, + }, + } + if _, err := h.handleProduce(context.Background(), &protocol.RequestHeader{CorrelationID: int32(i + 1)}, produceReq); err != nil { + t.Fatalf("handleProduce %d: %v", i, err) + } + } + + // Nothing should have flushed: the metadata-store offset (durable + // high-watermark) is still 0. This is the precondition that makes the bug + // observable: without the fix, the fetch path bounds reads at this 0. + durable, err := store.NextOffset(context.Background(), "orders", 0) + if err != nil { + t.Fatalf("NextOffset: %v", err) + } + if durable != 0 { + t.Fatalf("precondition: expected durable high-watermark 0 (nothing flushed), got %d", durable) + } + + fetchReq := &kmsg.FetchRequest{ + MaxWaitMillis: 50, + Topics: []kmsg.FetchRequestTopic{ + { + Topic: "orders", + Partitions: []kmsg.FetchRequestTopicPartition{ + { + Partition: 0, + FetchOffset: 0, + PartitionMaxBytes: 1 << 20, + }, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := h.handleFetch(ctx, &protocol.RequestHeader{CorrelationID: 100, APIVersion: 11}, fetchReq) + if err != nil { + t.Fatalf("handleFetch: %v", err) + } + fetchResp := decodeKmsgResponse(t, 11, resp, kmsg.NewPtrFetchResponse) + if len(fetchResp.Topics) == 0 || len(fetchResp.Topics[0].Partitions) == 0 { + t.Fatalf("expected a topic partition in the fetch response") + } + p := fetchResp.Topics[0].Partitions[0] + if p.ErrorCode != 0 { + t.Fatalf("fetch returned error code %d for an acknowledged offset", p.ErrorCode) + } + if p.HighWatermark < int64(messages) { + t.Fatalf("read-after-ack: high-watermark %d does not include the %d acknowledged-but-buffered records; "+ + "a consumer bounded by this watermark never requests them", p.HighWatermark, messages) + } + if len(p.RecordBatches) == 0 { + t.Fatalf("read-after-ack: fetch at offset 0 returned no records for %d acknowledged-but-buffered messages", messages) + } +} + +// TestHandleFetchDefaultFlushOnAckNoBufferFallback backs the no-op-in-default +// claim. In the default configuration (KAFSCALE_PRODUCE_SYNC_FLUSH=true) every +// acknowledged produce is flushed before the ack, so the write buffer is empty +// at read time, the durable high-watermark already covers every acknowledged +// offset, and the flush-disabled visibility raise added for read-after-ack is a +// no-op (it only fires when flushOnAck is false). This asserts the buffer is +// empty after the acks and that the durable watermark already includes them, so +// the buffer fallback is never the source in the default path. +func TestHandleFetchDefaultFlushOnAckNoBufferFallback(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + h := newTestHandler(store) // flushOnAck defaults to true + + if !h.flushOnAck { + t.Fatalf("precondition: expected default flushOnAck=true") + } + + const messages = 5 + for i := 0; i < messages; i++ { + produceReq := &kmsg.ProduceRequest{ + Acks: -1, + TimeoutMillis: 1000, + Topics: []kmsg.ProduceRequestTopic{ + { + Topic: "orders", + Partitions: []kmsg.ProduceRequestTopicPartition{ + { + Partition: 0, + Records: testBatchBytes(0, 0, 1), + }, + }, + }, + }, + } + if _, err := h.handleProduce(context.Background(), &protocol.RequestHeader{CorrelationID: int32(i + 1)}, produceReq); err != nil { + t.Fatalf("handleProduce %d: %v", i, err) + } + } + + // Every ack flushed: the durable high-watermark already covers all messages. + durable, err := store.NextOffset(context.Background(), "orders", 0) + if err != nil { + t.Fatalf("NextOffset: %v", err) + } + if durable != int64(messages) { + t.Fatalf("default path: expected durable high-watermark %d (every ack flushed), got %d", messages, durable) + } + + // The flush-disabled visibility raise is a no-op here: the buffered + // high-watermark equals the durable one, which means there is no buffered + // tail beyond what is already flushed, so the buffer fallback can never be + // the source of a read in the default path. (The storage-layer assertion + // that the buffer itself is empty after a flush lives in + // pkg/storage: TestWriteBufferDrainEmptiesBuffer.) + plog, err := h.getPartitionLog(context.Background(), "orders", 0) + if err != nil { + t.Fatalf("getPartitionLog: %v", err) + } + if buffered := plog.BufferedHighWatermark(); buffered != durable { + t.Fatalf("default path: buffered high-watermark %d should equal durable %d (visibility raise must be a no-op)", buffered, durable) + } + + // And the records are consumable end-to-end (via segments, not the buffer). + fetchReq := &kmsg.FetchRequest{ + MaxWaitMillis: 50, + Topics: []kmsg.FetchRequestTopic{ + { + Topic: "orders", + Partitions: []kmsg.FetchRequestTopicPartition{ + {Partition: 0, FetchOffset: 0, PartitionMaxBytes: 1 << 20}, + }, + }, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := h.handleFetch(ctx, &protocol.RequestHeader{CorrelationID: 200, APIVersion: 11}, fetchReq) + if err != nil { + t.Fatalf("handleFetch: %v", err) + } + fetchResp := decodeKmsgResponse(t, 11, resp, kmsg.NewPtrFetchResponse) + if len(fetchResp.Topics) == 0 || len(fetchResp.Topics[0].Partitions) == 0 { + t.Fatalf("expected a topic partition in the fetch response") + } + if got := fetchResp.Topics[0].Partitions[0]; len(got.RecordBatches) == 0 { + t.Fatalf("default path: fetch returned no records (should be served from segments)") + } +} diff --git a/pkg/storage/buffer.go b/pkg/storage/buffer.go index 3a394c0..21fb699 100644 --- a/pkg/storage/buffer.go +++ b/pkg/storage/buffer.go @@ -90,6 +90,54 @@ func (b *WriteBuffer) Drain() []RecordBatch { return drained } +// RecordsFrom returns the raw bytes of buffered batches needed to serve a read +// starting at offset, concatenated, non-destructively. A batch is included when +// its last offset (BaseOffset+LastOffsetDelta) is >= offset, i.e. the batch that +// contains the requested offset plus every batch after it. +// +// maxBytes caps the result. The first matching batch is always returned in full +// even if it alone exceeds maxBytes, so a read can always make progress (this +// mirrors how Kafka returns at least one batch regardless of fetch.max.bytes). +// maxBytes <= 0 is treated as "first matching batch only": a non-positive cap +// returns a single bounded unit rather than draining the whole buffered tail, so +// a malformed or zero PartitionMaxBytes can never produce an unbounded response. +// +// Returns nil when no buffered batch reaches offset. +// +// This makes acked-but-not-yet-flushed records readable (Kafka read-after-ack): +// flush is append-triggered, so a partition that goes quiet leaves its tail in +// the buffer; without this the fetch path (segments only) returns +// ErrOffsetOutOfRange for those acked offsets. +func (b *WriteBuffer) RecordsFrom(offset int64, maxBytes int32) []byte { + b.mu.Lock() + defer b.mu.Unlock() + return recordsFromBatches(b.batches, offset, maxBytes) +} + +// recordsFromBatches concatenates the bytes of the batches needed to serve a +// read starting at offset. It implements the RecordsFrom contract and is shared +// with PartitionLog.Read so the live buffer and the in-flight (mid-flush) +// batches are served identically. Callers hold the lock that guards batches. +func recordsFromBatches(batches []RecordBatch, offset int64, maxBytes int32) []byte { + var out []byte + for i := range batches { + batch := batches[i] + if batch.BaseOffset+int64(batch.LastOffsetDelta) < offset { + continue + } + if len(out) > 0 { + // A first matching batch is already included. Stop unless this batch + // still fits under a positive cap. maxBytes <= 0 stops here, so the + // result is just the first matching batch. + if maxBytes <= 0 || len(out)+len(batch.Bytes) > int(maxBytes) { + break + } + } + out = append(out, batch.Bytes...) + } + return out +} + // Size returns the accumulated byte count (for tests/metrics). func (b *WriteBuffer) Size() int { b.mu.Lock() diff --git a/pkg/storage/buffer_recordsfrom_test.go b/pkg/storage/buffer_recordsfrom_test.go new file mode 100644 index 0000000..69a4cd3 --- /dev/null +++ b/pkg/storage/buffer_recordsfrom_test.go @@ -0,0 +1,116 @@ +// Copyright 2025 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import "testing" + +// bufferWithBatches appends n single-record batches of the given size, assigning +// sequential base offsets, and returns the buffer plus the per-batch byte size. +func bufferWithBatches(t *testing.T, n int, payload int) (*WriteBuffer, int) { + t.Helper() + b := NewWriteBuffer(WriteBufferConfig{MaxBytes: 1 << 30}) + var batchLen int + for i := 0; i < n; i++ { + batch, err := NewRecordBatchFromBytes(make([]byte, payload)) + if err != nil { + t.Fatalf("NewRecordBatchFromBytes: %v", err) + } + PatchRecordBatchBaseOffset(&batch, int64(i)) + batchLen = len(batch.Bytes) + b.Append(batch) + } + return b, batchLen +} + +// TestWriteBufferRecordsFromMaxBytesGuard pins the maxBytes contract, including +// the non-positive case, so a malformed or zero PartitionMaxBytes can never +// produce an unbounded response and a read always makes progress. +func TestWriteBufferRecordsFromMaxBytesGuard(t *testing.T) { + const n = 5 + b, batchLen := bufferWithBatches(t, n, 70) + allLen := n * batchLen + + t.Run("positive cap stops after the budget but returns at least one batch", func(t *testing.T) { + // A cap smaller than a single batch still returns exactly one batch. + got := b.RecordsFrom(0, 1) + if len(got) != batchLen { + t.Fatalf("maxBytes=1: expected first batch in full (%d bytes), got %d", batchLen, len(got)) + } + // A cap that fits two batches returns two, not more. + got = b.RecordsFrom(0, int32(2*batchLen)) + if len(got) != 2*batchLen { + t.Fatalf("maxBytes=2*batch: expected %d bytes, got %d", 2*batchLen, len(got)) + } + }) + + t.Run("maxBytes zero returns the first matching batch only", func(t *testing.T) { + got := b.RecordsFrom(0, 0) + if len(got) != batchLen { + t.Fatalf("maxBytes=0: expected first matching batch only (%d bytes), got %d", batchLen, len(got)) + } + }) + + t.Run("maxBytes negative returns the first matching batch only", func(t *testing.T) { + got := b.RecordsFrom(0, -1) + if len(got) != batchLen { + t.Fatalf("maxBytes<0: expected first matching batch only (%d bytes), got %d", batchLen, len(got)) + } + }) + + t.Run("large positive cap returns the whole matching tail", func(t *testing.T) { + got := b.RecordsFrom(0, 1<<20) + if len(got) != allLen { + t.Fatalf("large cap: expected whole tail (%d bytes), got %d", allLen, len(got)) + } + }) + + t.Run("offset past the tail returns nil", func(t *testing.T) { + if got := b.RecordsFrom(int64(n), 1<<20); got != nil { + t.Fatalf("offset past tail: expected nil, got %d bytes", len(got)) + } + }) + + t.Run("offset mid-tail starts at the matching batch", func(t *testing.T) { + got := b.RecordsFrom(2, 1<<20) + if len(got) != (n-2)*batchLen { + t.Fatalf("mid-tail: expected %d bytes from offset 2, got %d", (n-2)*batchLen, len(got)) + } + }) +} + +// TestWriteBufferDrainEmptiesBuffer backs the broker-side no-op-in-default claim +// at the storage layer: after a flush drains the buffer, RecordsFrom serves +// nothing, so the buffer fallback can never be the source of a read once the +// data is in a segment. In the default flushOnAck=true path every ack drains the +// buffer immediately, so this is the post-flush state at read time. +func TestWriteBufferDrainEmptiesBuffer(t *testing.T) { + b, _ := bufferWithBatches(t, 3, 70) + if got := b.RecordsFrom(0, 1<<20); len(got) == 0 { + t.Fatalf("precondition: expected buffered bytes before drain") + } + drained := b.Drain() + if len(drained) != 3 { + t.Fatalf("expected 3 drained batches, got %d", len(drained)) + } + for off := int64(0); off < 3; off++ { + if got := b.RecordsFrom(off, 1<<20); got != nil { + t.Fatalf("post-drain: buffer fallback served offset %d (%d bytes); buffer must be empty after a flush", off, len(got)) + } + } + if b.Size() != 0 { + t.Fatalf("post-drain: expected buffer size 0, got %d", b.Size()) + } +} diff --git a/pkg/storage/flushwindow_test.go b/pkg/storage/flushwindow_test.go new file mode 100644 index 0000000..b8a939c --- /dev/null +++ b/pkg/storage/flushwindow_test.go @@ -0,0 +1,97 @@ +// Copyright 2025 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "testing" + + "github.com/KafScale/platform/pkg/cache" +) + +// TestPartitionLogFlushWindowOffsetReadable documents and guards the flush +// window: prepareFlush drains the buffer and builds a segment artifact, but the +// segment is not registered in l.segments until uploadFlush commits it after the +// S3 upload completes. Between those two steps an acknowledged offset is in +// neither the buffer (drained) nor a committed segment. +// +// This test drives that exact window by calling prepareFlush directly and then +// reading a drained offset before uploadFlush commits. It asserts the drained +// batches stay readable during the window so an in-flight flush never makes an +// acknowledged record briefly unreadable. +func TestPartitionLogFlushWindowOffsetReadable(t *testing.T) { + s3 := NewMemoryS3Client() + c := cache.NewSegmentCache(1 << 20) + // Thresholds that never auto-trip; we drive prepareFlush by hand to land + // precisely in the window. + log := NewPartitionLog("default", "orders", 0, 0, s3, c, PartitionLogConfig{ + Buffer: WriteBufferConfig{MaxBytes: 1 << 30}, + Segment: SegmentWriterConfig{IndexIntervalMessages: 1}, + }, nil, nil, nil) + + const n = 4 + for i := 0; i < n; i++ { + batch, err := NewRecordBatchFromBytes(make([]byte, 70)) + if err != nil { + t.Fatalf("NewRecordBatchFromBytes: %v", err) + } + if _, err := log.AppendBatch(context.Background(), batch); err != nil { + t.Fatalf("AppendBatch %d: %v", i, err) + } + } + + // Enter the flush window: drain the buffer into an artifact, mark flushing, + // but do NOT yet commit the segment to l.segments (that is uploadFlush's job). + log.mu.Lock() + artifact, err := log.prepareFlush() + log.mu.Unlock() + if err != nil { + t.Fatalf("prepareFlush: %v", err) + } + if artifact == nil { + t.Fatalf("prepareFlush returned no artifact; nothing was buffered") + } + + // During the window every acknowledged offset must still be readable. + for off := int64(0); off < n; off++ { + data, err := log.Read(context.Background(), off, 1<<20) + if err != nil { + if errors.Is(err, ErrOffsetOutOfRange) { + t.Fatalf("flush window: acknowledged offset %d is unreadable while a flush is in flight "+ + "(drained from buffer, segment not yet committed)", off) + } + t.Fatalf("Read offset %d during flush window: %v", off, err) + } + if len(data) == 0 { + t.Fatalf("flush window: Read offset %d returned no data while a flush is in flight", off) + } + } + + // Complete the flush; the offsets must remain readable afterwards. + if err := log.uploadFlush(context.Background(), artifact); err != nil { + t.Fatalf("uploadFlush: %v", err) + } + for off := int64(0); off < n; off++ { + data, err := log.Read(context.Background(), off, 1<<20) + if err != nil { + t.Fatalf("Read offset %d after flush committed: %v", off, err) + } + if len(data) == 0 { + t.Fatalf("Read offset %d after flush committed returned no data", off) + } + } +} diff --git a/pkg/storage/log.go b/pkg/storage/log.go index 4556c61..115fa24 100644 --- a/pkg/storage/log.go +++ b/pkg/storage/log.go @@ -60,6 +60,13 @@ type PartitionLog struct { flushCond *sync.Cond s3sem *semaphore.Weighted flushing bool + // flushingBatches holds the batches drained by prepareFlush but not yet + // committed to a segment by uploadFlush. During that window an acknowledged + // offset is in neither the live buffer (drained) nor l.segments (not yet + // registered); Read consults this slice so the record stays readable. Set + // under l.mu by prepareFlush, cleared under l.mu by uploadFlush on commit or + // on the upload-failure reset. + flushingBatches []RecordBatch } type segmentRange struct { @@ -261,6 +268,23 @@ func (l *PartitionLog) AppendBatch(ctx context.Context, batch RecordBatch) (*App return result, nil } +// BufferedHighWatermark returns the in-memory high-watermark: one past the last +// offset this log has assigned (whether that offset is in a flushed segment or +// still in the write buffer). It is NOT durable. The flushed/durable +// high-watermark is the metadata store's NextOffset, advanced only on flush. +// +// In flush-disabled / acks=0 mode no flush fires for the buffered tail, so the +// durable watermark lags. The fetch handler uses this value, gated on +// flushOnAck=false, to make acknowledged-but-buffered records consumable while +// the process lives. These records are LOST on restart: the buffer is in-memory +// and RestoreFromS3 rebuilds from segments only. This is a read-after-ack +// visibility helper for the non-default config, not a durability guarantee. +func (l *PartitionLog) BufferedHighWatermark() int64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.nextOffset +} + // EarliestOffset returns the lowest offset available in the log. func (l *PartitionLog) EarliestOffset() int64 { l.mu.Lock() @@ -329,6 +353,10 @@ func (l *PartitionLog) prepareFlush() (*SegmentArtifact, error) { return nil, fmt.Errorf("build segment: %w", err) } l.flushing = true + // Keep the drained batches readable until uploadFlush commits the segment to + // l.segments. Without this, an acknowledged offset is unreadable for the + // duration of the S3 upload (in neither the buffer nor a committed segment). + l.flushingBatches = batches return artifact, nil } @@ -366,6 +394,7 @@ func (l *PartitionLog) uploadFlush(ctx context.Context, artifact *SegmentArtifac if err := g.Wait(); err != nil { l.mu.Lock() l.flushing = false + l.flushingBatches = nil l.flushCond.Broadcast() l.mu.Unlock() return err @@ -385,6 +414,8 @@ func (l *PartitionLog) uploadFlush(ctx context.Context, artifact *SegmentArtifac l.indexEntries[artifact.BaseOffset] = artifact.RelativeIndex } l.flushing = false + // The segment is now in l.segments; the in-flight copy is no longer needed. + l.flushingBatches = nil l.flushCond.Broadcast() lastSegIdx := len(l.segments) - 1 l.mu.Unlock() @@ -461,6 +492,29 @@ func (l *PartitionLog) Read(ctx context.Context, offset int64, maxBytes int32) ( l.mu.Unlock() if !found { + // Not in any flushed segment. The offset may still be in the in-memory + // write buffer (acked but not yet flushed: flush is append-triggered, so + // a partition that goes quiet below the flush threshold keeps its tail + // buffered). Serve it so acked records are consumable (Kafka + // read-after-ack) instead of returning ErrOffsetOutOfRange. + if body := l.buffer.RecordsFrom(offset, maxBytes); len(body) > 0 { + // Observability for the flush-disabled / acks=0 path: this read was + // served from the in-memory buffer, not a durable segment. + l.logger().Debug("read served from write buffer (acked-but-unflushed)", + "topic", l.topic, "partition", l.partition, "offset", offset, "bytes", len(body)) + return body, nil + } + // Or the offset may be mid-flush: prepareFlush drained it from the buffer + // but uploadFlush has not yet committed the segment to l.segments. Serve + // the in-flight batches so the record stays readable across that window. + l.mu.Lock() + body := recordsFromBatches(l.flushingBatches, offset, maxBytes) + l.mu.Unlock() + if len(body) > 0 { + l.logger().Debug("read served from in-flight flush batches (flush window)", + "topic", l.topic, "partition", l.partition, "offset", offset, "bytes", len(body)) + return body, nil + } return nil, ErrOffsetOutOfRange } diff --git a/pkg/storage/multiflush_test.go b/pkg/storage/multiflush_test.go new file mode 100644 index 0000000..0e401f1 --- /dev/null +++ b/pkg/storage/multiflush_test.go @@ -0,0 +1,80 @@ +// Copyright 2025 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "testing" + + "github.com/KafScale/platform/pkg/cache" +) + +// TestPartitionLogMultiFlushAllOffsetsReadable appends many batches across many +// flush rotations (MaxBatches=3 -> a flush roughly every 3 appends) and asserts +// that EVERY acknowledged offset is still readable afterwards: the offsets that +// rotated into segments via the flushed-segment path, and the buffered tail via +// the write-buffer fallback. It checks this in pure storage logic over a +// MemoryS3 backend (no network/S3 flakiness). +// +// If segments overwrite each other, are not registered in l.segments, the flush +// path drops drained batches, or the buffered tail is not served, mid-stream +// offsets become ErrOffsetOutOfRange and this test fails. +func TestPartitionLogMultiFlushAllOffsetsReadable(t *testing.T) { + s3 := NewMemoryS3Client() + c := cache.NewSegmentCache(1 << 20) + log := NewPartitionLog("default", "orders", 0, 0, s3, c, PartitionLogConfig{ + Buffer: WriteBufferConfig{ + MaxBatches: 3, // force frequent flush rotations + }, + Segment: SegmentWriterConfig{IndexIntervalMessages: 1}, + CacheEnabled: true, + }, nil, nil, nil) + + const n = 30 // ~10 flush rotations + a buffered tail + for i := 0; i < n; i++ { + batch, err := NewRecordBatchFromBytes(make([]byte, 70)) + if err != nil { + t.Fatalf("NewRecordBatchFromBytes: %v", err) + } + res, err := log.AppendBatch(context.Background(), batch) + if err != nil { + t.Fatalf("AppendBatch %d: %v", i, err) + } + if res.BaseOffset != int64(i) { + t.Fatalf("append %d: expected base offset %d, got %d", i, i, res.BaseOffset) + } + } + + missing := []int64{} + for off := int64(0); off < n; off++ { + data, err := log.Read(context.Background(), off, 1<<20) + if err != nil { + if errors.Is(err, ErrOffsetOutOfRange) { + missing = append(missing, off) + continue + } + t.Fatalf("Read offset %d: %v", off, err) + } + if len(data) == 0 { + missing = append(missing, off) + } + } + if len(missing) > 0 { + t.Fatalf("read-after-ack across flush rotations: %d/%d acknowledged offsets unreadable: %v", + len(missing), n, missing) + } +} diff --git a/pkg/storage/readafterack_test.go b/pkg/storage/readafterack_test.go new file mode 100644 index 0000000..664ef2d --- /dev/null +++ b/pkg/storage/readafterack_test.go @@ -0,0 +1,101 @@ +// Copyright 2025 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "testing" + + "github.com/KafScale/platform/pkg/cache" +) + +// TestPartitionLogReadAfterAckBeforeFlush exercises read-after-ack at the +// storage layer: an acknowledged record that is still in the in-memory write +// buffer (not yet flushed to a segment) must be readable through +// PartitionLog.Read. +// +// AppendBatch returns an AppendResult (with assigned offsets) as soon as the +// batch is buffered; that return value is the basis for ACKing the produce to +// the client. Flushing to a segment happens only when a WriteBuffer threshold +// trips (MaxBytes/MaxMessages/MaxBatches/FlushInterval), evaluated inside +// AppendBatch (there is no background flusher). Before this change Read served +// only flushed segments and returned ErrOffsetOutOfRange for anything still in +// the buffer. +// +// Consequence at the storage layer: when produce to a partition stops below the +// flush threshold, the just-acknowledged tail stays in the buffer and Read +// returns ErrOffsetOutOfRange for it. This test asserts Read serves that +// buffered tail. +// +// Scope: this is the storage-layer half of read-after-ack. Whether a real Kafka +// consumer reaches these offsets through the fetch path additionally depends on +// the broker advertising a high-watermark that includes the buffered tail; that +// end-to-end path is covered by the broker test +// TestHandleFetchReadAfterAckFlushDisabled. The buffer is in-memory, so these +// records are readable while the process lives but lost on restart; this is not +// a durability guarantee. +// +// Existing tests use MaxBytes:1 so every append flushes immediately, which is +// why they never exercised this path. +func TestPartitionLogReadAfterAckBeforeFlush(t *testing.T) { + s3 := NewMemoryS3Client() + c := cache.NewSegmentCache(1024) + // Thresholds set so NOTHING auto-flushes: records stay in the buffer. + log := NewPartitionLog("default", "orders", 0, 0, s3, c, PartitionLogConfig{ + Buffer: WriteBufferConfig{ + MaxBytes: 1 << 30, // 1 GiB; never tripped by this test + MaxMessages: 0, + MaxBatches: 0, + FlushInterval: 0, // time-based flush disabled + }, + Segment: SegmentWriterConfig{IndexIntervalMessages: 1}, + }, nil, nil, nil) + + const n = 10 + for i := 0; i < n; i++ { + batch, err := NewRecordBatchFromBytes(make([]byte, 70)) + if err != nil { + t.Fatalf("NewRecordBatchFromBytes: %v", err) + } + res, err := log.AppendBatch(context.Background(), batch) + if err != nil { + t.Fatalf("AppendBatch %d: %v", i, err) + } + // A non-error AppendResult is the basis for ACKing this offset to the + // client: from the producer's point of view, offset res.BaseOffset is + // now committed. + if res.BaseOffset != int64(i) { + t.Fatalf("append %d: expected base offset %d, got %d", i, i, res.BaseOffset) + } + } + + // Every acked offset MUST be readable. On v1.6.0 nothing flushed (no + // threshold tripped), Read finds no segment and returns ErrOffsetOutOfRange. + for off := int64(0); off < n; off++ { + data, err := log.Read(context.Background(), off, 1<<20) + if err != nil { + if errors.Is(err, ErrOffsetOutOfRange) { + t.Fatalf("acks-but-unreadable: acked offset %d is not consumable "+ + "(buffered, not flushed; Read serves only segments): %v", off, err) + } + t.Fatalf("Read acked offset %d: %v", off, err) + } + if len(data) == 0 { + t.Fatalf("Read acked offset %d returned no data", off) + } + } +}