Skip to content

writer: per-writer queues + work-stealing encode pool#102

Draft
rgankema wants to merge 16 commits into
mainfrom
rg/batch-builder
Draft

writer: per-writer queues + work-stealing encode pool#102
rgankema wants to merge 16 commits into
mainfrom
rg/batch-builder

Conversation

@rgankema
Copy link
Copy Markdown

Summary

Replaces the encode pool's single MPMC channel + per-writer mutex with per-writer FIFO queues + work-stealing workers.

The problem this fixes

Workers committed to a specific writer at task-pull time. If two tasks for the same writer landed adjacently in the global FIFO, the second worker blocked on the writer's mutex even when tasks for other idle writers were queued behind it. Profiling showed ~25% pool waste in a real workload (8 workers, 14 concurrent writers).

The new design

  • Each WriterState owns a Mutex<VecDeque<RecordBatch>> + atomic queue_len + atomic busy flag.
  • A worker scans the active-writer snapshot, skips writers with empty queues, and CAS busy: false → true to claim one. It then drains that writer's queue to empty before releasing.
  • Wakeups use a single shared Notify with two safeguards:
    • Cascade on claim: a worker that wins a claim notifies a peer before draining, so concurrent producer notifications that collapse into one stored permit still rouse enough workers.
    • Re-check after release: after busy.store(false), the worker reloads queue_len and notifies again if a producer pushed during the release window.

Invariants

  • Per-writer FIFO: busy serializes pops, and pushes go to the back.
  • No stranded tasks: the cascade + post-release re-check covers the missed-wakeup races.
  • Close semantics unchanged: pending / done_notify are decremented and signaled exactly once per batch.
  • Panic handling unchanged: catch_unwind + first-error-wins on state.error.
  • FFI surface unchanged: iceberg_set_encode_workers, submit_batch, iceberg_writer_* all keep their signatures and semantics.

Test plan

  • `cargo build --release` clean
  • `cargo clippy --tests` clean (no new warnings)
  • `cargo test --lib` — 27 passed (25 pre-existing + 2 new)
  • Fairness test (new): 4 writers × 8 batches, 20ms delay; asserts per-writer FIFO and that every 4 consecutive completions span all 4 writers.
  • Stranded-task stress (new): 4 producer threads × 8 writers × 50 batches; asserts `pending` and `queue_len` reach zero on every writer and exactly 1,600 completions land.
  • Performance validation on the originating workload (waiting on author).
  • `make test` end-to-end against MinIO.

🤖 Generated with Claude Code

gbrgr and others added 16 commits May 11, 2026 10:33
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Arrow.jl returns Arrow.Date/Arrow.Timestamp wrappers, not Dates.Date/DateTime.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…rge Julia finalize arms

Removes ~130 lines of duplicated identity/scatter dispatch in append_numeric by
introducing two declarative macros. Merges JULIA_DATE/TIMESTAMP/TIMESTAMPTZ finalize
arms with their non-Julia counterparts since the Arrow output type is identical.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This reverts commit dfdb437.
Replace the global MPMC channel + per-writer mutex with per-writer FIFO
queues and a work-stealing worker pool. Workers scan the active-writer
set, CAS a `busy` flag to claim a writer, then drain its queue. This
removes the head-of-line blocking observed when multiple workers pulled
tasks for the same writer and serialized on its mutex.

Wakeup discipline: a single shared `Notify` plus a cascade — the worker
that wins a claim notifies a peer before draining, so concurrent
producer notifications that collapse to one stored permit still wake
enough workers. After releasing `busy`, the worker re-checks `queue_len`
and notifies again if a producer pushed during the release window.

Add a `#[cfg(test)]` encode hook plus two unit tests:
- fairness: 4 writers × 8 batches drain in round-robin, preserving
  per-writer FIFO.
- stranded-task stress: 1.6k submits across 8 writers from 4 producer
  threads all reach pending=0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The existing LZ4 codec maps to Compression::LZ4, the deprecated
Hadoop-framed variant whose per-page framing overhead is the usual
reason LZ4 underperforms Snappy in parquet benchmarks. Add LZ4_RAW = 5
as a separate FFI enum value mapping to Compression::LZ4_RAW (modern
raw blocks, parquet spec codec ID 7). LZ4 = 3 keeps its current
behavior for backward compatibility.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
robertbuessow added a commit that referenced this pull request May 21, 2026
Annotate classified_error(), classify(), and classify_iceberg() with
#[track_caller] so each call site embeds "[src/file.rs:line]" into the
detail field at compile time (zero runtime overhead).  Inlined
classify_iceberg() so it no longer delegates to classify() — otherwise
#[track_caller] would capture a line inside error_codes.rs instead of
the actual FFI site.  Converted all map_err(classify_iceberg) and
map_err(classify) callers to closure form (|e| classify_iceberg(e))
because bare function-pointer dispatch loses the caller attribute;
closure bodies preserve the source location.

IcebergException.detail in Julia now looks like:
  "Null scan pointer provided [src/full.rs:169]"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants