Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/core/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@ class IColumn : public COW<IColumn> {
"Method update_crc32c_batch is not supported for " + get_name());
}

// Hash NULL rows as this column type's default value, instead of skipping them like
// update_crc32c_batch(hashes, null_map). This keeps the legacy Nullable fixed-width hash
// semantics without mutating the source nested column.
virtual void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"Method update_crc32c_batch_default_on_null is not supported for " + get_name());
}

// use range for one hash value to avoid virtual function call in loop
virtual void update_crc32c_single(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_map) const {
Expand Down
10 changes: 10 additions & 0 deletions be/src/core/column/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ void ColumnDecimal<T>::update_crc32c_batch(uint32_t* __restrict hashes,
}
}

template <PrimitiveType T>
void ColumnDecimal<T>::update_crc32c_batch_default_on_null(
uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const {
DCHECK(null_map != nullptr);
auto s = size();
for (size_t i = 0; i < s; ++i) {
hashes[i] = HashUtil::crc32c_fixed(null_map[i] ? value_type() : data[i], hashes[i]);
}
}

template <PrimitiveType T>
void ColumnDecimal<T>::update_crc32c_single(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_map) const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/core/column/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
void update_crc32c_batch(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const override;

void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const override;

void update_crc32c_single(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_map) const override;

Expand Down
20 changes: 9 additions & 11 deletions be/src/core/column/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,16 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris::
void ColumnNullable::update_crc32c_batch(uint32_t* __restrict hashes,
const uint8_t* __restrict /* null_map */) const {
const auto* __restrict real_null_data = get_null_map_column().get_data().data();
if (_nested_column->support_replace_column_null_data()) {
// nullmap process is slow, replace null data to default value to avoid nullmap process
// This is an intentional in-place mutation inside a logically-const hash computation:
// null positions are overwritten with defaults so the inner hash loop needs no null checks.
// The invariant is that a column instance is not hashed concurrently through the same
// owner while this per-block hash path runs. Shared aliases are detached by mutate()
// before this normalized nested column is written back.
auto nested_mut = std::move(*static_cast<const IColumn::Ptr&>(_nested_column)).mutate();
nested_mut->replace_column_null_data(real_null_data);
static_cast<IColumn::Ptr&>(const_cast<IColumn::WrappedPtr&>(_nested_column)) =
std::move(nested_mut);
if (!has_null()) {
_nested_column->update_crc32c_batch(hashes, nullptr);
return;
}

if (_nested_column->support_replace_column_null_data()) {
// Keep the old optimized hash semantics: NULL rows are hashed as the nested type's
// default value. Do this in a read-only way; replacing NULL payloads in _nested_column
// from this const hash path can race with other readers of the same column.
_nested_column->update_crc32c_batch_default_on_null(hashes, real_null_data);
} else {
auto s = size();
for (int i = 0; i < s; ++i) {
Expand Down
23 changes: 20 additions & 3 deletions be/src/core/column/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,22 @@ uint32_t ColumnVector<T>::_zlib_crc32_hash(uint32_t hash, size_t idx) const {
}

template <PrimitiveType T>
uint32_t ColumnVector<T>::_crc32c_hash(uint32_t hash, size_t idx) const {
uint32_t ColumnVector<T>::_crc32c_hash_value(uint32_t hash, const value_type& value) const {
if constexpr (is_date_or_datetime(T)) {
char buf[64];
const auto& date_val = (const VecDateTimeValue&)data[idx];
const auto& date_val = (const VecDateTimeValue&)value;
auto len = date_val.to_buffer(buf);
return crc32c_extend(hash, (const uint8_t*)buf, len);
} else {
return HashUtil::crc32c_fixed(data[idx], hash);
return HashUtil::crc32c_fixed(value, hash);
}
}

template <PrimitiveType T>
uint32_t ColumnVector<T>::_crc32c_hash(uint32_t hash, size_t idx) const {
return _crc32c_hash_value(hash, data[idx]);
}

template <PrimitiveType T>
void ColumnVector<T>::update_crc32c_batch(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const {
Expand All @@ -211,6 +216,18 @@ void ColumnVector<T>::update_crc32c_batch(uint32_t* __restrict hashes,
}
}

template <PrimitiveType T>
void ColumnVector<T>::update_crc32c_batch_default_on_null(
uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const {
DCHECK(null_map != nullptr);
auto s = size();
auto default_value = this->default_value();
for (size_t i = 0; i < s; ++i) {
hashes[i] = null_map[i] ? _crc32c_hash_value(hashes[i], default_value)
: _crc32c_hash(hashes[i], i);
}
}

template <PrimitiveType T>
void ColumnVector<T>::update_crc32c_single(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_map) const {
Expand Down
4 changes: 4 additions & 0 deletions be/src/core/column/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ class ColumnVector final : public COWHelper<IColumn, ColumnVector<T>> {
void update_crc32c_batch(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const override;

void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes,
const uint8_t* __restrict null_map) const override;

void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const override;

Expand Down Expand Up @@ -402,6 +405,7 @@ class ColumnVector final : public COWHelper<IColumn, ColumnVector<T>> {

protected:
uint32_t _zlib_crc32_hash(uint32_t hash, size_t idx) const;
uint32_t _crc32c_hash_value(uint32_t hash, const value_type& value) const;
uint32_t _crc32c_hash(uint32_t hash, size_t idx) const;
Container data;
};
Expand Down
222 changes: 222 additions & 0 deletions be/test/core/column/column_nullable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include <atomic>
#include <thread>

#include "common/status.h"
#include "core/block/block.h"
#include "core/column/column_decimal.h"
#include "core/column/column_nullable_test.h"
#include "core/column/predicate_column.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/field.h"
#include "core/types.h"
#include "testutil/column_helper.h"
#include "util/hash_util.hpp"

namespace doris {

Expand Down Expand Up @@ -131,6 +138,221 @@ TEST(ColumnNullableTest, SharedCreatePreservesImmutableSubcolumns) {
EXPECT_EQ(null_map_alias->size(), 1);
}

TEST(ColumnNullableTest, UpdateCrc32cBatchKeepsBlockInsertable) {
auto nested = ColumnInt32::create();
nested->insert_value(1);
nested->insert_value(2);
nested->insert_value(3);
nested->insert_value(4);

auto null_map = ColumnUInt8::create();
null_map->insert_value(0);
null_map->insert_value(1);
null_map->insert_value(0);
null_map->insert_value(1);

ColumnPtr nullable = ColumnNullable::create(std::move(nested), std::move(null_map));
const auto& nullable_ref = assert_cast<const ColumnNullable&>(*nullable);
const auto* nested_before_hash = nullable_ref.get_nested_column_ptr().get();

auto nullable_type = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
Block block({ColumnWithTypeAndName(nullable, nullable_type, "v")});
ASSERT_TRUE(block.check_type_and_column().ok());

std::vector<uint32_t> hashes(block.rows(), 0);
nullable->update_crc32c_batch(hashes.data(), nullptr);

ASSERT_TRUE(block.check_type_and_column().ok());

const auto& nullable_after_hash =
assert_cast<const ColumnNullable&>(*block.get_by_position(0).column);
EXPECT_EQ(nullable_after_hash.get_nested_column_ptr().get(), nested_before_hash);

auto dst_block = block.clone_empty();
auto mutable_block = MutableBlock::create_unique(std::move(dst_block));
std::vector<uint32_t> indices = {0, 1, 2, 3};
ASSERT_TRUE(
mutable_block->add_rows(&block, indices.data(), indices.data() + indices.size()).ok());
EXPECT_EQ(mutable_block->rows(), block.rows());
}

TEST(ColumnNullableTest, UpdateCrc32cBatchDoesNotMutateSharedNestedColumn) {
auto nested_mut = ColumnInt32::create();
nested_mut->insert_value(10);
nested_mut->insert_value(20);
nested_mut->insert_value(30);
nested_mut->insert_value(40);
ColumnPtr nested = std::move(nested_mut);
ColumnPtr nested_alias = nested;

auto null_map_mut = ColumnUInt8::create();
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
ColumnPtr null_map = std::move(null_map_mut);

ColumnPtr nullable = ColumnNullable::create(nested, null_map);
const auto& nullable_ref = assert_cast<const ColumnNullable&>(*nullable);
EXPECT_EQ(nullable_ref.get_nested_column_ptr().get(), nested_alias.get());

auto expected_nested = nested->clone_resized(nested->size());
expected_nested->replace_column_null_data(
assert_cast<const ColumnUInt8&>(*null_map).get_data().data());
std::vector<uint32_t> expected_hashes(nullable->size(), 0);
expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr);

std::vector<uint32_t> hashes(nullable->size(), 0);
nullable->update_crc32c_batch(hashes.data(), nullptr);
EXPECT_EQ(hashes, expected_hashes);

const auto& nullable_after_hash = assert_cast<const ColumnNullable&>(*nullable);
EXPECT_EQ(nullable_after_hash.get_nested_column_ptr().get(), nested_alias.get());
EXPECT_EQ(assert_cast<const ColumnInt32&>(*nested_alias).get_data()[1], 20);
EXPECT_EQ(
assert_cast<const ColumnInt32&>(nullable_after_hash.get_nested_column()).get_data()[1],
20);

auto nullable_type = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
Block block({ColumnWithTypeAndName(nullable, nullable_type, "v")});
ASSERT_TRUE(block.check_type_and_column().ok());

auto dst_block = block.clone_empty();
auto mutable_block = MutableBlock::create_unique(std::move(dst_block));
std::vector<uint32_t> indices = {3, 2, 1, 0};
ASSERT_TRUE(
mutable_block->add_rows(&block, indices.data(), indices.data() + indices.size()).ok());
EXPECT_EQ(mutable_block->rows(), block.rows());
}

TEST(ColumnNullableTest, UpdateCrc32cBatchHashesNullAsNestedDefaultForWideType) {
auto nested_mut = ColumnInt64::create();
nested_mut->insert_value(10);
nested_mut->insert_value(20);
nested_mut->insert_value(30);
nested_mut->insert_value(40);
ColumnPtr nested = std::move(nested_mut);

auto null_map_mut = ColumnUInt8::create();
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
ColumnPtr null_map = std::move(null_map_mut);

ColumnPtr nullable = ColumnNullable::create(nested, null_map);

auto expected_nested = nested->clone_resized(nested->size());
expected_nested->replace_column_null_data(
assert_cast<const ColumnUInt8&>(*null_map).get_data().data());
constexpr uint32_t seed = 0xDEADBEEF;
std::vector<uint32_t> expected_hashes(nullable->size(), seed);
expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr);

std::vector<uint32_t> hashes(nullable->size(), seed);
nullable->update_crc32c_batch(hashes.data(), nullptr);

EXPECT_EQ(hashes, expected_hashes);
EXPECT_NE(hashes[1], HashUtil::crc32c_null(seed));
EXPECT_NE(hashes[3], HashUtil::crc32c_null(seed));
}

TEST(ColumnNullableTest, UpdateCrc32cBatchHashesNullAsDecimalDefault) {
auto nested_mut = ColumnDecimal64::create(0, 2);
nested_mut->insert_value(Decimal64(1010));
nested_mut->insert_value(Decimal64(2020));
nested_mut->insert_value(Decimal64(3030));
nested_mut->insert_value(Decimal64(4040));
ColumnPtr nested = std::move(nested_mut);

auto null_map_mut = ColumnUInt8::create();
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
null_map_mut->insert_value(0);
null_map_mut->insert_value(1);
ColumnPtr null_map = std::move(null_map_mut);

ColumnPtr nullable = ColumnNullable::create(nested, null_map);

auto expected_nested = nested->clone_resized(nested->size());
expected_nested->replace_column_null_data(
assert_cast<const ColumnUInt8&>(*null_map).get_data().data());
constexpr uint32_t seed = 0xDEADBEEF;
std::vector<uint32_t> expected_hashes(nullable->size(), seed);
expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr);

std::vector<uint32_t> hashes(nullable->size(), seed);
nullable->update_crc32c_batch(hashes.data(), nullptr);

EXPECT_EQ(hashes, expected_hashes);
EXPECT_NE(hashes[1], HashUtil::crc32c_null(seed));
EXPECT_NE(hashes[3], HashUtil::crc32c_null(seed));
}

TEST(ColumnNullableTest, ConcurrentUpdateCrc32cBatchAndInsertIndicesFrom) {
constexpr int rows = 4096;
constexpr int hash_threads = 4;
constexpr int insert_threads = 4;
constexpr int iterations = 2000;

auto nested = ColumnInt32::create();
auto null_map = ColumnUInt8::create();
for (int i = 0; i < rows; ++i) {
nested->insert_value(i);
null_map->insert_value(i % 3 == 0);
}

ColumnPtr nullable = ColumnNullable::create(std::move(nested), std::move(null_map));
std::vector<uint32_t> indices(rows);
for (uint32_t i = 0; i < rows; ++i) {
indices[i] = rows - i - 1;
}

std::atomic<bool> start = false;
std::atomic<bool> stop = false;
std::atomic<int> failures = 0;
std::vector<std::thread> threads;

for (int t = 0; t < hash_threads; ++t) {
threads.emplace_back([&] {
while (!start.load(std::memory_order_acquire)) {
}
std::vector<uint32_t> hashes(rows);
for (int i = 0; i < iterations && !stop.load(std::memory_order_relaxed); ++i) {
std::ranges::fill(hashes, 0);
nullable->update_crc32c_batch(hashes.data(), nullptr);
}
});
}

for (int t = 0; t < insert_threads; ++t) {
threads.emplace_back([&] {
while (!start.load(std::memory_order_acquire)) {
}
for (int i = 0; i < iterations && !stop.load(std::memory_order_relaxed); ++i) {
auto dst = ColumnNullable::create(ColumnInt32::create(), ColumnUInt8::create());
try {
dst->insert_indices_from(*nullable, indices.data(), indices.data() + rows);
if (dst->size() != rows) {
failures.fetch_add(1, std::memory_order_relaxed);
stop.store(true, std::memory_order_relaxed);
}
} catch (...) {
failures.fetch_add(1, std::memory_order_relaxed);
stop.store(true, std::memory_order_relaxed);
}
}
});
}

start.store(true, std::memory_order_release);
for (auto& thread : threads) {
thread.join();
}

EXPECT_EQ(failures.load(std::memory_order_relaxed), 0);
}

TEST(ColumnNullableTest, append_data_by_selector) {
auto srt_column = ColumnHelper::create_nullable_column<DataTypeInt64>(
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
Expand Down
Loading