Skip to content

Commit fcb6ded

Browse files
authored
Merge pull request #1138 from Altinity/backports/antalya-25.8/87600
Antalya 25.8 Backport of ClickHouse#87600 - More fixes for parquet reader v3, enable it in stateless tests
2 parents c8c233e + c236f00 commit fcb6ded

25 files changed

+413
-249
lines changed

src/DataTypes/NestedUtils.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,8 @@ std::string concatenateName(const std::string & nested_table_name, const std::st
4848
*/
4949
std::pair<std::string, std::string> splitName(const std::string & name, bool reverse)
5050
{
51-
auto idx = (reverse ? name.find_last_of('.') : name.find_first_of('.'));
52-
if (idx == std::string::npos || idx == 0 || idx + 1 == name.size())
53-
return {name, {}};
54-
55-
return {name.substr(0, idx), name.substr(idx + 1)};
51+
auto res = splitName(std::string_view(name), reverse);
52+
return {std::string(res.first), std::string(res.second)};
5653
}
5754

5855
std::pair<std::string_view, std::string_view> splitName(std::string_view name, bool reverse)

src/DataTypes/NestedUtils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ namespace Nested
1717
std::string concatenateName(const std::string & nested_table_name, const std::string & nested_field_name);
1818

1919
/// Splits name of compound identifier by first/last dot (depending on 'reverse' parameter).
20+
/// If the name is not nested (no dot or dot at start/end),
21+
/// returns {name, ""}.
2022
std::pair<std::string, std::string> splitName(const std::string & name, bool reverse = false);
2123
std::pair<std::string_view, std::string_view> splitName(std::string_view name, bool reverse = false);
2224

src/Formats/FormatFactory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class FormatFactory final : private boost::noncopyable
178178
UInt64 max_block_size,
179179
const std::optional<FormatSettings> & format_settings = std::nullopt,
180180
FormatParserSharedResourcesPtr parser_shared_resources = nullptr,
181-
FormatFilterInfoPtr format_filter_info = std::make_shared<FormatFilterInfo>(),
181+
FormatFilterInfoPtr format_filter_info = nullptr,
182182
// affects things like buffer sizes and parallel reading
183183
bool is_remote_fs = false,
184184
// allows to do: buf -> parallel read -> decompression,

src/Formats/FormatFilterInfo.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@ namespace DB
1515
namespace ErrorCodes
1616
{
1717
extern const int LOGICAL_ERROR;
18+
extern const int ICEBERG_SPECIFICATION_VIOLATION;
1819
}
1920

2021
void ColumnMapper::setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_)
2122
{
23+
chassert(storage_encoding.empty());
2224
storage_encoding = std::move(storage_encoding_);
25+
for (const auto & [column_name, field_id] : storage_encoding)
26+
if (!field_id_to_clickhouse_name.emplace(field_id, column_name).second)
27+
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Duplicate field id {}", field_id);
2328
}
2429

2530
std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> ColumnMapper::makeMapping(

src/Formats/FormatFilterInfo.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@ class ColumnMapper
1818
{
1919
public:
2020
/// clickhouse_column_name -> field_id
21+
/// For tuples, the map contains both the tuple itself and all its elements, e.g. {t, t.x, t.y}.
22+
/// Note that parquet schema reader has to apply the mapping to all tuple fields recursively
23+
/// even if the whole tuple was requested, because the names of the fields may be different.
2124
void setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_);
2225

2326
const std::unordered_map<String, Int64> & getStorageColumnEncoding() const { return storage_encoding; }
27+
const std::unordered_map<Int64, String> & getFieldIdToClickHouseName() const { return field_id_to_clickhouse_name; }
28+
2429
/// clickhouse_column_name -> format_column_name (just join the maps above by field_id).
2530
std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> makeMapping(const std::unordered_map<Int64, String> & format_encoding);
2631

2732
private:
2833
std::unordered_map<String, Int64> storage_encoding;
34+
std::unordered_map<Int64, String> field_id_to_clickhouse_name;
2935
};
3036

3137
using ColumnMapperPtr = std::shared_ptr<ColumnMapper>;

src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1556,7 +1556,18 @@ Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(
15561556
auto arrow_field = table->schema()->GetFieldByName(column_name);
15571557

15581558
if (parquet_columns_to_clickhouse)
1559-
column_name = parquet_columns_to_clickhouse->at(column_name);
1559+
{
1560+
auto column_name_it = parquet_columns_to_clickhouse->find(column_name);
1561+
if (column_name_it == parquet_columns_to_clickhouse->end())
1562+
{
1563+
throw Exception(
1564+
ErrorCodes::LOGICAL_ERROR,
1565+
"Column '{}' is not present in input data. Column name mapping has {} columns",
1566+
column_name,
1567+
parquet_columns_to_clickhouse->size());
1568+
}
1569+
column_name = column_name_it->second;
1570+
}
15601571

15611572
if (case_insensitive_matching)
15621573
boost::to_lower(column_name);

src/Processors/Formats/Impl/Parquet/Decoding.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ struct ByteStreamSplitDecoder : public PageDecoder
745745

746746
bool PageDecoderInfo::canReadDirectlyIntoColumn(parq::Encoding::type encoding, size_t num_values, IColumn & col, std::span<char> & out) const
747747
{
748-
if (encoding == parq::Encoding::PLAIN && fixed_size_converter && fixed_size_converter->isTrivial())
748+
if (encoding == parq::Encoding::PLAIN && fixed_size_converter && physical_type != parq::Type::BOOLEAN && fixed_size_converter->isTrivial())
749749
{
750750
chassert(col.sizeOfValueIfFixed() == fixed_size_converter->input_size);
751751
out = col.insertRawUninitialized(num_values);
@@ -1417,7 +1417,7 @@ void GeoConverter::convertColumn(std::span<const char> chars, const UInt64 * off
14171417
{
14181418
col.reserve(col.size() + num_values);
14191419
chassert(chars.size() >= offsets[num_values - 1]);
1420-
for (size_t i = 0; i < num_values; ++i)
1420+
for (ssize_t i = 0; i < ssize_t(num_values); ++i)
14211421
{
14221422
char * ptr = const_cast<char*>(chars.data() + offsets[i - 1]);
14231423
size_t length = offsets[i] - offsets[i - 1] - separator_bytes;

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro
356356
}
357357
case ReadStage::MainData:
358358
{
359+
row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed);
360+
359361
/// Must add to delivery_queue before advancing read_ptr to deliver subgroups in order.
360362
/// (If we advanced read_ptr first, another thread could start and finish reading the
361363
/// next subgroup before we add this one to delivery_queue, and ReadManager::read could
@@ -368,7 +370,6 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro
368370
size_t prev = row_group.read_ptr.exchange(row_subgroup_idx + 1);
369371
chassert(prev == row_subgroup_idx);
370372
advanced_read_ptr = prev + 1;
371-
row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed);
372373
delivery_cv.notify_one();
373374
break; // proceed to advancing read_ptr
374375
}

src/Processors/Formats/Impl/Parquet/Reader.cpp

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -274,31 +274,34 @@ void Reader::prefilterAndInitRowGroups()
274274
SchemaConverter schemer(file_metadata, options, &extended_sample_block);
275275
if (prewhere_info && !prewhere_info->remove_prewhere_column)
276276
schemer.external_columns.push_back(prewhere_info->prewhere_column_name);
277+
schemer.column_mapper = format_filter_info->column_mapper.get();
277278
schemer.prepareForReading();
278279
primitive_columns = std::move(schemer.primitive_columns);
279280
total_primitive_columns_in_file = schemer.primitive_column_idx;
280281
output_columns = std::move(schemer.output_columns);
281282

282283
/// Precalculate some column index mappings.
283284

284-
sample_block_to_output_columns_idx.resize(extended_sample_block.columns(), UINT64_MAX);
285+
sample_block_to_output_columns_idx.resize(extended_sample_block.columns());
285286
for (size_t i = 0; i < output_columns.size(); ++i)
286287
{
287288
const auto & idx = output_columns[i].idx_in_output_block;
288289
if (idx.has_value())
289290
{
290-
chassert(sample_block_to_output_columns_idx.at(*idx) == UINT64_MAX);
291+
chassert(!sample_block_to_output_columns_idx.at(*idx).has_value());
291292
sample_block_to_output_columns_idx.at(*idx) = i;
292293
}
293294
}
294-
chassert(std::all_of(sample_block_to_output_columns_idx.begin(), sample_block_to_output_columns_idx.end(), [](size_t x) { return x != UINT64_MAX; }));
295295

296296
if (format_filter_info->key_condition)
297297
{
298298
for (size_t idx_in_output_block : format_filter_info->key_condition->getUsedColumns())
299299
{
300-
size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
301-
const OutputColumnInfo & output_info = output_columns[output_idx];
300+
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
301+
if (!output_idx.has_value())
302+
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeyCondition uses PREWHERE output");
303+
const OutputColumnInfo & output_info = output_columns[output_idx.value()];
304+
302305
if (output_info.is_primitive)
303306
primitive_columns[output_info.primitive_start].used_by_key_condition = idx_in_output_block;
304307
}
@@ -363,7 +366,11 @@ void Reader::prefilterAndInitRowGroups()
363366
const auto & column_conditions = static_cast<FilterInfoExt *>(format_filter_info->opaque.get())->column_conditions;
364367
for (const auto & [idx_in_output_block, key_condition] : column_conditions)
365368
{
366-
const OutputColumnInfo & output_info = output_columns[sample_block_to_output_columns_idx.at(idx_in_output_block)];
369+
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
370+
if (!output_idx.has_value())
371+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column condition uses PREWHERE output");
372+
const OutputColumnInfo & output_info = output_columns[output_idx.value()];
373+
367374
if (!output_info.is_primitive)
368375
continue;
369376
primitive_columns[output_info.primitive_start].column_index_condition = key_condition.get();
@@ -602,44 +609,47 @@ void Reader::initializePrefetches()
602609
void Reader::preparePrewhere()
603610
{
604611
PrewhereInfoPtr prewhere_info = format_filter_info->prewhere_info;
605-
if (!prewhere_info)
606-
return;
612+
if (prewhere_info)
613+
{
614+
/// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row
615+
/// subgroup, in one thread per row group. Instead, we could extract single-column conditions
616+
/// and run them after decoding the corresponding columns, in parallel.
617+
/// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.)
618+
/// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression.
607619

608-
/// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row
609-
/// subgroup, in one thread per row group. Instead, we could extract single-column conditions
610-
/// and run them after decoding the corresponding columns, in parallel.
611-
/// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.)
612-
/// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression.
613620

614-
/// Convert ActionsDAG to ExpressionActions.
615-
ExpressionActionsSettings actions_settings;
616-
if (prewhere_info->row_level_filter.has_value())
617-
{
618-
ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings);
621+
/// Convert ActionsDAG to ExpressionActions.
622+
ExpressionActionsSettings actions_settings;
623+
if (prewhere_info->row_level_filter.has_value())
624+
{
625+
ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings);
626+
prewhere_steps.push_back(PrewhereStep
627+
{
628+
.actions = std::move(actions),
629+
.result_column_name = prewhere_info->row_level_column_name,
630+
});
631+
}
632+
ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings);
619633
prewhere_steps.push_back(PrewhereStep
620634
{
621635
.actions = std::move(actions),
622-
.result_column_name = prewhere_info->row_level_column_name
636+
.result_column_name = prewhere_info->prewhere_column_name,
637+
.need_filter = prewhere_info->need_filter,
623638
});
639+
if (!prewhere_info->remove_prewhere_column)
640+
prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name);
624641
}
625-
ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings);
626-
prewhere_steps.push_back(PrewhereStep
627-
{
628-
.actions = std::move(actions),
629-
.result_column_name = prewhere_info->prewhere_column_name,
630-
.need_filter = prewhere_info->need_filter,
631-
});
632-
if (!prewhere_info->remove_prewhere_column)
633-
prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name);
634-
635642
/// Look up expression inputs in extended_sample_block.
636643
for (PrewhereStep & step : prewhere_steps)
637644
{
638645
for (const auto & col : step.actions.getRequiredColumnsWithTypes())
639646
{
640647
size_t idx_in_output_block = extended_sample_block.getPositionByName(col.name, /* case_insensitive= */ false);
641-
size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
642-
OutputColumnInfo & output_info = output_columns[output_idx];
648+
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
649+
if (!output_idx.has_value())
650+
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input");
651+
OutputColumnInfo & output_info = output_columns[output_idx.value()];
652+
643653
output_info.use_prewhere = true;
644654
bool only_for_prewhere = idx_in_output_block >= sample_block->columns();
645655

@@ -649,7 +659,21 @@ void Reader::preparePrewhere()
649659
primitive_columns[primitive_idx].only_for_prewhere = only_for_prewhere;
650660
}
651661

652-
step.input_column_idxs.push_back(output_idx);
662+
step.input_column_idxs.push_back(output_idx.value());
663+
}
664+
}
665+
666+
/// Assert that sample_block_to_output_columns_idx is valid.
667+
for (size_t i = 0; i < sample_block_to_output_columns_idx.size(); ++i)
668+
{
669+
/// (`prewhere_steps` has at most two elements)
670+
size_t is_prewhere_output = std::count_if(prewhere_steps.begin(), prewhere_steps.end(),
671+
[&](const PrewhereStep & step) { return step.idx_in_output_block == i; });
672+
if (is_prewhere_output > 1 ||
673+
/// Column must appear in exactly one of {output_columns, prewhere output}.
674+
sample_block_to_output_columns_idx[i].has_value() != !is_prewhere_output)
675+
{
676+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected column in sample block: {}", extended_sample_block.getByPosition(i).name);
653677
}
654678
}
655679
}
@@ -974,7 +998,8 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group)
974998
bytes_per_row += estimateColumnMemoryBytesPerRow(row_group.columns.at(i), row_group, primitive_columns.at(i));
975999

9761000
size_t n = size_t(options.format.parquet.prefer_block_bytes / std::max(bytes_per_row, 1.));
977-
rows_per_subgroup = std::min(rows_per_subgroup, std::max(n, 1ul));
1001+
n = std::max(n, size_t(128)); // avoid super tiny blocks if something is wrong with stats
1002+
rows_per_subgroup = std::min(rows_per_subgroup, n);
9781003
}
9791004
chassert(rows_per_subgroup > 0);
9801005

src/Processors/Formats/Impl/Parquet/Reader.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,17 @@ namespace DB::Parquet
2424
{
2525

2626
// TODO [parquet]:
27-
// * column_mapper
28-
// * allow_geoparquet_parser
27+
// * either multistage PREWHERE or make query optimizer selectively move parts of the condition to prewhere instead of the whole condition
2928
// * test on files from https://github.com/apache/parquet-testing
3029
// * check fields for false sharing, add cacheline padding as needed
3130
// * make sure userspace page cache read buffer supports readBigAt
32-
// * assert that memory usage is zero at the end, the reset()s are easy to miss
3331
// * support newer parquet versions: https://github.com/apache/parquet-format/blob/master/CHANGES.md
3432
// * make writer write DataPageV2
3533
// * make writer write PageEncodingStats
3634
// * make writer write DELTA_LENGTH_BYTE_ARRAY
3735
// * try adding [[unlikely]] to all ifs
3836
// * try adding __restrict to pointers on hot paths
3937
// * support or deprecate the preserve-order setting
40-
// * stats (reuse the ones from the other PR?)
41-
// - number of row groups that were split into chunks
4238
// * add comments everywhere
4339
// * progress indication and estimating bytes to read; allow negative total_bytes_to_read?
4440
// * cache FileMetaData in something like SchemaCache
@@ -156,7 +152,7 @@ struct Reader
156152
size_t column_idx;
157153
/// Index in parquet `schema` (in FileMetaData).
158154
size_t schema_idx;
159-
String name;
155+
String name; // possibly mapped by ColumnMapper (e.g. using iceberg metadata)
160156
PageDecoderInfo decoder;
161157

162158
DataTypePtr raw_decoded_type; // not Nullable
@@ -192,7 +188,7 @@ struct Reader
192188

193189
struct OutputColumnInfo
194190
{
195-
String name;
191+
String name; // possibly mapped by ColumnMapper
196192
/// Range in primitive_columns.
197193
size_t primitive_start = 0;
198194
size_t primitive_end = 0;
@@ -455,8 +451,10 @@ struct Reader
455451
size_t total_primitive_columns_in_file = 0;
456452
std::vector<OutputColumnInfo> output_columns;
457453
/// Maps idx_in_output_block to index in output_columns. I.e.:
458-
/// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i
459-
std::vector<size_t> sample_block_to_output_columns_idx;
454+
/// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i
455+
/// nullopt if the column is produced by PREWHERE expression:
456+
/// prewhere_steps[?].idx_in_output_block == i
457+
std::vector<std::optional<size_t>> sample_block_to_output_columns_idx;
460458

461459
/// sample_block with maybe some columns added at the end.
462460
/// The added columns are used as inputs to prewhere expression, then discarded.

0 commit comments

Comments
 (0)