From 25e5919a28ce1a3808bc9b20d0a9b8cd37d2194d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Mar 2026 20:47:32 -0500 Subject: [PATCH 1/3] add more parquet options --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 60 +++++++++++++++++- .../beam/sdk/io/parquet/ParquetIOTest.java | 63 +++++++++++++++++++ 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 24c18f382817..df900ed86399 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -69,6 +69,7 @@ import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -1005,8 +1006,11 @@ public static Sink sink(Schema schema) { return new AutoValue_ParquetIO_Sink.Builder() .setJsonSchema(schema.toString()) .setCompressionCodec(CompressionCodecName.SNAPPY) - // This resembles the default value for ParquetWriter.rowGroupSize. .setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE) + .setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) + .setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED) + .setEnableBloomFilter(false) + .setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK) .build(); } @@ -1022,6 +1026,14 @@ public abstract static class Sink implements FileIO.Sink { abstract int getRowGroupSize(); + abstract int getPageSize(); + + abstract boolean getEnableDictionary(); + + abstract boolean getEnableBloomFilter(); + + abstract int getMinRowCountForPageSizeCheck(); + abstract @Nullable Class getAvroDataModelClass(); abstract Builder toBuilder(); @@ -1036,6 +1048,14 @@ abstract static class Builder { abstract Builder setRowGroupSize(int rowGroupSize); + abstract Builder setPageSize(int pageSize); + + abstract Builder setEnableDictionary(boolean enableDictionary); + + abstract Builder setEnableBloomFilter(boolean enableBloomFilter); + + abstract Builder setMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck); + abstract Builder setAvroDataModelClass(Class modelClass); abstract Sink build(); @@ -1064,6 +1084,34 @@ public Sink withRowGroupSize(int rowGroupSize) { return toBuilder().setRowGroupSize(rowGroupSize).build(); } + /** Specify the page size for the Parquet writer. Defaults to {@code 1 MB}. */ + public Sink withPageSize(int pageSize) { + checkArgument(pageSize > 0, "pageSize must be positive"); + return toBuilder().setPageSize(pageSize).build(); + } + + /** Enable or disable dictionary encoding. Enabled by default. */ + public Sink withDictionaryEncoding(boolean enableDictionary) { + return toBuilder().setEnableDictionary(enableDictionary).build(); + } + + /** Enable or disable bloom filters. Disabled by default. */ + public Sink withBloomFilterEnabled(boolean enableBloomFilter) { + return toBuilder().setEnableBloomFilter(enableBloomFilter).build(); + } + + /** + * Specify the minimum number of rows to write before a page size check is performed. The writer + * buffers at least this many rows before checking whether the page size threshold has been + * reached. With large rows, the default ({@code 100}) can cause excessive memory use; set a + * lower value (e.g. {@code 1}) to flush pages more frequently. + */ + public Sink withMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck) { + checkArgument( + minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must be positive"); + return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build(); + } + /** * Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}. */ @@ -1079,6 +1127,7 @@ public void open(WritableByteChannel channel) throws IOException { Schema schema = new Schema.Parser().parse(getJsonSchema()); Class modelClass = getAvroDataModelClass(); + Configuration conf = SerializableConfiguration.newConfiguration(getConfiguration()); BeamParquetOutputFile beamParquetOutputFile = new BeamParquetOutputFile(Channels.newOutputStream(channel)); @@ -1088,8 +1137,13 @@ public void open(WritableByteChannel channel) throws IOException { .withSchema(schema) .withCompressionCodec(getCompressionCodec()) .withWriteMode(OVERWRITE) - .withConf(SerializableConfiguration.newConfiguration(getConfiguration())) - .withRowGroupSize(getRowGroupSize()); + .withConf(conf) + .withRowGroupSize(getRowGroupSize()) + .withPageSize(getPageSize()) + .withDictionaryEncoding(getEnableDictionary()) + .withBloomFilterEnabled(getEnableBloomFilter()) + .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck()); + if (modelClass != null) { try { builder.withDataModel(buildModelObject(modelClass)); diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 7ee3ec5050fd..44ed59064cb9 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.io.UnsupportedEncodingException; @@ -57,8 +58,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.junit.Rule; import org.junit.Test; @@ -518,6 +523,64 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() { readPipeline.run().waitUntilFinish(); } + @Test + public void testWriteWithWriterProperties() throws Exception { + int customPageSize = 256 * 1024; // 256 KB + int customMinRowCount = 5; + List records = generateGenericRecords(1000); + + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via( + ParquetIO.sink(SCHEMA) + .withPageSize(customPageSize) + .withDictionaryEncoding(false) + .withBloomFilterEnabled(true) + .withMinRowCountForPageSizeCheck(customMinRowCount)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + mainPipeline.run().waitUntilFinish(); + + // Read back the file metadata and verify the settings took effect. + File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith(".")); + assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0); + + org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + + // Verify bloom filters were written: at least one column should have a bloom filter. + boolean hasBloomFilter = false; + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData col : block.getColumns()) { + if (col.getBloomFilterOffset() >= 0) { + hasBloomFilter = true; + } + } + } + assertTrue("Expected bloom filters to be present", hasBloomFilter); + + // Verify dictionary encoding was disabled: no columns should use dictionary pages. + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData col : block.getColumns()) { + assertEquals( + "Expected no dictionary page when dictionary encoding is disabled", + 0, + col.getDictionaryPageOffset()); + } + } + } + + // Verify the data still round-trips correctly. + PCollection readBack = + readPipeline.apply( + ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); + PAssert.that(readBack).containsInAnyOrder(records); + readPipeline.run().waitUntilFinish(); + } + /** Returns list of JSON representation of GenericRecords. */ private static List convertRecordsToJson(List records) { return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList()); From 0a4812fdcdced79da6814c6c1b71f3dbe995d1d5 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 10:59:53 -0500 Subject: [PATCH 2/3] comments --- .../beam/sdk/io/parquet/ParquetIOTest.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 44ed59064cb9..b16970f77359 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -61,7 +62,6 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; @@ -525,8 +525,6 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() { @Test public void testWriteWithWriterProperties() throws Exception { - int customPageSize = 256 * 1024; // 256 KB - int customMinRowCount = 5; List records = generateGenericRecords(1000); mainPipeline @@ -535,10 +533,10 @@ public void testWriteWithWriterProperties() throws Exception { FileIO.write() .via( ParquetIO.sink(SCHEMA) - .withPageSize(customPageSize) + .withPageSize(256 * 1024) .withDictionaryEncoding(false) .withBloomFilterEnabled(true) - .withMinRowCountForPageSizeCheck(customMinRowCount)) + .withMinRowCountForPageSizeCheck(5)) .to(temporaryFolder.getRoot().getAbsolutePath())); mainPipeline.run().waitUntilFinish(); @@ -552,25 +550,20 @@ public void testWriteWithWriterProperties() throws Exception { ParquetMetadata footer = reader.getFooter(); // Verify bloom filters were written: at least one column should have a bloom filter. - boolean hasBloomFilter = false; - for (BlockMetaData block : footer.getBlocks()) { - for (ColumnChunkMetaData col : block.getColumns()) { - if (col.getBloomFilterOffset() >= 0) { - hasBloomFilter = true; - } - } - } + boolean hasBloomFilter = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getBloomFilterOffset() >= 0); assertTrue("Expected bloom filters to be present", hasBloomFilter); - // Verify dictionary encoding was disabled: no columns should use dictionary pages. - for (BlockMetaData block : footer.getBlocks()) { - for (ColumnChunkMetaData col : block.getColumns()) { - assertEquals( - "Expected no dictionary page when dictionary encoding is disabled", - 0, - col.getDictionaryPageOffset()); - } - } + // Verify dictionary encoding was disabled: no columns should have dictionary pages. + // getDictionaryPageOffset() returns 0 when no dictionary page is present. + boolean hasDictionary = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getDictionaryPageOffset() > 0); + assertFalse( + "Expected no dictionary pages when dictionary encoding is disabled", hasDictionary); } // Verify the data still round-trips correctly. From 63243ecf3769d4ae6c45a7d1e053a384336801fc Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 11:28:54 -0500 Subject: [PATCH 3/3] more tests and use default --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 2 +- .../beam/sdk/io/parquet/ParquetIOTest.java | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index df900ed86399..e6e4e27d74f3 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -1009,7 +1009,7 @@ public static Sink sink(Schema schema) { .setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE) .setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) .setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED) - .setEnableBloomFilter(false) + .setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED) .setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK) .build(); } diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index b16970f77359..97f96452760f 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -523,6 +523,42 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() { readPipeline.run().waitUntilFinish(); } + @Test + public void testWriteWithDefaultWriterProperties() throws Exception { + List records = generateGenericRecords(1000); + + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via(ParquetIO.sink(SCHEMA)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + mainPipeline.run().waitUntilFinish(); + + File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith(".")); + assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0); + + org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + + // Verify bloom filters are absent by default. + boolean hasBloomFilter = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getBloomFilterOffset() >= 0); + assertFalse("Expected no bloom filters by default", hasBloomFilter); + + // Verify dictionary encoding is enabled by default. + boolean hasDictionary = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getDictionaryPageOffset() > 0); + assertTrue("Expected dictionary pages to be present by default", hasDictionary); + } + } + @Test public void testWriteWithWriterProperties() throws Exception { List records = generateGenericRecords(1000);