Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions be/src/core/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,13 @@ bool is_recursively_exclusive(const IColumn& column) {
}

bool exclusive = true;
IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) {
IColumn::ColumnCallback callback = [&](const IColumn& subcolumn) {
if (!exclusive) {
return;
}
const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn);
DCHECK(subcolumn_ptr);
exclusive = is_recursively_exclusive(*subcolumn_ptr);
exclusive = is_recursively_exclusive(subcolumn);
};
// `for_each_subcolumn` only exposes a mutable callback type. This callback
// only reads the wrapped pointers and never calls the non-const accessors.
const_cast<IColumn&>(column).for_each_subcolumn(callback);
column.for_each_subcolumn(callback);
return exclusive;
}

Expand Down
37 changes: 16 additions & 21 deletions be/src/core/column/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,22 @@ std::string IColumn::dump_structure() const {
std::stringstream res;
res << get_name() << "(size = " << size();

ColumnCallback callback = [&](ColumnPtr& subcolumn) {
res << ", " << subcolumn->dump_structure();
ColumnCallback callback = [&](const IColumn& subcolumn) {
res << ", " << subcolumn.dump_structure();
};

// simply read using for_each_subcolumn without modification; const_cast can be used.
const_cast<IColumn*>(this)->for_each_subcolumn(callback);
for_each_subcolumn(callback);

res << ")";
return res.str();
}

int IColumn::count_const_column() const {
int count = is_column_const(*this) ? 1 : 0;
ColumnCallback callback = [&](ColumnPtr& subcolumn) {
count += subcolumn->count_const_column();
ColumnCallback callback = [&](const IColumn& subcolumn) {
count += subcolumn.count_const_column();
};
// simply read using for_each_subcolumn without modification; const_cast can be used.
const_cast<IColumn*>(this)->for_each_subcolumn(callback);
for_each_subcolumn(callback);
return count;
}

Expand Down Expand Up @@ -95,13 +93,12 @@ bool IColumn::column_boolean_check() const {
};

bool is_valid = check_boolean_is_zero_or_one(*this);
ColumnCallback callback = [&](ColumnPtr& subcolumn) {
if (!subcolumn->column_boolean_check()) {
ColumnCallback callback = [&](const IColumn& subcolumn) {
if (!subcolumn.column_boolean_check()) {
is_valid = false;
}
};
// simply read using for_each_subcolumn without modification; const_cast can be used.
const_cast<IColumn*>(this)->for_each_subcolumn(callback);
for_each_subcolumn(callback);
return is_valid;
}

Expand All @@ -122,13 +119,12 @@ bool IColumn::null_map_check() const {
};

bool is_valid = check_null_map_is_zero_or_one(*this);
ColumnCallback callback = [&](ColumnPtr& subcolumn) {
if (!subcolumn->null_map_check()) {
ColumnCallback callback = [&](const IColumn& subcolumn) {
if (!subcolumn.null_map_check()) {
is_valid = false;
}
};
// simply read using for_each_subcolumn without modification; const_cast can be used.
const_cast<IColumn*>(this)->for_each_subcolumn(callback);
for_each_subcolumn(callback);
return is_valid;
}

Expand Down Expand Up @@ -231,15 +227,14 @@ bool is_column_const(const IColumn& column) {
}

void IColumn::check_const_only_in_top_level() const {
ColumnCallback throw_if_const = [&](WrappedPtr& column) {
const ColumnPtr& col = const_cast<const WrappedPtr&>(column);
if (is_column_const(*col)) {
ColumnCallback throw_if_const = [&](const IColumn& column) {
if (is_column_const(column)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"const column is not allowed to be nested, but got {}",
col->get_name());
column.get_name());
}
};
const_cast<IColumn*>(this)->for_each_subcolumn(throw_if_const);
for_each_subcolumn(throw_if_const);
}

} // namespace doris
33 changes: 22 additions & 11 deletions be/src/core/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,27 @@ class IColumn : public COW<IColumn> {

/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.
using ColumnCallback = std::function<void(WrappedPtr&)>;
using ImutableColumnCallback = std::function<void(const IColumn&)>;
virtual void for_each_subcolumn(ColumnCallback) {}
using ColumnCallback = std::function<void(const IColumn&)>;
virtual void for_each_subcolumn(ColumnCallback) const {}

protected:
virtual void mutate_subcolumns() {}

static void mutate_subcolumn(WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
}

template <typename ColumnType>
static void mutate_subcolumn(typename ColumnType::WrappedPtr& subcolumn) {
auto mutated = std::move(*static_cast<const typename ColumnType::Ptr&>(subcolumn)).mutate();
auto typed_mutated = ColumnType::cast_to_column_mutptr(
assert_cast<ColumnType*, TypeCheckOnRelease::DISABLE>(mutated.get()));
mutated = nullptr;
static_cast<typename ColumnType::Ptr&>(subcolumn) = std::move(typed_mutated);
}

public:
/// Columns have equal structure.
/// If true - you can use "compare_at", "insert_from", etc. methods.
virtual bool structure_equals(const IColumn&) const {
Expand All @@ -580,10 +597,7 @@ class IColumn : public COW<IColumn> {
// exclusive nodes are reused through the COW fast path.
MutablePtr mutate() const&& {
MutablePtr res = shallow_mutate();
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand All @@ -594,10 +608,7 @@ class IColumn : public COW<IColumn> {
static MutablePtr mutate(Ptr ptr) {
MutablePtr res = ptr->shallow_mutate(); /// Now use_count is 2.
ptr.reset(); /// Reset use_count to 1.
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand Down
17 changes: 8 additions & 9 deletions be/src/core/column/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "core/field.h"
#include "core/string_ref.h"
#include "core/types.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -216,14 +215,14 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
return get_offsets()[i] - get_offsets()[i - 1];
}

void for_each_subcolumn(ColumnCallback callback) override {
IColumn::WrappedPtr offsets_column(std::move(static_cast<ColumnOffsets::Ptr&>(offsets)));
Defer defer([&] {
static_cast<ColumnOffsets::Ptr&>(offsets) =
cast_to_column<ColumnOffsets>(static_cast<const IColumn::Ptr&>(offsets_column));
});
callback(offsets_column);
callback(data);
void mutate_subcolumns() override {
mutate_subcolumn<ColumnOffsets>(offsets);
mutate_subcolumn(data);
}

void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const ColumnOffsets::Ptr&>(offsets));
callback(*static_cast<const IColumn::Ptr&>(data));
}

ColumnPtr convert_column_if_overflow() override {
Expand Down
6 changes: 5 additions & 1 deletion be/src/core/column/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
}
}

void for_each_subcolumn(ColumnCallback callback) override { callback(data); }
void mutate_subcolumns() override { mutate_subcolumn(data); }

void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const IColumn::Ptr&>(data));
}

bool structure_equals(const IColumn& rhs) const override {
if (const auto* rhs_concrete = check_and_get_column<ColumnConst>(&rhs)) {
Expand Down
20 changes: 10 additions & 10 deletions be/src/core/column/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "core/string_ref.h"
#include "core/types.h"
#include "exec/common/sip_hash.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -74,15 +73,16 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {

std::string get_name() const override;

void for_each_subcolumn(ColumnCallback callback) override {
IColumn::WrappedPtr offsets(std::move(static_cast<COffsets::Ptr&>(offsets_column)));
Defer defer([&] {
static_cast<COffsets::Ptr&>(offsets_column) =
cast_to_column<COffsets>(static_cast<const IColumn::Ptr&>(offsets));
});
callback(keys_column);
callback(values_column);
callback(offsets);
void mutate_subcolumns() override {
mutate_subcolumn(keys_column);
mutate_subcolumn(values_column);
mutate_subcolumn<COffsets>(offsets_column);
}

void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const IColumn::Ptr&>(keys_column));
callback(*static_cast<const IColumn::Ptr&>(values_column));
callback(*static_cast<const COffsets::Ptr&>(offsets_column));
}

void sanity_check() const override {
Expand Down
15 changes: 7 additions & 8 deletions be/src/core/column/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "core/typeid_cast.h"
#include "core/types.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -250,14 +249,14 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
return get_ptr();
}

void for_each_subcolumn(ColumnCallback callback) override {
callback(_nested_column);
void mutate_subcolumns() override {
mutate_subcolumn(_nested_column);
mutate_subcolumn<ColumnUInt8>(_null_map);
}

IColumn::WrappedPtr null_map(std::move(static_cast<ColumnUInt8::Ptr&>(_null_map)));
Defer defer([&] {
_null_map = cast_to_column<ColumnUInt8>(static_cast<const IColumn::Ptr&>(null_map));
});
callback(null_map);
void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const IColumn::Ptr&>(_nested_column));
callback(*static_cast<const ColumnUInt8::Ptr&>(_null_map));
}

bool structure_equals(const IColumn& rhs) const override {
Expand Down
10 changes: 8 additions & 2 deletions be/src/core/column/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,15 @@ bool ColumnStruct::has_enough_capacity(const IColumn& src) const {
return true;
}

void ColumnStruct::for_each_subcolumn(ColumnCallback callback) {
void ColumnStruct::mutate_subcolumns() {
for (auto& column : columns) {
callback(column);
mutate_subcolumn(column);
}
}

void ColumnStruct::for_each_subcolumn(ColumnCallback callback) const {
for (const auto& column : columns) {
callback(*static_cast<const IColumn::Ptr&>(column));
}
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/core/column/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
void for_each_subcolumn(ColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;
bool structure_equals(const IColumn& rhs) const override;

size_t tuple_size() const { return columns.size(); }
Expand Down
31 changes: 23 additions & 8 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
#include "exprs/aggregate/aggregate_function.h"
#include "exprs/json_functions.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"
#include "util/json/path_in_data.h"
#include "util/jsonb_document.h"
#include "util/jsonb_document_cast.h"
Expand Down Expand Up @@ -826,19 +825,28 @@ size_t ColumnVariant::allocated_bytes() const {
return res;
}

void ColumnVariant::for_each_subcolumn(ColumnCallback callback) {
void ColumnVariant::mutate_subcolumns() {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(part);
mutate_subcolumn(part);
}
}
callback(serialized_sparse_column);
callback(serialized_doc_value_column);
// callback may be filter, so the row count may be changed
mutate_subcolumn(serialized_sparse_column);
mutate_subcolumn(serialized_doc_value_column);
num_rows = serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(this);
}

void ColumnVariant::for_each_subcolumn(ColumnCallback callback) const {
for (const auto& entry : subcolumns) {
for (const auto& part : entry->data.data) {
callback(*static_cast<const IColumn::Ptr&>(part));
}
}
callback(*static_cast<const IColumn::Ptr&>(serialized_sparse_column));
callback(*static_cast<const IColumn::Ptr&>(serialized_doc_value_column));
}

void ColumnVariant::insert_from(const IColumn& src, size_t n) {
const auto* src_v = assert_cast<const ColumnVariant*>(&src);
ENABLE_CHECK_CONSISTENCY(src_v);
Expand Down Expand Up @@ -2375,7 +2383,7 @@ size_t ColumnVariant::filter(const Filter& filter) {
for (auto& subcolumn : subcolumns) {
subcolumn->data.num_rows = count;
}
for_each_subcolumn([&](auto& part) {
auto filter_part = [&](IColumn::WrappedPtr& part) {
if (part->size() != count) {
if (part->is_exclusive()) {
const auto result_size = part->filter(filter);
Expand All @@ -2390,7 +2398,14 @@ size_t ColumnVariant::filter(const Filter& filter) {
part = part->filter(filter, count);
}
}
});
};
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
filter_part(part);
}
}
filter_part(serialized_sparse_column);
filter_part(serialized_doc_value_column);
}
num_rows = count;
ENABLE_CHECK_CONSISTENCY(this);
Expand Down
3 changes: 2 additions & 1 deletion be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {

bool has_enough_capacity(const IColumn& src) const override { return false; }

void for_each_subcolumn(ColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;

// Do nothing, call try_insert instead
void insert(const Field& field) override { try_insert(field); }
Expand Down
Loading
Loading