diff --git a/be/src/core/column/column.h b/be/src/core/column/column.h index 033960ab5e0da8..9e25b42d0d141e 100644 --- a/be/src/core/column/column.h +++ b/be/src/core/column/column.h @@ -405,6 +405,16 @@ class IColumn : public COW { "Method update_crc32c_batch is not supported for " + get_name()); } + // Hash NULL rows as this column type's default value, instead of skipping them like + // update_crc32c_batch(hashes, null_map). This keeps the legacy Nullable fixed-width hash + // semantics without mutating the source nested column. + virtual void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes, + const uint8_t* __restrict null_map) const { + throw doris::Exception( + ErrorCode::NOT_IMPLEMENTED_ERROR, + "Method update_crc32c_batch_default_on_null is not supported for " + get_name()); + } + // use range for one hash value to avoid virtual function call in loop virtual void update_crc32c_single(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_map) const { diff --git a/be/src/core/column/column_decimal.cpp b/be/src/core/column/column_decimal.cpp index 662b016059acbc..ca830a93e55023 100644 --- a/be/src/core/column/column_decimal.cpp +++ b/be/src/core/column/column_decimal.cpp @@ -193,6 +193,16 @@ void ColumnDecimal::update_crc32c_batch(uint32_t* __restrict hashes, } } +template +void ColumnDecimal::update_crc32c_batch_default_on_null( + uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const { + DCHECK(null_map != nullptr); + auto s = size(); + for (size_t i = 0; i < s; ++i) { + hashes[i] = HashUtil::crc32c_fixed(null_map[i] ? value_type() : data[i], hashes[i]); + } +} + template void ColumnDecimal::update_crc32c_single(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_map) const { diff --git a/be/src/core/column/column_decimal.h b/be/src/core/column/column_decimal.h index f474f1b8a8b9c3..4d6344fdfcd52a 100644 --- a/be/src/core/column/column_decimal.h +++ b/be/src/core/column/column_decimal.h @@ -172,6 +172,9 @@ class ColumnDecimal final : public COWHelper> { void update_crc32c_batch(uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const override; + void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes, + const uint8_t* __restrict null_map) const override; + void update_crc32c_single(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_map) const override; diff --git a/be/src/core/column/column_nullable.cpp b/be/src/core/column/column_nullable.cpp index c7ce9ab8f26511..7a560dde4b85e1 100644 --- a/be/src/core/column/column_nullable.cpp +++ b/be/src/core/column/column_nullable.cpp @@ -184,18 +184,16 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris:: void ColumnNullable::update_crc32c_batch(uint32_t* __restrict hashes, const uint8_t* __restrict /* null_map */) const { const auto* __restrict real_null_data = get_null_map_column().get_data().data(); - if (_nested_column->support_replace_column_null_data()) { - // nullmap process is slow, replace null data to default value to avoid nullmap process - // This is an intentional in-place mutation inside a logically-const hash computation: - // null positions are overwritten with defaults so the inner hash loop needs no null checks. - // The invariant is that a column instance is not hashed concurrently through the same - // owner while this per-block hash path runs. Shared aliases are detached by mutate() - // before this normalized nested column is written back. - auto nested_mut = std::move(*static_cast(_nested_column)).mutate(); - nested_mut->replace_column_null_data(real_null_data); - static_cast(const_cast(_nested_column)) = - std::move(nested_mut); + if (!has_null()) { _nested_column->update_crc32c_batch(hashes, nullptr); + return; + } + + if (_nested_column->support_replace_column_null_data()) { + // Keep the old optimized hash semantics: NULL rows are hashed as the nested type's + // default value. Do this in a read-only way; replacing NULL payloads in _nested_column + // from this const hash path can race with other readers of the same column. + _nested_column->update_crc32c_batch_default_on_null(hashes, real_null_data); } else { auto s = size(); for (int i = 0; i < s; ++i) { diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index d0e532871dff1d..dabe4716356b59 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -183,17 +183,22 @@ uint32_t ColumnVector::_zlib_crc32_hash(uint32_t hash, size_t idx) const { } template -uint32_t ColumnVector::_crc32c_hash(uint32_t hash, size_t idx) const { +uint32_t ColumnVector::_crc32c_hash_value(uint32_t hash, const value_type& value) const { if constexpr (is_date_or_datetime(T)) { char buf[64]; - const auto& date_val = (const VecDateTimeValue&)data[idx]; + const auto& date_val = (const VecDateTimeValue&)value; auto len = date_val.to_buffer(buf); return crc32c_extend(hash, (const uint8_t*)buf, len); } else { - return HashUtil::crc32c_fixed(data[idx], hash); + return HashUtil::crc32c_fixed(value, hash); } } +template +uint32_t ColumnVector::_crc32c_hash(uint32_t hash, size_t idx) const { + return _crc32c_hash_value(hash, data[idx]); +} + template void ColumnVector::update_crc32c_batch(uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const { @@ -211,6 +216,18 @@ void ColumnVector::update_crc32c_batch(uint32_t* __restrict hashes, } } +template +void ColumnVector::update_crc32c_batch_default_on_null( + uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const { + DCHECK(null_map != nullptr); + auto s = size(); + auto default_value = this->default_value(); + for (size_t i = 0; i < s; ++i) { + hashes[i] = null_map[i] ? _crc32c_hash_value(hashes[i], default_value) + : _crc32c_hash(hashes[i], i); + } +} + template void ColumnVector::update_crc32c_single(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_map) const { diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index a1bae982c43cf9..699f0115eb36f8 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -254,6 +254,9 @@ class ColumnVector final : public COWHelper> { void update_crc32c_batch(uint32_t* __restrict hashes, const uint8_t* __restrict null_map) const override; + void update_crc32c_batch_default_on_null(uint32_t* __restrict hashes, + const uint8_t* __restrict null_map) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; @@ -402,6 +405,7 @@ class ColumnVector final : public COWHelper> { protected: uint32_t _zlib_crc32_hash(uint32_t hash, size_t idx) const; + uint32_t _crc32c_hash_value(uint32_t hash, const value_type& value) const; uint32_t _crc32c_hash(uint32_t hash, size_t idx) const; Container data; }; diff --git a/be/test/core/column/column_nullable_test.cpp b/be/test/core/column/column_nullable_test.cpp index 799cc6a9826059..01e9e05b79334b 100644 --- a/be/test/core/column/column_nullable_test.cpp +++ b/be/test/core/column/column_nullable_test.cpp @@ -21,15 +21,22 @@ #include #include +#include +#include + #include "common/status.h" +#include "core/block/block.h" +#include "core/column/column_decimal.h" #include "core/column/column_nullable_test.h" #include "core/column/predicate_column.h" #include "core/data_type/data_type.h" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/define_primitive_type.h" #include "core/field.h" #include "core/types.h" #include "testutil/column_helper.h" +#include "util/hash_util.hpp" namespace doris { @@ -131,6 +138,221 @@ TEST(ColumnNullableTest, SharedCreatePreservesImmutableSubcolumns) { EXPECT_EQ(null_map_alias->size(), 1); } +TEST(ColumnNullableTest, UpdateCrc32cBatchKeepsBlockInsertable) { + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(2); + nested->insert_value(3); + nested->insert_value(4); + + auto null_map = ColumnUInt8::create(); + null_map->insert_value(0); + null_map->insert_value(1); + null_map->insert_value(0); + null_map->insert_value(1); + + ColumnPtr nullable = ColumnNullable::create(std::move(nested), std::move(null_map)); + const auto& nullable_ref = assert_cast(*nullable); + const auto* nested_before_hash = nullable_ref.get_nested_column_ptr().get(); + + auto nullable_type = std::make_shared(std::make_shared()); + Block block({ColumnWithTypeAndName(nullable, nullable_type, "v")}); + ASSERT_TRUE(block.check_type_and_column().ok()); + + std::vector hashes(block.rows(), 0); + nullable->update_crc32c_batch(hashes.data(), nullptr); + + ASSERT_TRUE(block.check_type_and_column().ok()); + + const auto& nullable_after_hash = + assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(nullable_after_hash.get_nested_column_ptr().get(), nested_before_hash); + + auto dst_block = block.clone_empty(); + auto mutable_block = MutableBlock::create_unique(std::move(dst_block)); + std::vector indices = {0, 1, 2, 3}; + ASSERT_TRUE( + mutable_block->add_rows(&block, indices.data(), indices.data() + indices.size()).ok()); + EXPECT_EQ(mutable_block->rows(), block.rows()); +} + +TEST(ColumnNullableTest, UpdateCrc32cBatchDoesNotMutateSharedNestedColumn) { + auto nested_mut = ColumnInt32::create(); + nested_mut->insert_value(10); + nested_mut->insert_value(20); + nested_mut->insert_value(30); + nested_mut->insert_value(40); + ColumnPtr nested = std::move(nested_mut); + ColumnPtr nested_alias = nested; + + auto null_map_mut = ColumnUInt8::create(); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + ColumnPtr null_map = std::move(null_map_mut); + + ColumnPtr nullable = ColumnNullable::create(nested, null_map); + const auto& nullable_ref = assert_cast(*nullable); + EXPECT_EQ(nullable_ref.get_nested_column_ptr().get(), nested_alias.get()); + + auto expected_nested = nested->clone_resized(nested->size()); + expected_nested->replace_column_null_data( + assert_cast(*null_map).get_data().data()); + std::vector expected_hashes(nullable->size(), 0); + expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr); + + std::vector hashes(nullable->size(), 0); + nullable->update_crc32c_batch(hashes.data(), nullptr); + EXPECT_EQ(hashes, expected_hashes); + + const auto& nullable_after_hash = assert_cast(*nullable); + EXPECT_EQ(nullable_after_hash.get_nested_column_ptr().get(), nested_alias.get()); + EXPECT_EQ(assert_cast(*nested_alias).get_data()[1], 20); + EXPECT_EQ( + assert_cast(nullable_after_hash.get_nested_column()).get_data()[1], + 20); + + auto nullable_type = std::make_shared(std::make_shared()); + Block block({ColumnWithTypeAndName(nullable, nullable_type, "v")}); + ASSERT_TRUE(block.check_type_and_column().ok()); + + auto dst_block = block.clone_empty(); + auto mutable_block = MutableBlock::create_unique(std::move(dst_block)); + std::vector indices = {3, 2, 1, 0}; + ASSERT_TRUE( + mutable_block->add_rows(&block, indices.data(), indices.data() + indices.size()).ok()); + EXPECT_EQ(mutable_block->rows(), block.rows()); +} + +TEST(ColumnNullableTest, UpdateCrc32cBatchHashesNullAsNestedDefaultForWideType) { + auto nested_mut = ColumnInt64::create(); + nested_mut->insert_value(10); + nested_mut->insert_value(20); + nested_mut->insert_value(30); + nested_mut->insert_value(40); + ColumnPtr nested = std::move(nested_mut); + + auto null_map_mut = ColumnUInt8::create(); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + ColumnPtr null_map = std::move(null_map_mut); + + ColumnPtr nullable = ColumnNullable::create(nested, null_map); + + auto expected_nested = nested->clone_resized(nested->size()); + expected_nested->replace_column_null_data( + assert_cast(*null_map).get_data().data()); + constexpr uint32_t seed = 0xDEADBEEF; + std::vector expected_hashes(nullable->size(), seed); + expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr); + + std::vector hashes(nullable->size(), seed); + nullable->update_crc32c_batch(hashes.data(), nullptr); + + EXPECT_EQ(hashes, expected_hashes); + EXPECT_NE(hashes[1], HashUtil::crc32c_null(seed)); + EXPECT_NE(hashes[3], HashUtil::crc32c_null(seed)); +} + +TEST(ColumnNullableTest, UpdateCrc32cBatchHashesNullAsDecimalDefault) { + auto nested_mut = ColumnDecimal64::create(0, 2); + nested_mut->insert_value(Decimal64(1010)); + nested_mut->insert_value(Decimal64(2020)); + nested_mut->insert_value(Decimal64(3030)); + nested_mut->insert_value(Decimal64(4040)); + ColumnPtr nested = std::move(nested_mut); + + auto null_map_mut = ColumnUInt8::create(); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + null_map_mut->insert_value(0); + null_map_mut->insert_value(1); + ColumnPtr null_map = std::move(null_map_mut); + + ColumnPtr nullable = ColumnNullable::create(nested, null_map); + + auto expected_nested = nested->clone_resized(nested->size()); + expected_nested->replace_column_null_data( + assert_cast(*null_map).get_data().data()); + constexpr uint32_t seed = 0xDEADBEEF; + std::vector expected_hashes(nullable->size(), seed); + expected_nested->update_crc32c_batch(expected_hashes.data(), nullptr); + + std::vector hashes(nullable->size(), seed); + nullable->update_crc32c_batch(hashes.data(), nullptr); + + EXPECT_EQ(hashes, expected_hashes); + EXPECT_NE(hashes[1], HashUtil::crc32c_null(seed)); + EXPECT_NE(hashes[3], HashUtil::crc32c_null(seed)); +} + +TEST(ColumnNullableTest, ConcurrentUpdateCrc32cBatchAndInsertIndicesFrom) { + constexpr int rows = 4096; + constexpr int hash_threads = 4; + constexpr int insert_threads = 4; + constexpr int iterations = 2000; + + auto nested = ColumnInt32::create(); + auto null_map = ColumnUInt8::create(); + for (int i = 0; i < rows; ++i) { + nested->insert_value(i); + null_map->insert_value(i % 3 == 0); + } + + ColumnPtr nullable = ColumnNullable::create(std::move(nested), std::move(null_map)); + std::vector indices(rows); + for (uint32_t i = 0; i < rows; ++i) { + indices[i] = rows - i - 1; + } + + std::atomic start = false; + std::atomic stop = false; + std::atomic failures = 0; + std::vector threads; + + for (int t = 0; t < hash_threads; ++t) { + threads.emplace_back([&] { + while (!start.load(std::memory_order_acquire)) { + } + std::vector hashes(rows); + for (int i = 0; i < iterations && !stop.load(std::memory_order_relaxed); ++i) { + std::ranges::fill(hashes, 0); + nullable->update_crc32c_batch(hashes.data(), nullptr); + } + }); + } + + for (int t = 0; t < insert_threads; ++t) { + threads.emplace_back([&] { + while (!start.load(std::memory_order_acquire)) { + } + for (int i = 0; i < iterations && !stop.load(std::memory_order_relaxed); ++i) { + auto dst = ColumnNullable::create(ColumnInt32::create(), ColumnUInt8::create()); + try { + dst->insert_indices_from(*nullable, indices.data(), indices.data() + rows); + if (dst->size() != rows) { + failures.fetch_add(1, std::memory_order_relaxed); + stop.store(true, std::memory_order_relaxed); + } + } catch (...) { + failures.fetch_add(1, std::memory_order_relaxed); + stop.store(true, std::memory_order_relaxed); + } + } + }); + } + + start.store(true, std::memory_order_release); + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(failures.load(std::memory_order_relaxed), 0); +} + TEST(ColumnNullableTest, append_data_by_selector) { auto srt_column = ColumnHelper::create_nullable_column( {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},