Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
Expand Down Expand Up @@ -1005,8 +1006,11 @@ public static Sink sink(Schema schema) {
return new AutoValue_ParquetIO_Sink.Builder()
.setJsonSchema(schema.toString())
.setCompressionCodec(CompressionCodecName.SNAPPY)
// This resembles the default value for ParquetWriter.rowGroupSize.
.setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
.setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
.setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED)
.setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)
.setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
.build();
}

Expand All @@ -1022,6 +1026,14 @@ public abstract static class Sink implements FileIO.Sink<GenericRecord> {

abstract int getRowGroupSize();

abstract int getPageSize();

abstract boolean getEnableDictionary();

abstract boolean getEnableBloomFilter();

abstract int getMinRowCountForPageSizeCheck();

abstract @Nullable Class<? extends GenericData> getAvroDataModelClass();

abstract Builder toBuilder();
Expand All @@ -1036,6 +1048,14 @@ abstract static class Builder {

abstract Builder setRowGroupSize(int rowGroupSize);

abstract Builder setPageSize(int pageSize);

abstract Builder setEnableDictionary(boolean enableDictionary);

abstract Builder setEnableBloomFilter(boolean enableBloomFilter);

abstract Builder setMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck);

abstract Builder setAvroDataModelClass(Class<? extends GenericData> modelClass);

abstract Sink build();
Expand Down Expand Up @@ -1064,6 +1084,34 @@ public Sink withRowGroupSize(int rowGroupSize) {
return toBuilder().setRowGroupSize(rowGroupSize).build();
}

/** Specify the page size for the Parquet writer. Defaults to {@code 1 MB}. */
public Sink withPageSize(int pageSize) {
checkArgument(pageSize > 0, "pageSize must be positive");
return toBuilder().setPageSize(pageSize).build();
}

/** Enable or disable dictionary encoding. Enabled by default. */
public Sink withDictionaryEncoding(boolean enableDictionary) {
return toBuilder().setEnableDictionary(enableDictionary).build();
}

/** Enable or disable bloom filters. Disabled by default. */
public Sink withBloomFilterEnabled(boolean enableBloomFilter) {
return toBuilder().setEnableBloomFilter(enableBloomFilter).build();
}

/**
* Specify the minimum number of rows to write before a page size check is performed. The writer
* buffers at least this many rows before checking whether the page size threshold has been
* reached. With large rows, the default ({@code 100}) can cause excessive memory use; set a
* lower value (e.g. {@code 1}) to flush pages more frequently.
*/
public Sink withMinRowCountForPageSizeCheck(int minRowCountForPageSizeCheck) {
checkArgument(
minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must be positive");
return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build();
}

/**
* Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}.
*/
Expand All @@ -1079,6 +1127,7 @@ public void open(WritableByteChannel channel) throws IOException {

Schema schema = new Schema.Parser().parse(getJsonSchema());
Class<? extends GenericData> modelClass = getAvroDataModelClass();
Configuration conf = SerializableConfiguration.newConfiguration(getConfiguration());

BeamParquetOutputFile beamParquetOutputFile =
new BeamParquetOutputFile(Channels.newOutputStream(channel));
Expand All @@ -1088,8 +1137,13 @@ public void open(WritableByteChannel channel) throws IOException {
.withSchema(schema)
.withCompressionCodec(getCompressionCodec())
.withWriteMode(OVERWRITE)
.withConf(SerializableConfiguration.newConfiguration(getConfiguration()))
.withRowGroupSize(getRowGroupSize());
.withConf(conf)
.withRowGroupSize(getRowGroupSize())
.withPageSize(getPageSize())
.withDictionaryEncoding(getEnableDictionary())
.withBloomFilterEnabled(getEnableBloomFilter())
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck());

if (modelClass != null) {
try {
builder.withDataModel(buildModelObject(modelClass));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -57,8 +59,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -518,6 +523,93 @@ public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteWithDefaultWriterProperties() throws Exception {
List<GenericRecord> records = generateGenericRecords(1000);

mainPipeline
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
.apply(
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA))
.to(temporaryFolder.getRoot().getAbsolutePath()));
mainPipeline.run().waitUntilFinish();

File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith("."));
assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0);

org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI());
try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) {
ParquetMetadata footer = reader.getFooter();

// Verify bloom filters are absent by default.
boolean hasBloomFilter =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.anyMatch(col -> col.getBloomFilterOffset() >= 0);
assertFalse("Expected no bloom filters by default", hasBloomFilter);

// Verify dictionary encoding is enabled by default.
boolean hasDictionary =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.anyMatch(col -> col.getDictionaryPageOffset() > 0);
assertTrue("Expected dictionary pages to be present by default", hasDictionary);
}
}

@Test
public void testWriteWithWriterProperties() throws Exception {
List<GenericRecord> records = generateGenericRecords(1000);

mainPipeline
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
.apply(
FileIO.<GenericRecord>write()
.via(
ParquetIO.sink(SCHEMA)
.withPageSize(256 * 1024)
.withDictionaryEncoding(false)
.withBloomFilterEnabled(true)
.withMinRowCountForPageSizeCheck(5))
.to(temporaryFolder.getRoot().getAbsolutePath()));
mainPipeline.run().waitUntilFinish();

// Read back the file metadata and verify the settings took effect.
File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> !name.startsWith("."));
assertTrue("Expected at least one output file", outputFiles != null && outputFiles.length > 0);

org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(outputFiles[0].toURI());
try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new Configuration()))) {
ParquetMetadata footer = reader.getFooter();

// Verify bloom filters were written: at least one column should have a bloom filter.
boolean hasBloomFilter =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.anyMatch(col -> col.getBloomFilterOffset() >= 0);
assertTrue("Expected bloom filters to be present", hasBloomFilter);

// Verify dictionary encoding was disabled: no columns should have dictionary pages.
// getDictionaryPageOffset() returns 0 when no dictionary page is present.
boolean hasDictionary =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.anyMatch(col -> col.getDictionaryPageOffset() > 0);
assertFalse(
"Expected no dictionary pages when dictionary encoding is disabled", hasDictionary);
}

// Verify the data still round-trips correctly.
PCollection<GenericRecord> readBack =
readPipeline.apply(
ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + "/*"));
PAssert.that(readBack).containsInAnyOrder(records);
readPipeline.run().waitUntilFinish();
}

/** Returns list of JSON representation of GenericRecords. */
private static List<String> convertRecordsToJson(List<GenericRecord> records) {
return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList());
Expand Down
Loading