diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 24c18f382817..e6e4e27d74f3 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -69,6 +69,7 @@ import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -1005,8 +1006,11 @@ public static Sink sink(Schema schema) { return new AutoValue_ParquetIO_Sink.Builder() .setJsonSchema(schema.toString()) .setCompressionCodec(CompressionCodecName.SNAPPY) - // This resembles the default value for ParquetWriter.rowGroupSize. .setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE) + .setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) + .setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED) + .setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED) + .setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK) .build(); } @@ -1022,6 +1026,14 @@ public abstract static class Sink implements FileIO.Sink { abstract int getRowGroupSize(); + abstract int getPageSize(); + + abstract boolean getEnableDictionary(); + + abstract boolean getEnableBloomFilter(); + + abstract int getMinRowCountForPageSizeCheck(); + abstract @Nullable Class getAvroDataModelClass(); abstract Builder toBuilder(); @@ -1036,6 +1048,14 @@ abstract static class Builder { abstract Builder setRowGroupSize(int rowGroupSize); + abstract Builder setPageSize(int pageSize); + + abstract Builder setEnableDictionary(boolean enableDictionary); + + abstract Builder setEnableBloomFilter(boolean enableBloomFilter); + + abstract Builder setMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck); + abstract Builder setAvroDataModelClass(Class modelClass); abstract Sink build(); @@ -1064,6 +1084,34 @@ public Sink withRowGroupSize(int rowGroupSize) { return toBuilder().setRowGroupSize(rowGroupSize).build(); } + /** Specify the page size for the Parquet writer. Defaults to {@code 1 MB}. */ + public Sink withPageSize(int pageSize) { + checkArgument(pageSize > 0, "pageSize must be positive"); + return toBuilder().setPageSize(pageSize).build(); + } + + /** Enable or disable dictionary encoding. Enabled by default. */ + public Sink withDictionaryEncoding(boolean enableDictionary) { + return toBuilder().setEnableDictionary(enableDictionary).build(); + } + + /** Enable or disable bloom filters. Disabled by default. */ + public Sink withBloomFilterEnabled(boolean enableBloomFilter) { + return toBuilder().setEnableBloomFilter(enableBloomFilter).build(); + } + + /** + * Specify the minimum number of rows to write before a page size check is performed. The writer + * buffers at least this many rows before checking whether the page size threshold has been + * reached. With large rows, the default ({@code 100}) can cause excessive memory use; set a + * lower value (e.g. {@code 1}) to flush pages more frequently. + */ + public Sink withMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck) { + checkArgument( + minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must be positive"); + return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build(); + } + /** * Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}. */ @@ -1079,6 +1127,7 @@ public void open(WritableByteChannel channel) throws IOException { Schema schema = new Schema.Parser().parse(getJsonSchema()); Class modelClass = getAvroDataModelClass(); + Configuration conf = SerializableConfiguration.newConfiguration(getConfiguration()); BeamParquetOutputFile beamParquetOutputFile = new BeamParquetOutputFile(Channels.newOutputStream(channel)); @@ -1088,8 +1137,13 @@ public void open(WritableByteChannel channel) throws IOException { .withSchema(schema) .withCompressionCodec(getCompressionCodec()) .withWriteMode(OVERWRITE) - .withConf(SerializableConfiguration.newConfiguration(getConfiguration())) - .withRowGroupSize(getRowGroupSize()); + .withConf(conf) + .withRowGroupSize(getRowGroupSize()) + .withPageSize(getPageSize()) + .withDictionaryEncoding(getEnableDictionary()) + .withBloomFilterEnabled(getEnableBloomFilter()) + .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck()); + if (modelClass != null) { try { builder.withDataModel(buildModelObject(modelClass)); diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 7ee3ec5050fd..97f96452760f 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -21,12 +21,14 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.io.UnsupportedEncodingException; @@ -57,8 +59,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.junit.Rule; import org.junit.Test; @@ -518,6 +523,93 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() { readPipeline.run().waitUntilFinish(); } + @Test + public void testWriteWithDefaultWriterProperties() throws Exception { + List records = generateGenericRecords(1000); + + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via(ParquetIO.sink(SCHEMA)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + mainPipeline.run().waitUntilFinish(); + + File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith(".")); + assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0); + + org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + + // Verify bloom filters are absent by default. + boolean hasBloomFilter = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getBloomFilterOffset() >= 0); + assertFalse("Expected no bloom filters by default", hasBloomFilter); + + // Verify dictionary encoding is enabled by default. + boolean hasDictionary = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getDictionaryPageOffset() > 0); + assertTrue("Expected dictionary pages to be present by default", hasDictionary); + } + } + + @Test + public void testWriteWithWriterProperties() throws Exception { + List records = generateGenericRecords(1000); + + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via( + ParquetIO.sink(SCHEMA) + .withPageSize(256 * 1024) + .withDictionaryEncoding(false) + .withBloomFilterEnabled(true) + .withMinRowCountForPageSizeCheck(5)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + mainPipeline.run().waitUntilFinish(); + + // Read back the file metadata and verify the settings took effect. + File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith(".")); + assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0); + + org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + + // Verify bloom filters were written: at least one column should have a bloom filter. + boolean hasBloomFilter = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getBloomFilterOffset() >= 0); + assertTrue("Expected bloom filters to be present", hasBloomFilter); + + // Verify dictionary encoding was disabled: no columns should have dictionary pages. + // getDictionaryPageOffset() returns 0 when no dictionary page is present. + boolean hasDictionary = + footer.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .anyMatch(col -> col.getDictionaryPageOffset() > 0); + assertFalse( + "Expected no dictionary pages when dictionary encoding is disabled", hasDictionary); + } + + // Verify the data still round-trips correctly. + PCollection readBack = + readPipeline.apply( + ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); + PAssert.that(readBack).containsInAnyOrder(records); + readPipeline.run().waitUntilFinish(); + } + /** Returns list of JSON representation of GenericRecords. */ private static List convertRecordsToJson(List records) { return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList());