diff --git a/be/src/vec/exec/format/table/parquet_metadata_reader.cpp b/be/src/vec/exec/format/table/parquet_metadata_reader.cpp new file mode 100644 index 00000000000000..cc506f2d2edf1a --- /dev/null +++ b/be/src/vec/exec/format/table/parquet_metadata_reader.cpp @@ -0,0 +1,881 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_metadata_reader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/hdfs_builder.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/string_view.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_file_metadata.h" +#include "vec/exec/format/table/parquet_utils.h" + +namespace doris::vectorized { + +using namespace parquet_utils; + +class ParquetMetadataReader::ModeHandler { +public: + explicit ModeHandler(RuntimeState* state) : _state(state) {} + virtual ~ModeHandler() = default; + + virtual void init_slot_pos_map(const std::vector& slots) = 0; + virtual Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) = 0; + +protected: + RuntimeState* _state = nullptr; + + static std::unordered_map _build_name_to_pos_map( + const std::vector& slots) { + std::unordered_map name_to_pos; + name_to_pos.reserve(slots.size()); + for (size_t i = 0; i < slots.size(); ++i) { + name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast(i)); + } + return name_to_pos; + } + + template + static void _init_slot_pos_map(const std::unordered_map& name_to_pos, + const std::array& column_names, + std::array* slot_pos) { + slot_pos->fill(-1); + for (size_t i = 0; i < column_names.size(); ++i) { + auto it = name_to_pos.find(column_names[i]); + if (it != name_to_pos.end()) { + (*slot_pos)[i] = it->second; + } + } + } +}; + +class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) override { + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + RETURN_IF_ERROR(_append_schema_node(path, field, columns)); + } + return Status::OK(); + } + +private: + std::array _slot_pos {}; + + static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) { + switch (type) { + case tparquet::FieldRepetitionType::REQUIRED: + return "REQUIRED"; + case tparquet::FieldRepetitionType::OPTIONAL: + return "OPTIONAL"; + case tparquet::FieldRepetitionType::REPEATED: + return "REPEATED"; + default: + return "UNKNOWN"; + } + } + + Status _append_schema_node(const std::string& path, const FieldSchema& field, + std::vector& columns) { + auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward(args)...); + } + }; + + insert_if_requested(SCHEMA_FILE_NAME, insert_string, path); + insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name); + + if (field.parquet_schema.__isset.type) { + insert_if_requested(SCHEMA_TYPE, insert_string, + physical_type_to_string(field.parquet_schema.type)); + } else { + insert_if_requested(SCHEMA_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.type_length) { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64, + static_cast(field.parquet_schema.type_length)); + } else { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null); + } + + if (field.parquet_schema.__isset.repetition_type) { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string, + _repetition_type_to_string(field.parquet_schema.repetition_type)); + } else { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null); + } + + int64_t num_children = field.parquet_schema.__isset.num_children + ? static_cast(field.parquet_schema.num_children) + : 0; + insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast(num_children)); + + if (field.parquet_schema.__isset.converted_type) { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string, + converted_type_to_string(field.parquet_schema.converted_type)); + } else { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.scale) { + insert_if_requested(SCHEMA_SCALE, insert_int64, + static_cast(field.parquet_schema.scale)); + } else { + insert_if_requested(SCHEMA_SCALE, insert_null); + } + + if (field.parquet_schema.__isset.precision) { + insert_if_requested(SCHEMA_PRECISION, insert_int64, + static_cast(field.parquet_schema.precision)); + } else { + insert_if_requested(SCHEMA_PRECISION, insert_null); + } + + if (field.parquet_schema.__isset.field_id) { + insert_if_requested(SCHEMA_FIELD_ID, insert_int64, + static_cast(field.parquet_schema.field_id)); + } else { + insert_if_requested(SCHEMA_FIELD_ID, insert_null); + } + + std::string logical = logical_type_to_string(field.parquet_schema); + if (logical.empty()) { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null); + } else { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical); + } + + for (const auto& child : field.children) { + RETURN_IF_ERROR(_append_schema_node(path, child, columns)); + } + return Status::OK(); + } +}; + +class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector& slots) override { + std::unordered_map name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + std::unordered_map path_map; + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + build_path_map(field, "", &path_map); + } + + const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA]; + bool has_kv_map = false; + Field kv_map_field; + if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata && + !thrift_meta.key_value_metadata.empty()) { + Array keys; + Array values; + keys.reserve(thrift_meta.key_value_metadata.size()); + values.reserve(thrift_meta.key_value_metadata.size()); + for (const auto& kv : thrift_meta.key_value_metadata) { + keys.emplace_back(Field::create_field(doris::StringView(kv.key))); + if (kv.__isset.value) { + values.emplace_back( + Field::create_field(doris::StringView(kv.value))); + } else { + values.emplace_back(Field {}); + } + } + Map map_value; + map_value.reserve(2); + map_value.emplace_back(Field::create_field(std::move(keys))); + map_value.emplace_back(Field::create_field(std::move(values))); + kv_map_field = Field::create_field(std::move(map_value)); + has_kv_map = true; + } + + for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) { + const auto& row_group = thrift_meta.row_groups[rg_index]; + Int64 row_group_num_rows = static_cast(row_group.num_rows); + Int64 row_group_num_columns = static_cast(row_group.columns.size()); + Int64 row_group_bytes = static_cast(row_group.total_byte_size); + Int64 row_group_compressed_bytes = 0; + if (row_group.__isset.total_compressed_size) { + row_group_compressed_bytes = static_cast(row_group.total_compressed_size); + } else { + for (const auto& col_chunk : row_group.columns) { + if (!col_chunk.__isset.meta_data) { + continue; + } + row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size; + } + } + + for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) { + const auto& column_chunk = row_group.columns[col_idx]; + if (!column_chunk.__isset.meta_data) { + continue; + } + const auto& column_meta = column_chunk.meta_data; + std::string path_in_schema = join_path(column_meta.path_in_schema); + const FieldSchema* schema_field = nullptr; + auto it = path_map.find(path_in_schema); + if (it != path_map.end()) { + schema_field = it->second; + } + + auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward(args)...); + } + }; + + insert_if_requested(META_FILE_NAME, insert_string, + column_chunk.__isset.file_path ? column_chunk.file_path : path); + insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast(rg_index)); + insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows); + insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64, + row_group_num_columns); + insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes); + insert_if_requested(META_COLUMN_ID, insert_int64, static_cast(col_idx)); + + // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present. + // Fall back to the first page (dictionary/data) offset to provide a useful value. + Int64 file_offset = static_cast(column_chunk.file_offset); + if (file_offset == 0) { + if (column_meta.__isset.dictionary_page_offset) { + file_offset = static_cast(column_meta.dictionary_page_offset); + } else { + file_offset = static_cast(column_meta.data_page_offset); + } + } + insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset); + insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values); + insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema); + insert_if_requested(META_TYPE, insert_string, + physical_type_to_string(column_meta.type)); + + if (column_meta.__isset.statistics) { + static const cctz::time_zone kUtc0 = cctz::utc_time_zone(); + const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0; + + const auto& stats = column_meta.statistics; + + if (stats.__isset.min) { + insert_if_requested(META_STATS_MIN, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.min, ctz)); + } else { + insert_if_requested(META_STATS_MIN, insert_null); + } + if (stats.__isset.max) { + insert_if_requested(META_STATS_MAX, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.max, ctz)); + } else { + insert_if_requested(META_STATS_MAX, insert_null); + } + + if (stats.__isset.null_count) { + insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count); + } else { + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + } + if (stats.__isset.distinct_count) { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64, + stats.distinct_count); + } else { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + } + + // Prefer min_value/max_value, but fall back to deprecated min/max so the column + // is still populated for older files. + std::string encoded_min_value; + std::string encoded_max_value; + bool has_min_value = false; + bool has_max_value = false; + if (stats.__isset.min_value) { + encoded_min_value = stats.min_value; + has_min_value = true; + } else if (stats.__isset.min) { + encoded_min_value = stats.min; + has_min_value = true; + } + if (stats.__isset.max_value) { + encoded_max_value = stats.max_value; + has_max_value = true; + } else if (stats.__isset.max) { + encoded_max_value = stats.max; + has_max_value = true; + } + if (has_min_value) { + insert_if_requested(META_STATS_MIN_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_min_value, ctz)); + } else { + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + } + if (has_max_value) { + insert_if_requested(META_STATS_MAX_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_max_value, ctz)); + } else { + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + } + + if (stats.__isset.is_min_value_exact) { + insert_if_requested(META_MIN_IS_EXACT, insert_bool, + stats.is_min_value_exact); + } else { + insert_if_requested(META_MIN_IS_EXACT, insert_null); + } + if (stats.__isset.is_max_value_exact) { + insert_if_requested(META_MAX_IS_EXACT, insert_bool, + stats.is_max_value_exact); + } else { + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + } else { + insert_if_requested(META_STATS_MIN, insert_null); + insert_if_requested(META_STATS_MAX, insert_null); + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + insert_if_requested(META_MIN_IS_EXACT, insert_null); + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + + insert_if_requested(META_COMPRESSION, insert_string, + compression_to_string(column_meta.codec)); + insert_if_requested(META_ENCODINGS, insert_string, + encodings_to_string(column_meta.encodings)); + + if (column_meta.__isset.index_page_offset) { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64, + column_meta.index_page_offset); + } else { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null); + } + if (column_meta.__isset.dictionary_page_offset) { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64, + column_meta.dictionary_page_offset); + } else { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null); + } + insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64, + column_meta.data_page_offset); + + insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64, + column_meta.total_compressed_size); + insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64, + column_meta.total_uncompressed_size); + + if (kv_pos >= 0) { + if (has_kv_map) { + columns[kv_pos]->insert(kv_map_field); + } else { + insert_null(columns[kv_pos]); + } + } + + if (column_meta.__isset.bloom_filter_offset) { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64, + column_meta.bloom_filter_offset); + } else { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null); + } + if (column_meta.__isset.bloom_filter_length) { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64, + static_cast(column_meta.bloom_filter_length)); + } else { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null); + } + + insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64, + row_group_compressed_bytes); + } + } + return Status::OK(); + } + +private: + std::array _slot_pos {}; +}; + +class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + + auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward(args)...); + } + }; + + insert_if_requested(FILE_META_FILE_NAME, insert_string, path); + if (thrift_meta.__isset.created_by) { + insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by); + } else { + insert_if_requested(FILE_META_CREATED_BY, insert_null); + } + insert_if_requested(FILE_META_NUM_ROWS, insert_int64, + static_cast(thrift_meta.num_rows)); + insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64, + static_cast(thrift_meta.row_groups.size())); + insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64, + static_cast(thrift_meta.version)); + if (thrift_meta.__isset.encryption_algorithm) { + const auto& algo = thrift_meta.encryption_algorithm; + std::string algo_name; + if (algo.__isset.AES_GCM_V1) { + algo_name = "AES_GCM_V1"; + } else if (algo.__isset.AES_GCM_CTR_V1) { + algo_name = "AES_GCM_CTR_V1"; + } + if (!algo_name.empty()) { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name); + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + if (thrift_meta.__isset.footer_signing_key_metadata) { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string, + thrift_meta.footer_signing_key_metadata); + } else { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null); + } + return Status::OK(); + } + +private: + std::array _slot_pos {}; +}; + +class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) { + return Status::OK(); + } + + auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward(args)...); + } + }; + + for (const auto& kv : thrift_meta.key_value_metadata) { + insert_if_requested(KV_FILE_NAME, insert_string, path); + insert_if_requested(KV_KEY, insert_string, kv.key); + if (kv.__isset.value) { + insert_if_requested(KV_VALUE, insert_string, kv.value); + } else { + insert_if_requested(KV_VALUE, insert_null); + } + } + return Status::OK(); + } + +private: + std::array _slot_pos {}; +}; + +class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type, + std::map properties, std::string column, + std::string literal) + : ModeHandler(state), + _file_type(file_type), + _properties(std::move(properties)), + _column(std::move(column)), + _literal(std::move(literal)) {} + + void init_slot_pos_map(const std::vector& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector& columns) override { + const FieldSchema* schema = metadata->schema().get_column(_column); + if (schema == nullptr) { + return Status::InvalidArgument( + fmt::format("Column '{}' not found for parquet_bloom_probe", _column)); + } + int parquet_col_id = schema->physical_column_index; + PrimitiveType primitive_type = _get_primitive(schema->data_type); + if (!ParquetPredicate::bloom_filter_supported(primitive_type)) { + return Status::InvalidArgument( + fmt::format("Column '{}' type {} does not support parquet bloom filter probe", + _column, primitive_type)); + } + + std::string encoded_literal; + RETURN_IF_ERROR( + _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal)); + + io::FileSystemProperties system_properties; + system_properties.system_type = _file_type; + system_properties.properties = _properties; + io::FileDescription file_desc; + file_desc.path = path; + io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader( + system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr)); + io::IOContext io_ctx; + + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) { + if (parquet_col_id < 0 || + parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) { + return Status::InvalidArgument(fmt::format( + "Invalid column index {} for parquet_bloom_probe", parquet_col_id)); + } + const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id]; + std::optional excludes; + if (column_chunk.__isset.meta_data && + column_chunk.meta_data.__isset.bloom_filter_offset) { + ParquetPredicate::ColumnStat stat; + auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader, + &io_ctx, &stat); + if (st.ok() && stat.bloom_filter) { + bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(), + encoded_literal.size()); + excludes = !might_contain; + } + } + _emit_row(path, static_cast(rg_idx), excludes, columns); + } + return Status::OK(); + } + +private: + std::array _slot_pos {}; + TFileType::type _file_type; + std::map _properties; + std::string _column; + std::string _literal; + + PrimitiveType _get_primitive(const DataTypePtr& type) const { + if (auto nullable = typeid_cast(type.get())) { + return nullable->get_nested_type()->get_primitive_type(); + } + return type->get_primitive_type(); + } + + Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type, + const std::string& literal, std::string* out) const { + try { + switch (physical_type) { + case tparquet::Type::INT32: { + int64_t v = std::stoll(literal); + int32_t v32 = static_cast(v); + out->assign(reinterpret_cast(&v32), sizeof(int32_t)); + return Status::OK(); + } + case tparquet::Type::INT64: { + int64_t v = std::stoll(literal); + out->assign(reinterpret_cast(&v), sizeof(int64_t)); + return Status::OK(); + } + case tparquet::Type::FLOAT: { + float v = std::stof(literal); + out->assign(reinterpret_cast(&v), sizeof(float)); + return Status::OK(); + } + case tparquet::Type::DOUBLE: { + double v = std::stod(literal); + out->assign(reinterpret_cast(&v), sizeof(double)); + return Status::OK(); + } + case tparquet::Type::BYTE_ARRAY: { + // For string/blob, use raw bytes from the literal. + *out = literal; + return Status::OK(); + } + default: + break; + } + } catch (const std::exception& e) { + return Status::InvalidArgument(fmt::format( + "Failed to parse literal '{}' for parquet bloom probe: {}", literal, e.what())); + } + return Status::NotSupported( + fmt::format("Physical type {} for column '{}' not supported in parquet_bloom_probe", + physical_type, _column)); + } + + void _emit_row(const std::string& path, Int64 row_group_id, std::optional excludes, + std::vector& columns) { + if (_slot_pos[BLOOM_FILE_NAME] >= 0) { + insert_string(columns[_slot_pos[BLOOM_FILE_NAME]], path); + } + if (_slot_pos[BLOOM_ROW_GROUP_ID] >= 0) { + insert_int32(columns[_slot_pos[BLOOM_ROW_GROUP_ID]], static_cast(row_group_id)); + } + if (_slot_pos[BLOOM_EXCLUDES] >= 0) { + int32_t excludes_val = -1; // -1: no bloom filter present + if (excludes.has_value()) { + excludes_val = excludes.value() ? 1 : 0; + } + insert_int32(columns[_slot_pos[BLOOM_EXCLUDES]], excludes_val); + } + } +}; + +ParquetMetadataReader::ParquetMetadataReader(std::vector slots, + RuntimeState* state, RuntimeProfile* profile, + TMetaScanRange scan_range) + : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) { + (void)profile; +} + +ParquetMetadataReader::~ParquetMetadataReader() = default; + +Status ParquetMetadataReader::init_reader() { + RETURN_IF_ERROR(_init_from_scan_range(_scan_range)); + if (_mode_type == Mode::SCHEMA) { + _mode_handler = std::make_unique(_state); + } else if (_mode_type == Mode::FILE_METADATA) { + _mode_handler = std::make_unique(_state); + } else if (_mode_type == Mode::KEY_VALUE_METADATA) { + _mode_handler = std::make_unique(_state); + } else if (_mode_type == Mode::BLOOM_PROBE) { + _mode_handler = std::make_unique( + _state, _file_type, _properties, _bloom_column, _bloom_literal); + } else { + _mode_handler = std::make_unique(_state); + } + _mode_handler->init_slot_pos_map(_slots); + return Status::OK(); +} + +Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) { + if (!scan_range.__isset.parquet_params) { + return Status::InvalidArgument( + "Missing parquet parameters for parquet_meta table function"); + } + const TParquetMetadataParams& params = scan_range.parquet_params; + std::vector resolved_paths; + if (scan_range.__isset.serialized_splits && !scan_range.serialized_splits.empty()) { + resolved_paths.assign(scan_range.serialized_splits.begin(), + scan_range.serialized_splits.end()); + } else if (params.__isset.paths && !params.paths.empty()) { + resolved_paths.assign(params.paths.begin(), params.paths.end()); + } else { + return Status::InvalidArgument("Property 'path' must be set for parquet_meta"); + } + _paths.swap(resolved_paths); + + if (params.__isset.mode) { + _mode = params.mode; + } else { + _mode = MODE_METADATA; // default + } + + if (params.__isset.file_type) { + _file_type = params.file_type; + } else { + return Status::InvalidArgument("Property 'file_type' must be set for parquet_metadata"); + } + if (params.__isset.properties) { + _properties = params.properties; + } + if (params.__isset.bloom_column) { + _bloom_column = params.bloom_column; + } + if (params.__isset.bloom_literal) { + _bloom_literal = params.bloom_literal; + } + std::string lower_mode = _mode; + std::ranges::transform(lower_mode, lower_mode.begin(), + [](unsigned char c) { return std::tolower(c); }); + if (lower_mode == MODE_SCHEMA) { + _mode_type = Mode::SCHEMA; + _mode = MODE_SCHEMA; + } else if (lower_mode == MODE_FILE_METADATA) { + _mode_type = Mode::FILE_METADATA; + _mode = MODE_FILE_METADATA; + } else if (lower_mode == MODE_KEY_VALUE_METADATA) { + _mode_type = Mode::KEY_VALUE_METADATA; + _mode = MODE_KEY_VALUE_METADATA; + } else if (lower_mode == MODE_BLOOM_PROBE) { + _mode_type = Mode::BLOOM_PROBE; + _mode = MODE_BLOOM_PROBE; + } else { + _mode_type = Mode::METADATA; + _mode = MODE_METADATA; + } + if (_mode_type == Mode::BLOOM_PROBE && (_bloom_column.empty() || _bloom_literal.empty())) { + return Status::InvalidArgument( + "Properties 'bloom_column' and 'bloom_literal' must be set for " + "parquet_bloom_probe"); + } + return Status::OK(); +} + +Status ParquetMetadataReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_eof) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } + + // Scanner may call multiple times; we surface data once and mark eof on the next call. + // When reusing a Block, wipe row data but keep column structure intact. + bool mem_reuse = block->mem_reuse(); + std::vector columns(_slots.size()); + if (mem_reuse) { + block->clear_column_data(); + for (size_t i = 0; i < _slots.size(); ++i) { + columns[i] = block->get_by_position(i).column->assume_mutable(); + } + } else { + for (size_t i = 0; i < _slots.size(); ++i) { + columns[i] = _slots[i]->get_empty_mutable_column(); + } + } + + size_t rows_before = block->rows(); + RETURN_IF_ERROR(_build_rows(columns)); + + if (!mem_reuse) { + for (size_t i = 0; i < _slots.size(); ++i) { + block->insert(ColumnWithTypeAndName( + std::move(columns[i]), _slots[i]->get_data_type_ptr(), _slots[i]->col_name())); + } + } else { + columns.clear(); + } + + size_t produced = block->rows() - rows_before; + *read_rows = produced; + _eof = true; + *eof = (produced == 0); + return Status::OK(); +} + +// Iterate all configured paths and append metadata rows into the provided columns. +Status ParquetMetadataReader::_build_rows(std::vector& columns) { + for (const auto& path : _paths) { + RETURN_IF_ERROR(_append_file_rows(path, columns)); + } + return Status::OK(); +} + +// Open a single Parquet file, read its footer, and dispatch to schema/metadata handlers. +Status ParquetMetadataReader::_append_file_rows(const std::string& path, + std::vector& columns) { + io::FileSystemProperties system_properties; + system_properties.system_type = _file_type; + system_properties.properties = _properties; + if (_file_type == TFileType::FILE_HDFS) { + system_properties.hdfs_params = ::doris::parse_properties(system_properties.properties); + } + io::FileDescription file_desc; + file_desc.path = path; + io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader( + system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr)); + + std::unique_ptr file_metadata; + size_t meta_size = 0; + io::IOContext io_ctx; + RETURN_IF_ERROR( + parse_thrift_footer(file_reader, &file_metadata, &meta_size, &io_ctx, false, false)); + + if (_mode_handler == nullptr) { + return Status::InternalError( + "Parquet metadata reader is not initialized with mode handler"); + } + return _mode_handler->append_rows(path, file_metadata.get(), columns); +} + +Status ParquetMetadataReader::close() { + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/parquet_metadata_reader.h b/be/src/vec/exec/format/table/parquet_metadata_reader.h new file mode 100644 index 00000000000000..1e712534b222b4 --- /dev/null +++ b/be/src/vec/exec/format/table/parquet_metadata_reader.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "common/factory_creator.h" +#include "common/status.h" +#include "runtime/descriptors.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +namespace io { +class FileReader; +} // namespace io +} // namespace doris + +namespace doris::vectorized { +class Block; + +// Lightweight reader that surfaces Parquet footer metadata as a table-valued scan. +// It reads only file footers (no data pages) and emits either schema rows or +// row-group/column statistics based on `mode`. +class ParquetMetadataReader : public GenericReader { + ENABLE_FACTORY_CREATOR(ParquetMetadataReader); + +public: + class ModeHandler; + + ParquetMetadataReader(std::vector slots, RuntimeState* state, + RuntimeProfile* profile, TMetaScanRange scan_range); + ~ParquetMetadataReader() override; + + Status init_reader(); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status close() override; + +private: + Status _init_from_scan_range(const TMetaScanRange& scan_range); + Status _build_rows(std::vector& columns); + Status _append_file_rows(const std::string& path, std::vector& columns); + + enum class Mode { SCHEMA, METADATA, FILE_METADATA, KEY_VALUE_METADATA, BLOOM_PROBE }; + + RuntimeState* _state = nullptr; + std::vector _slots; + TMetaScanRange _scan_range; + std::vector _paths; + // File system type and properties for remote Parquet access. + TFileType::type _file_type = TFileType::FILE_LOCAL; + std::map _properties; + std::string _mode; + Mode _mode_type = Mode::METADATA; + std::string _bloom_column; + std::string _bloom_literal; + bool _eof = false; + std::unique_ptr _mode_handler; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/parquet_utils.cpp b/be/src/vec/exec/format/table/parquet_utils.cpp new file mode 100644 index 00000000000000..2b9fbf6f21812a --- /dev/null +++ b/be/src/vec/exec/format/table/parquet_utils.cpp @@ -0,0 +1,436 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_utils.h" + +#include + +#include +#include +#include +#include +#include + +#include "util/string_util.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/common/assert_cast.h" +#include "vec/common/unaligned.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_column_convert.h" + +namespace doris::vectorized::parquet_utils { +namespace { + +template +void insert_numeric_impl(MutableColumnPtr& column, T value) { + if (auto* nullable_column = check_and_get_column(column.get())) { + auto& nested = nullable_column->get_nested_column(); + assert_cast(nested).insert_value(value); + nullable_column->push_false_to_nullmap(1); + } else { + assert_cast(*column).insert_value(value); + } +} + +} // namespace + +std::string join_path(const std::vector& items) { + return join(items, "."); +} + +void insert_int32(MutableColumnPtr& column, Int32 value) { + insert_numeric_impl(column, value); +} + +void insert_int64(MutableColumnPtr& column, Int64 value) { + insert_numeric_impl(column, value); +} + +void insert_bool(MutableColumnPtr& column, bool value) { + insert_numeric_impl(column, static_cast(value)); +} + +void insert_string(MutableColumnPtr& column, const std::string& value) { + if (auto* nullable = check_and_get_column(column.get())) { + nullable->get_null_map_data().push_back(0); + auto& nested = nullable->get_nested_column(); + assert_cast(nested).insert_data(value.c_str(), value.size()); + } else { + assert_cast(*column).insert_data(value.c_str(), value.size()); + } +} + +void insert_null(MutableColumnPtr& column) { + if (auto* nullable = check_and_get_column(column.get())) { + nullable->get_null_map_data().push_back(1); + nullable->get_nested_column().insert_default(); + } else { + column->insert_default(); + } +} + +std::string physical_type_to_string(tparquet::Type::type type) { + switch (type) { + case tparquet::Type::BOOLEAN: + return "BOOLEAN"; + case tparquet::Type::INT32: + return "INT32"; + case tparquet::Type::INT64: + return "INT64"; + case tparquet::Type::INT96: + return "INT96"; + case tparquet::Type::FLOAT: + return "FLOAT"; + case tparquet::Type::DOUBLE: + return "DOUBLE"; + case tparquet::Type::BYTE_ARRAY: + return "BYTE_ARRAY"; + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + return "FIXED_LEN_BYTE_ARRAY"; + default: + return "UNKNOWN"; + } +} + +std::string compression_to_string(tparquet::CompressionCodec::type codec) { + switch (codec) { + case tparquet::CompressionCodec::UNCOMPRESSED: + return "UNCOMPRESSED"; + case tparquet::CompressionCodec::SNAPPY: + return "SNAPPY"; + case tparquet::CompressionCodec::GZIP: + return "GZIP"; + case tparquet::CompressionCodec::LZO: + return "LZO"; + case tparquet::CompressionCodec::BROTLI: + return "BROTLI"; + case tparquet::CompressionCodec::LZ4: + return "LZ4"; + case tparquet::CompressionCodec::ZSTD: + return "ZSTD"; + case tparquet::CompressionCodec::LZ4_RAW: + return "LZ4_RAW"; + default: + return "UNKNOWN"; + } +} + +std::string converted_type_to_string(tparquet::ConvertedType::type type) { + switch (type) { + case tparquet::ConvertedType::UTF8: + return "UTF8"; + case tparquet::ConvertedType::MAP: + return "MAP"; + case tparquet::ConvertedType::MAP_KEY_VALUE: + return "MAP_KEY_VALUE"; + case tparquet::ConvertedType::LIST: + return "LIST"; + case tparquet::ConvertedType::ENUM: + return "ENUM"; + case tparquet::ConvertedType::DECIMAL: + return "DECIMAL"; + case tparquet::ConvertedType::DATE: + return "DATE"; + case tparquet::ConvertedType::TIME_MILLIS: + return "TIME_MILLIS"; + case tparquet::ConvertedType::TIME_MICROS: + return "TIME_MICROS"; + case tparquet::ConvertedType::TIMESTAMP_MILLIS: + return "TIMESTAMP_MILLIS"; + case tparquet::ConvertedType::TIMESTAMP_MICROS: + return "TIMESTAMP_MICROS"; + case tparquet::ConvertedType::UINT_8: + return "UINT_8"; + case tparquet::ConvertedType::UINT_16: + return "UINT_16"; + case tparquet::ConvertedType::UINT_32: + return "UINT_32"; + case tparquet::ConvertedType::UINT_64: + return "UINT_64"; + case tparquet::ConvertedType::INT_8: + return "INT_8"; + case tparquet::ConvertedType::INT_16: + return "INT_16"; + case tparquet::ConvertedType::INT_32: + return "INT_32"; + case tparquet::ConvertedType::INT_64: + return "INT_64"; + case tparquet::ConvertedType::JSON: + return "JSON"; + case tparquet::ConvertedType::BSON: + return "BSON"; + case tparquet::ConvertedType::INTERVAL: + return "INTERVAL"; + default: + return "UNKNOWN"; + } +} + +std::string logical_type_to_string(const tparquet::SchemaElement& element) { + if (element.__isset.logicalType) { + const auto& logical = element.logicalType; + if (logical.__isset.STRING) { + return "STRING"; + } else if (logical.__isset.MAP) { + return "MAP"; + } else if (logical.__isset.LIST) { + return "LIST"; + } else if (logical.__isset.ENUM) { + return "ENUM"; + } else if (logical.__isset.DECIMAL) { + return "DECIMAL"; + } else if (logical.__isset.DATE) { + return "DATE"; + } else if (logical.__isset.TIME) { + return "TIME"; + } else if (logical.__isset.TIMESTAMP) { + return "TIMESTAMP"; + } else if (logical.__isset.INTEGER) { + return "INTEGER"; + } else if (logical.__isset.UNKNOWN) { + return "UNKNOWN"; + } else if (logical.__isset.JSON) { + return "JSON"; + } else if (logical.__isset.BSON) { + return "BSON"; + } else if (logical.__isset.UUID) { + return "UUID"; + } else if (logical.__isset.FLOAT16) { + return "FLOAT16"; + } else if (logical.__isset.VARIANT) { + return "VARIANT"; + } else if (logical.__isset.GEOMETRY) { + return "GEOMETRY"; + } else if (logical.__isset.GEOGRAPHY) { + return "GEOGRAPHY"; + } + } + if (element.__isset.converted_type) { + return converted_type_to_string(element.converted_type); + } + return ""; +} + +std::string encodings_to_string(const std::vector& encodings) { + std::vector parts; + parts.reserve(encodings.size()); + for (auto encoding : encodings) { + switch (encoding) { + case tparquet::Encoding::PLAIN: + parts.emplace_back("PLAIN"); + break; + case tparquet::Encoding::PLAIN_DICTIONARY: + parts.emplace_back("PLAIN_DICTIONARY"); + break; + case tparquet::Encoding::RLE: + parts.emplace_back("RLE"); + break; + case tparquet::Encoding::BIT_PACKED: + parts.emplace_back("BIT_PACKED"); + break; + case tparquet::Encoding::DELTA_BINARY_PACKED: + parts.emplace_back("DELTA_BINARY_PACKED"); + break; + case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + parts.emplace_back("DELTA_LENGTH_BYTE_ARRAY"); + break; + case tparquet::Encoding::DELTA_BYTE_ARRAY: + parts.emplace_back("DELTA_BYTE_ARRAY"); + break; + case tparquet::Encoding::RLE_DICTIONARY: + parts.emplace_back("RLE_DICTIONARY"); + break; + default: + parts.emplace_back("UNKNOWN"); + break; + } + } + return fmt::format("{}", fmt::join(parts, ",")); +} + +bool try_get_statistics_encoded_value(const tparquet::Statistics& statistics, bool is_min, + std::string* encoded_value) { + if (is_min) { + if (statistics.__isset.min_value) { + *encoded_value = statistics.min_value; + return true; + } + if (statistics.__isset.min) { + *encoded_value = statistics.min; + return true; + } + } else { + if (statistics.__isset.max_value) { + *encoded_value = statistics.max_value; + return true; + } + if (statistics.__isset.max) { + *encoded_value = statistics.max; + return true; + } + } + encoded_value->clear(); + return false; +} + +std::string bytes_to_hex_string(const std::string& bytes) { + static constexpr char kHexDigits[] = "0123456789ABCDEF"; + std::string hex; + hex.resize(bytes.size() * 2); + for (size_t i = 0; i < bytes.size(); ++i) { + auto byte = static_cast(bytes[i]); + hex[i * 2] = kHexDigits[byte >> 4]; + hex[i * 2 + 1] = kHexDigits[byte & 0x0F]; + } + return fmt::format("0x{}", hex); +} + +std::string decode_statistics_value(const FieldSchema* schema_field, + tparquet::Type::type physical_type, + const std::string& encoded_value, const cctz::time_zone& ctz) { + if (encoded_value.empty()) { + return ""; + } + if (schema_field == nullptr) { + return bytes_to_hex_string(encoded_value); + } + + auto logical_data_type = remove_nullable(schema_field->data_type); + auto converter = parquet::PhysicalToLogicalConverter::get_converter( + schema_field, logical_data_type, logical_data_type, &ctz); + if (!converter || !converter->support()) { + return bytes_to_hex_string(encoded_value); + } + + ColumnPtr physical_column; + switch (physical_type) { + case tparquet::Type::type::BOOLEAN: { + if (encoded_value.size() != sizeof(UInt8)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnUInt8::create(); + physical_col->insert_value(doris::unaligned_load(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT32: { + if (encoded_value.size() != sizeof(Int32)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt32::create(); + physical_col->insert_value(doris::unaligned_load(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT64: { + if (encoded_value.size() != sizeof(Int64)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt64::create(); + physical_col->insert_value(doris::unaligned_load(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::FLOAT: { + if (encoded_value.size() != sizeof(Float32)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnFloat32::create(); + physical_col->insert_value(doris::unaligned_load(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::DOUBLE: { + if (encoded_value.size() != sizeof(Float64)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnFloat64::create(); + physical_col->insert_value(doris::unaligned_load(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::BYTE_ARRAY: { + auto physical_col = ColumnString::create(); + physical_col->insert_data(encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: { + int32_t type_length = schema_field->parquet_schema.__isset.type_length + ? schema_field->parquet_schema.type_length + : 0; + if (type_length <= 0) { + type_length = static_cast(encoded_value.size()); + } + if (static_cast(type_length) != encoded_value.size()) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnUInt8::create(); + physical_col->resize(type_length); + memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT96: { + constexpr size_t kInt96Size = 12; + if (encoded_value.size() != kInt96Size) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt8::create(); + physical_col->resize(kInt96Size); + memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + default: + return bytes_to_hex_string(encoded_value); + } + + ColumnPtr logical_column; + if (converter->is_consistent()) { + logical_column = physical_column; + } else { + logical_column = logical_data_type->create_column(); + if (Status st = converter->physical_convert(physical_column, logical_column); !st.ok()) { + return bytes_to_hex_string(encoded_value); + } + } + + if (logical_column->size() != 1) { + return bytes_to_hex_string(encoded_value); + } + DataTypeSerDe::FormatOptions options; + options.timezone = &ctz; + return logical_data_type->to_string(*logical_column, 0, options); +} + +void build_path_map(const FieldSchema& field, const std::string& prefix, + std::unordered_map* map) { + std::string current = prefix.empty() ? field.name : fmt::format("{}.{}", prefix, field.name); + if (field.children.empty()) { + (*map)[current] = &field; + } else { + for (const auto& child : field.children) { + build_path_map(child, current, map); + } + } +} + +} // namespace doris::vectorized::parquet_utils diff --git a/be/src/vec/exec/format/table/parquet_utils.h b/be/src/vec/exec/format/table/parquet_utils.h new file mode 100644 index 00000000000000..0f966ab0676f60 --- /dev/null +++ b/be/src/vec/exec/format/table/parquet_utils.h @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "cctz/time_zone.h" +#include "vec/core/types.h" +#include "vec/exec/format/parquet/schema_desc.h" + +namespace doris::vectorized::parquet_utils { + +inline constexpr const char* MODE_SCHEMA = "parquet_schema"; +inline constexpr const char* MODE_METADATA = "parquet_metadata"; +inline constexpr const char* MODE_FILE_METADATA = "parquet_file_metadata"; +inline constexpr const char* MODE_KEY_VALUE_METADATA = "parquet_kv_metadata"; +inline constexpr const char* MODE_BLOOM_PROBE = "parquet_bloom_probe"; + +enum SchemaColumnIndex : size_t { + SCHEMA_FILE_NAME = 0, + SCHEMA_NAME, + SCHEMA_TYPE, + SCHEMA_TYPE_LENGTH, + SCHEMA_REPETITION_TYPE, + SCHEMA_NUM_CHILDREN, + SCHEMA_CONVERTED_TYPE, + SCHEMA_SCALE, + SCHEMA_PRECISION, + SCHEMA_FIELD_ID, + SCHEMA_LOGICAL_TYPE, + SCHEMA_COLUMN_COUNT +}; + +enum MetadataColumnIndex : size_t { + META_FILE_NAME = 0, + META_ROW_GROUP_ID, + META_ROW_GROUP_NUM_ROWS, + META_ROW_GROUP_NUM_COLUMNS, + META_ROW_GROUP_BYTES, + META_COLUMN_ID, + META_FILE_OFFSET, + META_NUM_VALUES, + META_PATH_IN_SCHEMA, + META_TYPE, + META_STATS_MIN, + META_STATS_MAX, + META_STATS_NULL_COUNT, + META_STATS_DISTINCT_COUNT, + META_STATS_MIN_VALUE, + META_STATS_MAX_VALUE, + META_COMPRESSION, + META_ENCODINGS, + META_INDEX_PAGE_OFFSET, + META_DICTIONARY_PAGE_OFFSET, + META_DATA_PAGE_OFFSET, + META_TOTAL_COMPRESSED_SIZE, + META_TOTAL_UNCOMPRESSED_SIZE, + META_KEY_VALUE_METADATA, + META_BLOOM_FILTER_OFFSET, + META_BLOOM_FILTER_LENGTH, + META_MIN_IS_EXACT, + META_MAX_IS_EXACT, + META_ROW_GROUP_COMPRESSED_BYTES, + META_COLUMN_COUNT +}; + +enum FileMetadataColumnIndex : size_t { + FILE_META_FILE_NAME = 0, + FILE_META_CREATED_BY, + FILE_META_NUM_ROWS, + FILE_META_NUM_ROW_GROUPS, + FILE_META_FORMAT_VERSION, + FILE_META_ENCRYPTION_ALGORITHM, + FILE_META_FOOTER_SIGNING_KEY_METADATA, + FILE_META_COLUMN_COUNT +}; + +enum KeyValueColumnIndex : size_t { KV_FILE_NAME = 0, KV_KEY, KV_VALUE, KV_COLUMN_COUNT }; + +enum BloomProbeColumnIndex : size_t { + BLOOM_FILE_NAME = 0, + BLOOM_ROW_GROUP_ID, + BLOOM_EXCLUDES, + BLOOM_COLUMN_COUNT +}; + +inline constexpr std::array kSchemaColumnNames = { + "file_name", "name", "type", "type_length", "repetition_type", "num_children", + "converted_type", "scale", "precision", "field_id", "logical_type"}; + +inline constexpr std::array kMetadataColumnNames = { + "file_name", + "row_group_id", + "row_group_num_rows", + "row_group_num_columns", + "row_group_bytes", + "column_id", + "file_offset", + "num_values", + "path_in_schema", + "type", + "stats_min", + "stats_max", + "stats_null_count", + "stats_distinct_count", + "stats_min_value", + "stats_max_value", + "compression", + "encodings", + "index_page_offset", + "dictionary_page_offset", + "data_page_offset", + "total_compressed_size", + "total_uncompressed_size", + "key_value_metadata", + "bloom_filter_offset", + "bloom_filter_length", + "min_is_exact", + "max_is_exact", + "row_group_compressed_bytes"}; + +inline constexpr std::array kFileMetadataColumnNames = { + "file_name", + "created_by", + "num_rows", + "num_row_groups", + "format_version", + "encryption_algorithm", + "footer_signing_key_metadata"}; + +inline constexpr std::array kKeyValueColumnNames = {"file_name", + "key", "value"}; + +inline constexpr std::array kBloomProbeColumnNames = { + "file_name", "row_group_id", "bloom_filter_excludes"}; + +std::string join_path(const std::vector& items); + +void insert_int32(MutableColumnPtr& column, Int32 value); +void insert_int64(MutableColumnPtr& column, Int64 value); +void insert_bool(MutableColumnPtr& column, bool value); +void insert_string(MutableColumnPtr& column, const std::string& value); +void insert_null(MutableColumnPtr& column); + +std::string physical_type_to_string(tparquet::Type::type type); +std::string compression_to_string(tparquet::CompressionCodec::type codec); +std::string converted_type_to_string(tparquet::ConvertedType::type type); +std::string logical_type_to_string(const tparquet::SchemaElement& element); +std::string encodings_to_string(const std::vector& encodings); + +bool try_get_statistics_encoded_value(const tparquet::Statistics& statistics, bool is_min, + std::string* encoded_value); +std::string bytes_to_hex_string(const std::string& bytes); +std::string decode_statistics_value(const FieldSchema* schema_field, + tparquet::Type::type physical_type, + const std::string& encoded_value, const cctz::time_zone& ctz); + +void build_path_map(const FieldSchema& field, const std::string& prefix, + std::unordered_map* map); + +} // namespace doris::vectorized::parquet_utils diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index 4f5da317947d7b..e47451c77d7798 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -44,6 +44,7 @@ #include "vec/core/types.h" #include "vec/exec/format/table/iceberg_sys_table_jni_reader.h" #include "vec/exec/format/table/paimon_sys_table_jni_reader.h" +#include "vec/exec/format/table/parquet_metadata_reader.h" namespace doris { class RuntimeProfile; @@ -84,6 +85,11 @@ Status MetaScanner::_open_impl(RuntimeState* state) { static_cast(reader.get()) ->set_col_name_to_block_idx(&_src_block_name_to_idx); _reader = std::move(reader); + } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PARQUET) { + auto reader = ParquetMetadataReader::create_unique(_tuple_desc->slots(), state, _profile, + _scan_range.meta_scan_range); + RETURN_IF_ERROR(reader->init_reader()); + _reader = std::move(reader); } else { RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); } diff --git a/be/test/vec/exec/format/parquet/parquet_utils_test.cpp b/be/test/vec/exec/format/parquet/parquet_utils_test.cpp new file mode 100644 index 00000000000000..ba87eb2d00284d --- /dev/null +++ b/be/test/vec/exec/format/parquet/parquet_utils_test.cpp @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_utils.h" + +#include + +#include + +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::vectorized::parquet_utils { + +TEST(ParquetUtilsTest, JoinPath) { + EXPECT_EQ("a.b.c", join_path({"a", "b", "c"})); + EXPECT_EQ("", join_path({})); +} + +TEST(ParquetUtilsTest, InsertNumericAndString) { + { + auto col = ColumnInt32::create(); + MutableColumnPtr ptr = col->get_ptr(); + insert_int32(ptr, 42); + ASSERT_EQ(1, ptr->size()); + EXPECT_EQ(42, assert_cast(*ptr).get_data()[0]); + } + { + auto col = ColumnInt64::create(); + MutableColumnPtr ptr = col->get_ptr(); + insert_int64(ptr, 123456789LL); + ASSERT_EQ(1, ptr->size()); + EXPECT_EQ(123456789LL, assert_cast(*ptr).get_data()[0]); + } + { + auto col = ColumnUInt8::create(); + MutableColumnPtr ptr = col->get_ptr(); + insert_bool(ptr, true); + insert_bool(ptr, false); + ASSERT_EQ(2, ptr->size()); + const auto& data = assert_cast(*ptr).get_data(); + EXPECT_EQ(1, data[0]); + EXPECT_EQ(0, data[1]); + } + { + auto col = ColumnString::create(); + MutableColumnPtr ptr = col->get_ptr(); + insert_string(ptr, "abc"); + ASSERT_EQ(1, ptr->size()); + EXPECT_EQ("abc", assert_cast(*ptr).get_data_at(0).to_string()); + } +} + +TEST(ParquetUtilsTest, InsertIntoNullable) { + { + auto nullable = ColumnNullable::create(ColumnInt32::create(), ColumnUInt8::create()); + MutableColumnPtr ptr = nullable->get_ptr(); + insert_int32(ptr, 7); + insert_null(ptr); + ASSERT_EQ(2, ptr->size()); + const auto& nested_data = + assert_cast(nullable->get_nested_column()).get_data(); + EXPECT_EQ(7, nested_data[0]); + EXPECT_EQ(0, nested_data[1]); + EXPECT_EQ(0, nullable->get_null_map_data()[0]); + EXPECT_EQ(1, nullable->get_null_map_data()[1]); + } + { + auto nullable = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()); + MutableColumnPtr ptr = nullable->get_ptr(); + insert_string(ptr, "xyz"); + insert_null(ptr); + ASSERT_EQ(2, ptr->size()); + EXPECT_EQ("xyz", nullable->get_nested_column().get_data_at(0).to_string()); + EXPECT_EQ("", nullable->get_nested_column().get_data_at(1).to_string()); + EXPECT_EQ(0, nullable->get_null_map_data()[0]); + EXPECT_EQ(1, nullable->get_null_map_data()[1]); + } +} + +TEST(ParquetUtilsTest, TypeToString) { + EXPECT_EQ("INT32", physical_type_to_string(tparquet::Type::INT32)); + EXPECT_EQ("UNKNOWN", physical_type_to_string(static_cast(-1))); + + EXPECT_EQ("SNAPPY", compression_to_string(tparquet::CompressionCodec::SNAPPY)); + EXPECT_EQ("UNKNOWN", compression_to_string(static_cast(-1))); + + EXPECT_EQ("UINT_32", converted_type_to_string(tparquet::ConvertedType::UINT_32)); + EXPECT_EQ("UNKNOWN", converted_type_to_string(static_cast(-1))); +} + +TEST(ParquetUtilsTest, LogicalTypeToString) { + { + tparquet::SchemaElement element; + tparquet::LogicalType logical; + logical.__set_STRING(tparquet::StringType()); + element.__set_logicalType(logical); + EXPECT_EQ("STRING", logical_type_to_string(element)); + } + { + tparquet::SchemaElement element; + element.__set_converted_type(tparquet::ConvertedType::UTF8); + EXPECT_EQ("UTF8", logical_type_to_string(element)); + } + { + tparquet::SchemaElement element; + EXPECT_EQ("", logical_type_to_string(element)); + } +} + +TEST(ParquetUtilsTest, EncodingsToString) { + std::vector encodings = { + tparquet::Encoding::PLAIN, tparquet::Encoding::RLE_DICTIONARY, + tparquet::Encoding::DELTA_BYTE_ARRAY, static_cast(-1)}; + EXPECT_EQ("PLAIN,RLE_DICTIONARY,DELTA_BYTE_ARRAY,UNKNOWN", encodings_to_string(encodings)); +} + +TEST(ParquetUtilsTest, TryGetStatisticsEncodedValue) { + std::string value; + { + tparquet::Statistics stats; + stats.__set_min_value("min_value"); + stats.__set_min("min_deprecated"); + EXPECT_TRUE(try_get_statistics_encoded_value(stats, true, &value)); + EXPECT_EQ("min_value", value); + } + { + tparquet::Statistics stats; + stats.__set_min("min_only"); + EXPECT_TRUE(try_get_statistics_encoded_value(stats, true, &value)); + EXPECT_EQ("min_only", value); + } + { + tparquet::Statistics stats; + stats.__set_max_value("max_value"); + stats.__set_max("max_deprecated"); + EXPECT_TRUE(try_get_statistics_encoded_value(stats, false, &value)); + EXPECT_EQ("max_value", value); + } + { + tparquet::Statistics stats; + stats.__set_max("max_only"); + EXPECT_TRUE(try_get_statistics_encoded_value(stats, false, &value)); + EXPECT_EQ("max_only", value); + } + { + tparquet::Statistics stats; + value = "stale"; + EXPECT_FALSE(try_get_statistics_encoded_value(stats, true, &value)); + EXPECT_EQ("", value); + } +} + +TEST(ParquetUtilsTest, BytesToHexString) { + std::string bytes; + bytes.push_back(static_cast(0x00)); + bytes.push_back(static_cast(0x7F)); + bytes.push_back(static_cast(0xFF)); + EXPECT_EQ("0x007FFF", bytes_to_hex_string(bytes)); +} + +TEST(ParquetUtilsTest, DecodeStatisticsValue) { + auto tz = cctz::utc_time_zone(); + EXPECT_EQ("", decode_statistics_value(nullptr, tparquet::Type::BYTE_ARRAY, "", tz)); + EXPECT_EQ("0x616263", decode_statistics_value(nullptr, tparquet::Type::BYTE_ARRAY, "abc", tz)); + + FieldSchema field; + field.name = "col"; + field.parquet_schema.__set_type(tparquet::Type::INT32); + field.parquet_schema.__set_repetition_type(tparquet::FieldRepetitionType::OPTIONAL); + field.data_type = std::make_shared(); + field.physical_type = tparquet::Type::INT32; + + Int32 value = 12345; + std::string encoded(sizeof(Int32), '\0'); + memcpy(encoded.data(), &value, sizeof(Int32)); + EXPECT_EQ("12345", decode_statistics_value(&field, tparquet::Type::INT32, encoded, tz)); +} + +TEST(ParquetUtilsTest, BuildPathMap) { + FieldSchema root; + root.name = "root"; + FieldSchema child_a; + child_a.name = "a"; + FieldSchema child_b; + child_b.name = "b"; + FieldSchema child_c; + child_c.name = "c"; + child_b.children.push_back(child_c); + root.children.push_back(child_a); + root.children.push_back(child_b); + + std::unordered_map map; + build_path_map(root, "", &map); + ASSERT_EQ(2, map.size()); + EXPECT_EQ("a", map["root.a"]->name); + EXPECT_EQ("c", map["root.b.c"]->name); +} + +} // namespace doris::vectorized::parquet_utils diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index b46ee271c59392..a2fb673b12f220 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -33,6 +33,10 @@ import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta; +import org.apache.doris.nereids.trees.expressions.functions.table.ParquetBloomProbe; +import org.apache.doris.nereids.trees.expressions.functions.table.ParquetFileMetadata; +import org.apache.doris.nereids.trees.expressions.functions.table.ParquetKvMetadata; +import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; @@ -69,7 +73,11 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Query.class, "query"), tableValued(PartitionValues.class, "partition_values"), tableValued(File.class, "file"), - tableValued(Http.class, "http") + tableValued(Http.class, "http"), + tableValued(ParquetMeta.class, "parquet_meta"), + tableValued(ParquetFileMetadata.class, "parquet_file_metadata"), + tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"), + tableValued(ParquetBloomProbe.class, "parquet_bloom_probe") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetBloomProbe.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetBloomProbe.java new file mode 100644 index 00000000000000..02619acb7807d5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetBloomProbe.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_bloom_probe */ +public class ParquetBloomProbe extends TableValuedFunction { + public ParquetBloomProbe(Properties properties) { + super("parquet_bloom_probe", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_bloom_probe"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build ParquetMetadataTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetFileMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetFileMetadata.java new file mode 100644 index 00000000000000..f1bbe354fc5561 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetFileMetadata.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_file_metadata */ +public class ParquetFileMetadata extends TableValuedFunction { + public ParquetFileMetadata(Properties properties) { + super("parquet_file_metadata", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_file_metadata"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build ParquetMetadataTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetKvMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetKvMetadata.java new file mode 100644 index 00000000000000..ee54b088030903 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetKvMetadata.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_kv_metadata */ +public class ParquetKvMetadata extends TableValuedFunction { + public ParquetKvMetadata(Properties properties) { + super("parquet_kv_metadata", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_kv_metadata"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build ParquetMetadataTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetMeta.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetMeta.java new file mode 100644 index 00000000000000..48f891a7a6e8a1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetMeta.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_meta */ +public class ParquetMeta extends TableValuedFunction { + public ParquetMeta(Properties properties) { + super("parquet_meta", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build ParquetMetadataTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitParquetMeta(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index e831be0a6cff17..ee1ccd76478bd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta; +import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; @@ -120,6 +121,10 @@ default R visitNumbers(Numbers numbers, C context) { return visitTableValuedFunction(numbers, context); } + default R visitParquetMeta(ParquetMeta parquetMeta, C context) { + return visitTableValuedFunction(parquetMeta, context); + } + default R visitS3(S3 s3, C context) { return visitTableValuedFunction(s3, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java new file mode 100644 index 00000000000000..2ba434bc5ac17b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java @@ -0,0 +1,394 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.datasource.property.storage.LocalProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + // Align with DuckDB parquet_schema() output. + new Column("file_name", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true), + new Column("name", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true), + new Column("type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true), + new Column("type_length", PrimitiveType.BIGINT, true), + new Column("repetition_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true), + new Column("num_children", PrimitiveType.BIGINT, true), + new Column("converted_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true), + new Column("scale", PrimitiveType.BIGINT, true), + new Column("precision", PrimitiveType.BIGINT, true), + new Column("field_id", PrimitiveType.BIGINT, true), + new Column("logical_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true) + ); + + private static final ImmutableList PARQUET_METADATA_COLUMNS = ImmutableList.of( + // Align with DuckDB parquet_metadata() output. + new Column("file_name", ScalarType.createStringType(), true), + new Column("row_group_id", PrimitiveType.BIGINT, true), + new Column("row_group_num_rows", PrimitiveType.BIGINT, true), + new Column("row_group_num_columns", PrimitiveType.BIGINT, true), + new Column("row_group_bytes", PrimitiveType.BIGINT, true), + new Column("column_id", PrimitiveType.BIGINT, true), + new Column("file_offset", PrimitiveType.BIGINT, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("path_in_schema", ScalarType.createStringType(), true), + new Column("type", ScalarType.createStringType(), true), + new Column("stats_min", ScalarType.createStringType(), true), + new Column("stats_max", ScalarType.createStringType(), true), + new Column("stats_null_count", PrimitiveType.BIGINT, true), + new Column("stats_distinct_count", PrimitiveType.BIGINT, true), + new Column("stats_min_value", ScalarType.createStringType(), true), + new Column("stats_max_value", ScalarType.createStringType(), true), + new Column("compression", ScalarType.createStringType(), true), + new Column("encodings", ScalarType.createStringType(), true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("key_value_metadata", new MapType( + ScalarType.createVarbinaryType(ScalarType.MAX_VARBINARY_LENGTH), + ScalarType.createVarbinaryType(ScalarType.MAX_VARBINARY_LENGTH)), true), + new Column("bloom_filter_offset", PrimitiveType.BIGINT, true), + new Column("bloom_filter_length", PrimitiveType.BIGINT, true), + new Column("min_is_exact", PrimitiveType.BOOLEAN, true), + new Column("max_is_exact", PrimitiveType.BOOLEAN, true), + new Column("row_group_compressed_bytes", PrimitiveType.BIGINT, true) + ); + + private static final ImmutableList PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + // 1 = excluded by BF, 0 = might contain, -1 = no bloom filter in file + new Column("bloom_filter_excludes", PrimitiveType.INT, true) + ); + + private final List paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map params) throws AnalysisException { + Map normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawUri = normalizedParams.get(ExternalFileTableValuedFunction.URI_KEY); + boolean uriProvided = !Strings.isNullOrEmpty(rawUri); + String rawPath = uriProvided ? rawUri : normalizedParams.get(LocalProperties.PROP_FILE_PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException( + "Property 'uri' or 'file_path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException( + "Property 'uri' or 'file_path' must contain at least one location"); + } + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta"); + } + String tmpBloomColumn = null; + String tmpBloomLiteral = null; + if (MODE_BLOOM_PROBE.equals(mode)) { + tmpBloomColumn = normalizedParams.get(COLUMN); + tmpBloomLiteral = normalizedParams.get(VALUE); + if (Strings.isNullOrEmpty(tmpBloomColumn) || Strings.isNullOrEmpty(tmpBloomLiteral)) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + tmpBloomColumn = tmpBloomColumn.trim(); + tmpBloomLiteral = tmpBloomLiteral.trim(); + if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + } + + String scheme = null; + try { + scheme = new URI(parsedPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + if (uriProvided) { + if (Strings.isNullOrEmpty(scheme)) { + throw new AnalysisException("Property 'uri' must contain a scheme for parquet_meta"); + } + } else if (!Strings.isNullOrEmpty(scheme)) { + throw new AnalysisException("Property 'file_path' must not contain a scheme for parquet_meta"); + } + Map storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri". + if (uriProvided) { + storageParams.put(ExternalFileTableValuedFunction.URI_KEY, parsedPath); + } else { + // Local file path, hint local fs support. + storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true"); + storageParams.put(LocalProperties.PROP_FILE_PATH, parsedPath); + } + StorageProperties storageProperties; + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_meta: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map backendProps = storageProperties.getBackendConfigProperties(); + String normalizedPath; + try { + normalizedPath = storageProperties.validateAndNormalizeUri(parsedPath); + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_meta paths: " + e.getMessage(), e); + } + this.properties = backendProps; + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + List normalizedPaths = + expandGlobPath(normalizedPath, storageProperties, storageParams, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + this.bloomColumn = tmpBloomColumn; + this.bloomLiteral = tmpBloomLiteral; + } + + /** + * Expand a wildcard path to matching files. + */ + private static List expandGlobPath(String inputPath, + StorageProperties storageProperties, + Map storageParams, + TFileType fileType) throws AnalysisException { + if (Strings.isNullOrEmpty(inputPath)) { + return Collections.emptyList(); + } + if (!containsWildcards(inputPath)) { + return ImmutableList.of(inputPath); + } + List expanded = + expandSingleGlob(inputPath, storageProperties, storageParams, fileType); + if (expanded.isEmpty()) { + throw new AnalysisException("No files matched parquet_meta path patterns: " + inputPath); + } + return expanded; + } + + private static boolean containsWildcards(String path) { + if (Strings.isNullOrEmpty(path)) { + return false; + } + return path.contains("*") || path.contains("[") || path.contains("{"); + } + + private static List expandSingleGlob(String pattern, + StorageProperties storageProperties, + Map storageParams, + TFileType fileType) throws AnalysisException { + if (fileType == TFileType.FILE_LOCAL) { + Map localProps = new HashMap<>(storageParams); + // Allow Local TVF to pick any alive backend when backend_id is not provided. + localProps.putIfAbsent(LocalTableValuedFunction.PROP_SHARED_STORAGE, "true"); + localProps.putIfAbsent(FileFormatConstants.PROP_FORMAT, FileFormatConstants.FORMAT_PARQUET); + // Local TVF expects the uri/path in properties; storageParams already contains it. + LocalTableValuedFunction localTvf = new LocalTableValuedFunction(localProps); + return localTvf.getFileStatuses().stream() + .filter(status -> !status.isIsDir()) + .map(TBrokerFileStatus::getPath) + .collect(Collectors.toList()); + } + if (fileType == TFileType.FILE_HTTP) { + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + if (storageProperties == null) { + throw new AnalysisException("Storage properties is required for glob pattern: " + pattern); + } + if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) { + return globRemoteWithBroker(pattern, storageParams); + } + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + + private static List globRemoteWithBroker(String pattern, + Map storageParams) throws AnalysisException { + List remoteFiles = new ArrayList<>(); + BrokerDesc brokerDesc = new BrokerDesc("ParquetMetaTvf", storageParams); + try { + BrokerUtil.parseFile(pattern, brokerDesc, remoteFiles); + } catch (UserException e) { + throw new AnalysisException("Failed to expand glob pattern '" + pattern + "': " + + e.getMessage(), e); + } + return remoteFiles.stream() + .filter(file -> !file.isIsDir()) + .map(TBrokerFileStatus::getPath) + .collect(Collectors.toList()); + } + + /** + * Map FE storage type to BE file type. + */ + private static TFileType mapToFileType(StorageProperties.Type type) throws AnalysisException { + switch (type) { + case HDFS: + case OSS_HDFS: + return TFileType.FILE_HDFS; + case HTTP: + return TFileType.FILE_HTTP; + case LOCAL: + return TFileType.FILE_LOCAL; + case S3: + case OSS: + case OBS: + case COS: + case GCS: + case MINIO: + case AZURE: + return TFileType.FILE_S3; + default: + throw new AnalysisException("Unsupported storage type for parquet_meta: " + type); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.PARQUET; + } + + @Override + public TMetaScanRange getMetaScanRange(List requiredFields) { + TParquetMetadataParams parquetParams = new TParquetMetadataParams(); + parquetParams.setPaths(paths); + parquetParams.setMode(mode); + parquetParams.setFileType(fileType); + parquetParams.setProperties(properties); + if (MODE_BLOOM_PROBE.equals(mode)) { + parquetParams.setBloomColumn(bloomColumn); + parquetParams.setBloomLiteral(bloomLiteral); + } + + TMetaScanRange scanRange = new TMetaScanRange(); + scanRange.setMetadataType(TMetadataType.PARQUET); + scanRange.setParquetParams(parquetParams); + // Fan out: one file per split so MetadataScanNode can distribute across BEs. + if (paths != null && !paths.isEmpty()) { + scanRange.setSerializedSplits(paths); + } + return scanRange; + } + + @Override + public String getTableName() { + return "ParquetMetadataTableValuedFunction<" + mode + ">"; + } + + @Override + public List getTableColumns() { + if (MODE_SCHEMA.equals(mode)) { + return PARQUET_SCHEMA_COLUMNS; + } + if (MODE_FILE_METADATA.equals(mode)) { + return PARQUET_FILE_METADATA_COLUMNS; + } + if (MODE_KV_METADATA.equals(mode)) { + return PARQUET_KV_METADATA_COLUMNS; + } + if (MODE_BLOOM_PROBE.equals(mode)) { + return PARQUET_BLOOM_PROBE_COLUMNS; + } + return PARQUET_METADATA_COLUMNS; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index b7bdbcaa35bf71..7621ca58587e29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,6 +79,23 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map copy = new HashMap<>(params); + copy.put("mode", "parquet_file_metadata"); + return new ParquetMetadataTableValuedFunction(copy); + } + case ParquetMetadataTableValuedFunction.NAME_KV_METADATA: { + Map copy = new HashMap<>(params); + copy.put("mode", "parquet_kv_metadata"); + return new ParquetMetadataTableValuedFunction(copy); + } + case ParquetMetadataTableValuedFunction.NAME_BLOOM_PROBE: { + Map copy = new HashMap<>(params); + copy.put("mode", "parquet_bloom_probe"); + return new ParquetMetadataTableValuedFunction(copy); + } case GroupCommitTableValuedFunction.NAME: return new GroupCommitTableValuedFunction(params); case QueryTableValueFunction.NAME: diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index edcdd2c676bf84..690027d247a52d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -636,6 +636,15 @@ struct TQueriesMetadataParams { struct TMetaCacheStatsParams { } +struct TParquetMetadataParams { + 1: optional list paths + 2: optional string mode + 3: optional Types.TFileType file_type + 4: optional map properties + 5: optional string bloom_column + 6: optional string bloom_literal +} + struct TMetaScanRange { 1: optional Types.TMetadataType metadata_type 2: optional TIcebergMetadataParams iceberg_params // deprecated @@ -655,6 +664,7 @@ struct TMetaScanRange { 14: optional map hadoop_props 15: optional string serialized_table; 16: optional list serialized_splits; + 17: optional TParquetMetadataParams parquet_params; } // Specification of an individual data range which is held in its entirety diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index e558e51d022c7d..e2abaaace9c9d6 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -753,6 +753,7 @@ enum TMetadataType { PARTITION_VALUES = 10, HUDI = 11, PAIMON = 12, + PARQUET = 13, } // deprecated @@ -775,4 +776,3 @@ const i32 TSNAPSHOT_REQ_VERSION1 = 3; // corresponding to alpha rowset const i32 TSNAPSHOT_REQ_VERSION2 = 4; // corresponding to beta rowset // the snapshot request should always set prefer snapshot version to TPREFER_SNAPSHOT_REQ_VERSION const i32 TPREFER_SNAPSHOT_REQ_VERSION = TSNAPSHOT_REQ_VERSION2; - diff --git a/regression-test/data/external_table_p0/tvf/bloommeta.parquet b/regression-test/data/external_table_p0/tvf/bloommeta.parquet new file mode 100644 index 00000000000000..b3d620e0f6c5d7 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/bloommeta.parquet differ diff --git a/regression-test/data/external_table_p0/tvf/empty.parquet b/regression-test/data/external_table_p0/tvf/empty.parquet new file mode 100644 index 00000000000000..21a8bec4f913c6 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/empty.parquet differ diff --git a/regression-test/data/external_table_p0/tvf/kvmeta.parquet b/regression-test/data/external_table_p0/tvf/kvmeta.parquet new file mode 100644 index 00000000000000..1043aa9eb7718e Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/kvmeta.parquet differ diff --git a/regression-test/data/external_table_p0/tvf/meta.parquet b/regression-test/data/external_table_p0/tvf/meta.parquet new file mode 100644 index 00000000000000..8c9166d03af992 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/meta.parquet differ diff --git a/regression-test/data/external_table_p0/tvf/test_parquet_meta_tvf.out b/regression-test/data/external_table_p0/tvf/test_parquet_meta_tvf.out new file mode 100644 index 00000000000000..521286473d4122 --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_parquet_meta_tvf.out @@ -0,0 +1,155 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !parquet_metadata_s3 -- +0 5 4 236 0 4 5 normal_int INT32 1 5 1 \N 1 +0 5 4 236 1 65 5 string_col BYTE_ARRAY \N \N 1 \N a +0 5 4 236 2 130 5 date_col INT32 2023-01-01 2023-01-05 1 \N 2023-01-01 +0 5 4 236 3 191 5 decimal_col FIXED_LEN_BYTE_ARRAY 10.00 50.00 1 \N 10.00 + +-- !parquet_metadata_default_mode -- +0 5 4 236 0 4 5 normal_int INT32 1 5 1 \N 1 +0 5 4 236 1 65 5 string_col BYTE_ARRAY \N \N 1 \N a +0 5 4 236 2 130 5 date_col INT32 2023-01-01 2023-01-05 1 \N 2023-01-01 +0 5 4 236 3 191 5 decimal_col FIXED_LEN_BYTE_ARRAY 10.00 50.00 1 \N 10.00 + +-- !parquet_schema -- +date_col INT32 \N OPTIONAL 0 DATE \N \N \N DATE +decimal_col FIXED_LEN_BYTE_ARRAY 5 OPTIONAL 0 DECIMAL 2 10 \N DECIMAL +normal_int INT32 \N OPTIONAL 0 \N \N \N \N \N +string_col BYTE_ARRAY \N OPTIONAL 0 UTF8 \N \N \N STRING + +-- !parquet_metadata_empty -- + +-- !parquet_kv_metadata -- +app kvmeta_test +note it's ok +ver v1 + +-- !parquet_file_metadata -- +parquet-cpp-arrow version 21.0.0 5 1 2 + +-- !parquet_file_metadata_s3_glob -- +3 + +-- !parquet_bloom_probe -- +0 0 +1 0 +2 1 + +-- !parquet_bloom_probe_no_bf -- +0 -1 + +-- !parquet_mapping -- +0 5 4 236 0 4 5 normal_int INT32 1 5 1 \N 1 +0 5 4 236 1 65 5 string_col BYTE_ARRAY \N \N 1 \N a +0 5 4 236 2 130 5 date_col INT32 2023-01-01 2023-01-05 1 \N 2023-01-01 +0 5 4 236 3 191 5 decimal_col FIXED_LEN_BYTE_ARRAY 10.00 50.00 1 \N 10.00 + +-- !parquet_schema_hdfs -- +1 + +-- !parquet_file_metadata_hdfs -- +1 + +-- !parquet_kv_metadata_hdfs -- +0 + +-- !parquet_metadata_local -- +//meta.parquet 0 5 4 236 0 4 5 normal_int INT32 1 5 1 \N 1 5 SNAPPY PLAIN,RLE,RLE_DICTIONARY \N 4 36 61 57 {0x4152524F573A736368656D61:0x2F2F2F2F2F7A414241414151414141414141414B41417741426741464141674143674141414141424241414D4141414143414149414141414241414941414141424141414141514141414441414141416641414141455141414141454141414159502F2F2F77414141516351414141414A414141414151414141414141414141437741414147526C59326C7459577866593239734141674144414145414167414341414141416F4141414143414141416E502F2F2F774141415167514141414149414141414151414141414141414141434141414147526864475666593239734141414741416741426741474141414141414141414E442F2F2F384141414546454141414143414141414145414141414141414141416F414141427A64484A70626D6466593239734141414541415141424141414142414146414149414159414277414D41414141454141514141414141414142416841414141416B4141414142414141414141414141414B41414141626D39796257467358326C75644141414341414D41416741427741494141414141414141415341414141414141414141} \N \N \N \N 252 +//meta.parquet 0 5 4 236 1 65 5 string_col BYTE_ARRAY \N \N 1 \N a e SNAPPY PLAIN,RLE,RLE_DICTIONARY \N 65 101 65 61 {0x4152524F573A736368656D61:0x2F2F2F2F2F7A414241414151414141414141414B41417741426741464141674143674141414141424241414D4141414143414149414141414241414941414141424141414141514141414441414141416641414141455141414141454141414159502F2F2F77414141516351414141414A414141414151414141414141414141437741414147526C59326C7459577866593239734141674144414145414167414341414141416F4141414143414141416E502F2F2F774141415167514141414149414141414151414141414141414141434141414147526864475666593239734141414741416741426741474141414141414141414E442F2F2F384141414546454141414143414141414145414141414141414141416F414141427A64484A70626D6466593239734141414541415141424141414142414146414149414159414277414D41414141454141514141414141414142416841414141416B4141414142414141414141414141414B41414141626D39796257467358326C75644141414341414D41416741427741494141414141414141415341414141414141414141} \N \N \N \N 252 +//meta.parquet 0 5 4 236 2 130 5 date_col INT32 2023-01-01 2023-01-05 1 \N 2023-01-01 2023-01-05 SNAPPY PLAIN,RLE,RLE_DICTIONARY \N 130 162 61 57 {0x4152524F573A736368656D61:0x2F2F2F2F2F7A414241414151414141414141414B41417741426741464141674143674141414141424241414D4141414143414149414141414241414941414141424141414141514141414441414141416641414141455141414141454141414159502F2F2F77414141516351414141414A414141414151414141414141414141437741414147526C59326C7459577866593239734141674144414145414167414341414141416F4141414143414141416E502F2F2F774141415167514141414149414141414151414141414141414141434141414147526864475666593239734141414741416741426741474141414141414141414E442F2F2F384141414546454141414143414141414145414141414141414141416F414141427A64484A70626D6466593239734141414541415141424141414142414146414149414159414277414D41414141454141514141414141414142416841414141416B4141414142414141414141414141414B41414141626D39796257467358326C75644141414341414D41416741427741494141414141414141415341414141414141414141} \N \N \N \N 252 +//meta.parquet 0 5 4 236 3 191 5 decimal_col FIXED_LEN_BYTE_ARRAY 10.00 50.00 1 \N 10.00 50.00 SNAPPY PLAIN,RLE,RLE_DICTIONARY \N 191 227 65 61 {0x4152524F573A736368656D61:0x2F2F2F2F2F7A414241414151414141414141414B41417741426741464141674143674141414141424241414D4141414143414149414141414241414941414141424141414141514141414441414141416641414141455141414141454141414159502F2F2F77414141516351414141414A414141414151414141414141414141437741414147526C59326C7459577866593239734141674144414145414167414341414141416F4141414143414141416E502F2F2F774141415167514141414149414141414151414141414141414141434141414147526864475666593239734141414741416741426741474141414141414141414E442F2F2F384141414546454141414143414141414145414141414141414141416F414141427A64484A70626D6466593239734141414541415141424141414142414146414149414159414277414D41414141454141514141414141414142416841414141416B4141414142414141414141414141414B41414141626D39796257467358326C75644141414341414D41416741427741494141414141414141415341414141414141414141} \N \N \N \N 252 + +-- !parquet_schema_local -- +//meta.parquet date_col INT32 \N OPTIONAL 0 DATE \N \N \N DATE +//meta.parquet decimal_col FIXED_LEN_BYTE_ARRAY 5 OPTIONAL 0 DECIMAL 2 10 \N DECIMAL +//meta.parquet normal_int INT32 \N OPTIONAL 0 \N \N \N \N \N +//meta.parquet string_col BYTE_ARRAY \N OPTIONAL 0 UTF8 \N \N \N STRING + +-- !parquet_metadata_empty_local -- + +-- !parquet_file_metadata_local -- +//meta.parquet parquet-cpp-arrow version 21.0.0 5 1 2 \N \N + +-- !parquet_file_metadata_local_glob -- +3 + +-- !parquet_kv_metadata_local -- +//kvmeta.parquet app kvmeta_test +//kvmeta.parquet note it's ok +//kvmeta.parquet ver v1 + +-- !parquet_bloom_probe_local -- +//bloommeta.parquet 0 0 +//bloommeta.parquet 1 0 +//bloommeta.parquet 2 1 + +-- !parquet_bloom_probe_no_bf_local -- +//meta.parquet 0 -1 + +-- !parquet_metadata_local_mapping -- +0 5 4 236 0 4 5 normal_int INT32 1 5 1 \N 1 +0 5 4 236 1 65 5 string_col BYTE_ARRAY \N \N 1 \N a +0 5 4 236 2 130 5 date_col INT32 2023-01-01 2023-01-05 1 \N 2023-01-01 +0 5 4 236 3 191 5 decimal_col FIXED_LEN_BYTE_ARRAY 10.00 50.00 1 \N 10.00 + +-- !parquet_schema_local_comp -- +0 0 0 + +-- !parquet_schema_local_comp_arr -- +0 0 0 + +-- !parquet_schema_local_t -- +0 0 0 0 0 0 0 0 + +-- !parquet_metadata_local_comp -- +0 0 id INT32 25 +0 1 m1.key_value.key INT32 94 +0 2 m1.key_value.value.list.element DOUBLE 444 +0 3 m2.key_value.key BYTE_ARRAY 25 +0 4 m2.key_value.value.key_value.key BYTE_ARRAY 147 +0 5 m2.key_value.value.key_value.value DOUBLE 147 + +-- !parquet_kv_metadata_local_comp -- +//comp.parquet org.apache.spark.sql.parquet.row.metadata {"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"m1","type":{"type":"map","keyType":"integer","valueType":{"type":"array","elementType":"double","containsNull":true},"valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"m2","type":{"type":"map","keyType":"string","valueType":{"type":"map","keyType":"string","valueType":"double","valueContainsNull":true},"valueContainsNull":true},"nullable":true,"metadata":{}}]} + +-- !parquet_file_metadata_local_comp -- +//comp.parquet parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 25 1 + +-- !parquet_metadata_local_comp_arr -- +0 0 id INT32 25 +0 1 aa.list.element.list.element DOUBLE 4770 +0 2 am.list.element.key_value.key BYTE_ARRAY 1998 +0 3 am.list.element.key_value.value INT32 1998 + +-- !parquet_kv_metadata_local_comp_arr -- +//comp_arr.parquet org.apache.spark.sql.parquet.row.metadata {"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"aa","type":{"type":"array","elementType":{"type":"array","elementType":"double","containsNull":true},"containsNull":true},"nullable":true,"metadata":{}},{"name":"am","type":{"type":"array","elementType":{"type":"map","keyType":"string","valueType":"integer","valueContainsNull":true},"containsNull":true},"nullable":true,"metadata":{}}]} + +-- !parquet_file_metadata_local_comp_arr -- +//comp_arr.parquet parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 25 1 + +-- !parquet_metadata_local_t -- +0 0 id INT32 25 +0 1 arr_arr.list.element.list.element BYTE_ARRAY 341 +0 10 map_arr.key_value.value.list.element DOUBLE 473 +0 11 map_struct.key_value.key INT96 118 +0 12 map_struct.key_value.value.vin BYTE_ARRAY 118 +0 13 map_struct.key_value.value.charge_id INT32 118 +0 14 map_struct.key_value.value.start_time DOUBLE 118 +0 15 struct_arr_map.aa.list.element BYTE_ARRAY 106 +0 16 struct_arr_map.mm.key_value.key INT32 25 +0 17 struct_arr_map.mm.key_value.value BYTE_ARRAY 25 +0 2 arr_map.list.element.key_value.key BYTE_ARRAY 127 +0 3 arr_map.list.element.key_value.value INT32 127 +0 4 arr_struct.list.element.vin BYTE_ARRAY 137 +0 5 arr_struct.list.element.charge_id INT32 137 +0 6 map_map.key_value.key BYTE_ARRAY 25 +0 7 map_map.key_value.value.key_value.key BYTE_ARRAY 139 +0 8 map_map.key_value.value.key_value.value DOUBLE 139 +0 9 map_arr.key_value.key INT32 102 + +-- !parquet_kv_metadata_local_t -- +//t.parquet org.apache.spark.sql.parquet.row.metadata {"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"arr_arr","type":{"type":"array","elementType":{"type":"array","elementType":"string","containsNull":true},"containsNull":true},"nullable":true,"metadata":{}},{"name":"arr_map","type":{"type":"array","elementType":{"type":"map","keyType":"string","valueType":"date","valueContainsNull":true},"containsNull":true},"nullable":true,"metadata":{}},{"name":"arr_struct","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"charge_id","type":"integer","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"map_map","type":{"type":"map","keyType":"string","valueType":{"type":"map","keyType":"string","valueType":"double","valueContainsNull":true},"valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"map_arr","type":{"type":"map","keyType":"integer","valueType":{"type":"array","elementType":"double","containsNull":true},"valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"map_struct","type":{"type":"map","keyType":"timestamp","valueType":{"type":"struct","fields":[{"name":"vin","type":"string","nullable":true,"metadata":{}},{"name":"charge_id","type":"integer","nullable":true,"metadata":{}},{"name":"start_time","type":"double","nullable":true,"metadata":{}}]},"valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"struct_arr_map","type":{"type":"struct","fields":[{"name":"aa","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"mm","type":{"type":"map","keyType":"date","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]} + +-- !parquet_file_metadata_local_t -- +//t.parquet parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 25 1 + diff --git a/regression-test/suites/external_table_p0/tvf/test_parquet_meta_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_parquet_meta_tvf.groovy new file mode 100644 index 00000000000000..1ac07b6463b41c --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_parquet_meta_tvf.groovy @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_parquet_meta_tvf", "p0,external,external_docker,tvf") { + // use nereids planner + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + List> backends = sql """ show backends """ + assertTrue(backends.size() > 0) + + def ak = getS3AK() + def sk = getS3SK() + def endpoint = getS3Endpoint() + def region = getS3Region() + def bucket = context.config.otherConfigs.get("s3BucketName") + def basePath = "s3://${bucket}/regression/datalake/pipeline_data" + + // parquet_metadata (S3) + // Note: Prefer asserting on stable metadata columns; avoid relying on host-specific/local-only paths. + order_qt_parquet_metadata_s3 """ + select + row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, + file_offset, num_values, path_in_schema, type, stats_min, stats_max, stats_null_count, + stats_distinct_count, stats_min_value + from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_metadata" + ) + order by row_group_id, column_id; + """ + + // default mode: parquet_metadata + order_qt_parquet_metadata_default_mode """ + select + row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, + file_offset, num_values, path_in_schema, type, stats_min, stats_max, stats_null_count, + stats_distinct_count, stats_min_value + from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}" + ) + order by row_group_id, column_id; + """ + + // parquet_schema + order_qt_parquet_schema """ + select + name, type, type_length, repetition_type, num_children, converted_type, + scale, precision, field_id, logical_type + from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_schema" + ) + order by name; + """ + + // empty parquet + order_qt_parquet_metadata_empty """ + select * from parquet_meta( + "uri" = "${basePath}/empty.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_metadata" + ); + """ + + // kv metadata + order_qt_parquet_kv_metadata """ + select `key`, `value` from parquet_meta( + "uri" = "${basePath}/kvmeta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_kv_metadata" + ) + order by `key`; + """ + + // file metadata + order_qt_parquet_file_metadata """ + select created_by, num_rows, num_row_groups, format_version from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_file_metadata" + ); + """ + + // file metadata (S3 glob) + order_qt_parquet_file_metadata_s3_glob """ + select count(*) from parquet_meta( + "uri" = "${basePath}/*meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_file_metadata" + ); + """ + + // bloom probe + order_qt_parquet_bloom_probe """ + select row_group_id, bloom_filter_excludes from parquet_meta( + "uri" = "${basePath}/bloommeta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_bloom_probe", + "column" = "col", + "value" = 500 + ) + order by row_group_id; + """ + + // bloom probe: column without bloom filter + order_qt_parquet_bloom_probe_no_bf """ + select row_group_id, bloom_filter_excludes from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_bloom_probe", + "column" = "normal_int", + "value" = 500 + ) + order by row_group_id; + """ + + // mapping select + order_qt_parquet_mapping """ + select row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, + file_offset, num_values, path_in_schema, type, stats_min, stats_max, stats_null_count, + stats_distinct_count, stats_min_value + from parquet_meta( + "uri" = "${basePath}/meta.parquet", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_metadata" + ); + """ + + // parquet_metadata (HDFS, hive3): reuse group0 parquet files in test_hdfs_parquet_group0.groovy + String hdfs_port = context.config.otherConfigs.get("hive3HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + def hdfsUserName = "doris" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + def uri = "" + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/delta_length_byte_array.parquet" + order_qt_parquet_schema_hdfs """ + select count(*) from parquet_meta( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "mode" = "parquet_schema" + ); + """ + + uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/datapage_v1-snappy-compressed-checksum.parquet" + order_qt_parquet_file_metadata_hdfs """ + select count(*) from parquet_meta( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "mode" = "parquet_file_metadata" + ); + """ + + uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/column_chunk_key_value_metadata.parquet" + order_qt_parquet_kv_metadata_hdfs """ + select count(*) from parquet_meta( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "mode" = "parquet_kv_metadata" + ); + """ + } finally { + } + } + + // local parquet_meta: scp files to every BE, then query by local file_path + def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/" + def outFilePath="/" + mkdirRemotePathOnAllBE("root", outFilePath) + + def localMetaFile = "${dataFilePath}/meta.parquet" + def localEmptyFile = "${dataFilePath}/empty.parquet" + def localKvMetaFile = "${dataFilePath}/kvmeta.parquet" + def localBloomMetaFile = "${dataFilePath}/bloommeta.parquet" + def localCompFile = "${dataFilePath}/comp.parquet" + def localCompArrFile = "${dataFilePath}/comp_arr.parquet" + def localTFile = "${dataFilePath}/t.parquet" + + for (List backend : backends) { + def beHost = backend[1] + scpFiles("root", beHost, localMetaFile, outFilePath, false) + scpFiles("root", beHost, localEmptyFile, outFilePath, false) + scpFiles("root", beHost, localKvMetaFile, outFilePath, false) + scpFiles("root", beHost, localBloomMetaFile, outFilePath, false) + scpFiles("root", beHost, localCompFile, outFilePath, false) + scpFiles("root", beHost, localCompArrFile, outFilePath, false) + scpFiles("root", beHost, localTFile, outFilePath, false) + } + + order_qt_parquet_metadata_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_metadata" + ); + """ + + order_qt_parquet_schema_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_schema" + ); + """ + + order_qt_parquet_metadata_empty_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/empty.parquet", + "mode" = "parquet_metadata" + ); + """ + + order_qt_parquet_file_metadata_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_file_metadata" + ); + """ + + // file metadata (local glob) + order_qt_parquet_file_metadata_local_glob """ + select count(*) from parquet_meta( + "file_path" = "${outFilePath}/*meta.parquet", + "mode" = "parquet_file_metadata" + ); + """ + + order_qt_parquet_kv_metadata_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/kvmeta.parquet", + "mode" = "parquet_kv_metadata" + ); + """ + + order_qt_parquet_bloom_probe_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/bloommeta.parquet", + "mode" = "parquet_bloom_probe", + "column" = "col", + "value" = 500 + ); + """ + + order_qt_parquet_bloom_probe_no_bf_local """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_bloom_probe", + "column" = "normal_int", + "value" = 500 + ); + """ + + order_qt_parquet_metadata_local_mapping """ + select + row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, + file_offset, num_values, path_in_schema, type, stats_min, stats_max, stats_null_count, + stats_distinct_count, stats_min_value + from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_metadata" + ); + """ + + // complex types: schema validation + order_qt_parquet_schema_local_comp """ + select + sum(if(name = 'm1', 1, 0)) as m1_nodes, + sum(if(name = 'm2', 1, 0)) as m2_nodes, + max(case + when logical_type in ('MAP', 'LIST') or converted_type in ('MAP', 'LIST') then 1 + else 0 + end) as has_complex + from parquet_meta( + "file_path" = "${outFilePath}/comp.parquet", + "mode" = "parquet_schema" + ); + """ + + order_qt_parquet_schema_local_comp_arr """ + select + sum(if(name = 'aa', 1, 0)) as aa_nodes, + sum(if(name = 'am', 1, 0)) as am_nodes, + max(case + when logical_type in ('MAP', 'LIST') or converted_type in ('MAP', 'LIST') then 1 + else 0 + end) as has_complex + from parquet_meta( + "file_path" = "${outFilePath}/comp_arr.parquet", + "mode" = "parquet_schema" + ); + """ + + order_qt_parquet_schema_local_t """ + select + sum(if(name = 'arr_arr', 1, 0)) as arr_arr_nodes, + sum(if(name = 'arr_map', 1, 0)) as arr_map_nodes, + sum(if(name = 'arr_struct', 1, 0)) as arr_struct_nodes, + sum(if(name = 'map_map', 1, 0)) as map_map_nodes, + sum(if(name = 'map_arr', 1, 0)) as map_arr_nodes, + sum(if(name = 'map_struct', 1, 0)) as map_struct_nodes, + sum(if(name = 'struct_arr_map', 1, 0)) as struct_arr_map_nodes, + max(case + when logical_type in ('MAP', 'LIST') or converted_type in ('MAP', 'LIST') then 1 + else 0 + end) as has_complex + from parquet_meta( + "file_path" = "${outFilePath}/t.parquet", + "mode" = "parquet_schema" + ); + """ + + // complex types: metadata/kv/file modes + order_qt_parquet_metadata_local_comp """ + select row_group_id, column_id, path_in_schema, type, num_values + from parquet_meta( + "file_path" = "${outFilePath}/comp.parquet", + "mode" = "parquet_metadata" + ) + order by row_group_id, column_id; + """ + + order_qt_parquet_kv_metadata_local_comp """ + select * from parquet_meta( + "file_path" = "${outFilePath}/comp.parquet", + "mode" = "parquet_kv_metadata" + ); + """ + + order_qt_parquet_file_metadata_local_comp """ + select file_name, created_by, num_rows, num_row_groups + from parquet_meta( + "file_path" = "${outFilePath}/comp.parquet", + "mode" = "parquet_file_metadata" + ); + """ + + order_qt_parquet_metadata_local_comp_arr """ + select row_group_id, column_id, path_in_schema, type, num_values + from parquet_meta( + "file_path" = "${outFilePath}/comp_arr.parquet", + "mode" = "parquet_metadata" + ) + order by row_group_id, column_id; + """ + + order_qt_parquet_kv_metadata_local_comp_arr """ + select * from parquet_meta( + "file_path" = "${outFilePath}/comp_arr.parquet", + "mode" = "parquet_kv_metadata" + ); + """ + + order_qt_parquet_file_metadata_local_comp_arr """ + select file_name, created_by, num_rows, num_row_groups + from parquet_meta( + "file_path" = "${outFilePath}/comp_arr.parquet", + "mode" = "parquet_file_metadata" + ); + """ + + order_qt_parquet_metadata_local_t """ + select row_group_id, column_id, path_in_schema, type, num_values + from parquet_meta( + "file_path" = "${outFilePath}/t.parquet", + "mode" = "parquet_metadata" + ) + order by row_group_id, column_id; + """ + + order_qt_parquet_kv_metadata_local_t """ + select * from parquet_meta( + "file_path" = "${outFilePath}/t.parquet", + "mode" = "parquet_kv_metadata" + ); + """ + + order_qt_parquet_file_metadata_local_t """ + select file_name, created_by, num_rows, num_row_groups + from parquet_meta( + "file_path" = "${outFilePath}/t.parquet", + "mode" = "parquet_file_metadata" + ); + """ + + // test exception + test { + sql """ select * from parquet_meta(); """ + exception "Property 'uri' or 'file_path' is required for parquet_meta" + } + + test { + sql """ + select * from parquet_meta( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "endpoint" = "${endpoint}", + "region" = "${region}", + "mode" = "parquet_metadata" + ); + """ + exception "Property 'uri' or 'file_path' is required for parquet_meta" + } + + test { + sql """ + select * from parquet_meta( + "uri" = " ", + "mode" = "parquet_metadata" + ); + """ + exception "Property 'uri' or 'file_path' must contain at least one location" + } + + test { + sql """ + select * from parquet_meta( + "uri" = "meta.parquet", + "mode" = "parquet_metadata" + ); + """ + exception "Property 'uri' must contain a scheme for parquet_meta" + } + + test { + sql """ + select * from parquet_meta( + "file_path" = "s3://bucket/path.parquet", + "mode" = "parquet_metadata" + ); + """ + exception "Property 'file_path' must not contain a scheme for parquet_meta" + } + + test { + sql """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_unknown" + ); + """ + exception "Unsupported mode 'parquet_unknown' for parquet_meta" + } + + test { + sql """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_bloom_probe", + "value" = 1 + ); + """ + exception "Missing 'column' or 'value' for mode parquet_bloom_probe" + } + + test { + sql """ + select * from parquet_meta( + "file_path" = "${outFilePath}/meta.parquet", + "mode" = "parquet_bloom_probe", + "column" = "col" + ); + """ + exception "Missing 'column' or 'value' for mode parquet_bloom_probe" + } + + test { + sql """ + select * from parquet_meta( + "file_path" = "${outFilePath}/__parquet_meta_tvf_no_match_*.parquet", + "mode" = "parquet_metadata" + ); + """ + exception "failed to glob" + } +}