Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecor
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}

jobConf.set(IOConstants.COLUMNS, columnNameProperty);
jobConf.set(IOConstants.COLUMNS_TYPES, columnTypeProperty);
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes, jobConf), jobConf);

return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -120,8 +120,8 @@
serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize());
}

schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
schemaSize = DataWritableReadSupport.getTableParquetSchema(jobConf, fileMetaData.getSchema(),
readContext.getReadSupportMetadata()).getFieldCount();
final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
final long splitStart = fileSplit.getStart();
final long splitLength = fileSplit.getLength();
Expand Down Expand Up @@ -164,18 +164,29 @@
legacyConversionEnabled =
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(), conf);

split = new ParquetInputSplit(finalPath,
splitStart,
splitLength,
fileSplit.getLocations(),
filteredBlocks,
readContext.getRequestedSchema().toString(),
fileMetaData.getSchema().toString(),
fileMetaData.getKeyValueMetaData(),
readContext.getReadSupportMetadata());
split = buildParquetInputSplit(finalPath, readContext.getRequestedSchema());
return split;
}

private ParquetInputSplit buildParquetInputSplit(final Path finalPath, final MessageType requestedSchema)

Check warning on line 171 in ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ParquetInputSplit"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ5ll8LrYz4-StK4ja7c&open=AZ5ll8LrYz4-StK4ja7c&pullRequest=6506
throws IOException {
long filteredSize = 0;
for (BlockMetaData block : filteredBlocks) {
for (ColumnChunkMetaData column : block.getColumns()) {
if (requestedSchema.containsPath(column.getPath().toArray())) {
filteredSize += column.getTotalSize();
}
}
}

long[] rowGroupOffsets = new long[filteredBlocks.size()];
for (int i = 0; i < rowGroupOffsets.length; i++) {
rowGroupOffsets[i] = filteredBlocks.get(i).getStartingPos();
}
return new ParquetInputSplit(finalPath, fileSplit.getStart(), filteredSize, fileSplit.getLength(),

Check warning on line 186 in ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ParquetInputSplit"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ5ll8LrYz4-StK4ja7d&open=AZ5ll8LrYz4-StK4ja7d&pullRequest=6506
fileSplit.getLocations(), rowGroupOffsets);
}

@SuppressWarnings("deprecation")
protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws IOException {
return ParquetFileReader.readFooter(jobConf, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.convert;

import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageTypeParser;

import java.util.Map;

Expand All @@ -32,10 +30,9 @@ public class DataWritableRecordConverter extends RecordMaterializer<ArrayWritabl

private final HiveStructConverter root;

public DataWritableRecordConverter(final GroupType requestedSchema, final Map<String, String> metadata, TypeInfo hiveTypeInfo) {
this.root = new HiveStructConverter(requestedSchema,
MessageTypeParser.parseMessageType(metadata.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)),
metadata, hiveTypeInfo);
public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema,
final Map<String, String> metadata, TypeInfo hiveTypeInfo) {
this.root = new HiveStructConverter(requestedSchema, tableSchema, metadata, hiveTypeInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
Expand Down Expand Up @@ -438,6 +439,20 @@ public static MessageType getRequestedSchema(
}
}

public static MessageType getTableParquetSchema(Configuration configuration, MessageType fileSchema,
Map<String, String> metadata) {
String columnNames = configuration.get(IOConstants.COLUMNS);
String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
if (columnNames != null && !columnNames.isEmpty() && columnTypes != null && !columnTypes.isEmpty()) {
return getRequestedSchemaForIndexAccess(
Boolean.parseBoolean(metadata.get(PARQUET_COLUMN_INDEX_ACCESS)),
getColumnNames(columnNames),
getColumnTypes(columnTypes),
fileSchema);
}
return MessageTypeParser.parseMessageType(metadata.get(HIVE_TABLE_AS_PARQUET_SCHEMA));
}

private static MessageType getRequestedSchemaForIndexAccess(
boolean indexAccess,
List<String> columnNamesList,
Expand Down Expand Up @@ -535,6 +550,7 @@ public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration conf
}
}

return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo);
MessageType tableSchema = getTableParquetSchema(configuration, fileSchema, metadata);
return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema, metadata, hiveTypeInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;

import org.apache.hive.common.util.HiveVersionInfo;
Expand Down Expand Up @@ -49,6 +52,14 @@ public static void setSchema(final MessageType schema, final Configuration confi
}

public static MessageType getSchema(final Configuration configuration) {
String columnNames = configuration.get(IOConstants.COLUMNS);
String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
if (columnNames != null && !columnNames.isEmpty() && columnTypes != null && !columnTypes.isEmpty()) {
return HiveSchemaConverter.convert(
DataWritableReadSupport.getColumnNames(columnNames),
DataWritableReadSupport.getColumnTypes(columnTypes),
configuration);
}
return MessageTypeParser.parseMessageType(configuration.get(PARQUET_HIVE_SCHEMA));
}

Expand Down
32 changes: 32 additions & 0 deletions ql/src/test/queries/clientpositive/parquet_complex_col_names.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
set hive.support.quoted.identifiers=column;
set hive.fetch.task.conversion=none;
set hive.mapred.mode=nonstrict;

set hive.vectorized.execution.enabled=true;

drop table if exists tbl_bps_src;
drop table if exists parquet_bps_sink;

create table tbl_bps_src (`rate(bps)` double, id int) stored as orc;
insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3);

create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet;

insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10;

select `rate(bps)`, id from parquet_bps_sink order by id;


set hive.vectorized.execution.enabled=false;

insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10;

select `rate(bps)`, id from parquet_bps_sink order by id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
PREHOOK: query: drop table if exists tbl_bps_src
PREHOOK: type: DROPTABLE
PREHOOK: Output: database:default
POSTHOOK: query: drop table if exists tbl_bps_src
POSTHOOK: type: DROPTABLE
POSTHOOK: Output: database:default
PREHOOK: query: drop table if exists parquet_bps_sink
PREHOOK: type: DROPTABLE
PREHOOK: Output: database:default
POSTHOOK: query: drop table if exists parquet_bps_sink
POSTHOOK: type: DROPTABLE
POSTHOOK: Output: database:default
PREHOOK: query: create table tbl_bps_src (`rate(bps)` double, id int) stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl_bps_src
POSTHOOK: query: create table tbl_bps_src (`rate(bps)` double, id int) stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_bps_src
PREHOOK: query: insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_bps_src
POSTHOOK: query: insert into tbl_bps_src values (12.5, 1), (25.0, 2), (37.5, 3)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_bps_src
POSTHOOK: Lineage: tbl_bps_src.id SCRIPT []
POSTHOOK: Lineage: tbl_bps_src.rate(bps) SCRIPT []
PREHOOK: query: create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@parquet_bps_sink
POSTHOOK: query: create table parquet_bps_sink (`rate(bps)` double, id int) stored as parquet
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@parquet_bps_sink
PREHOOK: query: insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_bps_src
PREHOOK: Output: default@parquet_bps_sink
POSTHOOK: query: insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_bps_src
POSTHOOK: Output: default@parquet_bps_sink
POSTHOOK: Lineage: parquet_bps_sink.id SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:id, type:int, comment:null), ]
POSTHOOK: Lineage: parquet_bps_sink.rate(bps) SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:rate(bps), type:double, comment:null), ]
PREHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id
PREHOOK: type: QUERY
PREHOOK: Input: default@parquet_bps_sink
#### A masked pattern was here ####
POSTHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@parquet_bps_sink
#### A masked pattern was here ####
12.5 1
25.0 2
37.5 3
PREHOOK: query: insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_bps_src
PREHOOK: Output: default@parquet_bps_sink
POSTHOOK: query: insert overwrite table parquet_bps_sink
select `rate(bps)`, id
from tbl_bps_src
order by id
limit 10
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_bps_src
POSTHOOK: Output: default@parquet_bps_sink
POSTHOOK: Lineage: parquet_bps_sink.id SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:id, type:int, comment:null), ]
POSTHOOK: Lineage: parquet_bps_sink.rate(bps) SIMPLE [(tbl_bps_src)tbl_bps_src.FieldSchema(name:rate(bps), type:double, comment:null), ]
PREHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id
PREHOOK: type: QUERY
PREHOOK: Input: default@parquet_bps_sink
#### A masked pattern was here ####
POSTHOOK: query: select `rate(bps)`, id from parquet_bps_sink order by id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@parquet_bps_sink
#### A masked pattern was here ####
12.5 1
25.0 2
37.5 3
Loading