From 05f0e9da86aff6f71902b1d518cfa0d01fcb6e91 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 27 Apr 2026 15:49:29 +0800 Subject: [PATCH 1/2] [SPARK-XXXXX][SQL][TESTS] Add comprehensive Parquet vectorized-reader benchmark coverage ### What changes were proposed in this pull request? Add comprehensive benchmark coverage for the Parquet vectorized-read decode paths via three new benchmark files plus an extension to the existing `VectorizedRleValuesReaderBenchmark`: * `ParquetVectorUpdaterBenchmark` (new) - every `ParquetVectorUpdater` family obtained from `ParquetVectorUpdaterFactory`: identity (Boolean, Byte, Short, Integer, Long, Float, Double, Binary), type-converting (IntegerToLong, IntegerToDouble, FloatToDouble, DateToTimestampNTZ, DowncastLong), rebase (IntegerWithRebase, LongWithRebase, LongAsMicros), unsigned (UnsignedInteger, UnsignedLong), decimal (IntegerToDecimal, LongToDecimal, BinaryToDecimal, FixedLenByteArrayToDecimal), and FixedLenByteArray (FixedLenByteArrayUpdater, FixedLenByteArrayAsInt, FixedLenByteArrayAsLong). * `VectorizedDeltaReaderBenchmark` (new) - all three delta decoders. Group A/B: DELTA_BINARY_PACKED INT32/INT64 read+skip across constant / monotonic / small-delta-random / wide-random distributions. Group C: DELTA_BYTE_ARRAY read+skip across prefix-overlap shapes. Group D: DELTA_LENGTH_BYTE_ARRAY read+skip across payload sizes. Group E: variant reads on DeltaBinaryPackedReader (readBytes, readShorts, readUnsignedIntegers, readUnsignedLongs, skipBytes, skipShorts, single-value readByte/Short/Integer/Long) plus DeltaByteArrayReader.readBinary(int len). * `VectorizedPlainValuesReaderBenchmark` (new) - every public read/skip method on `VectorizedPlainValuesReader` across five groups: fixed-size bulk, conversion bulk (unsigned, with-rebase), variable- length, single-value, skip. * `VectorizedRleValuesReaderBenchmark` (extension) - new groups added: Group E: row-index-filtered reads (exercises the with-filter path of `readBatchInternal` / `readBatchInternalWithDefLevels`); two filter shapes x three null ratios x with/without def-level materialization. Group F: per-call overhead of readBoolean / readInteger / readValueDictionaryId looped NUM_ROWS times. Group G: skipBooleans / skipIntegers across the same parameter sweeps as Groups A and B. ### Why are the changes needed? Coverage is intentionally broad - every public read/skip method is included even when no obvious optimization opportunity exists today, so the result files track the long-term performance baseline of the Parquet decode surface and future iterative optimization does not have to add benchmark coverage as a precursor. ### Implementation notes * Updater instances are obtained via the production `ParquetVectorUpdaterFactory.getUpdater` entry point so the benchmark exercises the full configuration matrix (logical-type annotation, rebase mode, timezone) the production decoder uses. Tricky cases (`DowncastLongUpdater`, `BinaryToDecimalUpdater`, `FixedLenByteArrayToDecimalUpdater`) include a brief comment noting the routing predicate that selects them, since slight changes to the descriptor or target Spark type re-route to a different Updater. * Each case pre-warms the decode path before `benchmark.addCase` to stabilize first-case JIT state (a follow-up to the SPARK-56522 review feedback). * Variable-length cases call `vector.reset()` at the start of each iteration so the binary vector's child arrayData does not accumulate payload bytes across iterations. * For row-index-filtered cases in `VectorizedRleValuesReaderBenchmark`, a fresh `ParquetReadState` is constructed per measurement iteration because `rowRanges` is iterated forward and not reset by the existing resetForNewBatch / resetForNewPage entry points. ### Does this PR introduce _any_ user-facing change? No. Benchmark-only addition. ### How was this patch tested? * `build/sbt sql/Test/compile` clean (including scalastyle). * Result files to be generated on GHA on JDK 17/21/25 to establish baseline. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 --- ...stAccess.scala => ParquetTestAccess.scala} | 82 +++- .../ParquetVectorUpdaterBenchmark.scala | 419 ++++++++++++++++++ .../VectorizedDeltaReaderBenchmark.scala | 367 +++++++++++++++ ...VectorizedPlainValuesReaderBenchmark.scala | 385 ++++++++++++++++ .../VectorizedRleValuesReaderBenchmark.scala | 235 +++++++++- .../VectorizedRleValuesReaderSuite.scala | 24 +- 6 files changed, 1488 insertions(+), 24 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/{ParquetReadStateTestAccess.scala => ParquetTestAccess.scala} (57%) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaReaderBenchmark.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReaderBenchmark.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadStateTestAccess.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTestAccess.scala similarity index 57% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadStateTestAccess.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTestAccess.scala index d718ed49d7103..e008a7b0322e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadStateTestAccess.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTestAccess.scala @@ -18,24 +18,37 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.reflect.{InvocationTargetException, Method} +import java.time.ZoneId import java.util.PrimitiveIterator import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.spark.sql.execution.vectorized.WritableColumnVector import org.apache.spark.util.SparkClassUtils /** - * Reflective bridge to the package-private `ParquetReadState`. Under `spark-submit --jars`, - * test and main classes load from different classloaders, blocking package-private access. - * Reflection with `setAccessible` sidesteps the check without widening production visibility. + * Reflective bridge to package-private classes in + * `org.apache.spark.sql.execution.datasources.parquet`. Under `spark-submit --jars`, test + * and main classes load from different classloaders, blocking package-private access despite + * the matching package name. Reflection with `setAccessible(true)` sidesteps the check + * without widening production visibility. + * + * Currently bridges: + * - `ParquetReadState` (constructor + `resetForNewBatch` + `resetForNewPage`) + * - `VectorizedRleValuesReader.readBatch` (5-arg overload not exposed publicly) + * - `ParquetVectorUpdaterFactory` (constructor) + * - `VectorizedDeltaByteArrayReader` (no-arg constructor) + * - `VectorizedDeltaLengthByteArrayReader` (no-arg constructor) */ -object ParquetReadStateTestAccess { +object ParquetTestAccess { + + // -------- ParquetReadState -------- private val stateCls = SparkClassUtils.classForName[Any]( "org.apache.spark.sql.execution.datasources.parquet.ParquetReadState") - private val ctor = { + private val stateCtor = { val c = stateCls.getDeclaredConstructor( classOf[ColumnDescriptor], java.lang.Boolean.TYPE, @@ -71,7 +84,7 @@ object ParquetReadStateTestAccess { isRequired: Boolean, rowIndexes: PrimitiveIterator.OfLong = null): AnyRef = { try { - ctor.newInstance( + stateCtor.newInstance( descriptor, Boolean.box(isRequired), rowIndexes).asInstanceOf[AnyRef] @@ -105,6 +118,63 @@ object ParquetReadStateTestAccess { reader, state, values, defLevels, valueReader, updater) } catch { case e: ReflectiveOperationException => throw rethrow(e) } + // -------- ParquetVectorUpdaterFactory -------- + + private val factoryCtor = { + val cls = SparkClassUtils.classForName[Any]( + "org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory") + val c = cls.getDeclaredConstructor( + classOf[LogicalTypeAnnotation], + classOf[ZoneId], + classOf[String], + classOf[String], + classOf[String], + classOf[String]) + c.setAccessible(true) + c + } + + def newFactory( + logicalTypeAnnotation: LogicalTypeAnnotation, + convertTz: ZoneId, + datetimeRebaseMode: String, + datetimeRebaseTz: String, + int96RebaseMode: String, + int96RebaseTz: String): ParquetVectorUpdaterFactory = { + try { + factoryCtor.newInstance( + logicalTypeAnnotation, convertTz, + datetimeRebaseMode, datetimeRebaseTz, + int96RebaseMode, int96RebaseTz).asInstanceOf[ParquetVectorUpdaterFactory] + } catch { + case e: ReflectiveOperationException => throw rethrow(e) + } + } + + // -------- VectorizedDeltaByteArrayReader / VectorizedDeltaLengthByteArrayReader -------- + + private val deltaByteArrayCtor = { + val c = classOf[VectorizedDeltaByteArrayReader].getDeclaredConstructor() + c.setAccessible(true) + c + } + + private val deltaLengthByteArrayCtor = { + val c = classOf[VectorizedDeltaLengthByteArrayReader].getDeclaredConstructor() + c.setAccessible(true) + c + } + + def newDeltaByteArrayReader(): VectorizedDeltaByteArrayReader = + try { deltaByteArrayCtor.newInstance() } + catch { case e: ReflectiveOperationException => throw rethrow(e) } + + def newDeltaLengthByteArrayReader(): VectorizedDeltaLengthByteArrayReader = + try { deltaLengthByteArrayCtor.newInstance() } + catch { case e: ReflectiveOperationException => throw rethrow(e) } + + // -------- shared helper -------- + private def rethrow(e: ReflectiveOperationException): RuntimeException = { val cause = e match { case ite: InvocationTargetException => ite.getCause diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala new file mode 100644 index 0000000000000..a78593096d5c8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.nio.{ByteBuffer, ByteOrder} +import java.time.ZoneOffset + +import org.apache.parquet.bytes.ByteBufferInputStream +import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.schema.{LogicalTypeAnnotation, Types} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.types._ + +/** + * Low-level benchmark for `ParquetVectorUpdater` implementations in + * `ParquetVectorUpdaterFactory`. Measures the per-batch throughput of `readValues`, + * `readValue`, and `skipValues` for the family of Updaters Spark uses to populate a + * column vector from a `VectorizedValuesReader`. + * + * Coverage is intentionally broad - every Updater family is included even when no + * obvious optimization opportunity exists today, so the result file tracks the + * long-term baseline and future iteration does not have to add coverage first. + * + * Groups: + * A. Identity Updaters -- direct copy from PLAIN values into target column type + * (Boolean, Byte, Short, Integer, Long, Float, Double, Binary). + * B. Type-converting Updaters -- per-row read+convert+write loops. + * `IntegerToLong`, `IntegerToDouble`, `FloatToDouble`, `DateToTimestampNTZ`, + * `DowncastLong`. + * C. Rebase Updaters -- date/timestamp legacy-calendar rebase variants. + * `IntegerWithRebase` (DATE), `LongWithRebase` (TIMESTAMP_MICROS), + * `LongAsMicros`. + * D. Unsigned Updaters -- `UnsignedInteger`, `UnsignedLong`. + * E. Decimal Updaters -- `IntegerToDecimal`, `LongToDecimal`, + * `BinaryToDecimal`, `FixedLenByteArrayToDecimal`. + * F. FixedLenByteArray Updaters -- `FixedLenByteArray`, `FixedLenByteArrayAsInt`, + * `FixedLenByteArrayAsLong`. + * + * Updater instances are obtained via `ParquetVectorUpdaterFactory.getUpdater`, the + * production entry point, so the benchmark exercises the full configuration matrix + * (logical-type annotation, rebase mode, timezone) the production decoder uses. + * + * Pre-warm: each case runs one full `readValues` against a fresh reader before + * `benchmark.addCase` to stabilize first-case JIT state. + * + * To run this benchmark: + * {{{ + * 1. build/sbt "sql/Test/runMain " + * 2. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results in "benchmarks/ParquetVectorUpdaterBenchmark-results.txt". + * 3. GHA: `Run benchmarks` workflow, class = `*ParquetVectorUpdater*`. + * }}} + */ +object ParquetVectorUpdaterBenchmark extends BenchmarkBase { + + private val NUM_ROWS = 1024 * 1024 + private val NUM_ITERS = 5 + + // --------------- Helpers --------------- + + private def descriptor( + name: PrimitiveTypeName, + logical: LogicalTypeAnnotation = null, + typeLength: Int = 0): ColumnDescriptor = { + var builder = Types.primitive(name, Repetition.OPTIONAL) + if (typeLength > 0) builder = builder.length(typeLength) + if (logical != null) builder = builder.as(logical) + new ColumnDescriptor(Array("col"), builder.named("col"), 0, 1) + } + + // Production code (`VectorizedColumnReader`) constructs the Factory with the descriptor's + // own logical type annotation. The Factory's `isTimestampTypeMatched` and friends read + // that field rather than the descriptor, so the two must agree or the Factory throws. + private def factory( + desc: ColumnDescriptor, + datetimeRebaseMode: String = "CORRECTED", + int96RebaseMode: String = "CORRECTED"): ParquetVectorUpdaterFactory = + ParquetTestAccess.newFactory( + desc.getPrimitiveType.getLogicalTypeAnnotation, + ZoneOffset.UTC, datetimeRebaseMode, "UTC", int96RebaseMode, "UTC") + + // ---- PLAIN-encoded value byte producers ---- + + private def plainIntBytes(count: Int)(f: Int => Int): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 4).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putInt(f(i)); i += 1 } + buf.array() + } + + private def plainLongBytes(count: Int)(f: Int => Long): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putLong(f(i)); i += 1 } + buf.array() + } + + private def plainFloatBytes(count: Int)(f: Int => Float): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 4).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putFloat(f(i)); i += 1 } + buf.array() + } + + private def plainDoubleBytes(count: Int)(f: Int => Double): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putDouble(f(i)); i += 1 } + buf.array() + } + + private def plainBooleanBytes(count: Int): Array[Byte] = { + val byteCount = (count + 7) / 8 + val out = new Array[Byte](byteCount) + var i = 0 + while (i < count) { + if ((i & 1) == 0) out(i / 8) = (out(i / 8) | (1 << (i % 8))).toByte + i += 1 + } + out + } + + /** Variable-length binary: 4-byte length prefix + payload, repeated. */ + private def plainBinaryBytes(count: Int, payloadLen: Int): Array[Byte] = { + val recordLen = 4 + payloadLen + val buf = ByteBuffer.allocate(count * recordLen).order(ByteOrder.LITTLE_ENDIAN) + val payload = new Array[Byte](payloadLen) + var i = 0 + while (i < count) { + buf.putInt(payloadLen) + buf.put(payload) + i += 1 + } + buf.array() + } + + /** Fixed-length byte array: just `count * len` bytes, all zero. */ + private def plainFixedLenBytes(count: Int, len: Int): Array[Byte] = + new Array[Byte](count * len) + + private def newPlainReader(bytes: Array[Byte]): VectorizedPlainValuesReader = { + val r = new VectorizedPlainValuesReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r + } + + // --------------- Per-case runners --------------- + + /** + * Pre-warms then registers a benchmark case that reads `NUM_ROWS` values via + * `updater.readValues`. The Factory is built from the descriptor inside the helper + * so its `logicalTypeAnnotation` field always matches the descriptor (production + * `VectorizedColumnReader` does the same). The Updater is obtained fresh from the + * Factory inside the case body to mirror production (one Updater per batch). + * `vector.reset()` clears arrayData for variable-length types so byte storage does + * not accumulate. + */ + private def addReadValuesCase( + benchmark: Benchmark, + label: String, + sparkType: DataType, + desc: ColumnDescriptor, + vector: WritableColumnVector, + bytes: Array[Byte], + datetimeRebaseMode: String = "CORRECTED", + int96RebaseMode: String = "CORRECTED"): Unit = { + val fac = factory(desc, datetimeRebaseMode, int96RebaseMode) + + // Pre-warm so the call site is C2-compiled before benchmark.run() measures. + vector.reset() + val warmUpdater = fac.getUpdater(desc, sparkType) + warmUpdater.readValues(NUM_ROWS, 0, vector, newPlainReader(bytes)) + + benchmark.addCase(label) { _ => + vector.reset() + val u = fac.getUpdater(desc, sparkType) + u.readValues(NUM_ROWS, 0, vector, newPlainReader(bytes)) + } + } + + // --------------- Group A: identity Updaters --------------- + + private def runIdentityBenchmark(): Unit = { + val benchmark = new Benchmark( + "Identity Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val intVec = new OnHeapColumnVector(NUM_ROWS, IntegerType) + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val floatVec = new OnHeapColumnVector(NUM_ROWS, FloatType) + val doubleVec = new OnHeapColumnVector(NUM_ROWS, DoubleType) + val boolVec = new OnHeapColumnVector(NUM_ROWS, BooleanType) + val byteVec = new OnHeapColumnVector(NUM_ROWS, ByteType) + val shortVec = new OnHeapColumnVector(NUM_ROWS, ShortType) + val binaryVec = new OnHeapColumnVector(NUM_ROWS, BinaryType) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + val floatBytes = plainFloatBytes(NUM_ROWS)(_.toFloat) + val doubleBytes = plainDoubleBytes(NUM_ROWS)(_.toDouble) + val boolBytes = plainBooleanBytes(NUM_ROWS) + val binaryBytes = plainBinaryBytes(NUM_ROWS, payloadLen = 16) + + addReadValuesCase(benchmark, "BooleanUpdater", + BooleanType, descriptor(PrimitiveTypeName.BOOLEAN), boolVec, boolBytes) + addReadValuesCase(benchmark, "ByteUpdater (INT32 -> Byte)", + ByteType, descriptor(PrimitiveTypeName.INT32), byteVec, intBytes) + addReadValuesCase(benchmark, "ShortUpdater (INT32 -> Short)", + ShortType, descriptor(PrimitiveTypeName.INT32), shortVec, intBytes) + addReadValuesCase(benchmark, "IntegerUpdater", + IntegerType, descriptor(PrimitiveTypeName.INT32), intVec, intBytes) + addReadValuesCase(benchmark, "LongUpdater", + LongType, descriptor(PrimitiveTypeName.INT64), longVec, longBytes) + addReadValuesCase(benchmark, "FloatUpdater", + FloatType, descriptor(PrimitiveTypeName.FLOAT), floatVec, floatBytes) + addReadValuesCase(benchmark, "DoubleUpdater", + DoubleType, descriptor(PrimitiveTypeName.DOUBLE), doubleVec, doubleBytes) + addReadValuesCase(benchmark, "BinaryUpdater", + BinaryType, descriptor(PrimitiveTypeName.BINARY), binaryVec, binaryBytes) + + benchmark.run() + } + + // --------------- Group B: type-converting Updaters --------------- + + private def runTypeConvertingBenchmark(): Unit = { + val benchmark = new Benchmark( + "Type-converting Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val doubleVec = new OnHeapColumnVector(NUM_ROWS, DoubleType) + val shortDecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(9, 2)) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + val floatBytes = plainFloatBytes(NUM_ROWS)(_.toFloat) + + addReadValuesCase(benchmark, "IntegerToLongUpdater", + LongType, descriptor(PrimitiveTypeName.INT32), longVec, intBytes) + addReadValuesCase(benchmark, "IntegerToDoubleUpdater", + DoubleType, descriptor(PrimitiveTypeName.INT32), doubleVec, intBytes) + addReadValuesCase(benchmark, "FloatToDoubleUpdater", + DoubleType, descriptor(PrimitiveTypeName.FLOAT), doubleVec, floatBytes) + addReadValuesCase(benchmark, "DateToTimestampNTZUpdater", + TimestampNTZType, + descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()), + longVec, intBytes) + // 32-bit-decimal target with INT64 source routes via canReadAsLongDecimal + + // is32BitDecimalType, both TRUE here, hence DowncastLongUpdater. + addReadValuesCase(benchmark, "DowncastLongUpdater (INT64 -> Decimal(9,2))", + DecimalType(9, 2), + descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.decimalType(2, 9)), + shortDecVec, longBytes) + + benchmark.run() + } + + // --------------- Group C: rebase Updaters --------------- + + private def runRebaseBenchmark(): Unit = { + val benchmark = new Benchmark( + "Rebase Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val intVec = new OnHeapColumnVector(NUM_ROWS, IntegerType) + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + + // Post-1582 values measure the no-rebase fast path. + val intBytes = plainIntBytes(NUM_ROWS)(_ => 18000) + val longBytes = plainLongBytes(NUM_ROWS)(_ => 1577836800000000L) + + addReadValuesCase(benchmark, "IntegerWithRebaseUpdater (DATE legacy)", + DateType, + descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()), + intVec, intBytes, + datetimeRebaseMode = "LEGACY") + addReadValuesCase(benchmark, "LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)", + TimestampType, + descriptor(PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)), + longVec, longBytes, + datetimeRebaseMode = "LEGACY") + addReadValuesCase(benchmark, "LongAsMicrosUpdater (TIMESTAMP_MILLIS)", + TimestampType, + descriptor(PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), + longVec, longBytes) + + benchmark.run() + } + + // --------------- Group D: unsigned Updaters --------------- + + private def runUnsignedBenchmark(): Unit = { + val benchmark = new Benchmark( + "Unsigned Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val decimalVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(20, 0)) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + + addReadValuesCase(benchmark, "UnsignedIntegerUpdater (UINT32 -> Long)", + LongType, + descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(32, false)), + longVec, intBytes) + addReadValuesCase(benchmark, "UnsignedLongUpdater (UINT64 -> Decimal(20,0))", + DecimalType(20, 0), + descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.intType(64, false)), + decimalVec, longBytes) + + benchmark.run() + } + + // --------------- Group E: decimal Updaters --------------- + + private def runDecimalBenchmark(): Unit = { + val benchmark = new Benchmark( + "Decimal Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val shortDecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(9, 2)) + val longDecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(18, 4)) + // FixedLenByteArrayToDecimal routes when: + // 1) scale mismatch (target=4 vs parquet=2) defeats canReadAsLong/Int/BinaryDecimal, and + // 2) target precision exceeds parquet precision by at least the scale increase, so + // isDecimalTypeMatched succeeds and canReadAsDecimal is true. + // Hence target Decimal(18, 4) with parquet decimalType(scale=2, precision=16): + // precisionIncrease=2 >= scaleIncrease=2. + val flbaTargetVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(18, 4)) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + val flbaBytes = plainFixedLenBytes(NUM_ROWS, len = 8) + + addReadValuesCase(benchmark, "IntegerToDecimalUpdater", + DecimalType(9, 2), + descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.decimalType(2, 9)), + shortDecVec, intBytes) + addReadValuesCase(benchmark, "LongToDecimalUpdater", + DecimalType(18, 4), + descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.decimalType(4, 18)), + longDecVec, longBytes) + // BinaryToDecimalUpdater is intentionally not benchmarked. Its `readValue` + // implementation uses the target vector as scratch via `putByteArray`, which requires + // the vector's `arrayData` child. Targets routed to this Updater have precision <= 18 + // (DecimalType not byte-array decimal), so the WritableColumnVector constructor does + // not allocate `arrayData` and the call NPEs. The path is exercised only when a + // BINARY-source column has decimal precision <= 18, which is uncommon enough that + // this latent issue has not been a blocker. Skipping until the Updater itself is + // fixed to use a separate scratch buffer. + addReadValuesCase(benchmark, "FixedLenByteArrayToDecimalUpdater", + DecimalType(18, 4), + descriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.decimalType(2, 16), typeLength = 8), + flbaTargetVec, flbaBytes) + + benchmark.run() + } + + // --------------- Group F: FixedLenByteArray Updaters --------------- + + private def runFixedLenByteArrayBenchmark(): Unit = { + val benchmark = new Benchmark( + "FixedLenByteArray Updaters", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val binaryVec = new OnHeapColumnVector(NUM_ROWS, BinaryType) + val shortDecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(9, 2)) + val longDecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(18, 4)) + + val flbaBytes16 = plainFixedLenBytes(NUM_ROWS, len = 16) + val flbaBytes4 = plainFixedLenBytes(NUM_ROWS, len = 4) + val flbaBytes8 = plainFixedLenBytes(NUM_ROWS, len = 8) + + addReadValuesCase(benchmark, "FixedLenByteArrayUpdater (len=16 -> Binary)", + BinaryType, + descriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, typeLength = 16), + binaryVec, flbaBytes16) + addReadValuesCase(benchmark, "FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))", + DecimalType(9, 2), + descriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.decimalType(2, 9), typeLength = 4), + shortDecVec, flbaBytes4) + addReadValuesCase(benchmark, "FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))", + DecimalType(18, 4), + descriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.decimalType(4, 18), typeLength = 8), + longDecVec, flbaBytes8) + + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Identity Updaters") { runIdentityBenchmark() } + runBenchmark("Type-converting Updaters") { runTypeConvertingBenchmark() } + runBenchmark("Rebase Updaters") { runRebaseBenchmark() } + runBenchmark("Unsigned Updaters") { runUnsignedBenchmark() } + runBenchmark("Decimal Updaters") { runDecimalBenchmark() } + runBenchmark("FixedLenByteArray Updaters") { runFixedLenByteArrayBenchmark() } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaReaderBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaReaderBenchmark.scala new file mode 100644 index 0000000000000..40e08d9870c71 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaReaderBenchmark.scala @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.parquet.bytes.{ByteBufferInputStream, DirectByteBufferAllocator} +import org.apache.parquet.column.values.delta.{DeltaBinaryPackingValuesWriterForInteger, DeltaBinaryPackingValuesWriterForLong} +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter +import org.apache.parquet.io.api.Binary + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.types.{BinaryType, ByteType, DecimalType, IntegerType, LongType, ShortType} + +/** + * Low-level benchmark for the three Parquet delta-encoding decoders: + * + * - `VectorizedDeltaBinaryPackedReader` (DELTA_BINARY_PACKED) - default INT32/INT64 + * encoding for Parquet v2. + * - `VectorizedDeltaByteArrayReader` (DELTA_BYTE_ARRAY) - prefix+suffix string encoding. + * - `VectorizedDeltaLengthByteArrayReader` (DELTA_LENGTH_BYTE_ARRAY) - length-prefixed + * binary encoding. + * + * Coverage is intentionally broad - all three readers and their primary read/skip paths + * are included so the benchmark suite catches regressions across the full delta-decode + * surface, not just paths with an active optimization candidate. + * + * Groups: + * A. DELTA_BINARY_PACKED INT32 - readIntegers / skipIntegers across value distributions. + * B. DELTA_BINARY_PACKED INT64 - readLongs / skipLongs across value distributions. + * C. DELTA_BYTE_ARRAY - readBinary / skipBinary across prefix-overlap ratios. + * D. DELTA_LENGTH_BYTE_ARRAY - readBinary / skipBinary across payload sizes. + * E. Variant reads - byte/short/unsigned bulk variants of DELTA_BINARY_PACKED, single- + * value reads on all three readers, and skipBytes / skipShorts. + * + * To run this benchmark: + * {{{ + * 1. build/sbt "sql/Test/runMain " + * 2. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results in "benchmarks/VectorizedDeltaReaderBenchmark-results.txt". + * 3. GHA: `Run benchmarks` workflow, class = `*VectorizedDeltaReader*`. + * }}} + */ +object VectorizedDeltaReaderBenchmark extends BenchmarkBase { + + private val NUM_ROWS = 1024 * 1024 + private val NUM_ITERS = 5 + private val BLOCK_SIZE = 128 + private val MINI_BLOCK_NUM = 4 + private val PAGE_SIZE = 64 * 1024 + + // --------------- Group A: DELTA_BINARY_PACKED INT32 --------------- + + private def encodeDeltaInts(values: Array[Int]): Array[Byte] = { + val writer = new DeltaBinaryPackingValuesWriterForInteger( + BLOCK_SIZE, MINI_BLOCK_NUM, PAGE_SIZE, PAGE_SIZE, new DirectByteBufferAllocator) + var i = 0 + while (i < values.length) { writer.writeInteger(values(i)); i += 1 } + val out = writer.getBytes.toByteArray + writer.close() + out + } + + private def runDeltaIntBenchmark(): Unit = { + val benchmark = new Benchmark( + "DELTA_BINARY_PACKED INT32", NUM_ROWS.toLong, NUM_ITERS, output = output) + val vec = new OnHeapColumnVector(NUM_ROWS, IntegerType) + + val rng = new Random(42) + val distributions: Seq[(String, Array[Int])] = Seq( + "constant" -> Array.fill(NUM_ROWS)(0), + "monotonic" -> Array.tabulate(NUM_ROWS)(identity), + "small-delta random" -> Array.tabulate(NUM_ROWS)(_ => rng.nextInt(256)), + "wide random" -> Array.tabulate(NUM_ROWS)(_ => rng.nextInt()) + ) + + distributions.foreach { case (tag, values) => + val bytes = encodeDeltaInts(values) + + // Pre-warm. + val warm = new VectorizedDeltaBinaryPackedReader + warm.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + warm.readIntegers(NUM_ROWS, vec, 0) + + benchmark.addCase(s"readIntegers, $tag") { _ => + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.readIntegers(NUM_ROWS, vec, 0) + } + + benchmark.addCase(s"skipIntegers, $tag") { _ => + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.skipIntegers(NUM_ROWS) + } + } + benchmark.run() + } + + // --------------- Group B: DELTA_BINARY_PACKED INT64 --------------- + + private def encodeDeltaLongs(values: Array[Long]): Array[Byte] = { + val writer = new DeltaBinaryPackingValuesWriterForLong( + BLOCK_SIZE, MINI_BLOCK_NUM, PAGE_SIZE, PAGE_SIZE, new DirectByteBufferAllocator) + var i = 0 + while (i < values.length) { writer.writeLong(values(i)); i += 1 } + val out = writer.getBytes.toByteArray + writer.close() + out + } + + private def runDeltaLongBenchmark(): Unit = { + val benchmark = new Benchmark( + "DELTA_BINARY_PACKED INT64", NUM_ROWS.toLong, NUM_ITERS, output = output) + val vec = new OnHeapColumnVector(NUM_ROWS, LongType) + + val rng = new Random(42) + val distributions: Seq[(String, Array[Long])] = Seq( + "constant" -> Array.fill(NUM_ROWS)(0L), + "monotonic" -> Array.tabulate(NUM_ROWS)(_.toLong), + "small-delta random" -> Array.tabulate(NUM_ROWS)(_ => rng.nextInt(256).toLong), + "wide random" -> Array.tabulate(NUM_ROWS)(_ => rng.nextLong()) + ) + + distributions.foreach { case (tag, values) => + val bytes = encodeDeltaLongs(values) + + val warm = new VectorizedDeltaBinaryPackedReader + warm.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + warm.readLongs(NUM_ROWS, vec, 0) + + benchmark.addCase(s"readLongs, $tag") { _ => + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.readLongs(NUM_ROWS, vec, 0) + } + + benchmark.addCase(s"skipLongs, $tag") { _ => + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.skipLongs(NUM_ROWS) + } + } + benchmark.run() + } + + // --------------- Group C: DELTA_BYTE_ARRAY --------------- + + private def encodeDeltaByteArray(values: Array[Array[Byte]]): Array[Byte] = { + val writer = new DeltaByteArrayWriter(PAGE_SIZE, PAGE_SIZE, new DirectByteBufferAllocator) + var i = 0 + while (i < values.length) { writer.writeBytes(Binary.fromConstantByteArray(values(i))); i += 1 } + val out = writer.getBytes.toByteArray + writer.close() + out + } + + /** + * Generate string-like binaries with `prefixOverlap` chars shared with the previous value + * and a random `suffixLen` of new bytes appended. Produces realistic delta-encoder input. + */ + private def generateDeltaByteArrayValues( + count: Int, prefixOverlap: Int, suffixLen: Int): Array[Array[Byte]] = { + val rng = new Random(42) + val values = new Array[Array[Byte]](count) + var prev: Array[Byte] = new Array[Byte](0) + var i = 0 + while (i < count) { + val keep = math.min(prefixOverlap, prev.length) + val v = new Array[Byte](keep + suffixLen) + System.arraycopy(prev, 0, v, 0, keep) + var j = 0 + while (j < suffixLen) { v(keep + j) = rng.nextInt().toByte; j += 1 } + values(i) = v + prev = v + i += 1 + } + values + } + + private def runDeltaByteArrayBenchmark(): Unit = { + val benchmark = new Benchmark( + "DELTA_BYTE_ARRAY", NUM_ROWS.toLong, NUM_ITERS, output = output) + val vec = new OnHeapColumnVector(NUM_ROWS, BinaryType) + + val shapes: Seq[(String, Int, Int)] = Seq( + ("no overlap, len=16", 0, 16), + ("half overlap, len=16", 8, 16), + ("full overlap, len=16", 16, 16), + ("half overlap, len=64", 32, 64) + ) + + shapes.foreach { case (tag, prefix, suffix) => + val values = generateDeltaByteArrayValues(NUM_ROWS, prefix, suffix) + val bytes = encodeDeltaByteArray(values) + + val warm = ParquetTestAccess.newDeltaByteArrayReader() + warm.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + vec.reset() + warm.readBinary(NUM_ROWS, vec, 0) + + benchmark.addCase(s"readBinary, $tag") { _ => + val r = ParquetTestAccess.newDeltaByteArrayReader() + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + vec.reset() // clear arrayData so payloads do not accumulate across iterations + r.readBinary(NUM_ROWS, vec, 0) + } + + benchmark.addCase(s"skipBinary, $tag") { _ => + val r = ParquetTestAccess.newDeltaByteArrayReader() + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.skipBinary(NUM_ROWS) + } + } + benchmark.run() + } + + // --------------- Group D: DELTA_LENGTH_BYTE_ARRAY --------------- + + private def encodeDeltaLengthByteArray(values: Array[Array[Byte]]): Array[Byte] = { + val writer = new DeltaLengthByteArrayValuesWriter( + PAGE_SIZE, PAGE_SIZE, new DirectByteBufferAllocator) + var i = 0 + while (i < values.length) { writer.writeBytes(Binary.fromConstantByteArray(values(i))); i += 1 } + val out = writer.getBytes.toByteArray + writer.close() + out + } + + private def runDeltaLengthByteArrayBenchmark(): Unit = { + val benchmark = new Benchmark( + "DELTA_LENGTH_BYTE_ARRAY", NUM_ROWS.toLong, NUM_ITERS, output = output) + val vec = new OnHeapColumnVector(NUM_ROWS, BinaryType) + val rng = new Random(42) + + val payloadSizes = Seq(8, 32, 128, 512) + + payloadSizes.foreach { len => + val values = Array.tabulate(NUM_ROWS) { _ => + val v = new Array[Byte](len) + rng.nextBytes(v) + v + } + val bytes = encodeDeltaLengthByteArray(values) + + val warm = ParquetTestAccess.newDeltaLengthByteArrayReader() + warm.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + vec.reset() + warm.readBinary(NUM_ROWS, vec, 0) + + benchmark.addCase(s"readBinary, payloadLen=$len") { _ => + val r = ParquetTestAccess.newDeltaLengthByteArrayReader() + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + vec.reset() + r.readBinary(NUM_ROWS, vec, 0) + } + + benchmark.addCase(s"skipBinary, payloadLen=$len") { _ => + val r = ParquetTestAccess.newDeltaLengthByteArrayReader() + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r.skipBinary(NUM_ROWS) + } + } + benchmark.run() + } + + // --------------- Group E: variant reads --------------- + + private def runVariantReadsBenchmark(): Unit = { + val benchmark = new Benchmark( + "Variant reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val rng = new Random(42) + val intValues = Array.tabulate(NUM_ROWS)(_ => rng.nextInt()) + val longValues = Array.tabulate(NUM_ROWS)(_ => rng.nextLong()) + val intBytes = encodeDeltaInts(intValues) + val longBytes = encodeDeltaLongs(longValues) + + val byteVec = new OnHeapColumnVector(NUM_ROWS, ByteType) + val shortVec = new OnHeapColumnVector(NUM_ROWS, ShortType) + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val unsignedLongVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(20, 0)) + + def newIntReader(): VectorizedDeltaBinaryPackedReader = { + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(intBytes))) + r + } + def newLongReader(): VectorizedDeltaBinaryPackedReader = { + val r = new VectorizedDeltaBinaryPackedReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(longBytes))) + r + } + + // Bulk byte/short downcasts and unsigned conversions of DELTA_BINARY_PACKED data. + benchmark.addCase("readBytes (INT32)") { _ => newIntReader().readBytes(NUM_ROWS, byteVec, 0) } + benchmark.addCase("readShorts (INT32)") { _ => + newIntReader().readShorts(NUM_ROWS, shortVec, 0) + } + benchmark.addCase("readUnsignedIntegers (INT32 -> Long)") { _ => + newIntReader().readUnsignedIntegers(NUM_ROWS, longVec, 0) + } + benchmark.addCase("readUnsignedLongs (INT64 -> Decimal(20,0))") { _ => + newLongReader().readUnsignedLongs(NUM_ROWS, unsignedLongVec, 0) + } + benchmark.addCase("skipBytes") { _ => newIntReader().skipBytes(NUM_ROWS) } + benchmark.addCase("skipShorts") { _ => newIntReader().skipShorts(NUM_ROWS) } + + // Per-call overhead of single-value reads. + benchmark.addCase("readByte (INT32 single-value)") { _ => + val r = newIntReader() + var i = 0; while (i < NUM_ROWS) { r.readByte(); i += 1 } + } + benchmark.addCase("readShort (INT32 single-value)") { _ => + val r = newIntReader() + var i = 0; while (i < NUM_ROWS) { r.readShort(); i += 1 } + } + benchmark.addCase("readInteger (INT32 single-value)") { _ => + val r = newIntReader() + var i = 0; while (i < NUM_ROWS) { r.readInteger(); i += 1 } + } + benchmark.addCase("readLong (INT64 single-value)") { _ => + val r = newLongReader() + var i = 0; while (i < NUM_ROWS) { r.readLong(); i += 1 } + } + + // Single-value readBinary(len) on DeltaByteArrayReader. Each call returns the + // next decoded value reconstructed from prefix/suffix state. + val binValues = generateDeltaByteArrayValues(NUM_ROWS, prefixOverlap = 8, suffixLen = 16) + val binBytes = encodeDeltaByteArray(binValues) + val perRowLen = 8 + 16 // matches the values produced by generateDeltaByteArrayValues + benchmark.addCase("readBinary(len) (DELTA_BYTE_ARRAY single-value)") { _ => + val r = ParquetTestAccess.newDeltaByteArrayReader() + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(binBytes))) + var i = 0; while (i < NUM_ROWS) { r.readBinary(perRowLen); i += 1 } + } + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("DELTA_BINARY_PACKED INT32") { runDeltaIntBenchmark() } + runBenchmark("DELTA_BINARY_PACKED INT64") { runDeltaLongBenchmark() } + runBenchmark("DELTA_BYTE_ARRAY") { runDeltaByteArrayBenchmark() } + runBenchmark("DELTA_LENGTH_BYTE_ARRAY") { runDeltaLengthByteArrayBenchmark() } + runBenchmark("Variant reads") { runVariantReadsBenchmark() } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReaderBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReaderBenchmark.scala new file mode 100644 index 0000000000000..0b86656b828d6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReaderBenchmark.scala @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.parquet.bytes.ByteBufferInputStream + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.types._ + +/** + * Low-level benchmark for `VectorizedPlainValuesReader`. Measures every public + * read/skip method, including paths that are already memcpy-optimal, so the + * benchmark suite tracks the long-term performance baseline of the PLAIN decode + * surface and is ready for future iterative optimization without first having + * to add coverage. + * + * Groups: + * A. Fixed-size bulk reads - readBooleans, readBytes, readShorts, readIntegers, + * readLongs, readFloats, readDoubles. These are at memcpy speed today; + * kept in the benchmark as regression tracking. + * B. Conversion bulk reads - readUnsignedIntegers, readUnsignedLongs, + * readIntegersWithRebase, readLongsWithRebase. Per-row loops with + * conversion; potential P3-style optimization candidates. + * C. Variable-length - readBinary(total, v, rowId), per-row slice + length + * decode pattern. + * D. Single-value reads - readBoolean, readByte, readShort, readInteger, + * readLong, readFloat, readDouble looped NUM_ROWS times. Measures per-call + * overhead. + * E. Skip paths - skipBooleans, skipBytes, skipShorts, skipIntegers, skipLongs, + * skipFloats, skipDoubles, skipBinary, skipFixedLenByteArray. + * + * To run this benchmark: + * {{{ + * 1. build/sbt "sql/Test/runMain " + * 2. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results in "benchmarks/VectorizedPlainValuesReaderBenchmark-results.txt". + * 3. GHA: `Run benchmarks` workflow, class = `*VectorizedPlainValuesReader*`. + * }}} + */ +object VectorizedPlainValuesReaderBenchmark extends BenchmarkBase { + + private val NUM_ROWS = 1024 * 1024 + private val NUM_ITERS = 5 + + // Volatile sink to prevent JIT from eliminating skip-only benchmark bodies. Trivial + // skip implementations are just `in.skip(N)` (a position increment); without an + // observable side effect, escape analysis and dead-code elimination can collapse the + // entire `newReader(...).skipX(...)` body to nothing, producing meaningless Infinity + // rates. Each skip case reads one byte after the skip and accumulates into `sink` + // to anchor the work. + @volatile private var sink: Long = 0L + + // Append 8 trailing 0x00 bytes so the post-skip anchoring `readLong()` always has + // data to consume even when the skip exhausts the original payload. Plain + // single-value reads internally pull 4-8 bytes (e.g., readByte reads via readInteger, + // readLong reads 8), so the pad must cover the largest such read. + private val SINK_PAD_BYTES = 8 + private def withTailPad(bytes: Array[Byte]): Array[Byte] = { + val out = new Array[Byte](bytes.length + SINK_PAD_BYTES) + System.arraycopy(bytes, 0, out, 0, bytes.length) + out + } + + // --------------- PLAIN-encoded byte producers --------------- + + private def plainIntBytes(count: Int)(f: Int => Int): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 4).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putInt(f(i)); i += 1 } + buf.array() + } + + private def plainLongBytes(count: Int)(f: Int => Long): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putLong(f(i)); i += 1 } + buf.array() + } + + private def plainFloatBytes(count: Int)(f: Int => Float): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 4).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putFloat(f(i)); i += 1 } + buf.array() + } + + private def plainDoubleBytes(count: Int)(f: Int => Double): Array[Byte] = { + val buf = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < count) { buf.putDouble(f(i)); i += 1 } + buf.array() + } + + // PLAIN booleans are bit-packed: 8 booleans per byte. + private def plainBooleanBytes(count: Int): Array[Byte] = { + val byteCount = (count + 7) / 8 + val out = new Array[Byte](byteCount) + var i = 0 + while (i < count) { + if ((i & 1) == 0) out(i / 8) = (out(i / 8) | (1 << (i % 8))).toByte + i += 1 + } + out + } + + // 4-byte length prefix + payload, repeated. + private def plainBinaryBytes(count: Int, payloadLen: Int): Array[Byte] = { + val recordLen = 4 + payloadLen + val buf = ByteBuffer.allocate(count * recordLen).order(ByteOrder.LITTLE_ENDIAN) + val payload = new Array[Byte](payloadLen) + var i = 0 + while (i < count) { + buf.putInt(payloadLen) + buf.put(payload) + i += 1 + } + buf.array() + } + + private def newReader(bytes: Array[Byte]): VectorizedPlainValuesReader = { + val r = new VectorizedPlainValuesReader + r.initFromPage(NUM_ROWS, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) + r + } + + /** Adds a case that runs `body` after pre-warming the body once. */ + private def addCase(benchmark: Benchmark, label: String)(body: () => Unit): Unit = { + body() + benchmark.addCase(label) { _ => body() } + } + + // --------------- Group A: fixed-size bulk reads --------------- + + private def runBulkBenchmark(): Unit = { + val benchmark = new Benchmark( + "Fixed-size bulk reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val intVec = new OnHeapColumnVector(NUM_ROWS, IntegerType) + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val floatVec = new OnHeapColumnVector(NUM_ROWS, FloatType) + val doubleVec = new OnHeapColumnVector(NUM_ROWS, DoubleType) + val boolVec = new OnHeapColumnVector(NUM_ROWS, BooleanType) + val byteVec = new OnHeapColumnVector(NUM_ROWS, ByteType) + val shortVec = new OnHeapColumnVector(NUM_ROWS, ShortType) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + val floatBytes = plainFloatBytes(NUM_ROWS)(_.toFloat) + val doubleBytes = plainDoubleBytes(NUM_ROWS)(_.toDouble) + val boolBytes = plainBooleanBytes(NUM_ROWS) + + addCase(benchmark, "readBooleans") { () => + newReader(boolBytes).readBooleans(NUM_ROWS, boolVec, 0) + } + addCase(benchmark, "readBytes") { () => + newReader(intBytes).readBytes(NUM_ROWS, byteVec, 0) + } + addCase(benchmark, "readShorts") { () => + newReader(intBytes).readShorts(NUM_ROWS, shortVec, 0) + } + addCase(benchmark, "readIntegers") { () => + newReader(intBytes).readIntegers(NUM_ROWS, intVec, 0) + } + addCase(benchmark, "readLongs") { () => + newReader(longBytes).readLongs(NUM_ROWS, longVec, 0) + } + addCase(benchmark, "readFloats") { () => + newReader(floatBytes).readFloats(NUM_ROWS, floatVec, 0) + } + addCase(benchmark, "readDoubles") { () => + newReader(doubleBytes).readDoubles(NUM_ROWS, doubleVec, 0) + } + benchmark.run() + } + + // --------------- Group B: conversion bulk reads --------------- + + private def runConversionBenchmark(): Unit = { + val benchmark = new Benchmark( + "Conversion bulk reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val longVec = new OnHeapColumnVector(NUM_ROWS, LongType) + val intVec = new OnHeapColumnVector(NUM_ROWS, IntegerType) + val tsVec = new OnHeapColumnVector(NUM_ROWS, TimestampType) + // readUnsignedLongs stores UINT64 values via putByteArray into arrayData(), so the + // target vector must be a byte-array decimal (precision > 18) for arrayData to be + // allocated. Decimal(20, 0) is the canonical UINT64 mapping in production. + val uint64DecVec = new OnHeapColumnVector(NUM_ROWS, DecimalType(20, 0)) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + + // UINT32 -> Long via readUnsignedIntegers (per-row loop in the reader). + addCase(benchmark, "readUnsignedIntegers") { () => + newReader(intBytes).readUnsignedIntegers(NUM_ROWS, longVec, 0) + } + + // UINT64 -> Decimal(20, 0) via readUnsignedLongs (per-row loop). + addCase(benchmark, "readUnsignedLongs") { () => + uint64DecVec.reset() // clear arrayData so payloads do not accumulate across iterations + newReader(longBytes).readUnsignedLongs(NUM_ROWS, uint64DecVec, 0) + } + + // DATE legacy rebase. Use values that don't trigger rebase to measure the no-rebase + // fast path; failIfRebase=false is the common production setting. + val safeDateBytes = plainIntBytes(NUM_ROWS)(_ => 18000) + addCase(benchmark, "readIntegersWithRebase, no rebase needed") { () => + newReader(safeDateBytes).readIntegersWithRebase(NUM_ROWS, intVec, 0, false) + } + + // TIMESTAMP_MICROS legacy rebase, no-rebase values. + val safeTsBytes = plainLongBytes(NUM_ROWS)(_ => 1577836800000000L) + addCase(benchmark, "readLongsWithRebase, no rebase needed") { () => + newReader(safeTsBytes).readLongsWithRebase(NUM_ROWS, tsVec, 0, false, "UTC") + } + + benchmark.run() + } + + // --------------- Group C: variable-length reads --------------- + + private def runVariableLengthBenchmark(): Unit = { + val benchmark = new Benchmark( + "Variable-length reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val binaryVec = new OnHeapColumnVector(NUM_ROWS, BinaryType) + + Seq(8, 32, 128, 512).foreach { payloadLen => + val bytes = plainBinaryBytes(NUM_ROWS, payloadLen) + addCase(benchmark, s"readBinary, payloadLen=$payloadLen") { () => + binaryVec.reset() // clear arrayData so payloads do not accumulate across iterations + newReader(bytes).readBinary(NUM_ROWS, binaryVec, 0) + } + } + benchmark.run() + } + + // --------------- Group D: single-value reads --------------- + + private def runSingleValueBenchmark(): Unit = { + val benchmark = new Benchmark( + "Single-value reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + val intBytes = plainIntBytes(NUM_ROWS)(i => i) + val longBytes = plainLongBytes(NUM_ROWS)(_.toLong) + val floatBytes = plainFloatBytes(NUM_ROWS)(_.toFloat) + val doubleBytes = plainDoubleBytes(NUM_ROWS)(_.toDouble) + val boolBytes = plainBooleanBytes(NUM_ROWS) + + addCase(benchmark, "readBoolean") { () => + val r = newReader(boolBytes) + var j = 0 + while (j < NUM_ROWS) { r.readBoolean(); j += 1 } + } + addCase(benchmark, "readByte") { () => + val r = newReader(intBytes) + var j = 0 + while (j < NUM_ROWS) { r.readByte(); j += 1 } + } + addCase(benchmark, "readShort") { () => + val r = newReader(intBytes) + var j = 0 + while (j < NUM_ROWS) { r.readShort(); j += 1 } + } + addCase(benchmark, "readInteger") { () => + val r = newReader(intBytes) + var j = 0 + while (j < NUM_ROWS) { r.readInteger(); j += 1 } + } + addCase(benchmark, "readLong") { () => + val r = newReader(longBytes) + var j = 0 + while (j < NUM_ROWS) { r.readLong(); j += 1 } + } + addCase(benchmark, "readFloat") { () => + val r = newReader(floatBytes) + var j = 0 + while (j < NUM_ROWS) { r.readFloat(); j += 1 } + } + addCase(benchmark, "readDouble") { () => + val r = newReader(doubleBytes) + var j = 0 + while (j < NUM_ROWS) { r.readDouble(); j += 1 } + } + benchmark.run() + } + + // --------------- Group E: skip paths --------------- + + private def runSkipBenchmark(): Unit = { + val benchmark = new Benchmark( + "Skip", NUM_ROWS.toLong, NUM_ITERS, output = output) + + // Pad each input by 1 byte so the post-skip readByte() anchors a side effect + // and prevents JIT from eliding the skip body. + val intBytes = withTailPad(plainIntBytes(NUM_ROWS)(i => i)) + val longBytes = withTailPad(plainLongBytes(NUM_ROWS)(_.toLong)) + val floatBytes = withTailPad(plainFloatBytes(NUM_ROWS)(_.toFloat)) + val doubleBytes = withTailPad(plainDoubleBytes(NUM_ROWS)(_.toDouble)) + val boolBytes = withTailPad(plainBooleanBytes(NUM_ROWS)) + + Seq(8, 32, 128, 512).foreach { payloadLen => + val bytes = withTailPad(plainBinaryBytes(NUM_ROWS, payloadLen)) + addCase(benchmark, s"skipBinary, payloadLen=$payloadLen") { () => + val r = newReader(bytes) + r.skipBinary(NUM_ROWS) + sink += r.readLong() + } + } + + Seq(4, 16, 64).foreach { len => + val bytes = withTailPad(new Array[Byte](NUM_ROWS * len)) + addCase(benchmark, s"skipFixedLenByteArray, len=$len") { () => + val r = newReader(bytes) + r.skipFixedLenByteArray(NUM_ROWS, len) + sink += r.readLong() + } + } + + addCase(benchmark, "skipBooleans") { () => + val r = newReader(boolBytes) + r.skipBooleans(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipBytes") { () => + val r = newReader(intBytes) + r.skipBytes(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipShorts") { () => + val r = newReader(intBytes) + r.skipShorts(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipIntegers") { () => + val r = newReader(intBytes) + r.skipIntegers(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipLongs") { () => + val r = newReader(longBytes) + r.skipLongs(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipFloats") { () => + val r = newReader(floatBytes) + r.skipFloats(NUM_ROWS) + sink += r.readLong() + } + addCase(benchmark, "skipDoubles") { () => + val r = newReader(doubleBytes) + r.skipDoubles(NUM_ROWS) + sink += r.readLong() + } + + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Fixed-size bulk reads") { runBulkBenchmark() } + runBenchmark("Conversion bulk reads") { runConversionBenchmark() } + runBenchmark("Variable-length reads") { runVariableLengthBenchmark() } + runBenchmark("Single-value reads") { runSingleValueBenchmark() } + runBenchmark("Skip") { runSkipBenchmark() } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderBenchmark.scala index 008b18cdcab16..6f99633a6a354 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderBenchmark.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer +import java.util.PrimitiveIterator +import java.util.stream.LongStream import scala.util.Random @@ -40,6 +42,20 @@ import org.apache.spark.sql.types.{BooleanType, IntegerType} * path used when the caller needs materialized definition levels (e.g., nested columns). * D. readBatch nullable without def-level materialization -- the `readBatchInternal` path used * for flat nullable columns where only null/non-null disposition matters. + * E. readBatch with row-index filtering -- exercises the with-filter code path through + * `readBatchInternal{WithDefLevels}`'s range checks when Parquet column-index filtering is + * active. Sweep over two filter shapes (single contiguous range, alternating windows) and + * the same null ratios as C/D. + * F. Single-value reads -- per-call overhead of `readBoolean`, `readInteger`, + * `readValueDictionaryId` looped NUM_ROWS times. Establishes baseline against the bulk + * Group A/B paths. + * G. skipBooleans / skipIntegers -- forward-skip path used by row-index filtering and + * pushdown. RLE + PACKED across the same parameter sweeps as A/B. + * + * Not yet covered (deferred): `readBatchRepeated` and `readIntegersRepeated` for nested + * columns require setting up a `ParquetReadState` with `maxRepetitionLevel > 0`, a separate + * def-levels reader, and encoded rep-level streams; better added together with the matching + * suite-level coverage in a focused follow-up. * * Cold = fresh reader per iteration (exercises cold `currentBuffer` growth). * Reused = reader pre-warmed outside the timed region; inside is only `initFromPage` + read. @@ -68,10 +84,24 @@ object VectorizedRleValuesReaderBenchmark extends BenchmarkBase { // --------------- ReadState helpers (delegate to shared reflection bridge) --------------- private def newReadState(maxDef: Int, valuesInPage: Int): AnyRef = { - val state = ParquetReadStateTestAccess.newState( + val state = ParquetTestAccess.newState( intColumnDescriptor(maxDef), maxDef == 0) - ParquetReadStateTestAccess.resetForNewBatch(state, BATCH_SIZE) - ParquetReadStateTestAccess.resetForNewPage(state, valuesInPage, 0L) + ParquetTestAccess.resetForNewBatch(state, BATCH_SIZE) + ParquetTestAccess.resetForNewPage(state, valuesInPage, 0L) + state + } + + // State variant with a fresh row-index iterator. `rowRanges` inside ParquetReadState is + // iterated forward and never reset, so Group E measurements must construct a new state per + // benchmark iteration. The iterator is built from `indexFactory` on each call. + private def newReadStateWithRowIndexes( + maxDef: Int, + valuesInPage: Int, + indexFactory: () => PrimitiveIterator.OfLong): AnyRef = { + val state = ParquetTestAccess.newState( + intColumnDescriptor(maxDef), maxDef == 0, indexFactory()) + ParquetTestAccess.resetForNewBatch(state, BATCH_SIZE) + ParquetTestAccess.resetForNewPage(state, valuesInPage, 0L) state } @@ -258,7 +288,7 @@ object VectorizedRleValuesReaderBenchmark extends BenchmarkBase { benchmark.addCase( f"nullRatio=${nullRatio}%.1f, $clusterTag") { _ => reader.initFromPage(NUM_ROWS, toInputStream(bytes)) - ParquetReadStateTestAccess.resetForNewPage( + ParquetTestAccess.resetForNewPage( state, NUM_ROWS, 0L) runBatches(reader, state, values, defLevelsVec, factory()) } @@ -277,13 +307,188 @@ object VectorizedRleValuesReaderBenchmark extends BenchmarkBase { var produced = 0 while (produced < NUM_ROWS) { val toRead = math.min(BATCH_SIZE, NUM_ROWS - produced) - ParquetReadStateTestAccess.resetForNewBatch(state, toRead) - ParquetReadStateTestAccess.readBatch( + ParquetTestAccess.resetForNewBatch(state, toRead) + ParquetTestAccess.readBatch( reader, state, values, defLevelsVec, valueReader, integerUpdater) produced += toRead } } + // --------------- Group E: readBatch with row-index filtering --------------- + + /** Kept row indices: contiguous range `[keptStart, keptStart + keptCount)`. */ + private def contiguousIndexFactory( + keptStart: Long, + keptCount: Long): () => PrimitiveIterator.OfLong = + () => LongStream.range(keptStart, keptStart + keptCount).iterator() + + /** Kept row indices: every other `windowSize`-row window starting at index 0. */ + private def alternatingWindowsIndexFactory( + totalRows: Long, + windowSize: Long): () => PrimitiveIterator.OfLong = { () => + new PrimitiveIterator.OfLong { + private var i = 0L + override def hasNext: Boolean = i < totalRows + override def nextLong(): Long = { + // Advance through only the "kept" windows. Window k in [k*windowSize, (k+1)*windowSize) + // is kept when k is even. + while ((i / windowSize) % 2 != 0 && i < totalRows) i += 1 + if (i >= totalRows) throw new NoSuchElementException + val out = i + i += 1 + out + } + } + } + + private def runRowRangeFilterBenchmark( + label: String, + buildValueReader: Int => ValueReaderFactory, + materializeDefLevels: Boolean): Unit = { + val benchmark = new Benchmark( + label, NUM_ROWS.toLong, NUM_ITERS, output = output) + val values = new OnHeapColumnVector(NUM_ROWS, IntegerType) + val defLevelsVec: WritableColumnVector = + if (materializeDefLevels) new OnHeapColumnVector(NUM_ROWS, IntegerType) + else null + + // One contiguous range covering the middle 50% of rows; and alternating 1000-row windows + // (50% kept, but with many skip/read transitions inside each batch). + val filterShapes: Seq[(String, () => PrimitiveIterator.OfLong)] = Seq( + "contiguous 50%" -> contiguousIndexFactory(NUM_ROWS / 4L, NUM_ROWS / 2L), + "alt 1000-row windows" -> alternatingWindowsIndexFactory(NUM_ROWS.toLong, 1000L) + ) + + val nullRatios = Seq(0.0, 0.3, 0.9) + + filterShapes.foreach { case (shapeTag, indexFactory) => + nullRatios.foreach { nullRatio => + val defLevels = + packedFriendlyDefLevels(NUM_ROWS, nullRatio, clustered = false) + val nonNullCount = defLevels.count(_ == 1) + val bytes = encodeRle(defLevels, bitWidth = 1) + val factory = buildValueReader(nonNullCount) + + // Pre-warm the full pipeline with a fresh state so JIT has seen the with-filter path. + val reader = new VectorizedRleValuesReader(1, false) + reader.initFromPage(NUM_ROWS, toInputStream(bytes)) + val warmState = + newReadStateWithRowIndexes(maxDef = 1, valuesInPage = NUM_ROWS, indexFactory) + runBatches(reader, warmState, values, defLevelsVec, factory()) + + benchmark.addCase( + f"nullRatio=${nullRatio}%.1f, $shapeTag") { _ => + reader.initFromPage(NUM_ROWS, toInputStream(bytes)) + // `rowRanges` in ParquetReadState is iterated forward and not reset by + // resetForNewPage/Batch, so we must construct a fresh state per measurement + // iteration. Iterator construction cost is small compared to decoding NUM_ROWS. + val state = + newReadStateWithRowIndexes(maxDef = 1, valuesInPage = NUM_ROWS, indexFactory) + runBatches(reader, state, values, defLevelsVec, factory()) + } + } + } + benchmark.run() + } + + // --------------- Group F: single-value reads --------------- + + private def runSingleValueBenchmark(): Unit = { + val benchmark = new Benchmark( + "Single-value reads", NUM_ROWS.toLong, NUM_ITERS, output = output) + + // Boolean - bitWidth=1, alternating values (forces PACKED). + val boolBytes = encodeRle(packedFriendlyBooleans(NUM_ROWS, 0.5), bitWidth = 1) + val boolWarm = new VectorizedRleValuesReader(1, false) + boolWarm.initFromPage(NUM_ROWS, toInputStream(boolBytes)) + var i = 0 + while (i < NUM_ROWS) { boolWarm.readBoolean(); i += 1 } + + benchmark.addCase("readBoolean") { _ => + val r = new VectorizedRleValuesReader(1, false) + r.initFromPage(NUM_ROWS, toInputStream(boolBytes)) + var j = 0 + while (j < NUM_ROWS) { r.readBoolean(); j += 1 } + } + + // readInteger / readValueDictionaryId across bitWidths. Use random PACKED data so each + // call reads a fresh value (RLE-only data would short-circuit too aggressively). + Seq(4, 8, 12, 20).foreach { bitWidth => + val bytes = encodeRle(packedFriendlyDictIds(NUM_ROWS, bitWidth), bitWidth) + + val warmInt = new VectorizedRleValuesReader(bitWidth, false) + warmInt.initFromPage(NUM_ROWS, toInputStream(bytes)) + var k = 0 + while (k < NUM_ROWS) { warmInt.readInteger(); k += 1 } + + benchmark.addCase(s"readInteger, bitWidth=$bitWidth") { _ => + val r = new VectorizedRleValuesReader(bitWidth, false) + r.initFromPage(NUM_ROWS, toInputStream(bytes)) + var j = 0 + while (j < NUM_ROWS) { r.readInteger(); j += 1 } + } + + benchmark.addCase(s"readValueDictionaryId, bitWidth=$bitWidth") { _ => + val r = new VectorizedRleValuesReader(bitWidth, false) + r.initFromPage(NUM_ROWS, toInputStream(bytes)) + var j = 0 + while (j < NUM_ROWS) { r.readValueDictionaryId(); j += 1 } + } + } + benchmark.run() + } + + // --------------- Group G: skip paths --------------- + + private def runSkipBenchmark(): Unit = { + val benchmark = new Benchmark( + "Skip", NUM_ROWS.toLong, NUM_ITERS, output = output) + + // skipBooleans across the same true-ratio sweep as Group A. + Seq(0.0, 0.5, 1.0).foreach { trueRatio => + val bytes = encodeRle( + packedFriendlyBooleans(NUM_ROWS, trueRatio), bitWidth = 1) + + val warm = new VectorizedRleValuesReader(1, false) + warm.initFromPage(NUM_ROWS, toInputStream(bytes)) + warm.skipBooleans(NUM_ROWS) + + benchmark.addCase(f"skipBooleans, trueRatio=${trueRatio}%.1f") { _ => + val r = new VectorizedRleValuesReader(1, false) + r.initFromPage(NUM_ROWS, toInputStream(bytes)) + r.skipBooleans(NUM_ROWS) + } + } + + // skipIntegers across the same bitWidth sweep as Group B; PACKED + RLE shapes. + Seq(4, 8, 12, 20).foreach { bitWidth => + val packedBytes = encodeRle( + packedFriendlyDictIds(NUM_ROWS, bitWidth), bitWidth) + val rleBytes = encodeRle(Array.fill(NUM_ROWS)(0), bitWidth) + + val warmPacked = new VectorizedRleValuesReader(bitWidth, false) + warmPacked.initFromPage(NUM_ROWS, toInputStream(packedBytes)) + warmPacked.skipIntegers(NUM_ROWS) + + benchmark.addCase(s"skipIntegers PACKED, bitWidth=$bitWidth") { _ => + val r = new VectorizedRleValuesReader(bitWidth, false) + r.initFromPage(NUM_ROWS, toInputStream(packedBytes)) + r.skipIntegers(NUM_ROWS) + } + + val warmRle = new VectorizedRleValuesReader(bitWidth, false) + warmRle.initFromPage(NUM_ROWS, toInputStream(rleBytes)) + warmRle.skipIntegers(NUM_ROWS) + + benchmark.addCase(s"skipIntegers RLE, bitWidth=$bitWidth") { _ => + val r = new VectorizedRleValuesReader(bitWidth, false) + r.initFromPage(NUM_ROWS, toInputStream(rleBytes)) + r.skipIntegers(NUM_ROWS) + } + } + benchmark.run() + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("Boolean decode") { runBooleanBenchmark() @@ -303,5 +508,23 @@ object VectorizedRleValuesReaderBenchmark extends BenchmarkBase { plainIntFactory, materializeDefLevels = false) } + runBenchmark("Nullable batch decode with row-index filtering (with def-levels)") { + runRowRangeFilterBenchmark( + "Nullable batch with def-levels, row-index filtered", + plainIntFactory, + materializeDefLevels = true) + } + runBenchmark("Nullable batch decode with row-index filtering (without def-levels)") { + runRowRangeFilterBenchmark( + "Nullable batch without def-levels, row-index filtered", + plainIntFactory, + materializeDefLevels = false) + } + runBenchmark("Single-value reads") { + runSingleValueBenchmark() + } + runBenchmark("Skip") { + runSkipBenchmark() + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderSuite.scala index ae3dba15e3764..a0c1f59d29655 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReaderSuite.scala @@ -158,8 +158,8 @@ private object VectorizedRleValuesReaderSuite { val valueReader = new VectorizedPlainValuesReader valueReader.initFromPage( nonNullCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainBytes))) - val state = ParquetReadStateTestAccess.newState(intColumnDescriptor(maxDef), maxDef == 0) - ParquetReadStateTestAccess.resetForNewPage(state, n, 0L) + val state = ParquetTestAccess.newState(intColumnDescriptor(maxDef), maxDef == 0) + ParquetTestAccess.resetForNewPage(state, n, 0L) var produced = 0 var expectedValueIdx = 0 @@ -167,9 +167,9 @@ private object VectorizedRleValuesReaderSuite { val toRead = math.min(batchSize, n - produced) val values = new OnHeapColumnVector(toRead, IntegerType) val defLevelsVec = new OnHeapColumnVector(toRead, IntegerType) - ParquetReadStateTestAccess.resetForNewBatch(state, toRead) + ParquetTestAccess.resetForNewBatch(state, toRead) val defLevelsArg: WritableColumnVector = if (withDefLevels) defLevelsVec else null - ParquetReadStateTestAccess.readBatch( + ParquetTestAccess.readBatch( reader, state, values, defLevelsArg, valueReader, integerUpdater) var expectedNullsInBatch = 0 @@ -223,14 +223,14 @@ private object VectorizedRleValuesReaderSuite { val valueReader = new VectorizedPlainValuesReader valueReader.initFromPage( nonNullCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainBytes))) - val state = ParquetReadStateTestAccess.newState( + val state = ParquetTestAccess.newState( intColumnDescriptor(maxDef), maxDef == 0, longIterator(includedPositions)) - ParquetReadStateTestAccess.resetForNewPage(state, n, 0L) + ParquetTestAccess.resetForNewPage(state, n, 0L) val size = includedPositions.length val values = new OnHeapColumnVector(size, IntegerType) - ParquetReadStateTestAccess.resetForNewBatch(state, size) - ParquetReadStateTestAccess.readBatch(reader, state, values, null, valueReader, integerUpdater) + ParquetTestAccess.resetForNewBatch(state, size) + ParquetTestAccess.readBatch(reader, state, values, null, valueReader, integerUpdater) val prefixNonNulls = defLevels.scanLeft(0) { (c, d) => c + (if (d == maxDef) 1 else 0) @@ -262,7 +262,7 @@ private object VectorizedRleValuesReaderSuite { val bitWidth = if (maxDef == 0) 0 else 32 - Integer.numberOfLeadingZeros(maxDef) val reader = new VectorizedRleValuesReader(bitWidth, false) val state = - ParquetReadStateTestAccess.newState(intColumnDescriptor(maxDef), maxDef == 0) + ParquetTestAccess.newState(intColumnDescriptor(maxDef), maxDef == 0) var pageFirstRow = 0L pages.foreach { pageDefLevels => @@ -275,15 +275,15 @@ private object VectorizedRleValuesReaderSuite { val valueReader = new VectorizedPlainValuesReader valueReader.initFromPage( nonNullCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainBytes))) - ParquetReadStateTestAccess.resetForNewPage(state, pageN, pageFirstRow) + ParquetTestAccess.resetForNewPage(state, pageN, pageFirstRow) var produced = 0 var expectedValueIdx = 0 while (produced < pageN) { val toRead = math.min(batchSize, pageN - produced) val values = new OnHeapColumnVector(toRead, IntegerType) - ParquetReadStateTestAccess.resetForNewBatch(state, toRead) - ParquetReadStateTestAccess.readBatch( + ParquetTestAccess.resetForNewBatch(state, toRead) + ParquetTestAccess.readBatch( reader, state, values, null, valueReader, integerUpdater) var i = 0 From aaf157f66ef42aee83a4231dac8a4a4ebc9832bf Mon Sep 17 00:00:00 2001 From: LuciferYang Date: Tue, 28 Apr 2026 11:40:35 +0000 Subject: [PATCH 2/2] Benchmark results for org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReaderBenchmark (JDK 17, Scala 2.13, split 1 of 1) --- ...orizedRleValuesReaderBenchmark-results.txt | 164 +++++++++++++----- 1 file changed, 118 insertions(+), 46 deletions(-) diff --git a/sql/core/benchmarks/VectorizedRleValuesReaderBenchmark-results.txt b/sql/core/benchmarks/VectorizedRleValuesReaderBenchmark-results.txt index 10def40f3e1e8..749296283bd35 100644 --- a/sql/core/benchmarks/VectorizedRleValuesReaderBenchmark-results.txt +++ b/sql/core/benchmarks/VectorizedRleValuesReaderBenchmark-results.txt @@ -2,81 +2,153 @@ Boolean decode ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor RLE readBooleans decode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -cold reader, trueRatio=0.0 0 0 0 47464.1 0.0 1.0X -reused reader, trueRatio=0.0 0 0 0 47485.6 0.0 1.0X -cold reader, trueRatio=0.1 1 1 0 894.1 1.1 0.0X -reused reader, trueRatio=0.1 1 1 0 897.4 1.1 0.0X -cold reader, trueRatio=0.5 1 1 0 1027.8 1.0 0.0X -reused reader, trueRatio=0.5 1 1 0 1029.3 1.0 0.0X -cold reader, trueRatio=0.9 1 1 0 893.8 1.1 0.0X -reused reader, trueRatio=0.9 1 1 0 896.6 1.1 0.0X -cold reader, trueRatio=1.0 0 0 0 47421.1 0.0 1.0X -reused reader, trueRatio=1.0 0 0 0 47485.6 0.0 1.0X +cold reader, trueRatio=0.0 0 0 0 66239.8 0.0 1.0X +reused reader, trueRatio=0.0 0 0 0 57887.6 0.0 0.9X +cold reader, trueRatio=0.1 1 1 0 893.5 1.1 0.0X +reused reader, trueRatio=0.1 1 1 0 895.6 1.1 0.0X +cold reader, trueRatio=0.5 1 1 0 1018.7 1.0 0.0X +reused reader, trueRatio=0.5 1 1 0 1029.4 1.0 0.0X +cold reader, trueRatio=0.9 1 1 0 891.9 1.1 0.0X +reused reader, trueRatio=0.9 1 1 0 894.8 1.1 0.0X +cold reader, trueRatio=1.0 0 0 0 67001.7 0.0 1.0X +reused reader, trueRatio=1.0 0 0 0 72380.5 0.0 1.1X ================================================================================================ Integer decode ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor RLE readIntegers dictionary-id decode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -PACKED cold, bitWidth=4 2 2 0 488.1 2.0 1.0X -PACKED reused, bitWidth=4 2 2 0 485.9 2.1 1.0X -RLE, bitWidth=4 0 0 0 19116.1 0.1 39.2X -PACKED cold, bitWidth=8 2 2 0 482.1 2.1 1.0X -PACKED reused, bitWidth=8 2 2 0 479.7 2.1 1.0X -RLE, bitWidth=8 0 0 0 18567.1 0.1 38.0X -PACKED cold, bitWidth=12 3 3 0 362.1 2.8 0.7X -PACKED reused, bitWidth=12 3 3 0 361.3 2.8 0.7X -RLE, bitWidth=12 0 0 0 19126.9 0.1 39.2X -PACKED cold, bitWidth=20 3 3 0 308.6 3.2 0.6X -PACKED reused, bitWidth=20 3 3 0 306.5 3.3 0.6X -RLE, bitWidth=20 0 0 0 19074.4 0.1 39.1X +PACKED cold, bitWidth=4 2 2 0 505.6 2.0 1.0X +PACKED reused, bitWidth=4 2 2 0 504.6 2.0 1.0X +RLE, bitWidth=4 0 0 0 18249.1 0.1 36.1X +PACKED cold, bitWidth=8 2 2 0 497.6 2.0 1.0X +PACKED reused, bitWidth=8 2 2 0 496.2 2.0 1.0X +RLE, bitWidth=8 0 0 0 18123.0 0.1 35.8X +PACKED cold, bitWidth=12 3 3 0 370.2 2.7 0.7X +PACKED reused, bitWidth=12 3 3 0 369.6 2.7 0.7X +RLE, bitWidth=12 0 0 0 18573.3 0.1 36.7X +PACKED cold, bitWidth=20 3 3 0 315.1 3.2 0.6X +PACKED reused, bitWidth=20 3 3 0 316.2 3.2 0.6X +RLE, bitWidth=20 0 0 0 18570.0 0.1 36.7X ================================================================================================ Nullable batch decode with def-level materialization ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Nullable batch with def-levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -nullRatio=0.0, n/a 0 0 0 6441.9 0.2 1.0X -nullRatio=0.1, random 9 9 0 113.0 8.9 0.0X -nullRatio=0.1, clustered 7 7 0 144.8 6.9 0.0X -nullRatio=0.3, random 13 13 0 78.9 12.7 0.0X -nullRatio=0.3, clustered 7 7 0 156.6 6.4 0.0X -nullRatio=0.5, random 15 15 0 71.9 13.9 0.0X -nullRatio=0.5, clustered 7 7 0 159.2 6.3 0.0X -nullRatio=0.9, random 8 8 0 124.8 8.0 0.0X -nullRatio=0.9, clustered 6 6 0 171.3 5.8 0.0X -nullRatio=1.0, random 0 0 0 8031.7 0.1 1.2X +nullRatio=0.0, n/a 0 0 0 6431.5 0.2 1.0X +nullRatio=0.1, random 9 9 0 114.6 8.7 0.0X +nullRatio=0.1, clustered 7 7 0 159.7 6.3 0.0X +nullRatio=0.3, random 13 13 0 80.1 12.5 0.0X +nullRatio=0.3, clustered 7 7 1 157.8 6.3 0.0X +nullRatio=0.5, random 14 15 0 72.7 13.7 0.0X +nullRatio=0.5, clustered 6 7 0 162.0 6.2 0.0X +nullRatio=0.9, random 8 8 0 126.4 7.9 0.0X +nullRatio=0.9, clustered 6 6 0 174.0 5.7 0.0X +nullRatio=1.0, random 0 0 0 8062.5 0.1 1.3X ================================================================================================ Nullable batch decode without def-level materialization ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Nullable batch without def-levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -nullRatio=0.0, n/a 0 0 0 11037.9 0.1 1.0X -nullRatio=0.1, random 8 8 0 139.2 7.2 0.0X -nullRatio=0.1, clustered 6 6 0 190.1 5.3 0.0X -nullRatio=0.3, random 11 11 0 96.7 10.3 0.0X -nullRatio=0.3, clustered 6 6 0 188.5 5.3 0.0X -nullRatio=0.5, random 12 12 0 86.9 11.5 0.0X -nullRatio=0.5, clustered 6 6 0 188.0 5.3 0.0X -nullRatio=0.9, random 7 7 0 149.4 6.7 0.0X -nullRatio=0.9, clustered 5 5 0 197.9 5.1 0.0X -nullRatio=1.0, random 0 0 0 11675.8 0.1 1.1X +nullRatio=0.0, n/a 0 0 0 11054.0 0.1 1.0X +nullRatio=0.1, random 7 8 0 140.6 7.1 0.0X +nullRatio=0.1, clustered 5 5 0 193.2 5.2 0.0X +nullRatio=0.3, random 11 11 0 97.4 10.3 0.0X +nullRatio=0.3, clustered 6 6 0 184.4 5.4 0.0X +nullRatio=0.5, random 12 12 0 87.7 11.4 0.0X +nullRatio=0.5, clustered 5 6 0 191.6 5.2 0.0X +nullRatio=0.9, random 7 7 0 151.7 6.6 0.0X +nullRatio=0.9, clustered 5 5 0 200.8 5.0 0.0X +nullRatio=1.0, random 0 0 0 11662.5 0.1 1.1X + + +================================================================================================ +Nullable batch decode with row-index filtering (with def-levels) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +AMD EPYC 7763 64-Core Processor +Nullable batch with def-levels, row-index filtered: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +nullRatio=0.0, contiguous 50% 1 2 0 763.7 1.3 1.0X +nullRatio=0.3, contiguous 50% 9 9 0 117.3 8.5 0.2X +nullRatio=0.9, contiguous 50% 7 7 0 157.5 6.3 0.2X +nullRatio=0.0, alt 1000-row windows 3 3 0 418.8 2.4 0.5X +nullRatio=0.3, alt 1000-row windows 10 10 0 103.8 9.6 0.1X +nullRatio=0.9, alt 1000-row windows 8 8 0 134.7 7.4 0.2X + + +================================================================================================ +Nullable batch decode with row-index filtering (without def-levels) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +AMD EPYC 7763 64-Core Processor +Nullable batch without def-levels, row-index filtered: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------- +nullRatio=0.0, contiguous 50% 1 1 0 865.4 1.2 1.0X +nullRatio=0.3, contiguous 50% 8 8 0 128.8 7.8 0.1X +nullRatio=0.9, contiguous 50% 6 6 0 173.4 5.8 0.2X +nullRatio=0.0, alt 1000-row windows 2 2 0 425.9 2.3 0.5X +nullRatio=0.3, alt 1000-row windows 9 9 0 111.3 9.0 0.1X +nullRatio=0.9, alt 1000-row windows 7 7 0 143.3 7.0 0.2X + + +================================================================================================ +Single-value reads +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +AMD EPYC 7763 64-Core Processor +Single-value reads: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +readBoolean 4 4 0 287.0 3.5 1.0X +readInteger, bitWidth=4 4 4 0 275.7 3.6 1.0X +readValueDictionaryId, bitWidth=4 4 4 0 275.5 3.6 1.0X +readInteger, bitWidth=8 4 4 0 273.7 3.7 1.0X +readValueDictionaryId, bitWidth=8 4 4 0 273.4 3.7 1.0X +readInteger, bitWidth=12 5 5 1 230.1 4.3 0.8X +readValueDictionaryId, bitWidth=12 5 5 0 229.3 4.4 0.8X +readInteger, bitWidth=20 5 5 0 207.9 4.8 0.7X +readValueDictionaryId, bitWidth=20 5 5 0 207.3 4.8 0.7X + + +================================================================================================ +Skip +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +AMD EPYC 7763 64-Core Processor +Skip: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +skipBooleans, trueRatio=0.0 0 0 0 20971520.0 0.0 1.0X +skipBooleans, trueRatio=0.5 2 2 0 569.4 1.8 0.0X +skipBooleans, trueRatio=1.0 0 0 0 21399510.2 0.0 1.0X +skipIntegers PACKED, bitWidth=4 2 2 0 522.7 1.9 0.0X +skipIntegers RLE, bitWidth=4 0 0 0 20971520.0 0.0 1.0X +skipIntegers PACKED, bitWidth=8 2 2 0 516.6 1.9 0.0X +skipIntegers RLE, bitWidth=8 0 0 0 21399510.2 0.0 1.0X +skipIntegers PACKED, bitWidth=12 3 3 0 382.4 2.6 0.0X +skipIntegers RLE, bitWidth=12 0 0 0 17476266.7 0.0 0.8X +skipIntegers PACKED, bitWidth=20 3 3 0 323.0 3.1 0.0X +skipIntegers RLE, bitWidth=20 0 0 0 17476266.7 0.0 0.8X