Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_
MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
auto res = ColumnString::create();
if (_dict_items.empty()) {
if (dict_column->size() > 0) {
LOG(ERROR) << "Attempt to convert dict column with empty dictionary, column size: "
<< dict_column->size();
}
return res;
}
std::vector<StringRef> dict_values(dict_column->size());
const auto& data = dict_column->get_data();
for (size_t i = 0; i < dict_column->size(); ++i) {
Expand All @@ -106,7 +113,7 @@ Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary()) {
ColumnDictI32& dict_column = assert_cast<ColumnDictI32&>(*doris_column);
if (dict_column.dict_size() == 0) {
if (dict_column.dict_size() == 0 && !_dict_items.empty()) {
//If the dictionary grows too big, whether in size or number of distinct values,
// the encoding will fall back to the plain encoding.
dict_column.insert_many_dict_data(_dict_items.data(),
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
ColumnSelectVector& select_vector, bool is_dict_filter) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0 && !_dict_items.empty()) {
std::vector<StringRef> dict_items;

char* dict_item_address = (char*)_dict.get();
Expand Down Expand Up @@ -213,6 +213,13 @@ class FixLengthDictDecoder final : public BaseDictDecoder {

MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override {
auto res = ColumnString::create();
if (_dict_items.empty()) {
if (dict_column->size() > 0) {
LOG(ERROR) << "Attempt to convert dict column with empty dictionary, column size: "
<< dict_column->size();
}
return res;
}
std::vector<StringRef> dict_values;
dict_values.reserve(dict_column->size());
const auto& data = dict_column->get_data();
Expand Down
20 changes: 18 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,21 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
if (!dict_loaded) {
// Load and decompress dictionary page from file
if (_block_compress_codec != nullptr) {
auto dict_num = header->dictionary_page_header.num_values;
if (dict_num == 0 && uncompressed_size != 0) {
return Status::IOError(
"Dictionary page's num_values is {} but uncompressed_size is {}", dict_num,
uncompressed_size);
}
Slice compressed_data;
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
Slice dict_slice(dict_data.get(), uncompressed_size);
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice));
if (dict_num != 0) {
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice));
}

// Decide whether to cache decompressed or compressed dictionary based on threshold
// If uncompressed_page_size == 0, should_cache_decompressed will return true
bool cache_payload_decompressed = should_cache_decompressed(header, _metadata);

if (_page_read_ctx.enable_parquet_file_page_cache &&
Expand All @@ -431,17 +440,24 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
std::vector<uint8_t> empty_levels; // Dictionary pages don't have levels
if (cache_payload_decompressed) {
// Cache the decompressed dictionary page
// If dict_num == 0, `dict_slice` will be empty
_insert_page_into_cache(empty_levels, dict_slice);
_chunk_statistics.page_cache_decompressed_write_counter += 1;
} else {
if (config::enable_parquet_cache_compressed_pages) {
DCHECK(!compressed_data.empty());
// Cache the compressed dictionary page
_insert_page_into_cache(empty_levels,
Slice(compressed_data.data, compressed_data.size));
_chunk_statistics.page_cache_compressed_write_counter += 1;
}
}
}
// `get_page_data` not called, we should skip the page data
// Because `_insert_page_into_cache` will use _page_reader, we should exec `skip_page_data` after `_insert_page_into_cache`
if (dict_num == 0) {
_page_reader->skip_page_data();
}
} else {
Slice dict_slice;
RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/vparquet_page_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ inline bool should_cache_decompressed(const tparquet::PageHeader* header,
const tparquet::ColumnMetaData& metadata) {
if (header->compressed_page_size <= 0) return true;
if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return true;
if (header->uncompressed_page_size == 0) return true;

double ratio = static_cast<double>(header->uncompressed_page_size) /
static_cast<double>(header->compressed_page_size);
Expand Down
Loading