Skip to content

feat(encoding): support u128 bitpacking for decimal128 columns#6858

Open
LuciferYang wants to merge 4 commits into
lance-format:mainfrom
LuciferYang:feat/decimal128-u128-bitpacking
Open

feat(encoding): support u128 bitpacking for decimal128 columns#6858
LuciferYang wants to merge 4 commits into
lance-format:mainfrom
LuciferYang:feat/decimal128-u128-bitpacking

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang commented May 20, 2026

Closes #6857.

What

Extends the miniblock inline-bitpacking chooser to also consider bits = 128 and adds u128 BitPacking with per-chunk SIMD dispatch (NarrowU32 / NarrowU64 / SequentialU128 / Memcpy keyed by Stat::BitWidth), so decimal128 columns whose values fit in <128 bits encode/decode through FastLanes lanes instead of a scalar bit-stream kernel.

Impact

On-disk size

Measured on TPC-DS SF=100 store_sales (288 M rows, 12 × decimal128(7,2) columns):

Before After
On-disk size 34 GiB 15.873 GiB
Bytes per row ~127 ~59

~53 % reduction, schema / row count / file format version (v2.1) unchanged.

Decode / encode throughput

Apple M-series, criterion 100 samples × 5s / case, single Decimal128(38, 0) column × 5 M rows (80 MiB logical i128). Each variant pins every chunk's Stat::BitWidth to the target dispatch arm.

Encode (cargo bench -p lance-encoding --bench encoder -- encode_decimal128):

bit_width (arm) without bp128 with bp128 + dispatch Δ time
24 (NarrowU32) 39.01 ms / 1.91 GiB/s 11.60 ms / 6.43 GiB/s −70 %
40 (NarrowU64) 35.42 ms / 2.10 GiB/s 14.82 ms / 5.03 GiB/s −58 %
100 (SequentialU128) 34.05 ms / 2.19 GiB/s 38.32 ms / 1.94 GiB/s +13 %
128 (Memcpy) 35.08 ms / 2.12 GiB/s 29.40 ms / 2.53 GiB/s −16 %

Decode (cargo bench -p lance-encoding --bench decoder -- decode_decimal128):

bit_width (arm) without bp128 with bp128 + dispatch Δ time
24 (NarrowU32) 6.19 ms / 12.04 GiB/s 15.55 ms / 4.79 GiB/s +151 %
40 (NarrowU64) 6.32 ms / 11.80 GiB/s 16.63 ms / 4.48 GiB/s +163 %
100 (SequentialU128) 6.23 ms / 11.97 GiB/s 15.38 ms / 4.84 GiB/s +147 %
128 (Memcpy) 6.29 ms / 11.84 GiB/s 6.37 ms / 11.70 GiB/s parity

Decode adds the cost of decompression that didn't exist on the plain-MiniBlock path; encode-narrow regimes are net faster because the SIMD pack replaces the generic-fixed-width path's ~2 GiB/s ceiling. The −53 % disk size typically wins net wall-time when storage / network bandwidth is the bottleneck; pure CPU-bound in-memory decode is the explicit trade-off.

End-to-end (read + write, local-disk + MinIO)

Apple M-series, lance Rust public API end-to-end. 5,000,000 rows × 6 × decimal128(7,2), deterministic xorshift values within precision-7 range so every chunk's Stat::BitWidth lands on the NarrowU32 dispatch arm. 5 iterations per scenario; iter 0 is cold, warm mean is over the 4 subsequent iterations. All 8 runs produced an identical per-column sum, so this is a correctness-preserving A/B. Read times are end-to-end (Dataset::open + scan + per-column arrow::compute::sum).

without bp128 with bp128 + dispatch Δ
Local-disk write 206.5 ms 51.6 ms −75.0 %
Local-disk read 35.2 ms 27.2 ms −22.7 %
MinIO write 362.4 ms 264.3 ms −27.1 %
MinIO read 153.1 ms 37.2 ms −75.7 %
On-disk size per version ~451 MiB ~87 MiB −80.8 %

Local-disk read is mostly CPU-bound on warm page cache, and is still net faster — chunked decode plus 5.2× less data on disk outweighs the slower pure-kernel decode in practice. MinIO read is bandwidth-bound and the size reduction maps directly into the speedup. Both write regimes are dominated by the SIMD pack replacing the generic-fixed-width path's ~2 GiB/s ceiling; the wider gap on local-disk write reflects that local fsync is not the bottleneck. Reproducer + the bench tool's full source are in a PR comment below.

Changes

  • rust/compression/bitpacking/src/lib.rs — scalar u128 BitPacking kernel.
  • rust/lance-encoding/src/encodings/physical/bitpacking.rs — u128 miniblock encode/decode wiring + per-chunk dispatch via u128_kernel_for(bit_width) (single source of truth for both pack and unpack; one match arm per kernel).
  • rust/lance-encoding/src/compression.rs — chooser now matches bits ∈ {8, 16, 32, 64, 128}.
  • rust/lance-encoding/src/statistics.rs — stat plumbing for the 128 case + OR-fold sign-safety contract test.
  • rust/lance-encoding/benches/{decoder,encoder}.rsbench_decode_decimal128 / bench_encode_decimal128 covering all four non-trivial dispatch arms.

No new public API, no on-wire format change (the new bit-width is already valid for v2.1 readers — the encoder just didn't previously emit it).

Testing

  • u128 round-trip unit tests across in-line and out-of-line value distributions, including the width = 63 boundary.
  • Per-chunk dispatch correctness: width-24 NarrowU32, width-40 NarrowU64, width-80 SequentialU128, width-128 Memcpy, mixed-widths-per-chunk, exhaustive 0..=128 u128_kernel_for table + roundtrip.
  • Sign-safety regression: all-negatives and mixed-sign-small-magnitude pin that any negative i128 routes to the Memcpy arm via the Stat::BitWidth OR-fold contract.
  • unchunk_u128_dispatch corruption guards: under-length buffer, oversized num_values, header out-of-range, body length mismatch, and the num_values == 0 short-circuit (both negative and positive coverage).
  • Two proptest fuzzers: single-width × full bw range, and multi-chunk independent widths.
  • End-to-end compress → decompress test through the miniblock path.
  • Full Spark V2 CTAS rewrite of TPC-DS SF=100 store_sales verifies row count, schema, and reads back identically.

decimal128(7,2) columns were stored at full 128-bit width without
compression because BitPacking only supported u8/u16/u32/u64. This
adds scalar u128 bitpacking, reducing decimal128 storage from 131
bits/value to ~24 bits/value (5.6x compression on TPC-DS store_sales).

File size: 34 GiB → 16 GiB for store_sales SF=100.
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added the enhancement New feature or request label May 20, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 20, 2026

Codecov Report

❌ Patch coverage is 92.39598% with 53 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...ance-encoding/src/encodings/physical/bitpacking.rs 90.56% 47 Missing and 6 partials ⚠️

📢 Thoughts on this report? Let us know!

@LuciferYang LuciferYang marked this pull request as draft May 20, 2026 13:32
@LuciferYang
Copy link
Copy Markdown
Contributor Author

LuciferYang commented May 20, 2026

Decoding performance has degraded. I need to explore possible optimizations to decide whether to move forward with this change.

Per-chunk u128 inline bitpacking dispatch for decimal128 columns.
Each chunk's `Stat::BitWidth` selects one of five kernels via the
single source of truth `u128_kernel_for(bit_width)`:

| bit_width | kernel                                         |
|-----------|------------------------------------------------|
| 0         | zero-fill, no body bytes                       |
| 1..=32    | reinterpret body as &[u32], FastLanes u32 SIMD |
| 33..=64   | reinterpret body as &[u64], FastLanes u64 SIMD |
| 65..=127  | scalar sequential u128 bit-stream              |
| 128       | memcpy identity (one u128 per value)           |

Encoder (`pack_u128_chunk`) and decoder (`unpack_u128_chunk`) both
dispatch through the same enum, so the dispatch table is a single
place to change. The 16-byte chunk header layout is unchanged and
the body byte count stays at `bit_width * ELEMS_PER_CHUNK / 8` for
every kernel; what changes at the same `bit_width` is the body
byte interpretation. This is documented in the module rustdoc as a
deliberate format-contract design — `bit_width` IS the kernel
selector, no separate negotiation channel — and is safe because
u128 inline bitpacking is unreleased so no in-the-wild reader can
mis-decode.

# Performance

q4 (6× decimal128 sum) on store_sales SF=100, MinIO 100ms RTT,
JMH avgt=10:

| Format                  | q4 wall-time             | vs PR#6858 baseline |
|-------------------------|--------------------------|---------------------|
| parquet                 |  2667.779 ± 139.766 ms   | —                   |
| lance_bp128 (this PR)   |  5573.007 ± 127.962 ms   | 3.7× speedup; -73%  |
| lance (plain MiniBlock) |  6420.842 ± 118.629 ms   | —                   |

Disk size unchanged at 16 GiB / 305 fragments (header + body byte
count are identical to the PR#6858 baseline; only the kernel
choice changes per chunk). The IO savings (-53% vs plain
MiniBlock) now translate into a net win on high-RTT object
storage instead of being dwarfed by scalar u128 unpack CPU cost.

# Sign safety (load-bearing invariant)

Narrow branches reinterpret `&[u128]` as `&[u32]` / `&[u64]` and
require every value to satisfy `v >> bit_width == 0`. This is
guaranteed by `FixedWidthDataBlock::max_bit_widths`, whose
OR-fold + `leading_zeros` algorithm guarantees the high lanes are
all zero for any `bit_width < 128`. Negative i128 values (after
`bytemuck::cast_slice` to u128) have the high bit set and force
`bit_width = 128`, routing them to the memcpy branch. Defenses:

- Module-level rustdoc statement (`bitpacking.rs:64-71`).
- `pack_u128_chunk` `# Safety` block citing the upstream source.
- Release-build `assert!(src[0] >> bit_width == 0)` sentinel on
  both NarrowU32 and NarrowU64 entry points (defense-in-depth
  against a hypothetical future `Stat::BitWidth` regression that
  would otherwise silently truncate).
- Per-value `debug_assert!` in dev/test builds.
- Pinned by `test_bit_width_or_fold_invariant_for_u128_narrow_dispatch`
  in `statistics.rs` covering all five regimes including `-1i128`.

# Soundness

`append_u128_chunk` writes the body via safe `Vec::resize(..,
0u128)` + `&mut [u128]` slice indexing. The remaining `unsafe {
pack_u128_chunk(..) }` call has an inline SAFETY comment proving
all three preconditions (`src.len() == ELEMS_PER_CHUNK`,
`bit_width <= 128`, `dest.len() * 16 == bit_width * ELEMS_PER_CHUNK
/ 8`). Panic safety: u128 has no invalid bit patterns and no Drop,
so a panic mid-pack leaves the Vec at its declared length holding
well-defined u128s. Compile-time `assert!(ELEMS_PER_CHUNK
.is_multiple_of(128))` catches future drift in the framing
constant.

# Decoder boundary validation

`unchunk_u128_dispatch` validates 4 corruption modes before any
`unsafe` call:

- under-length buffer (< 16 bytes header) → `InvalidInput`
- `num_values > ELEMS_PER_CHUNK` → `InvalidInput`
- header out of range (`raw > 128`) → `InvalidInput`
- body length mismatch → `InvalidInput`

`num_values == 0` short-circuits *after* validation to skip the
16 KiB scratch unpack. All 4 paths plus the short-circuit have
explicit negative tests + 1 positive happy-path test.

# Test coverage

- 5 per-regime hand tests (widths 24, 40, 80, 128 all-negatives,
  mixed-sign-small-magnitude memcpy routing).
- Exhaustive `test_u128_kernel_for_exhaustive_table_and_roundtrip`
  iterating `0..=128`, forces high-bit injection per width.
- `test_inline_bitpack_u128_mixed_widths_per_chunk_dispatch` for
  cross-chunk independence.
- 4 negative tests for decoder entry-point validation.
- 1 positive test for `num_values == 0` short-circuit.
- 1 sign-safety regression test in `statistics.rs`.
- 2 proptests:
  - `proptest_inline_bitpack_u128_roundtrip`: single-width × full
    range × 1..=3072 values × 128-case proptest budget.
  - `proptest_inline_bitpack_u128_mixed_widths`: 1..=3 chunks ×
    independent widths × xorshift seed=0 fixed-point salt.

# Scope

This dispatch lives only in the `InlineBitpacking` (miniblock)
path. `OutOfLineBitpacking` (full-zip) is intentionally NOT
touched — full-zip frames whole pages with a single bit_width
and has no per-chunk header to carry a kernel selector, so the
same machinery would either need a new format field or change
cross-page framing semantics. Documented at the
`OutOfLineBitpacking` rustdoc block.

# Out-of-scope (not included; deferred to follow-ups)

- Stack scratch sizes (16 KiB worst case acceptable for now).
- Per-chunk `vec![]` allocations.
- `kernel_body_bytes(kernel, bit_width)` helper to centralize the
  framing math currently duplicated across 3 sites.
- `assert_eq!(bit_widths_array.len(), data.num_values
  .div_ceil(ELEMS_PER_CHUNK))` end-to-end contract pin.
- Per-call SAFETY comment on the decoder `unpack_u128_chunk` call
  (function-level `# Safety` block already covers it).
- `BlockDecompressor` u128 arm not directly tested (routes
  through the same `unchunk_u128_dispatch` as `MiniBlockDecompressor`).
- File size 1916 lines (cohesive single-module dispatch).

# Verification

- `cargo build -p lance-encoding --tests` ✓
- `cargo test -p lance-encoding --lib` ✓ (389 passed, 0 failed,
  5 ignored — full unfiltered suite)
- `cargo clippy --release -p lance-encoding --tests -- -D warnings` ✓
- `cargo fmt -p lance-encoding -- --check` ✓
Adds two criterion benches that characterize the per-chunk u128 bitpacking
dispatch end-to-end through the public encode_batch / decode_batch APIs.
Each bench scans four bit_width regimes corresponding to the four
non-trivial dispatch arms (the bw=0 Zero arm has no observable per-value
cost and is omitted):

| bit_width | dispatch arm     | typical workload                 |
|-----------|------------------|----------------------------------|
| 24        | NarrowU32 (SIMD) | decimal128(7,2)-typical          |
| 40        | NarrowU64 (SIMD) | decimal128(12,2) / store_sales   |
| 100       | SequentialU128   | scalar-fallback (rare in OLTP)   |
| 128       | Memcpy           | full-precision / signed columns  |

Per-chunk Stat::BitWidth is pinned to the target bw deterministically:

- bw < 128: every value forced to [2^(bw-1), 2^bw - 1] by setting bit
  (bw-1) and masking the higher bits → per-chunk OR-fold = bw exactly.
- bw == 128: bounded negatives in [-(2^64), -1]. Every negative i128 has
  bit 127 set in its u128 reinterpretation → OR-fold = u128::MAX → bw =
  128. The bounded magnitude (≤ 2^64) keeps every value within
  Decimal128(38, 0) precision regardless of arrow's data-validation
  policy across versions.

Style matches the existing bench_decode_compressed / bench_encode_compressed:
single product code path, parameter-scan over bit_width buckets, public
API (no dispatch table reflection), 5M rows × 16 bytes = 80 MB throughput
per case. Fixed seed 0xDEAD_BEEF (with `| 1` salt to defend against the
xorshift seed=0 fixed point) for reproducibility.

Local numbers on Apple M-series, criterion 100 samples × 5s/case:

decode_decimal128:
  bw024_narrow_u32:      15.55 ms / 4.79 GiB/s
  bw040_narrow_u64:      16.63 ms / 4.48 GiB/s
  bw100_sequential_u128: 15.38 ms / 4.84 GiB/s
  bw128_memcpy:           6.37 ms / 11.70 GiB/s

encode_decimal128:
  bw024_narrow_u32:      11.60 ms / 6.43 GiB/s
  bw040_narrow_u64:      14.82 ms / 5.03 GiB/s
  bw100_sequential_u128: 38.32 ms / 1.94 GiB/s
  bw128_memcpy:          29.40 ms / 2.53 GiB/s

The encode side cleanly shows SequentialU128 as the slowest arm (≈2×
NarrowU64, ≈3× NarrowU32), motivating the per-chunk dispatch:
data with bit_width ≤ 64 routes to FastLanes SIMD instead of the
scalar bit-stream kernel. Decode is dominated by pipeline overhead
(scheduler, repdef, runtime) so kernel differences are compressed,
matching the end-to-end JMH numbers (-10% q4 wall-time on
100ms-RTT MinIO vs the kernel-level ~3× speedup observed at the
BitPacking trait boundary).

Run with:
  cargo bench -p lance-encoding --bench decoder -- decode_decimal128
  cargo bench -p lance-encoding --bench encoder -- encode_decimal128
Two CI failures on PR#6858:

1. typos check fails on `mis-decoded` (line 61) — the `mis` token trips
   crate-ci/typos. Reword to `decode incorrectly`.

2. rustdoc -D warnings fails on
   `[crate::statistics::FixedWidthDataBlock::max_bit_widths]` (line 68) —
   `max_bit_widths` is a private fn (no `pub`), so the intra-doc link
   cannot resolve. Drop the `[]` link syntax and keep the reference as
   plain code formatting; reader can grep the codebase if curious.

Verification:
- `RUSTDOCFLAGS="-D warnings" cargo doc -p lance-encoding --no-deps` ✓
- `cargo fmt -p lance-encoding -- --check` ✓
@LuciferYang
Copy link
Copy Markdown
Contributor Author

Reproducer + bench-tool source for the End-to-end A/B in the description

The numbers in the ### End-to-end (read + write, local-disk + MinIO) section were produced by a local-only example that is intentionally not part of this PR (it lives outside the test suite, purely as an A/B-numbers tool). Reproducing it here verbatim so the table is independently reproducible.

# drop the source below at rust/examples/src/decimal128_bp128_ab.rs,
# add the [[example]] entry below to rust/examples/Cargo.toml,
# then:
cargo build --release --example decimal128_bp128_ab -p lance-examples
B=./target/release/examples/decimal128_bp128_ab
"$B" write file:///tmp/dec128.lance              --rows 5000000 --iters 5
"$B" read  file:///tmp/dec128.lance                                --iters 5
"$B" write s3://benchmark/dec128.lance --s3-endpoint http://localhost:9000 --rows 5000000 --iters 5
"$B" read  s3://benchmark/dec128.lance --s3-endpoint http://localhost:9000                  --iters 5

The --s3-endpoint flag forces allow_http=true and minioadmin creds for a local MinIO. Same binary covers s3://... and file://... URIs.

rust/examples/Cargo.toml[[example]] entry to add
[[example]]
name = "decimal128_bp128_ab"
path = "src/decimal128_bp128_ab.rs"
rust/examples/src/decimal128_bp128_ab.rs — full source
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
//
// LOCAL-ONLY: this example is intentionally NOT git-tracked. It produces A/B
// numbers (bp128 OFF vs bp128 ON) for PR#6858 by reading and writing
// decimal128 datasets via the public lance Rust API. Covers four scenarios
// per branch — { local-disk, MinIO } × { read, write } — driven by the same
// binary via the `read` / `write` subcommands. To get an A/B:
//   1) check out the pre-bp128 commit (e.g. `29a8f92eb`), `cargo run` against
//      a separate dst path, capture numbers.
//   2) check out the current PR branch, `cargo run` again, compare.
//
// This file + the corresponding `[[example]]` entry in `rust/examples/Cargo.toml`
// are the only artifacts you need to keep around. Both are intentionally
// untracked; restore Cargo.toml before committing the upstream branch.

#![allow(clippy::print_stdout)]

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use arrow::array::Decimal128Array;
use arrow::compute::sum;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::{RecordBatch, RecordBatchIterator};
use clap::{Parser, Subcommand};
use futures::StreamExt;
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::{ObjectStoreParams, StorageOptionsAccessor};

/// 6 decimal128(7,2) columns from TPC-DS `store_sales`. Same set used by
/// the JMH q4 benchmark — full-scan + per-column sum.
const COLUMNS: &[&str] = &[
    "ss_wholesale_cost",
    "ss_list_price",
    "ss_sales_price",
    "ss_ext_sales_price",
    "ss_ext_wholesale_cost",
    "ss_ext_list_price",
];

#[derive(Parser, Debug)]
#[command(about = "A/B read+write throughput bench for decimal128 bp128.")]
struct Args {
    #[command(subcommand)]
    cmd: Cmd,

    /// Optional MinIO/S3 endpoint URL. If set, also forces
    /// `allow_http=true` and uses minioadmin creds.
    #[arg(long, global = true)]
    s3_endpoint: Option<String>,
}

#[derive(Subcommand, Debug)]
enum Cmd {
    /// Open an existing lance dataset, scan 6 decimal128 columns,
    /// per-column sum, report wall time per iteration.
    Read {
        /// Lance dataset URI (e.g.
        /// `s3://benchmark/tpcds-sf-100/store_sales.lance` or
        /// `file:///path/to/store_sales.lance`).
        uri: String,
        /// Number of measurement iterations. First is "cold", rest are
        /// "warm" (lance runtime / object_store cache effects only —
        /// no OS page-cache warming guaranteed).
        #[arg(long, default_value_t = 5)]
        iters: usize,
    },
    /// Generate `--rows` rows of 6 decimal128(7,2) columns in-memory and
    /// write them to a fresh lance dataset (overwrite mode). Reports
    /// wall time per iteration.
    Write {
        /// Destination lance dataset URI.
        dst: String,
        /// Number of rows to generate per iteration.
        #[arg(long, default_value_t = 1_000_000)]
        rows: usize,
        /// Number of measurement iterations.
        #[arg(long, default_value_t = 5)]
        iters: usize,
        /// Number of rows per RecordBatch. Lance miniblock chunk is 1024
        /// values; larger batch = fewer encoder.commit() calls but bigger
        /// in-memory footprint. 64K is a typical Spark-side batch.
        #[arg(long, default_value_t = 65_536)]
        batch_rows: usize,
    },
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();
    let args = Args::parse();

    let mut storage_options: HashMap<String, String> = HashMap::new();
    if let Some(endpoint) = &args.s3_endpoint {
        storage_options.insert("endpoint".to_string(), endpoint.clone());
        storage_options.insert("allow_http".to_string(), "true".to_string());
        storage_options.insert("aws_region".to_string(), "us-east-1".to_string());
        storage_options.insert("aws_access_key_id".to_string(), "minioadmin".to_string());
        storage_options.insert(
            "aws_secret_access_key".to_string(),
            "minioadmin".to_string(),
        );
    }

    match args.cmd {
        Cmd::Read { uri, iters } => bench_read(uri, iters, storage_options).await,
        Cmd::Write {
            dst,
            rows,
            iters,
            batch_rows,
        } => bench_write(dst, rows, iters, batch_rows, storage_options).await,
    }
}

async fn bench_read(
    uri: String,
    iters: usize,
    storage_options: HashMap<String, String>,
) -> Result<(), Box<dyn std::error::Error>> {
    let dataset: Dataset = DatasetBuilder::from_uri(&uri)
        .with_storage_options(storage_options)
        .load()
        .await?;
    println!(
        "opened {uri} — version={}, rows={}, fragments={}",
        dataset.version().version,
        dataset.count_rows(None).await?,
        dataset.get_fragments().len(),
    );

    let mut wall_ms = Vec::with_capacity(iters);
    for i in 0..iters {
        let label = if i == 0 { "cold" } else { "warm" };
        let started = Instant::now();
        let mut total_rows: usize = 0;
        let mut sums: Vec<i128> = vec![0; COLUMNS.len()];

        let mut scanner = dataset.scan();
        scanner.project(&COLUMNS.to_vec())?;
        let mut stream = scanner.try_into_stream().await?;

        while let Some(batch_res) = stream.next().await {
            let batch = batch_res?;
            total_rows += batch.num_rows();
            for (col_idx, col_name) in COLUMNS.iter().enumerate() {
                let col = batch
                    .column_by_name(col_name)
                    .unwrap_or_else(|| panic!("missing column {col_name}"));
                let arr = col
                    .as_any()
                    .downcast_ref::<Decimal128Array>()
                    .expect("decimal128 column");
                if let Some(s) = sum(arr) {
                    sums[col_idx] = sums[col_idx].wrapping_add(s);
                }
            }
        }

        let elapsed_ms = started.elapsed().as_secs_f64() * 1000.0;
        wall_ms.push(elapsed_ms);
        println!(
            "iter {i:>2} ({label:<4}): {elapsed_ms:>9.1} ms · rows={total_rows} · sum0={}",
            sums[0]
        );
    }

    summarize("read", &wall_ms);
    Ok(())
}

async fn bench_write(
    dst: String,
    rows: usize,
    iters: usize,
    batch_rows: usize,
    storage_options: HashMap<String, String>,
) -> Result<(), Box<dyn std::error::Error>> {
    println!(
        "writing {rows} rows × {} cols decimal128(7,2) → {dst} · batch_rows={batch_rows} · iters={iters}",
        COLUMNS.len()
    );

    let mut wall_ms = Vec::with_capacity(iters);
    for i in 0..iters {
        let label = if i == 0 { "cold" } else { "warm" };
        let batches = synth_decimal128_batches(rows, batch_rows, 0xDEAD_BEEF + i as u64);
        let schema = batches[0].schema();

        let started = Instant::now();
        let iter = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
        let store_params = if storage_options.is_empty() {
            None
        } else {
            let accessor = Arc::new(StorageOptionsAccessor::with_static_options(
                storage_options.clone(),
            ));
            Some(ObjectStoreParams {
                storage_options_accessor: Some(accessor),
                ..Default::default()
            })
        };
        let write_params = WriteParams {
            mode: WriteMode::Overwrite,
            store_params,
            ..Default::default()
        };
        Dataset::write(iter, &dst, Some(write_params)).await?;
        let elapsed_ms = started.elapsed().as_secs_f64() * 1000.0;
        wall_ms.push(elapsed_ms);
        println!("iter {i:>2} ({label:<4}): {elapsed_ms:>9.1} ms");
    }

    summarize("write", &wall_ms);
    Ok(())
}

/// Build `total_rows / batch_rows + 1` RecordBatches of 6 decimal128(7,2)
/// columns. Values are deterministic xorshift64 modulo 10^7 (within
/// precision-7 range), so OR-fold per-chunk lands at roughly bw=24 — the
/// realistic NarrowU32 dispatch arm for `decimal128(7,2)` data.
fn synth_decimal128_batches(total_rows: usize, batch_rows: usize, seed: u64) -> Vec<RecordBatch> {
    let schema = Arc::new(Schema::new(
        COLUMNS
            .iter()
            .map(|name| Field::new(*name, DataType::Decimal128(7, 2), false))
            .collect::<Vec<_>>(),
    ));

    let mut state = seed | 1;
    let mut next = || {
        state ^= state << 13;
        state ^= state >> 7;
        state ^= state << 17;
        // 7-digit decimal range: 0..10^7-1 ≈ 9_999_999, fits in 24 bits.
        (state % 9_999_999) as i128
    };

    let mut batches = Vec::new();
    let mut remaining = total_rows;
    while remaining > 0 {
        let n = remaining.min(batch_rows);
        let cols: Vec<Arc<dyn arrow::array::Array>> = (0..COLUMNS.len())
            .map(|_| {
                let values: Vec<i128> = (0..n).map(|_| next()).collect();
                Arc::new(
                    Decimal128Array::from(values)
                        .with_precision_and_scale(7, 2)
                        .unwrap(),
                ) as Arc<dyn arrow::array::Array>
            })
            .collect();
        batches.push(RecordBatch::try_new(schema.clone(), cols).unwrap());
        remaining -= n;
    }
    batches
}

fn summarize(label: &str, wall_ms: &[f64]) {
    if wall_ms.len() < 2 {
        return;
    }
    let warm = &wall_ms[1..];
    let mean = warm.iter().sum::<f64>() / warm.len() as f64;
    let min = warm.iter().cloned().fold(f64::INFINITY, f64::min);
    let max = warm.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
    println!(
        "{label} warm summary (n={}): mean={mean:.1} ms · min={min:.1} ms · max={max:.1} ms",
        warm.len()
    );
}

@LuciferYang
Copy link
Copy Markdown
Contributor Author

Spark end-to-end Query (TPC-DS SF=100 store_sales, 6 × decimal128(7,2) full-scan + sum)

A more user-facing data point: the same dispatch + dataset shape as the in-PR end-to-end table, but driven through a real Spark 4.0 query (spark.read.format("lance").load(...).sql(...)) reading from MinIO over the JNI bridge — the closest proxy to "what users actually feel" when running a Spark aggregation against a decimal128-heavy table.

-- query (full-scan + sum on 6 decimal128(7,2) columns)
SELECT sum(ss_wholesale_cost), sum(ss_list_price), sum(ss_sales_price),
       sum(ss_ext_sales_price), sum(ss_net_paid), sum(ss_net_profit)
FROM store_sales

Setup — TPC-DS SF=100 store_sales (~288 M rows, 23 columns, 6 of which are the decimal128(7,2) columns above), MinIO local, Spark 4.0.0 / Scala 2.13. Both Lance datasets were CTAS-materialized from the same source via spark.read.format("lance").write.format("lance") so the only thing that differs is the encoder version. The bp128 dataset uses the feat/decimal128-u128-bitpacking liblance_jni.dylib (release build, 150,413,664 bytes, sha256=5afc770d624c9e90ab4f96ecb7b93ce01c539f1fbd3e51097970bb2aa1324d9b), pinned via the JMH harness's startup fingerprint.

JMH config — @BenchmarkMode(AverageTime) / @OutputTimeUnit(MILLISECONDS) / @Warmup(iterations = 5, time = 10 s) / @Measurement(iterations = 10, time = 10 s) / @Fork(1).

Format Query mean (ms) 99.9 % CI On-disk size Δ vs Lance baseline
Lance (baseline, no bp128) 6410.229 ± 113.802 34 GiB
Lance with bp128 + dispatch 5750.606 ± 475.850 16 GiB −10.3 %

The −10 % wall-time win is more modest than the −76 % we see on the lance Rust API MinIO read of the same shape — bp128 only optimizes the decode kernel, and the Spark end-to-end stack has additional overhead outside that scope which this PR doesn't touch (we haven't fully attributed where it lives). The disk-side win (34 GiB → 16 GiB, a 2.1× reduction; matches the −53 % already in the PR description) is the same regardless of who's reading. For S3-style deployments where compute is decoupled from storage, the size cut alone usually pays for itself before any decode delta is considered.

@LuciferYang LuciferYang marked this pull request as ready for review May 22, 2026 03:18
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

summaryzb added a commit to summaryzb/lance that referenced this pull request May 22, 2026
Squash of upstream PR lance-format#6858 (4 commits):
- feat(encoding): add u128 bitpacking support for decimal128 columns
- feat(encoding): per-chunk u128 bitpacking dispatch (u32/u64/u128/memcpy)
- test(encoding): add Decimal128 inline-bitpacking decode/encode benches
- fix(encoding): rustdoc intra-doc link + typo in u128 dispatch module

Per-chunk Stat::BitWidth selects one of five kernels via
u128_kernel_for(bit_width):
  0       -> zero-fill
  1..=32  -> reinterpret as &[u32], FastLanes u32 SIMD
  33..=64 -> reinterpret as &[u64], FastLanes u64 SIMD
  65..=127-> scalar sequential u128 bit-stream
  128     -> memcpy identity

Co-Authored-By: yangjie01 <yangjie01@baidu.com>
Change-Id: I7744f629f880d397fac1d104f1db61755f21ab6b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support u128 bitpacking for decimal128 columns

1 participant