Skip to content

Comments

[SPARK-55652][SQL] Optimize VectorizedPlainValuesReader.readShorts() with direct array access for heap buffers#54441

Closed
LuciferYang wants to merge 4 commits intoapache:masterfrom
LuciferYang:readShorts
Closed

[SPARK-55652][SQL] Optimize VectorizedPlainValuesReader.readShorts() with direct array access for heap buffers#54441
LuciferYang wants to merge 4 commits intoapache:masterfrom
LuciferYang:readShorts

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Feb 24, 2026

What changes were proposed in this pull request?

This PR optimizes VectorizedPlainValuesReader.readShorts by introducing a new batch write method putShortsFromIntsLittleEndian in WritableColumnVector, OnHeapColumnVector, and OffHeapColumnVector.

In Parquet, SHORT values are stored as 4-byte little-endian integers. The previous implementation read each value individually via ByteBuffer.getInt() and called putShort() per element, incurring a virtual method dispatch per value and preventing JIT vectorization.

The new approach:

  1. Adds putShortsFromIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) as an abstract method in WritableColumnVector, with implementations in both OnHeapColumnVector and OffHeapColumnVector.
  2. The implementations use Platform.getInt to read directly from the underlying byte[], handle big-endian platforms by reversing bytes outside the loop, and write directly to shortData[] (OnHeap) or off-heap memory via Platform.putShort (OffHeap).
  3. readShorts in VectorizedPlainValuesReader delegates to putShortsFromIntsLittleEndian when buffer.hasArray() is true, matching the pattern already established by readIntegers, readLongs, readFloats, and readDoubles.

Why are the changes needed?

The previous implementation of readShorts did not take advantage of the hasArray() fast path that other fixed-width type readers (readIntegers, readLongs, etc.) already use. This caused unnecessary overhead from:

  • Per-element virtual method dispatch via putShort()
  • ByteBuffer.getInt() overhead including internal bounds checking and byte-order branching on every call

By pushing the batch operation into WritableColumnVector and operating directly on the underlying array, the JIT compiler can more effectively inline and vectorize the tight loop, eliminating these overheads for the common heap-buffer case.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Pass Github Actions and add a new test scenario in ColumnarBatchSuite to test WritableColumnVector#putShortsFromIntsLittleEndian
  • Rename the original code to OldVectorizedPlainValuesReader, and compare the latency of the old and new readShorts methods using JMH:
Benchmark Code (click to expand)
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import org.apache.parquet.bytes.ByteBufferInputStream;

import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
@Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
public class VectorizedPlainValuesReaderJMHBenchmark {

    // ==================== Parameters ====================

    @Param({"10000000"})
    private int numValues;

    // ==================== Test Data ====================

    private byte[] shortData;
    private static final int BATCH_SIZE = 4096;

    // Readers and streams for each scenario
    private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
    private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
    private VectorizedPlainValuesReader newSingleBufferOffHeapReader;
    private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;

    // ==================== State Classes ====================
    @State(Scope.Thread)
    public static class OnHeapColumnVectorState {
        public WritableColumnVector shortColumn;
        @Setup(Level.Iteration)
        public void setup() {
            shortColumn = new OnHeapColumnVector(BATCH_SIZE, DataTypes.ShortType);
        }
        @TearDown(Level.Iteration)
        public void tearDown() {
            shortColumn.close();
        }
        @Setup(Level.Invocation)
        public void reset() {
            shortColumn.reset();
        }
    }

    // ==================== Setup ====================

    @Setup(Level.Trial)
    public void setupTrial() {
        Random random = new Random(42);
        shortData = generateShortData(numValues, random);
    }

    @TearDown(Level.Trial)
    public void tearDownTrial() {
    }

    @Setup(Level.Invocation)
    public void setupInvocation() throws IOException {
        // OnHeap SingleBuffer
        newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(shortData));
        oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(shortData));
        // OffHeap SingleBuffer
        newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(shortData));
        oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(shortData));
    }

    // ==================== Data Generation ====================
    private byte[] generateShortData(int count, Random random) {
        ByteBuffer buffer = ByteBuffer.allocate(count * 4).order(ByteOrder.LITTLE_ENDIAN);
        for (int i = 0; i < count; i++) {
            buffer.putInt(random.nextInt(65536) - 32768);
        }
        return buffer.array();
    }

    // ==================== ByteBufferInputStream Creation ====================

    private ByteBufferInputStream createSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
        return ByteBufferInputStream.wrap(buffer);
    }

    private ByteBuffer createDirectBuffer(byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
        buffer.put(data);
        buffer.flip();
        return buffer;
    }

    private ByteBufferInputStream createDirectSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = createDirectBuffer(data);
        return ByteBufferInputStream.wrap(buffer);
    }

    // ====================================================================================
    // readShorts OnHeap
    // ====================================================================================

    @Benchmark
    public void readShorts_onHeap_New(OnHeapColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE, numValues - i), state.shortColumn, 0);
        }
    }

    @Benchmark
    public void readShorts_onHeap_Old(OnHeapColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE, numValues - i), state.shortColumn, 0);
        }
    }

    // ====================================================================================
    // readShorts offHeap
    // ====================================================================================

    @Benchmark
    public void readShorts_offHeap_New(OnHeapColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE, numValues - i), state.shortColumn, 0);
        }
    }

    @Benchmark
    public void readShorts_offHeap_Old(OnHeapColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE, numValues - i), state.shortColumn, 0);
        }
    }

    // ==================== Main Method ====================

    public static void main(String[] args) throws RunnerException {
        String filter = args.length > 0 ? args[0] : VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
        Options opt = new OptionsBuilder()
                .include(filter)
                .build();

        new Runner(opt).run();
    }
}

Perform build/sbt "sql/Test/runMain org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark" to conduct the test

Benchmark results:

  • Java 17.0.18+8-LTS
Benchmark                                                       (numValues)  Mode  Cnt     Score    Error  Units
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New     10000000  avgt   10  4048.579 ± 54.466  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old     10000000  avgt   10  3952.443 ± 29.947  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New      10000000  avgt   10  4358.785 ± 45.051  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old      10000000  avgt   10  6775.679 ± 75.302  us/op
  • Java 21.0.10+7-LTS
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New     10000000  avgt   10  3050.606 ± 57.169  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old     10000000  avgt   10  7206.623 ± 29.275  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New      10000000  avgt   10  3252.563 ± 44.564  us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old      10000000  avgt   10  7145.537 ±  8.843  us/op

The test results reveal that the optimized OnHeap path achieves nearly 50%+ performance improvement. The OffHeap path shows no significant negative impact.

Was this patch authored or co-authored using generative AI tooling?

The benchmark code used for performance testing was generated by GitHub Copilot.

@LuciferYang LuciferYang marked this pull request as draft February 24, 2026 05:52
@LuciferYang LuciferYang marked this pull request as ready for review February 24, 2026 06:19
@pan3793
Copy link
Member

pan3793 commented Feb 24, 2026

The benchmark result of JDK 17 VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New does not seem good.

BTW, which CPU are you using for benchmarking?

@LuciferYang
Copy link
Contributor Author

The benchmark result of JDK 17 VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New does not seem good.

BTW, which CPU are you using for benchmarking?

Off-heap is not the optimization objective; it just needs to ensure there's no significant performance degradation compared to the previous state.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Feb 24, 2026

BTW, which CPU are you using for benchmarking?

@pan3793 I hardcoded the test commands and updated the test results using GitHub Actions Host Runner. It seems that when using Java 21, the performance of old code falls short compared to Java 17, but the new code will gain some unintended additional benefits.

@LuciferYang
Copy link
Contributor Author

Merged into master. Thanks @HyukjinKwon @pan3793

@dongjoon-hyun
Copy link
Member

Oh, nice improvement. Thank you, @LuciferYang !

nearly 50%+ performance improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants