diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 4786305a4145..dc68323d7d0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -118,6 +118,8 @@ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecor columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } + jobConf.set(IOConstants.COLUMNS, columnNameProperty); + jobConf.set(IOConstants.COLUMNS_TYPES, columnTypeProperty); DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes, jobConf), jobConf); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 9b817eb3642a..e4df0c91b965 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -37,10 +37,10 @@ import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,8 +120,8 @@ protected ParquetInputSplit getSplit( serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); } - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + schemaSize = DataWritableReadSupport.getTableParquetSchema(jobConf, fileMetaData.getSchema(), + readContext.getReadSupportMetadata()).getFieldCount(); final List splitGroup = new ArrayList(); final long splitStart = fileSplit.getStart(); final long splitLength = fileSplit.getLength(); @@ -164,18 +164,29 @@ protected ParquetInputSplit getSplit( legacyConversionEnabled = DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(), conf); - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - fileSplit.getLocations(), - filteredBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); + split = buildParquetInputSplit(finalPath, readContext.getRequestedSchema()); return split; } + private ParquetInputSplit buildParquetInputSplit(final Path finalPath, final MessageType requestedSchema) + throws IOException { + long filteredSize = 0; + for (BlockMetaData block : filteredBlocks) { + for (ColumnChunkMetaData column : block.getColumns()) { + if (requestedSchema.containsPath(column.getPath().toArray())) { + filteredSize += column.getTotalSize(); + } + } + } + + long[] rowGroupOffsets = new long[filteredBlocks.size()]; + for (int i = 0; i < rowGroupOffsets.length; i++) { + rowGroupOffsets[i] = filteredBlocks.get(i).getStartingPos(); + } + return new ParquetInputSplit(finalPath, fileSplit.getStart(), filteredSize, fileSplit.getLength(), + fileSplit.getLocations(), rowGroupOffsets); + } + @SuppressWarnings("deprecation") protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws IOException { return ParquetFileReader.readFooter(jobConf, path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java index 0cd52fda1e59..8ab9d39fe19e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java @@ -13,13 +13,11 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageTypeParser; import java.util.Map; @@ -32,10 +30,9 @@ public class DataWritableRecordConverter extends RecordMaterializer metadata, TypeInfo hiveTypeInfo) { - this.root = new HiveStructConverter(requestedSchema, - MessageTypeParser.parseMessageType(metadata.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)), - metadata, hiveTypeInfo); + public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema, + final Map metadata, TypeInfo hiveTypeInfo) { + this.root = new HiveStructConverter(requestedSchema, tableSchema, metadata, hiveTypeInfo); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index cd093dd6a53a..e72ebf6290d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -50,6 +50,7 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; @@ -438,6 +439,20 @@ public static MessageType getRequestedSchema( } } + public static MessageType getTableParquetSchema(Configuration configuration, MessageType fileSchema, + Map metadata) { + String columnNames = configuration.get(IOConstants.COLUMNS); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + if (columnNames != null && !columnNames.isEmpty() && columnTypes != null && !columnTypes.isEmpty()) { + return getRequestedSchemaForIndexAccess( + Boolean.parseBoolean(metadata.get(PARQUET_COLUMN_INDEX_ACCESS)), + getColumnNames(columnNames), + getColumnTypes(columnTypes), + fileSchema); + } + return MessageTypeParser.parseMessageType(metadata.get(HIVE_TABLE_AS_PARQUET_SCHEMA)); + } + private static MessageType getRequestedSchemaForIndexAccess( boolean indexAccess, List columnNamesList, @@ -535,6 +550,7 @@ public RecordMaterializer prepareForRead(final Configuration conf } } - return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo); + MessageType tableSchema = getTableParquetSchema(configuration, fileSchema, metadata); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema, metadata, hiveTypeInfo); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java index 8d143ef518c1..2bef7af0c830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java @@ -19,6 +19,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hive.common.util.HiveVersionInfo; @@ -49,6 +52,14 @@ public static void setSchema(final MessageType schema, final Configuration confi } public static MessageType getSchema(final Configuration configuration) { + String columnNames = configuration.get(IOConstants.COLUMNS); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + if (columnNames != null && !columnNames.isEmpty() && columnTypes != null && !columnTypes.isEmpty()) { + return HiveSchemaConverter.convert( + DataWritableReadSupport.getColumnNames(columnNames), + DataWritableReadSupport.getColumnTypes(columnTypes), + configuration); + } return MessageTypeParser.parseMessageType(configuration.get(PARQUET_HIVE_SCHEMA)); } diff --git a/ql/src/test/queries/clientpositive/parquet_complex_col_names.q b/ql/src/test/queries/clientpositive/parquet_complex_col_names.q new file mode 100644 index 000000000000..24fca89192f8 --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_complex_col_names.q @@ -0,0 +1,32 @@ +set hive.support.quoted.identifiers=column; +set hive.fetch.task.conversion=none; +set hive.mapred.mode=nonstrict; + +set hive.vectorized.execution.enabled=true; + +drop table if exists tbl_bps_src; +drop table if exists parquet_bps_sink; + +create table tbl_bps_src (`rate(bps)` double, id int) stored as orc; +insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3); + +create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet; + +insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10; + +select `rate(bps)`, id from parquet_bps_sink order by id; + + +set hive.vectorized.execution.enabled=false; + +insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10; + +select `rate(bps)`, id from parquet_bps_sink order by id; diff --git a/ql/src/test/results/clientpositive/llap/parquet_complex_col_names.q.out b/ql/src/test/results/clientpositive/llap/parquet_complex_col_names.q.out new file mode 100644 index 000000000000..734cd16de6be --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/parquet_complex_col_names.q.out @@ -0,0 +1,96 @@ +PREHOOK: query: drop table if exists tbl_bps_src +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists tbl_bps_src +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: drop table if exists parquet_bps_sink +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists parquet_bps_sink +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: create table tbl_bps_src (`rate(bps)` double, id int) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_bps_src +POSTHOOK: query: create table tbl_bps_src (`rate(bps)` double, id int) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_bps_src +PREHOOK: query: insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_bps_src +POSTHOOK: query: insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_bps_src +POSTHOOK: Lineage: tbl_bps_src.id SCRIPT [] +POSTHOOK: Lineage: tbl_bps_src.rate(bps) SCRIPT [] +PREHOOK: query: create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_bps_sink +POSTHOOK: query: create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_bps_sink +PREHOOK: query: insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_bps_src +PREHOOK: Output: default@parquet_bps_sink +POSTHOOK: query: insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_bps_src +POSTHOOK: Output: default@parquet_bps_sink +POSTHOOK: Lineage: parquet_bps_sink.id SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:id, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_bps_sink.rate(bps) SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:rate(bps), type:double, comment:null), ] +PREHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_bps_sink +#### A masked pattern was here #### +POSTHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_bps_sink +#### A masked pattern was here #### +12.5 1 +25.0 2 +37.5 3 +PREHOOK: query: insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_bps_src +PREHOOK: Output: default@parquet_bps_sink +POSTHOOK: query: insert overwrite table parquet_bps_sink +select `rate(bps)`, id +from tbl_bps_src +order by id +limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_bps_src +POSTHOOK: Output: default@parquet_bps_sink +POSTHOOK: Lineage: parquet_bps_sink.id SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:id, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_bps_sink.rate(bps) SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:rate(bps), type:double, comment:null), ] +PREHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_bps_sink +#### A masked pattern was here #### +POSTHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_bps_sink +#### A masked pattern was here #### +12.5 1 +25.0 2 +37.5 3