Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions writers/batchwriter/batchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m
limit.AddSlice(add)
}
if len(toFlush) > 0 || rest != nil || limit.ReachedLimit() {
// flush current batch
send()
if limit.Rows() > 0 {
send()
}
Comment thread
bbernays marked this conversation as resolved.
ticker.Reset(w.batchTimeout)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go inside the if as well?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or have batch.SliceRecord return toFlush = 0?

}
for _, sliceToFlush := range toFlush {
Expand Down
51 changes: 51 additions & 0 deletions writers/batchwriter/batchwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,57 @@ func TestBatchUpserts(t *testing.T) {
}
}

// noEmptyBatchClient wraps testBatchClient and fails the test if WriteTableBatch
// is called with an empty messages slice.
type noEmptyBatchClient struct {
testBatchClient
t *testing.T
}

func (c *noEmptyBatchClient) WriteTableBatch(ctx context.Context, name string, messages message.WriteInserts) error {
if len(messages) == 0 {
// Use t.Error (not t.Fatal) because this may be called from a worker goroutine.
c.t.Error("WriteTableBatch called with empty messages slice")
return nil
}
return c.testBatchClient.WriteTableBatch(ctx, name, messages)
}

// TestBatchNoEmptyFlush is a regression test ensuring WriteTableBatch is never called with
// an empty messages slice. This can happen when batchSizeBytes is so small that no row fits
// in the initial batch (SliceRecord returns add==nil while toFlush is non-empty), which
// previously caused send() to call WriteTableBatch with an empty resources slice.
func TestBatchNoEmptyFlush(t *testing.T) {
ctx := context.Background()

testClient := &noEmptyBatchClient{t: t}
// batchSizeBytes=1 ensures that no single row fits in the initial batch:
// SliceRecord returns add==nil, toFlush=[one record per row], rest=nil.
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment describing the WithBatchSizeBytes(1) scenario says SliceRecord returns ... rest=nil, but internal/batch.SliceRecord currently returns a non-nil remaining when the byte cap is too small (because getToFlush slices to single-row batches without clearing the original record). Please update this comment to reflect the actual SliceRecord behavior so future readers aren’t misled about why this regression case triggers.

Suggested change
// SliceRecord returns add==nil, toFlush=[one record per row], rest=nil.
// SliceRecord returns add==nil, toFlush=[one record per row], and a non-nil remaining
// record for the unsliced original batch.

Copilot uses AI. Check for mistakes.
wr, err := New(testClient, WithBatchSizeBytes(1))
if err != nil {
t.Fatal(err)
}

table := schema.Table{Name: "table1", Columns: []schema.Column{{Name: "id", Type: arrow.PrimitiveTypes.Int64}}}
const numRows = 5
record := getRecord(table.ToArrowSchema(), numRows)
if err := wr.writeAll(ctx, []message.WriteMessage{&message.WriteInsert{Record: record}}); err != nil {
t.Fatal(err)
}

if err := wr.Flush(ctx); err != nil {
t.Fatal(err)
}
if err := wr.Close(ctx); err != nil {
t.Fatal(err)
}

// All rows must have been written via at least one non-empty batch.
if testClient.InsertsLen() == 0 {
t.Fatalf("expected at least 1 insert message, got 0")
}
}

func getRecord(sc *arrow.Schema, rows int) arrow.RecordBatch {
builder := array.NewRecordBuilder(memory.DefaultAllocator, sc)
defer builder.Release()
Expand Down
Loading