Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions be/src/storage/iterator/vgeneric_iterators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_nullable.h"
#include "core/data_type/data_type.h"
#include "storage/cache/schema_cache.h"
#include "storage/field.h"
Expand All @@ -46,36 +45,13 @@ using namespace ErrorCode;
Status VStatisticsIterator::init(const StorageReadOptions& opts) {
if (!_init) {
_push_down_agg_type_opt = opts.push_down_agg_type_opt;
_tablet_schema = opts.tablet_schema;

// COUNT_NULL needs to actually read nullmap pages, so the column iterators must be
// fully initialized with a valid ColumnIteratorOptions (file_reader, stats, io_ctx).
// Other agg types (COUNT, MINMAX, MIX) only use zone-map metadata and never open
// pages, so they do not need init.
const bool need_iter_init = (_push_down_agg_type_opt == TPushAggOp::COUNT_NULL);
ColumnIteratorOptions iter_opts {
.use_page_cache = opts.use_page_cache,
.file_reader = _segment->file_reader().get(),
.stats = opts.stats,
.io_ctx = opts.io_ctx,
};

for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
auto unique_id = _schema.column(cid)->unique_id();
if (_column_iterators_map.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(
opts.tablet_schema->column(cid), &_column_iterators_map[unique_id], &opts));
if (need_iter_init) {
RETURN_IF_ERROR(_column_iterators_map[unique_id]->init(iter_opts));
// Seek to ordinal 0 once during init so that the page iterator
// is properly positioned for sequential read_null_map() calls
// in next_batch(). We must NOT seek again in next_batch() —
// doing so would reset the iterator to ordinal 0 on every batch
// and cause rows to be re-read/double-counted for segments
// larger than MAX_ROW_SIZE_IN_COUNT (65535) rows.
RETURN_IF_ERROR(_column_iterators_map[unique_id]->seek_to_ordinal(0));
}
}
_column_iterators.push_back(_column_iterators_map[unique_id].get());
}
Expand All @@ -100,33 +76,6 @@ Status VStatisticsIterator::next_batch(Block* block) {
for (auto& column : columns) {
column->insert_many_defaults(size);
}
} else if (_push_down_agg_type_opt == TPushAggOp::COUNT_NULL) {
for (int i = 0; i < (int)columns.size(); ++i) {
auto& column = columns[i];
auto cid = _schema.column_id(i);
auto& tablet_column = _tablet_schema->column(cid);

if (tablet_column.is_nullable()) {
auto& nullable_col = assert_cast<ColumnNullable&>(*column);
auto& nested_col = nullable_col.get_nested_column();

// Read the real nullmap for this column from the current position.
// Do NOT seek back to ordinal 0 here: the column iterator already
// starts at ordinal 0 after init(), and each call to read_null_map
// advances it sequentially. Seeking to 0 on every next_batch() call
// would cause large segments (> MAX_ROW_SIZE_IN_COUNT rows) to have
// their first portion re-read and counted multiple times, producing
// a result higher than the true non-null count.
size_t read_rows = size;
auto& null_map_data = nullable_col.get_null_map_data();
RETURN_IF_ERROR(_column_iterators[i]->read_null_map(&read_rows, null_map_data));

// nested column needs one default value per row
nested_col.insert_many_defaults(size);
} else {
column->insert_many_defaults(size);
}
}
} else {
for (int i = 0; i < columns.size(); ++i) {
RETURN_IF_ERROR(_column_iterators[i]->next_batch_of_zone_map(&size, columns[i]));
Expand Down
1 change: 0 additions & 1 deletion be/src/storage/iterator/vgeneric_iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class VStatisticsIterator : public RowwiseIterator {
private:
std::shared_ptr<Segment> _segment;
const Schema& _schema;
std::shared_ptr<TabletSchema> _tablet_schema;
size_t _target_rows = 0;
size_t _output_rows = 0;
bool _init = false;
Expand Down
110 changes: 0 additions & 110 deletions be/src/storage/segment/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1252,27 +1252,6 @@ Status MapFileColumnIterator::set_access_paths(const TColumnAccessPaths& all_acc
return Status::OK();
}

Status MapFileColumnIterator::read_null_map(size_t* n, NullMap& null_map) {
if (!_map_reader->is_nullable()) {
return Status::InternalError("read_null_map is not supported for non-nullable column");
}
if (!_null_iterator) {
// Schema-change scenario: column became nullable but old segment has no null data.
null_map.resize(*n);
memset(null_map.data(), 0, *n);
return Status::OK();
}
auto null_col = ColumnUInt8::create();
auto null_col_ptr = null_col->get_ptr();
size_t read_rows = *n;
bool has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&read_rows, null_col_ptr, &has_null));
*n = read_rows;
null_map.resize(read_rows);
memcpy(null_map.data(), null_col->get_data().data(), read_rows);
return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////

StructFileColumnIterator::StructFileColumnIterator(
Expand Down Expand Up @@ -1490,27 +1469,6 @@ Status StructFileColumnIterator::set_access_paths(
return Status::OK();
}

Status StructFileColumnIterator::read_null_map(size_t* n, NullMap& null_map) {
if (!_struct_reader->is_nullable()) {
return Status::InternalError("read_null_map is not supported for non-nullable column");
}
if (!_null_iterator) {
// Schema-change scenario: column became nullable but old segment has no null data.
null_map.resize(*n);
memset(null_map.data(), 0, *n);
return Status::OK();
}
auto null_col = ColumnUInt8::create();
auto null_col_ptr = null_col->get_ptr();
size_t read_rows = *n;
bool has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&read_rows, null_col_ptr, &has_null));
*n = read_rows;
null_map.resize(read_rows);
memcpy(null_map.data(), null_col->get_data().data(), read_rows);
return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////
Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_offset_iterator->init(opts));
Expand Down Expand Up @@ -1769,27 +1727,6 @@ Status ArrayFileColumnIterator::set_access_paths(const TColumnAccessPaths& all_a
return Status::OK();
}

Status ArrayFileColumnIterator::read_null_map(size_t* n, NullMap& null_map) {
if (!_array_reader->is_nullable()) {
return Status::InternalError("read_null_map is not supported for non-nullable column");
}
if (!_null_iterator) {
// Schema-change scenario: column became nullable but old segment has no null data.
null_map.resize(*n);
memset(null_map.data(), 0, *n);
return Status::OK();
}
auto null_col = ColumnUInt8::create();
auto null_col_ptr = null_col->get_ptr();
size_t read_rows = *n;
bool has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&read_rows, null_col_ptr, &has_null));
*n = read_rows;
null_map.resize(read_rows);
memcpy(null_map.data(), null_col->get_data().data(), read_rows);
return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////

FileColumnIterator::FileColumnIterator(std::shared_ptr<ColumnReader> reader) : _reader(reader) {}
Expand Down Expand Up @@ -1964,53 +1901,6 @@ Status FileColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst, bool* ha
return Status::OK();
}

Status FileColumnIterator::read_null_map(size_t* n, NullMap& null_map) {
if (!_reader->is_nullable()) {
return Status::InternalError("read_null_map is not supported for non-nullable column");
}

null_map.resize(*n);
size_t remaining = *n;
size_t offset = 0;

while (remaining > 0) {
if (!_page.has_remaining()) {
bool eos = false;
RETURN_IF_ERROR(_load_next_page(&eos));
if (eos) {
break;
}
}

if (!_page.has_null) {
size_t nrows_in_page = std::min(remaining, _page.remaining());
memset(null_map.data() + offset, 0, nrows_in_page);
offset += nrows_in_page;
_current_ordinal += nrows_in_page;
_page.offset_in_page += nrows_in_page;
remaining -= nrows_in_page;
continue;
}

size_t nrows_in_page = std::min(remaining, _page.remaining());
size_t this_run = 0;
while (this_run < nrows_in_page) {
bool is_null = false;
size_t run_len = _page.null_decoder.GetNextRun(&is_null, nrows_in_page - this_run);
memset(null_map.data() + offset + this_run, is_null ? 1 : 0, run_len);
this_run += run_len;
}

offset += nrows_in_page;
_page.offset_in_page += nrows_in_page;
_current_ordinal += nrows_in_page;
remaining -= nrows_in_page;
}

*n = offset;
return Status::OK();
}

Status FileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) {
if (_reading_flag == ReadingFlag::SKIP_READING) {
Expand Down
17 changes: 2 additions & 15 deletions be/src/storage/segment/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h" // for Status
#include "core/column/column_array.h" // ColumnArray
#include "core/column/column_nullable.h" // NullMap
#include "common/status.h" // for Status
#include "core/column/column_array.h" // ColumnArray
#include "core/data_type/data_type.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
Expand Down Expand Up @@ -361,10 +360,6 @@ class ColumnIterator {

virtual bool is_all_dict_encoding() const { return false; }

virtual Status read_null_map(size_t* n, NullMap& null_map) {
return Status::NotSupported("read_null_map not implemented");
}

virtual Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) {
if (!predicate_access_paths.empty()) {
Expand Down Expand Up @@ -463,8 +458,6 @@ class FileColumnIterator final : public ColumnIterator {

bool is_all_dict_encoding() const override { return _is_all_dict_encoding; }

Status read_null_map(size_t* n, NullMap& null_map) override;

Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
Expand Down Expand Up @@ -590,8 +583,6 @@ class MapFileColumnIterator final : public ColumnIterator {

void remove_pruned_sub_iterators() override;

Status read_null_map(size_t* n, NullMap& null_map) override;

private:
std::shared_ptr<ColumnReader> _map_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -633,8 +624,6 @@ class StructFileColumnIterator final : public ColumnIterator {
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;

Status read_null_map(size_t* n, NullMap& null_map) override;

private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -674,8 +663,6 @@ class ArrayFileColumnIterator final : public ColumnIterator {
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;

Status read_null_map(size_t* n, NullMap& null_map) override;

private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
Expand Down
4 changes: 0 additions & 4 deletions be/src/storage/segment/variant/variant_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,6 @@ class VariantRootColumnIterator : public ColumnIterator {

ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); }

Status read_null_map(size_t* n, NullMap& null_map) override {
return _inner_iter->read_null_map(n, null_map);
}

Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,9 +1358,6 @@ public PlanFragment visitPhysicalStorageLayerAggregate(
case MIX:
pushAggOp = TPushAggOp.MIX;
break;
case COUNT_NULL:
pushAggOp = TPushAggOp.COUNT_NULL;
break;
default:
throw new AnalysisException("Unsupported storage layer aggregate: "
+ storageLayerAggregate.getAggOp());
Expand Down
Loading
Loading