diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index b2f8ec173b4..d85b004ab41 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -116,6 +116,7 @@ check_then_add_sources_compile_flag ( src/Interpreters/JoinV2/HashJoinBuild.cpp src/Interpreters/JoinV2/HashJoinProbe.cpp src/Interpreters/JoinV2/SemiJoinProbe.cpp + src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.cpp src/IO/Compression/EncodingUtil.cpp src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp index cfb5ccab1e0..08a691336e8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp @@ -214,12 +214,12 @@ bool PhysicalJoinV2::isSupported(const tipb::Join & join) case Anti: case LeftOuterSemi: case LeftOuterAnti: + case RightOuter: + case RightSemi: + case RightAnti: if (!tiflash_join.getBuildJoinKeys().empty()) return true; break; - //case RightOuter: - //case RightSemi: - //case RightAnti: default: } return false; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinV2Probe.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinV2Probe.cpp index 73292e8a27b..2dcb2f93e83 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoinV2Probe.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinV2Probe.cpp @@ -36,6 +36,7 @@ void PhysicalJoinV2Probe::buildPipelineExecGroupImpl( builder.appendTransformOp( std::make_unique(exec_context, log->identifier(), join_ptr, probe_index++)); }); + exec_context.addOneTimeFuture(join_ptr->getWaitProbeFinishFuture()); join_ptr.reset(); } } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index 3f5ddcc5143..116739fcd15 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -3443,6 +3443,8 @@ try Field(static_cast(threshold))); ASSERT_COLUMNS_EQ_UR(ref, executeStreams(request, original_max_streams)) << "left_table_name = " << left_table_name << ", right_table_name = " << right_table_name; + if (cfg.enable_join_v2) + break; } WRAP_FOR_JOIN_TEST_END } @@ -3483,6 +3485,8 @@ try << "left_table_name = " << left_table_name << ", right_exchange_receiver_concurrency = " << exchange_concurrency << ", join_probe_cache_columns_threshold = " << threshold; + if (cfg.enable_join_v2) + break; } WRAP_FOR_JOIN_TEST_END } @@ -4795,6 +4799,7 @@ try auto request = context.scan("right_semi_family", "t") .join(context.scan("right_semi_family", "s"), type, {col("a")}, {}, {}, {}, {}, 0, false, 0) .build(context); + WRAP_FOR_JOIN_TEST_BEGIN executeAndAssertColumnsEqual(request, res); auto request_column_prune = context.scan("right_semi_family", "t") @@ -4802,6 +4807,7 @@ try .aggregation({Count(lit(static_cast(1)))}, {}) .build(context); ASSERT_COLUMNS_EQ_UR(genScalarCountResults(res), executeStreams(request_column_prune, 2)); + WRAP_FOR_JOIN_TEST_END } /// One join key(t.a = s.a) + other condition(t.c < s.c). diff --git a/dbms/src/Interpreters/JoinV2/HashJoin.cpp b/dbms/src/Interpreters/JoinV2/HashJoin.cpp index 305be3a2c99..c499e7a658a 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoin.cpp +++ b/dbms/src/Interpreters/JoinV2/HashJoin.cpp @@ -133,6 +133,7 @@ HashJoin::HashJoin( , log(Logger::get(join_req_id)) , has_other_condition(non_equal_conditions.other_cond_expr != nullptr) , output_columns(output_columns_) + , wait_probe_finished_future(std::make_shared(NotifyType::WAIT_ON_JOIN_PROBE_FINISH)) { RUNTIME_ASSERT(key_names_left.size() == key_names_right.size()); output_block = Block(output_columns); @@ -198,19 +199,27 @@ void HashJoin::initRowLayoutAndHashJoinMethod() /// Move all raw required join key column to the end of the join key. Names new_key_names_left, new_key_names_right; BoolVec raw_required_key_flag(keys_size); + bool is_right_semi_join = isRightSemiFamily(kind); + auto check_key_required = [&](const String & name) -> bool { + // If it's right semi/anti join, the key is required if and only if it's needed for other condition + if (is_right_semi_join) + return required_columns_names_set_for_other_condition.contains(name); + else + return right_sample_block_pruned.has(name); + }; for (size_t i = 0; i < keys_size; ++i) { bool is_raw_required = false; if (key_columns[i].column_ptr->valuesHaveFixedSize()) { - if (right_sample_block_pruned.has(key_names_right[i])) + if (check_key_required(key_names_right[i])) is_raw_required = true; } else { if (canAsColumnString(key_columns[i].column_ptr) && getStringCollatorKind(collators) == StringCollatorKind::StringBinary - && right_sample_block_pruned.has(key_names_right[i])) + && check_key_required(key_names_right[i])) { is_raw_required = true; } @@ -261,6 +270,7 @@ void HashJoin::initRowLayoutAndHashJoinMethod() if (c.column->valuesHaveFixedSize()) { row_layout.other_column_fixed_size += c.column->sizeOfValueIfFixed(); + row_layout.other_column_for_other_condition_fixed_size += c.column->sizeOfValueIfFixed(); row_layout.other_column_indexes.push_back({i, true}); } else @@ -319,7 +329,7 @@ void HashJoin::initBuild(const Block & sample_block, size_t build_concurrency_) build_workers_data.resize(build_concurrency); for (size_t i = 0; i < build_concurrency; ++i) build_workers_data[i].key_getter = createHashJoinKeyGetter(method, collators); - for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT + 1; ++i) + for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT; ++i) multi_row_containers.emplace_back(std::make_unique()); build_initialized = true; @@ -415,18 +425,28 @@ void HashJoin::initProbe(const Block & sample_block, size_t probe_concurrency_) active_probe_worker = probe_concurrency; probe_workers_data.resize(probe_concurrency); + if (needScanBuildSideAfterProbe()) + join_build_scanner_after_probe = std::make_unique(this); + probe_initialized = true; } bool HashJoin::finishOneBuildRow(size_t stream_index) { auto & wd = build_workers_data[stream_index]; + if (wd.non_joined_block.rows() > 0) + { + non_joined_blocks.insertNonFullBlock(std::move(wd.non_joined_block)); + wd.non_joined_block = {}; + } LOG_DEBUG( log, - "{} insert block to row containers cost {}ms, row count {}, padding size {}({:.2f}% of all size {})", + "{} insert block to row containers cost {}ms, row count {}, non-joined count {}, padding size {}({:.2f}% of " + "all size {})", stream_index, wd.build_time, wd.row_count, + wd.non_joined_row_count, wd.padding_size, 100.0 * wd.padding_size / wd.all_size, wd.all_size); @@ -455,11 +475,22 @@ bool HashJoin::finishOneProbe(size_t stream_index) if (active_probe_worker.fetch_sub(1) == 1) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_probe); + wait_probe_finished_future->finish(); return true; } return false; } +bool HashJoin::isAllProbeFinished() +{ + if (active_probe_worker > 0) + { + setNotifyFuture(wait_probe_finished_future.get()); + return false; + } + return true; +} + void HashJoin::workAfterBuildRowFinish() { size_t all_build_row_count = 0; @@ -478,13 +509,15 @@ void HashJoin::workAfterBuildRowFinish() enable_tagged_pointer, false); - /// Conservative threshold: trigger late materialization when lm_row_size average >= 16 bytes. - constexpr size_t trigger_lm_row_size_threshold = 16; bool late_materialization = false; size_t avg_lm_row_size = 0; - if (has_other_condition - && row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size()) + if (shouldCheckLateMaterialization()) { + // Calculate the average row size of late materialization rows. + // If the average row size is greater than or equal to the threshold, enable late materialization. + // Otherwise, disable it. + // Note: this is a conservative threshold, enable late materialization when lm_row_size average >= 16 bytes. + constexpr size_t trigger_lm_row_size_threshold = 16; size_t total_lm_row_size = 0; size_t total_lm_row_count = 0; for (size_t i = 0; i < build_concurrency; ++i) @@ -556,19 +589,14 @@ void HashJoin::buildRowFromBlock(const Block & b, size_t stream_index) assertBlocksHaveEqualStructure(block, right_sample_block_pruned, "Join Build"); - bool check_lm_row_size = has_other_condition - && row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size(); - insertBlockToRowContainers( - method, - needRecordNotInsertRows(kind), + JoinBuildHelper::insertBlockToRowContainers( + this, block, rows, key_columns, null_map, - row_layout, - multi_row_containers, build_workers_data[stream_index], - check_lm_row_size); + shouldCheckLateMaterialization()); build_workers_data[stream_index].build_time += watch.elapsedMilliseconds(); } @@ -576,21 +604,20 @@ void HashJoin::buildRowFromBlock(const Block & b, size_t stream_index) bool HashJoin::buildPointerTable(size_t stream_index) { bool is_end; + size_t max_build_size = 2 * settings.max_block_size; switch (method) { -#define M(METHOD) \ - case HashJoinKeyMethod::METHOD: \ - using KeyGetterType##METHOD = HashJoinKeyGetterForType; \ - if constexpr (KeyGetterType##METHOD::Type::joinKeyCompareHashFirst()) \ - is_end = pointer_table.build( \ - build_workers_data[stream_index], \ - multi_row_containers, \ - settings.max_block_size); \ - else \ - is_end = pointer_table.build( \ - build_workers_data[stream_index], \ - multi_row_containers, \ - settings.max_block_size); \ +#define M(METHOD) \ + case HashJoinKeyMethod::METHOD: \ + using KeyGetterType##METHOD = HashJoinKeyGetterForType; \ + if constexpr (KeyGetterType##METHOD::Type::joinKeyCompareHashFirst()) \ + is_end = pointer_table.build( \ + build_workers_data[stream_index], \ + multi_row_containers, \ + max_build_size); \ + else \ + is_end \ + = pointer_table.build(build_workers_data[stream_index], multi_row_containers, max_build_size); \ break; APPLY_FOR_HASH_JOIN_VARIANTS(M) #undef M @@ -665,6 +692,19 @@ Block HashJoin::probeLastResultBlock(size_t stream_index) return {}; } +bool HashJoin::needScanBuildSideAfterProbe() const +{ + return isRightOuterJoin(kind) || isRightSemiFamily(kind); +} + +Block HashJoin::scanBuildSideAfterProbe(size_t stream_index) +{ + auto & wd = probe_workers_data[stream_index]; + Stopwatch all_watch; + SCOPE_EXIT({ probe_workers_data[stream_index].scan_build_side_time += all_watch.elapsedFromLastTime(); }); + return join_build_scanner_after_probe->scan(wd); +} + void HashJoin::removeUselessColumn(Block & block) const { const NameSet & probe_output_name_set = has_other_condition diff --git a/dbms/src/Interpreters/JoinV2/HashJoin.h b/dbms/src/Interpreters/JoinV2/HashJoin.h index e28292e6a61..426971571ca 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoin.h +++ b/dbms/src/Interpreters/JoinV2/HashJoin.h @@ -19,8 +19,11 @@ #include #include #include +#include #include +#include #include +#include #include #include #include @@ -54,6 +57,8 @@ class HashJoin bool finishOneBuildRow(size_t stream_index); /// Return true if it is the last probe worker. bool finishOneProbe(size_t stream_index); + /// Return true if all probe work has finished. + bool isAllProbeFinished(); void buildRowFromBlock(const Block & block, size_t stream_index); bool buildPointerTable(size_t stream_index); @@ -61,6 +66,9 @@ class HashJoin Block probeBlock(JoinProbeContext & ctx, size_t stream_index); Block probeLastResultBlock(size_t stream_index); + bool needScanBuildSideAfterProbe() const; + Block scanBuildSideAfterProbe(size_t stream_index); + void removeUselessColumn(Block & block) const; /// Block's schema must be all_sample_block_pruned. Block removeUselessColumnForOutput(const Block & block) const; @@ -76,14 +84,25 @@ class HashJoin const JoinProfileInfoPtr & getProfileInfo() const { return profile_info; } + const OneTimeNotifyFuturePtr & getWaitProbeFinishFuture() { return wait_probe_finished_future; } + private: void initRowLayoutAndHashJoinMethod(); void workAfterBuildRowFinish(); + bool shouldCheckLateMaterialization() const + { + bool is_any_semi_join = isSemiFamily(kind) || isLeftOuterSemiFamily(kind) || isRightSemiFamily(kind); + return has_other_condition && !is_any_semi_join + && row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size(); + } + private: + friend JoinBuildHelper; friend JoinProbeHelper; friend SemiJoinProbeHelper; + friend JoinBuildScannerAfterProbe; static const DataTypePtr match_helper_type; @@ -140,6 +159,8 @@ class HashJoin /// Row containers std::vector> multi_row_containers; + /// Non-joined blocks + NonJoinedBlocks non_joined_blocks; /// Build row phase size_t build_concurrency = 0; @@ -152,8 +173,11 @@ class HashJoin size_t probe_concurrency = 0; std::vector probe_workers_data; std::atomic active_probe_worker = 0; + OneTimeNotifyFuturePtr wait_probe_finished_future; std::unique_ptr join_probe_helper; std::unique_ptr semi_join_probe_helper; + /// Probe scan build side + std::unique_ptr join_build_scanner_after_probe; const JoinProfileInfoPtr profile_info = std::make_shared(); diff --git a/dbms/src/Interpreters/JoinV2/HashJoinBuild.cpp b/dbms/src/Interpreters/JoinV2/HashJoinBuild.cpp index 337b24e0500..34855920b22 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinBuild.cpp +++ b/dbms/src/Interpreters/JoinV2/HashJoinBuild.cpp @@ -13,8 +13,10 @@ // limitations under the License. #include +#include #include + namespace DB { namespace ErrorCodes @@ -24,13 +26,12 @@ extern const int UNKNOWN_SET_DATA_VARIANT; template -void NO_INLINE insertBlockToRowContainersTypeImpl( +void NO_INLINE JoinBuildHelper::insertBlockToRowContainersImpl( + HashJoin * join, Block & block, size_t rows, const ColumnRawPtrs & key_columns, ConstNullMapPtr null_map, - const HashJoinRowLayout & row_layout, - std::vector> & multi_row_containers, JoinBuildWorkerData & wd, bool check_lm_row_size) { @@ -39,20 +40,45 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( using HashValueType = typename KeyGetter::HashValueType; static_assert(sizeof(HashValueType) <= sizeof(decltype(wd.hashes)::value_type)); + const auto kind = join->kind; + const auto & row_layout = join->row_layout; + const auto & settings = join->settings; + auto & multi_row_containers = join->multi_row_containers; + auto & non_joined_blocks = join->non_joined_blocks; + auto & key_getter = *static_cast(wd.key_getter.get()); key_getter.reset(key_columns, row_layout.raw_key_column_indexes.size()); + RUNTIME_CHECK(multi_row_containers.size() == JOIN_BUILD_PARTITION_COUNT); wd.row_sizes.clear(); - wd.row_sizes.resize_fill(rows, row_layout.other_column_fixed_size); + bool is_right_semi_family = isRightSemiFamily(kind); + if (is_right_semi_family) + { + wd.row_sizes.resize_fill(rows, row_layout.other_column_for_other_condition_fixed_size); + wd.right_semi_selector.clear(); + wd.right_semi_selector.reserve(rows); + if constexpr (has_null_map) + { + wd.right_semi_selective.clear(); + wd.right_semi_selective.reserve(rows); + } + } + else + { + wd.row_sizes.resize_fill(rows, row_layout.other_column_fixed_size); + } + if constexpr (has_null_map && need_record_null_rows) + { + wd.non_joined_offsets.clear(); + wd.non_joined_offsets.reserve(rows); + } wd.hashes.resize(rows); - /// The last partition is used to hold rows with null join key. - constexpr size_t part_count = JOIN_BUILD_PARTITION_COUNT + 1; wd.partition_row_sizes.clear(); - wd.partition_row_sizes.resize_fill_zero(part_count); + wd.partition_row_sizes.resize_fill_zero(JOIN_BUILD_PARTITION_COUNT); wd.partition_row_count.clear(); - wd.partition_row_count.resize_fill_zero(part_count); + wd.partition_row_count.resize_fill_zero(JOIN_BUILD_PARTITION_COUNT); wd.partition_last_row_index.clear(); - wd.partition_last_row_index.resize_fill(part_count, -1); + wd.partition_last_row_index.resize_fill(JOIN_BUILD_PARTITION_COUNT, -1); if (check_lm_row_size) { @@ -66,29 +92,38 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( } } - for (const auto & [index, is_fixed_size] : row_layout.other_column_indexes) + size_t other_column_count; + if (is_right_semi_family) + other_column_count = row_layout.other_column_count_for_other_condition; + else + other_column_count = row_layout.other_column_indexes.size(); + for (size_t i = 0; i < other_column_count; ++i) { + size_t index = row_layout.other_column_indexes[i].first; + bool is_fixed_size = row_layout.other_column_indexes[i].second; if (!is_fixed_size) block.getByPosition(index).column->countSerializeByteSize(wd.row_sizes); } for (size_t i = 0; i < rows; ++i) { - if (has_null_map && (*null_map)[i]) + if constexpr (has_null_map) { - if constexpr (need_record_null_rows) + if ((*null_map)[i]) { - //TODO - //wd.row_sizes[i] += sizeof(RowPtr); - //wd.row_sizes[i] = alignRowSize(wd.row_sizes[i]); - //wd.partition_row_sizes[part_count - 1] += wd.row_sizes[i]; - //++wd.partition_row_count[part_count - 1]; + if constexpr (need_record_null_rows) + wd.non_joined_offsets.push_back(i); + continue; } - continue; + if (is_right_semi_family) + wd.right_semi_selective.push_back(i); } + const auto & key = key_getter.getJoinKeyWithBuffer(i); wd.hashes[i] = static_cast(Hash()(key)); size_t part_num = getJoinBuildPartitionNum(wd.hashes[i]); + if (is_right_semi_family) + wd.right_semi_selector.push_back(part_num); size_t ptr_and_key_size = sizeof(RowPtr) + key_getter.getJoinKeyByteSize(key); if constexpr (KeyGetterType::joinKeyCompareHashFirst()) @@ -126,12 +161,13 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( ++wd.partition_row_count[part_num]; } - std::vector partition_column_row(part_count); - for (size_t i = 0; i < part_count; ++i) + std::array partition_row_container; + size_t row_count = 0; + for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT; ++i) { if (wd.partition_row_count[i] > 0) { - auto & container = partition_column_row[i]; + auto & container = partition_row_container[i]; container.data.resize(wd.partition_row_sizes[i], CPU_CACHE_LINE_SIZE); wd.enable_tagged_pointer &= isRowPtrTagZero(container.data.data()); wd.enable_tagged_pointer &= isRowPtrTagZero(container.data.data() + wd.partition_row_sizes[i]); @@ -143,13 +179,12 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( container.hashes.reserve(wd.partition_row_count[i]); wd.partition_row_sizes[i] = 0; - if (i != JOIN_BUILD_PARTITION_COUNT) - { - // Do not add the count of null rows - wd.row_count += wd.partition_row_count[i]; - } + row_count += wd.partition_row_count[i]; } } + RUNTIME_CHECK(row_count <= rows); + wd.row_count += row_count; + wd.non_joined_row_count += rows - row_count; constexpr size_t step = 256; wd.row_ptrs.reserve(step); @@ -163,23 +198,16 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( { if (has_null_map && (*null_map)[j]) { - if constexpr (need_record_null_rows) - { - //TODO - } - else - { - wd.row_ptrs.push_back(nullptr); - } + wd.row_ptrs.push_back(nullptr); continue; } size_t part_num = getJoinBuildPartitionNum(wd.hashes[j]); - wd.row_ptrs.push_back(partition_column_row[part_num].data.data() + wd.partition_row_sizes[part_num]); + wd.row_ptrs.push_back(partition_row_container[part_num].data.data() + wd.partition_row_sizes[part_num]); auto & ptr = wd.row_ptrs.back(); assert((reinterpret_cast(ptr) & (ROW_ALIGN - 1)) == 0); wd.partition_row_sizes[part_num] += wd.row_sizes[j]; - partition_column_row[part_num].offsets.push_back(wd.partition_row_sizes[part_num]); + partition_row_container[part_num].offsets.push_back(wd.partition_row_sizes[part_num]); unalignedStore(ptr, nullptr); ptr += sizeof(RowPtr); @@ -191,105 +219,159 @@ void NO_INLINE insertBlockToRowContainersTypeImpl( } else { - partition_column_row[part_num].hashes.push_back(wd.hashes[j]); + partition_row_container[part_num].hashes.push_back(wd.hashes[j]); } const auto & key = key_getter.getJoinKeyWithBuffer(j); key_getter.serializeJoinKey(key, ptr); ptr += key_getter.getJoinKeyByteSize(key); } - for (const auto & [index, _] : row_layout.other_column_indexes) + for (size_t i = 0; i < other_column_count; ++i) { - if constexpr (has_null_map && !need_record_null_rows) + size_t index = row_layout.other_column_indexes[i].first; + if constexpr (has_null_map) block.getByPosition(index).column->serializeToPos(wd.row_ptrs, start, end - start, true); else block.getByPosition(index).column->serializeToPos(wd.row_ptrs, start, end - start, false); } } - for (size_t i = 0; i < part_count; ++i) + if (is_right_semi_family) { - if (wd.partition_row_count[i] > 0) - multi_row_containers[i]->insert(std::move(partition_column_row[i]), wd.partition_row_count[i]); + IColumn::ScatterColumns scatter_columns(JOIN_BUILD_PARTITION_COUNT); + for (size_t i = row_layout.other_column_count_for_other_condition; i < row_layout.other_column_indexes.size(); + ++i) + { + size_t index = row_layout.other_column_indexes[i].first; + auto & column_data = block.getByPosition(index); + size_t column_memory = column_data.column->byteSize(); + for (size_t j = 0; j < JOIN_BUILD_PARTITION_COUNT; ++j) + { + auto new_column_data = column_data.cloneEmpty(); + if (wd.partition_row_count[j] > 0) + { + size_t memory_hint = 1.2 * column_memory * wd.partition_row_count[j] / rows; + new_column_data.column->assumeMutable()->reserveWithTotalMemoryHint( + wd.partition_row_count[j], + memory_hint); + } + scatter_columns[j] = new_column_data.column->assumeMutable(); + partition_row_container[j].other_column_block.insert(std::move(new_column_data)); + } + if constexpr (has_null_map) + column_data.column->scatterTo(scatter_columns, wd.right_semi_selector, wd.right_semi_selective); + else + column_data.column->scatterTo(scatter_columns, wd.right_semi_selector); + } } -} -template -void insertBlockToRowContainersType( - bool need_record_null_rows, - Block & block, - size_t rows, - const ColumnRawPtrs & key_columns, - ConstNullMapPtr null_map, - const HashJoinRowLayout & row_layout, - std::vector> & multi_row_containers, - JoinBuildWorkerData & worker_data, - bool check_lm_row_size) -{ -#define CALL(has_null_map, need_record_null_rows) \ - insertBlockToRowContainersTypeImpl( \ - block, \ - rows, \ - key_columns, \ - null_map, \ - row_layout, \ - multi_row_containers, \ - worker_data, \ - check_lm_row_size); - - if (null_map) + if constexpr (has_null_map && need_record_null_rows) { - if (need_record_null_rows) + if (!wd.non_joined_offsets.empty()) { - CALL(true, true); - } - else - { - CALL(true, false); + join->initOutputBlock(wd.non_joined_block); + RUNTIME_CHECK(wd.non_joined_block.rows() < settings.max_block_size); + size_t columns = wd.non_joined_block.columns(); + auto fill_block = [&](size_t offset_start, size_t length) { + for (size_t i = 0; i < columns; ++i) + { + auto des_mut_column = wd.non_joined_block.getByPosition(i).column->assumeMutable(); + const auto & name = wd.non_joined_block.getByPosition(i).name; + if (!block.has(name)) + { + // If block does not have this column, this column should be nullable and from the left side + RUNTIME_CHECK_MSG( + des_mut_column->isColumnNullable(), + "Column with name {} is not nullable", + name); + auto & nullable_column = static_cast(*des_mut_column); + nullable_column.insertManyDefaults(length); + continue; + } + auto & src_column = block.getByName(name).column; + des_mut_column->insertSelectiveRangeFrom(*src_column, wd.non_joined_offsets, offset_start, length); + } + }; + size_t offset_start = 0; + size_t offset_size = wd.non_joined_offsets.size(); + while (true) + { + size_t remaining_size = settings.max_block_size - wd.non_joined_block.rows(); + if (remaining_size > offset_size - offset_start) + { + fill_block(offset_start, offset_size - offset_start); + break; + } + fill_block(offset_start, remaining_size); + offset_start += remaining_size; + non_joined_blocks.insertFullBlock(std::move(wd.non_joined_block)); + wd.non_joined_block = {}; + if (offset_start >= offset_size) + break; + join->initOutputBlock(wd.non_joined_block); + } } } - else + + for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT; ++i) { - CALL(false, false); + if (wd.partition_row_count[i] > 0) + multi_row_containers[i]->insert(std::move(partition_row_container[i]), wd.partition_row_count[i]); } -#undef CALL } -void insertBlockToRowContainers( - HashJoinKeyMethod method, - bool need_record_null_rows, +void JoinBuildHelper::insertBlockToRowContainers( + HashJoin * join, Block & block, size_t rows, const ColumnRawPtrs & key_columns, ConstNullMapPtr null_map, - const HashJoinRowLayout & row_layout, - std::vector> & multi_row_containers, - JoinBuildWorkerData & worker_data, + JoinBuildWorkerData & wd, bool check_lm_row_size) { - switch (method) +#define CALL2(KeyGetter, has_null_map, need_record_null_rows) \ + { \ + insertBlockToRowContainersImpl( \ + join, \ + block, \ + rows, \ + key_columns, \ + null_map, \ + wd, \ + check_lm_row_size); \ + } + +#define CALL1(KeyGetter) \ + { \ + bool need_record_null_rows = needRecordNotInsertRows(join->kind); \ + if (null_map) \ + { \ + if (need_record_null_rows) \ + CALL2(KeyGetter, true, true) \ + else \ + CALL2(KeyGetter, true, false) \ + } \ + else \ + CALL2(KeyGetter, false, false) \ + } + + switch (join->method) { #define M(METHOD) \ case HashJoinKeyMethod::METHOD: \ using KeyGetterType##METHOD = HashJoinKeyGetterForType; \ - insertBlockToRowContainersType( \ - need_record_null_rows, \ - block, \ - rows, \ - key_columns, \ - null_map, \ - row_layout, \ - multi_row_containers, \ - worker_data, \ - check_lm_row_size); \ + CALL1(KeyGetterType##METHOD) \ break; APPLY_FOR_HASH_JOIN_VARIANTS(M) #undef M default: throw Exception( - fmt::format("Unknown JOIN keys variant {}.", magic_enum::enum_name(method)), + fmt::format("Unknown JOIN keys variant {}.", magic_enum::enum_name(join->method)), ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } + +#undef CALL1 +#undef CALL2 } } // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/HashJoinBuild.h b/dbms/src/Interpreters/JoinV2/HashJoinBuild.h index 906ca0841a6..5b8e18ce7b3 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinBuild.h +++ b/dbms/src/Interpreters/JoinV2/HashJoinBuild.h @@ -43,15 +43,18 @@ inline size_t getJoinBuildPartitionNum(HashValueType hash) struct alignas(CPU_CACHE_LINE_SIZE) JoinBuildWorkerData { std::unique_ptr> key_getter; - /// Count of not-null rows size_t row_count = 0; - - RowPtr null_rows_list_head = nullptr; + size_t non_joined_row_count = 0; PaddedPODArray row_sizes; PaddedPODArray hashes; RowPtrs row_ptrs; + IColumn::Selector right_semi_selector; + BlockSelective right_semi_selective; + Block non_joined_block; + IColumn::Offsets non_joined_offsets; + PaddedPODArray partition_row_sizes; PaddedPODArray partition_row_count; PaddedPODArray partition_last_row_index; @@ -61,7 +64,7 @@ struct alignas(CPU_CACHE_LINE_SIZE) JoinBuildWorkerData size_t build_pointer_table_time = 0; size_t build_pointer_table_size = 0; - ssize_t build_pointer_table_iter = -1; + ssize_t current_build_table_index = -1; size_t padding_size = 0; size_t all_size = 0; @@ -73,17 +76,30 @@ struct alignas(CPU_CACHE_LINE_SIZE) JoinBuildWorkerData size_t lm_row_count = 0; }; -void insertBlockToRowContainers( - HashJoinKeyMethod method, - bool need_record_null_rows, - Block & block, - size_t rows, - const ColumnRawPtrs & key_columns, - ConstNullMapPtr null_map, - const HashJoinRowLayout & row_layout, - std::vector> & multi_row_containers, - JoinBuildWorkerData & worker_data, - bool check_lm_row_size); +class HashJoin; +class JoinBuildHelper +{ +public: + static void insertBlockToRowContainers( + HashJoin * join, + Block & block, + size_t rows, + const ColumnRawPtrs & key_columns, + ConstNullMapPtr null_map, + JoinBuildWorkerData & wd, + bool check_lm_row_size); + +private: + template + static void NO_INLINE insertBlockToRowContainersImpl( + HashJoin * join, + Block & block, + size_t rows, + const ColumnRawPtrs & key_columns, + ConstNullMapPtr null_map, + JoinBuildWorkerData & wd, + bool check_lm_row_size); +}; } // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.cpp b/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.cpp new file mode 100644 index 00000000000..7eb19b9ef42 --- /dev/null +++ b/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.cpp @@ -0,0 +1,408 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "Core/Block.h" + +namespace DB +{ + +using enum ASTTableJoin::Kind; + +JoinBuildScannerAfterProbe::JoinBuildScannerAfterProbe(HashJoin * join) + : join(join) +{ + join_key_getter = createHashJoinKeyGetter(join->method, join->collators); + ColumnRawPtrs key_columns + = extractAndMaterializeKeyColumns(join->right_sample_block, materialized_key_columns, join->key_names_right); + ColumnPtr null_map_holder; + ConstNullMapPtr null_map{}; + extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); + resetHashJoinKeyGetter(join->method, join_key_getter, key_columns, join->row_layout); + + size_t left_columns = join->left_sample_block_pruned.columns(); + + auto kind = join->kind; + bool need_row_data; + bool need_other_block_data; + if (kind == RightOuter) + { + need_row_data = true; + need_other_block_data = false; + } + else + { + need_row_data = false; + for (auto [column_index, is_nullable] : join->row_layout.raw_key_column_indexes) + { + auto output_index = join->output_column_indexes.at(left_columns + column_index); + need_row_data |= output_index >= 0; + if (need_row_data) + break; + } + for (size_t i = 0; i < join->row_layout.other_column_count_for_other_condition; ++i) + { + size_t column_index = join->row_layout.other_column_indexes[i].first; + auto output_index = join->output_column_indexes.at(left_columns + column_index); + need_row_data |= output_index >= 0; + if (need_row_data) + break; + } + + need_other_block_data + = join->row_layout.other_column_indexes.size() > join->row_layout.other_column_count_for_other_condition; + + // The output data should not be empty + RUNTIME_CHECK(need_row_data || need_other_block_data); + } + +#define SET_FUNC_PTR(KeyGetter, JoinType, need_row_data, need_other_block_data) \ + { \ + scan_func_ptr \ + = &JoinBuildScannerAfterProbe::scanImpl; \ + } + +#define CALL2(KeyGetter, JoinType) \ + { \ + if (need_row_data && need_other_block_data) \ + SET_FUNC_PTR(KeyGetter, JoinType, true, true) \ + else if (need_row_data) \ + SET_FUNC_PTR(KeyGetter, JoinType, true, false) \ + else \ + SET_FUNC_PTR(KeyGetter, JoinType, false, true) \ + } + +#define CALL(KeyGetter) \ + { \ + if (kind == RightOuter) \ + SET_FUNC_PTR(KeyGetter, RightOuter, true, false) \ + else if (kind == RightSemi) \ + CALL2(KeyGetter, RightSemi) \ + else if (kind == RightAnti) \ + CALL2(KeyGetter, RightAnti) \ + else \ + throw Exception( \ + fmt::format( \ + "Logical error: unknown combination of JOIN {} during scanning build side", \ + magic_enum::enum_name(kind)), \ + ErrorCodes::LOGICAL_ERROR); \ + } + + switch (join->method) + { +#define M(METHOD) \ + case HashJoinKeyMethod::METHOD: \ + using KeyGetterType##METHOD = HashJoinKeyGetterForType; \ + CALL(KeyGetterType##METHOD); \ + break; + APPLY_FOR_HASH_JOIN_VARIANTS(M) +#undef M + + default: + throw Exception( + fmt::format("Unknown JOIN keys variant {} during scanning build side", magic_enum::enum_name(join->method)), + ErrorCodes::UNKNOWN_SET_DATA_VARIANT); + } + +#undef CALL +#undef CALL2 +#undef SET_FUNC_PTR +} + +Block JoinBuildScannerAfterProbe::scan(JoinProbeWorkerData & wd) +{ + return (this->*scan_func_ptr)(wd); +} + +template +Block JoinBuildScannerAfterProbe::scanImpl(JoinProbeWorkerData & wd) +{ + static_assert(need_row_data || need_other_block_data); + + if (wd.is_scan_end) + return {}; + + using KeyGetterType = typename KeyGetter::Type; + using HashValueType = typename KeyGetter::HashValueType; + + auto & non_joined_blocks = join->non_joined_blocks; + Block * full_block = non_joined_blocks.getNextFullBlock(); + if (full_block != nullptr) + return *full_block; + + const size_t max_block_size = join->settings.max_block_size; + while (true) + { + Block * non_joined_non_full_block = non_joined_blocks.getNextNonFullBlock(); + if (non_joined_non_full_block == nullptr) + break; + RUNTIME_CHECK(non_joined_non_full_block->columns() == join->output_block_after_finalize.columns()); + size_t rows = non_joined_non_full_block->rows(); + if (rows >= max_block_size / 2) + return *non_joined_non_full_block; + + if (wd.non_joined_non_full_blocks_rows + rows > max_block_size) + { + Block res_block = vstackBlocks(std::move(wd.non_joined_non_full_blocks)); + wd.non_joined_non_full_blocks.clear(); + wd.non_joined_non_full_blocks.emplace_back(); + wd.non_joined_non_full_blocks.back().swap(*non_joined_non_full_block); + wd.non_joined_non_full_blocks_rows = rows; + return res_block; + } + + wd.non_joined_non_full_blocks.emplace_back(); + wd.non_joined_non_full_blocks.back().swap(*non_joined_non_full_block); + wd.non_joined_non_full_blocks_rows += rows; + } + if (wd.non_joined_non_full_blocks_rows > 0) + { + Block res_block = vstackBlocks(std::move(wd.non_joined_non_full_blocks)); + wd.non_joined_non_full_blocks.clear(); + wd.non_joined_non_full_blocks_rows = 0; + return res_block; + } + + const auto & multi_row_containers = join->multi_row_containers; + const size_t left_columns = join->left_sample_block_pruned.columns(); + auto & key_getter = *static_cast(join_key_getter.get()); + constexpr size_t key_offset + = sizeof(RowPtr) + (KeyGetterType::joinKeyCompareHashFirst() ? sizeof(HashValueType) : 0); + + size_t scan_size = 0; + RowContainer * container = wd.current_container; + size_t index = wd.current_container_index; + wd.selective_offsets.clear(); + wd.selective_offsets.reserve(max_block_size); + constexpr size_t insert_batch_max_size = 256; + wd.insert_batch.clear(); + wd.insert_batch.reserve(insert_batch_max_size); + + join->initOutputBlock(wd.scan_result_block); + size_t output_columns = wd.scan_result_block.columns(); + size_t result_block_rows = wd.scan_result_block.rows(); + + do + { + if (container == nullptr) + { + if (wd.current_scan_table_index != -1) + container = multi_row_containers[wd.current_scan_table_index]->getScanNext(); + if (container == nullptr) + { + std::unique_lock lock(scan_build_lock); + for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT; ++i) + { + scan_build_index = (scan_build_index + i) % JOIN_BUILD_PARTITION_COUNT; + container = multi_row_containers[scan_build_index]->getScanNext(); + if (container != nullptr) + { + wd.current_scan_table_index = scan_build_index; + scan_build_index = (scan_build_index + 1) % JOIN_BUILD_PARTITION_COUNT; + break; + } + } + } + if unlikely (container == nullptr) + { + wd.is_scan_end = true; + break; + } + } + size_t rows = container->size(); + size_t original_index = index; + while (index < rows) + { + RowPtr ptr = container->getRowPtr(index); + bool need_output; + if constexpr (kind == RightSemi) + need_output = hasRowPtrMatchedFlag(ptr); + else + need_output = !hasRowPtrMatchedFlag(ptr); + if (need_output) + { + if constexpr (need_row_data) + { + const auto & key = key_getter.deserializeJoinKey(ptr + key_offset); + size_t required_offset = key_offset + key_getter.getRequiredKeyOffset(key); + wd.insert_batch.push_back(ptr + required_offset); + if unlikely (wd.insert_batch.size() >= insert_batch_max_size) + flushInsertBatch(wd); + } + if constexpr (need_other_block_data) + { + wd.selective_offsets.push_back(index); + } + ++result_block_rows; + if unlikely (result_block_rows >= max_block_size) + { + ++index; + break; + } + } + ++index; + } + + if constexpr (need_other_block_data) + { + size_t other_columns = join->row_layout.other_column_indexes.size() + - join->row_layout.other_column_count_for_other_condition; + RUNTIME_CHECK(container->other_column_block.columns() == other_columns); + for (size_t i = 0; i < other_columns; ++i) + { + size_t column_index + = join->row_layout.other_column_indexes[join->row_layout.other_column_count_for_other_condition + i] + .first; + auto output_index = join->output_column_indexes.at(left_columns + column_index); + // These columns must be in the final output schema otherwise they should be pruned + RUNTIME_CHECK(output_index >= 0); + auto & src_column = container->other_column_block.safeGetByPosition(i); + auto & des_column = wd.scan_result_block.safeGetByPosition(output_index); + des_column.column->assumeMutable()->insertSelectiveFrom(*src_column.column, wd.selective_offsets); + } + wd.selective_offsets.clear(); + } + + scan_size += index - original_index; + + if (index >= rows) + { + container = nullptr; + index = 0; + } + + if unlikely (result_block_rows >= max_block_size) + break; + } while (scan_size < 2 * max_block_size); + + flushInsertBatch(wd); + fillNullMapWithZero(wd); + + wd.current_container = container; + wd.current_container_index = index; + + if constexpr (kind == RightOuter) + { + for (size_t i = 0; i < output_columns; ++i) + { + auto des_mut_column = wd.scan_result_block.getByPosition(i).column->assumeMutable(); + size_t current_rows = des_mut_column->size(); + if (current_rows < result_block_rows) + { + // This column should be nullable and from the left side + RUNTIME_CHECK_MSG( + des_mut_column->isColumnNullable(), + "Column with name {} is not nullable", + wd.scan_result_block.getByPosition(i).name); + auto & nullable_column = static_cast(*des_mut_column); + nullable_column.insertManyDefaults(result_block_rows - current_rows); + } + } + } + + if unlikely (wd.is_scan_end || result_block_rows >= max_block_size) + { + if (result_block_rows == 0) + return {}; + + Block res_block; + res_block.swap(wd.scan_result_block); + return res_block; + } + return join->output_block_after_finalize; +} + +template +void JoinBuildScannerAfterProbe::flushInsertBatch(JoinProbeWorkerData & wd) const +{ + const size_t left_columns = join->left_sample_block_pruned.columns(); + for (auto [column_index, is_nullable] : join->row_layout.raw_key_column_indexes) + { + auto output_index = join->output_column_indexes.at(left_columns + column_index); + if (output_index < 0) + { + join->right_sample_block_pruned.safeGetByPosition(column_index) + .column->deserializeAndAdvancePos(wd.insert_batch); + continue; + } + auto & des_column = wd.scan_result_block.safeGetByPosition(output_index); + IColumn * column = des_column.column->assumeMutable().get(); + if (is_nullable) + column = &static_cast(*column).getNestedColumn(); + column->deserializeAndInsertFromPos(wd.insert_batch, true); + if constexpr (last_flush) + column->flushNTAlignBuffer(); + } + + size_t other_column_count; + if (join->kind == RightOuter) + other_column_count = join->row_layout.other_column_indexes.size(); + else + other_column_count = join->row_layout.other_column_count_for_other_condition; + + const size_t invalid_start_offset = other_column_count; + size_t advance_start_offset = invalid_start_offset; + for (size_t i = 0; i < other_column_count; ++i) + { + size_t column_index = join->row_layout.other_column_indexes[i].first; + auto output_index = join->output_column_indexes.at(left_columns + column_index); + if (output_index < 0) + { + advance_start_offset = std::min(advance_start_offset, i); + continue; + } + if (advance_start_offset != invalid_start_offset) + { + while (advance_start_offset < i) + { + size_t column_index = join->row_layout.other_column_indexes[advance_start_offset].first; + join->right_sample_block_pruned.safeGetByPosition(column_index) + .column->deserializeAndAdvancePos(wd.insert_batch); + ++advance_start_offset; + } + advance_start_offset = invalid_start_offset; + } + auto & des_column = wd.scan_result_block.safeGetByPosition(output_index); + des_column.column->assumeMutable()->deserializeAndInsertFromPos(wd.insert_batch, true); + if constexpr (last_flush) + des_column.column->assumeMutable()->flushNTAlignBuffer(); + } + + wd.insert_batch.clear(); +} + +void JoinBuildScannerAfterProbe::fillNullMapWithZero(JoinProbeWorkerData & wd) const +{ + size_t left_columns = join->left_sample_block_pruned.columns(); + for (auto [column_index, is_nullable] : join->row_layout.raw_key_column_indexes) + { + auto output_index = join->output_column_indexes.at(left_columns + column_index); + if (!is_nullable || output_index < 0) + continue; + + auto des_mut_column = wd.scan_result_block.safeGetByPosition(output_index).column->assumeMutable(); + RUNTIME_CHECK(des_mut_column->isColumnNullable()); + auto & nullable_column = static_cast(*des_mut_column); + size_t data_size = nullable_column.getNestedColumn().size(); + size_t nullmap_size = nullable_column.getNullMapColumn().size(); + RUNTIME_CHECK(nullmap_size <= data_size); + nullable_column.getNullMapColumn().getData().resize_fill_zero(data_size); + } +} + +} // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.h b/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.h new file mode 100644 index 00000000000..4c1d10e8058 --- /dev/null +++ b/dbms/src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.h @@ -0,0 +1,68 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ + +class HashJoin; +class JoinBuildScannerAfterProbe +{ +public: + explicit JoinBuildScannerAfterProbe(HashJoin * join); + + Block scan(JoinProbeWorkerData & wd); + +private: + template + Block scanImpl(JoinProbeWorkerData & wd); + + template + void ALWAYS_INLINE + insertRowToBatch(JoinProbeWorkerData & wd, RowPtr row_ptr, size_t index, size_t insert_batch_max_size) const + { + if constexpr (need_row_data) + { + wd.insert_batch.push_back(row_ptr); + if unlikely (wd.insert_batch.size() >= insert_batch_max_size) + flushInsertBatch(wd); + } + if constexpr (need_other_block_data) + { + wd.selective_offsets.push_back(index); + } + } + + template + void flushInsertBatch(JoinProbeWorkerData & wd) const; + + void fillNullMapWithZero(JoinProbeWorkerData & wd) const; + +private: + using FuncType = Block (JoinBuildScannerAfterProbe::*)(JoinProbeWorkerData &); + FuncType scan_func_ptr = nullptr; + + HashJoin * join; + std::mutex scan_build_lock; + size_t scan_build_index = 0; + /// Used for deserializing join key and getting required key offset + std::unique_ptr> join_key_getter; + Columns materialized_key_columns; +}; + + +} // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/HashJoinPointerTable.cpp b/dbms/src/Interpreters/JoinV2/HashJoinPointerTable.cpp index 1f785793ba0..a58f3e69222 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinPointerTable.cpp +++ b/dbms/src/Interpreters/JoinV2/HashJoinPointerTable.cpp @@ -88,11 +88,11 @@ bool HashJoinPointerTable::buildImpl( Stopwatch watch; size_t build_size = 0; bool is_end = false; - while (true) + do { RowContainer * container = nullptr; - if (wd.build_pointer_table_iter != -1) - container = multi_row_containers[wd.build_pointer_table_iter]->getNext(); + if (wd.current_build_table_index != -1) + container = multi_row_containers[wd.current_build_table_index]->getNext(); if (container == nullptr) { { @@ -103,7 +103,7 @@ bool HashJoinPointerTable::buildImpl( container = multi_row_containers[build_table_index]->getNext(); if (container != nullptr) { - wd.build_pointer_table_iter = build_table_index; + wd.current_build_table_index = build_table_index; build_table_index = (build_table_index + 1) % JOIN_BUILD_PARTITION_COUNT; break; } @@ -116,9 +116,9 @@ bool HashJoinPointerTable::buildImpl( break; } } - size_t size = container->size(); - build_size += size; - for (size_t i = 0; i < size; ++i) + size_t rows = container->size(); + build_size += rows; + for (size_t i = 0; i < rows; ++i) { RowPtr row_ptr = container->getRowPtr(i); assert((reinterpret_cast(row_ptr) & (ROW_ALIGN - 1)) == 0); @@ -149,10 +149,7 @@ bool HashJoinPointerTable::buildImpl( if (old_head != nullptr) unalignedStore(row_ptr, old_head); } - - if (build_size >= max_build_size) - break; - } + } while (build_size < max_build_size * 2); wd.build_pointer_table_size += build_size; wd.build_pointer_table_time += watch.elapsedMilliseconds(); return is_end; diff --git a/dbms/src/Interpreters/JoinV2/HashJoinProbe.cpp b/dbms/src/Interpreters/JoinV2/HashJoinProbe.cpp index b3c71247b9e..b9e9ae2d2d0 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinProbe.cpp +++ b/dbms/src/Interpreters/JoinV2/HashJoinProbe.cpp @@ -149,19 +149,21 @@ void JoinProbeContext::prepareForHashProbe( is_prepared = true; } -template +template void JoinProbeHelperUtil::flushInsertBatch(JoinProbeWorkerData & wd, MutableColumns & added_columns) const { for (auto [column_index, is_nullable] : row_layout.raw_key_column_indexes) { IColumn * column = added_columns[column_index].get(); if (is_nullable) - column = &static_cast(*added_columns[column_index]).getNestedColumn(); + column = &static_cast(*column).getNestedColumn(); column->deserializeAndInsertFromPos(wd.insert_batch, true); + if constexpr (last_flush) + column->flushNTAlignBuffer(); } size_t add_size; - if constexpr (late_materialization) + if constexpr (late_materialization || is_right_semi_join) add_size = row_layout.other_column_count_for_other_condition; else add_size = row_layout.other_column_indexes.size(); @@ -169,42 +171,22 @@ void JoinProbeHelperUtil::flushInsertBatch(JoinProbeWorkerData & wd, MutableColu { size_t column_index = row_layout.other_column_indexes[i].first; added_columns[column_index]->deserializeAndInsertFromPos(wd.insert_batch, true); - } - if constexpr (late_materialization) - wd.row_ptrs_for_lm.insert(wd.insert_batch.begin(), wd.insert_batch.end()); - - if constexpr (last_flush) - { - for (auto [column_index, is_nullable] : row_layout.raw_key_column_indexes) - { - IColumn * column = added_columns[column_index].get(); - if (is_nullable) - column = &static_cast(*added_columns[column_index]).getNestedColumn(); - column->flushNTAlignBuffer(); - } - - size_t add_size; - if constexpr (late_materialization) - add_size = row_layout.other_column_count_for_other_condition; - else - add_size = row_layout.other_column_indexes.size(); - for (size_t i = 0; i < add_size; ++i) - { - size_t column_index = row_layout.other_column_indexes[i].first; + if constexpr (last_flush) added_columns[column_index]->flushNTAlignBuffer(); - } } + if constexpr (late_materialization && !is_right_semi_join) + wd.row_ptrs_for_lm.insert(wd.insert_batch.begin(), wd.insert_batch.end()); wd.insert_batch.clear(); } -template void JoinProbeHelperUtil::fillNullMapWithZero(MutableColumns & added_columns) const { for (auto [column_index, is_nullable] : row_layout.raw_key_column_indexes) { if (is_nullable) { + RUNTIME_CHECK(added_columns[column_index]->isColumnNullable()); auto & nullable_column = static_cast(*added_columns[column_index]); size_t data_size = nullable_column.getNestedColumn().size(); size_t nullmap_size = nullable_column.getNullMapColumn().size(); @@ -233,7 +215,7 @@ struct JoinProbeAdder { ++current_offset; wd.selective_offsets.push_back(idx); - helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); + helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); return current_offset >= helper.settings.max_block_size; } @@ -245,8 +227,8 @@ struct JoinProbeAdder static void flush(JoinProbeHelper & helper, JoinProbeWorkerData & wd, MutableColumns & added_columns) { - helper.flushInsertBatch(wd, added_columns); - helper.fillNullMapWithZero(added_columns); + helper.flushInsertBatch(wd, added_columns); + helper.fillNullMapWithZero(added_columns); } }; @@ -269,7 +251,7 @@ struct JoinProbeAdder { ++current_offset; wd.selective_offsets.push_back(idx); - helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); + helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); return current_offset >= helper.settings.max_block_size; } @@ -291,8 +273,8 @@ struct JoinProbeAdder static void flush(JoinProbeHelper & helper, JoinProbeWorkerData & wd, MutableColumns & added_columns) { - helper.flushInsertBatch(wd, added_columns); - helper.fillNullMapWithZero(added_columns); + helper.flushInsertBatch(wd, added_columns); + helper.fillNullMapWithZero(added_columns); if constexpr (!has_other_condition) { @@ -436,12 +418,128 @@ struct JoinProbeAdder static void flush(JoinProbeHelper &, JoinProbeWorkerData &, MutableColumns &) {} }; +template +struct JoinProbeAdder +{ + static constexpr bool need_matched = true; + static constexpr bool need_not_matched = false; + static constexpr bool break_on_first_match = false; + + static bool ALWAYS_INLINE addMatched( + JoinProbeHelper & helper, + JoinProbeContext &, + JoinProbeWorkerData & wd, + MutableColumns & added_columns, + size_t idx, + size_t & current_offset, + RowPtr row_ptr, + size_t ptr_offset) + { + if constexpr (has_other_condition) + { + wd.right_join_row_ptrs.push_back(hasRowPtrMatchedFlag(row_ptr) ? nullptr : row_ptr); + } + else + { + setRowPtrMatchedFlag(row_ptr); + } + + ++current_offset; + wd.selective_offsets.push_back(idx); + helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); + return current_offset >= helper.settings.max_block_size; + } + + static bool ALWAYS_INLINE + addNotMatched(JoinProbeHelper &, JoinProbeContext &, JoinProbeWorkerData &, size_t, size_t &) + { + return false; + } + + static void flush(JoinProbeHelper & helper, JoinProbeWorkerData & wd, MutableColumns & added_columns) + { + helper.flushInsertBatch(wd, added_columns); + helper.fillNullMapWithZero(added_columns); + } +}; + +template + requires(kind == RightSemi || kind == RightAnti) +struct JoinProbeAdder +{ + static constexpr bool need_matched = true; + static constexpr bool need_not_matched = false; + static constexpr bool break_on_first_match = false; + + static bool ALWAYS_INLINE addMatched( + JoinProbeHelper &, + JoinProbeContext &, + JoinProbeWorkerData &, + MutableColumns &, + size_t, + size_t &, + RowPtr row_ptr, + size_t) + { + setRowPtrMatchedFlag(row_ptr); + return false; + } + + static bool ALWAYS_INLINE + addNotMatched(JoinProbeHelper &, JoinProbeContext &, JoinProbeWorkerData &, size_t, size_t &) + { + return false; + } + + static void flush(JoinProbeHelper &, JoinProbeWorkerData &, MutableColumns &) {} +}; + +template + requires(kind == RightSemi || kind == RightAnti) +struct JoinProbeAdder +{ + static constexpr bool need_matched = true; + static constexpr bool need_not_matched = false; + static constexpr bool break_on_first_match = false; + + static bool ALWAYS_INLINE addMatched( + JoinProbeHelper & helper, + JoinProbeContext &, + JoinProbeWorkerData & wd, + MutableColumns & added_columns, + size_t idx, + size_t & current_offset, + RowPtr row_ptr, + size_t ptr_offset) + { + if (hasRowPtrMatchedFlag(row_ptr)) + return false; + ++current_offset; + wd.selective_offsets.push_back(idx); + wd.right_join_row_ptrs.push_back(row_ptr); + helper.insertRowToBatch(wd, added_columns, row_ptr + ptr_offset); + return current_offset >= helper.settings.max_block_size; + } + + static bool ALWAYS_INLINE + addNotMatched(JoinProbeHelper &, JoinProbeContext &, JoinProbeWorkerData &, size_t, size_t &) + { + return false; + } + + static void flush(JoinProbeHelper & helper, JoinProbeWorkerData & wd, MutableColumns & added_columns) + { + helper.flushInsertBatch(wd, added_columns); + helper.fillNullMapWithZero(added_columns); + } +}; + JoinProbeHelper::JoinProbeHelper(const HashJoin * join, bool late_materialization) : JoinProbeHelperUtil(join->settings, join->row_layout) , join(join) , pointer_table(join->pointer_table) { -#define CALL3(KeyGetter, JoinType, has_other_condition, late_materialization, tagged_pointer) \ +#define SET_FUNC_PTR(KeyGetter, JoinType, has_other_condition, late_materialization, tagged_pointer) \ { \ func_ptr_has_null \ = &JoinProbeHelper:: \ @@ -451,12 +549,12 @@ JoinProbeHelper::JoinProbeHelper(const HashJoin * join, bool late_materializatio probeImpl; \ } -#define CALL2(KeyGetter, JoinType, has_other_condition, late_materialization) \ - { \ - if (pointer_table.enableTaggedPointer()) \ - CALL3(KeyGetter, JoinType, has_other_condition, late_materialization, true) \ - else \ - CALL3(KeyGetter, JoinType, has_other_condition, late_materialization, false) \ +#define CALL2(KeyGetter, JoinType, has_other_condition, late_materialization) \ + { \ + if (pointer_table.enableTaggedPointer()) \ + SET_FUNC_PTR(KeyGetter, JoinType, has_other_condition, late_materialization, true) \ + else \ + SET_FUNC_PTR(KeyGetter, JoinType, has_other_condition, late_materialization, false) \ } #define CALL1(KeyGetter, JoinType) \ @@ -480,6 +578,8 @@ JoinProbeHelper::JoinProbeHelper(const HashJoin * join, bool late_materializatio CALL1(KeyGetter, Inner) \ else if (kind == LeftOuter) \ CALL1(KeyGetter, LeftOuter) \ + else if (kind == RightOuter) \ + CALL1(KeyGetter, RightOuter) \ else if (kind == Semi && !has_other_condition) \ CALL2(KeyGetter, Semi, false, false) \ else if (kind == Anti && !has_other_condition) \ @@ -488,6 +588,14 @@ JoinProbeHelper::JoinProbeHelper(const HashJoin * join, bool late_materializatio CALL2(KeyGetter, LeftOuterSemi, false, false) \ else if (kind == LeftOuterAnti && !has_other_condition) \ CALL2(KeyGetter, LeftOuterAnti, false, false) \ + else if (kind == RightSemi && has_other_condition) \ + CALL2(KeyGetter, RightSemi, true, false) \ + else if (kind == RightSemi && !has_other_condition) \ + CALL2(KeyGetter, RightSemi, false, false) \ + else if (kind == RightAnti && has_other_condition) \ + CALL2(KeyGetter, RightAnti, true, false) \ + else if (kind == RightAnti && !has_other_condition) \ + CALL2(KeyGetter, RightAnti, false, false) \ else \ throw Exception( \ fmt::format("Logical error: unknown combination of JOIN {}", magic_enum::enum_name(kind)), \ @@ -509,10 +617,11 @@ JoinProbeHelper::JoinProbeHelper(const HashJoin * join, bool late_materializatio fmt::format("Unknown JOIN keys variant {}.", magic_enum::enum_name(join->method)), ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } + #undef CALL #undef CALL1 #undef CALL2 -#undef CALL3 +#undef SET_FUNC_PTR } Block JoinProbeHelper::probe(JoinProbeContext & ctx, JoinProbeWorkerData & wd) @@ -556,6 +665,11 @@ Block JoinProbeHelper::probeImpl(JoinProbeContext & ctx, JoinProbeWorkerData & w wd.row_ptrs_for_lm.clear(); wd.row_ptrs_for_lm.reserve(settings.max_block_size); } + if constexpr (kind == RightSemi || kind == RightAnti) + { + wd.right_join_row_ptrs.clear(); + wd.right_join_row_ptrs.reserve(settings.max_block_size); + } size_t left_columns = join->left_sample_block_pruned.columns(); size_t right_columns = join->right_sample_block_pruned.columns(); @@ -595,7 +709,9 @@ Block JoinProbeHelper::probeImpl(JoinProbeContext & ctx, JoinProbeWorkerData & w for (size_t i = 0; i < right_columns; ++i) wd.result_block.safeGetByPosition(left_columns + i).column = std::move(added_columns[i]); - if constexpr (kind == Inner || kind == LeftOuter || kind == Semi || kind == Anti) + if constexpr ( + kind == Inner || kind == LeftOuter || kind == RightOuter || kind == Semi || kind == Anti || kind == RightSemi + || kind == RightAnti) { if (wd.selective_offsets.empty()) return join->output_block_after_finalize; @@ -723,10 +839,18 @@ void JoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDa const auto & key2 = key_getter.deserializeJoinKey(ptr + key_offset); bool key_is_equal = joinKeyIsEqual(key_getter, key, key2, hash, ptr); collision += !key_is_equal; + if constexpr (Adder::need_not_matched) + is_matched |= key_is_equal; if (key_is_equal) { - if constexpr (Adder::need_not_matched) - is_matched = true; + if constexpr ((kind == RightSemi || kind == RightAnti) && !has_other_condition) + { + if (hasRowPtrMatchedFlag(ptr)) + { + ptr = nullptr; + break; + } + } if constexpr (Adder::need_matched) { @@ -755,13 +879,13 @@ void JoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDa } } - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); if (ptr == nullptr) break; } if unlikely (ptr != nullptr) { - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); if (ptr == nullptr) ++idx; break; @@ -864,7 +988,7 @@ void JoinProbeHelper::probeFillColumnsPrefetch( if (state->stage == ProbePrefetchStage::FindNext) { RowPtr ptr = state->ptr; - RowPtr next_ptr = getNextRowPtr(ptr); + RowPtr next_ptr = getNextRowPtr(ptr); state->ptr = next_ptr; const auto & key2 = key_getter.deserializeJoinKey(ptr + key_offset); @@ -872,6 +996,14 @@ void JoinProbeHelper::probeFillColumnsPrefetch( collision += !key_is_equal; if constexpr (Adder::need_not_matched) state->is_matched |= key_is_equal; + if constexpr ((kind == RightSemi || kind == RightAnti) && !has_other_condition) + { + if (key_is_equal && hasRowPtrMatchedFlag(ptr)) + { + next_ptr = nullptr; + key_is_equal = false; + } + } if (key_is_equal) { if constexpr (Adder::need_matched) @@ -1085,6 +1217,29 @@ Block JoinProbeHelper::handleOtherConditions( ctx.rows_not_matched[idx] &= !is_matched; } } + else if (kind == RightOuter) + { + RUNTIME_CHECK(wd.right_join_row_ptrs.size() == rows); + RUNTIME_CHECK(wd.filter.size() == rows); + for (size_t i = 0; i < rows; ++i) + { + bool is_matched = wd.filter[i]; + if (is_matched && wd.right_join_row_ptrs[i]) + setRowPtrMatchedFlag(wd.right_join_row_ptrs[i]); + } + } + else if (isRightSemiFamily(kind)) + { + RUNTIME_CHECK(wd.right_join_row_ptrs.size() == rows); + RUNTIME_CHECK(wd.filter.size() == rows); + for (size_t i = 0; i < rows; ++i) + { + bool is_matched = wd.filter[i]; + if (is_matched) + setRowPtrMatchedFlag(wd.right_join_row_ptrs[i]); + } + return output_block_after_finalize; + } join->initOutputBlock(wd.result_block_for_other_condition); @@ -1387,4 +1542,9 @@ Block JoinProbeHelper::genResultBlockForLeftOuterSemi(JoinProbeContext & ctx) return res_block; } +// SemiJoinProbe.cpp calls this function +template void DB::JoinProbeHelperUtil::flushInsertBatch( + DB::JoinProbeWorkerData & wd, + DB::MutableColumns & added_columns) const; + } // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/HashJoinProbe.h b/dbms/src/Interpreters/JoinV2/HashJoinProbe.h index 698f1dba409..289c13cde88 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinProbe.h +++ b/dbms/src/Interpreters/JoinV2/HashJoinProbe.h @@ -100,12 +100,25 @@ struct alignas(CPU_CACHE_LINE_SIZE) JoinProbeWorkerData /// For late materialization RowPtrs row_ptrs_for_lm; RowPtrs filter_row_ptrs_for_lm; + /// For right outer/semi/anti join with other conditions + RowPtrs right_join_row_ptrs; /// Schema: HashJoin::all_sample_block_pruned Block result_block; /// Schema: HashJoin::output_block_after_finalize Block result_block_for_other_condition; + /// Scan build side + ssize_t current_scan_table_index = -1; + RowContainer * current_container = nullptr; + size_t current_container_index = 0; + /// Schema: HashJoin::output_block_after_finalize + Block scan_result_block; + /// Accumulate non-joined non-full blocks and output them once they approach the max block size + Blocks non_joined_non_full_blocks; + size_t non_joined_non_full_blocks_rows = 0; + bool is_scan_end = false; + /// Metrics size_t probe_handle_rows = 0; size_t probe_time = 0; @@ -113,6 +126,7 @@ struct alignas(CPU_CACHE_LINE_SIZE) JoinProbeWorkerData size_t replicate_time = 0; size_t other_condition_time = 0; size_t collision = 0; + size_t scan_build_side_time = 0; }; class JoinProbeHelperUtil @@ -140,18 +154,17 @@ class JoinProbeHelperUtil return key_getter.joinKeyIsEqual(key1, key2); } - template + template void ALWAYS_INLINE insertRowToBatch(JoinProbeWorkerData & wd, MutableColumns & added_columns, RowPtr row_ptr) const { wd.insert_batch.push_back(row_ptr); if unlikely (wd.insert_batch.size() >= settings.probe_insert_batch_size) - flushInsertBatch(wd, added_columns); + flushInsertBatch(wd, added_columns); } - template + template void flushInsertBatch(JoinProbeWorkerData & wd, MutableColumns & added_columns) const; - template void fillNullMapWithZero(MutableColumns & added_columns) const; protected: diff --git a/dbms/src/Interpreters/JoinV2/HashJoinRowLayout.h b/dbms/src/Interpreters/JoinV2/HashJoinRowLayout.h index 16d1c640082..7263ef6add4 100644 --- a/dbms/src/Interpreters/JoinV2/HashJoinRowLayout.h +++ b/dbms/src/Interpreters/JoinV2/HashJoinRowLayout.h @@ -15,9 +15,13 @@ #pragma once #include +#include +#include #include #include +#include +#include #include namespace DB @@ -41,12 +45,16 @@ struct HashJoinRowLayout size_t key_column_fixed_size = 0; size_t other_column_fixed_size = 0; + size_t other_column_for_other_condition_fixed_size = 0; }; using RowPtr = char *; using RowPtrs = PaddedPODArray; -constexpr size_t ROW_ALIGN = 4; +static_assert(alignof(std::atomic) == alignof(uintptr_t)); +constexpr UInt8 ROW_ALIGN = alignof(uintptr_t); +static_assert((ROW_ALIGN & (ROW_ALIGN - 1)) == 0); +static_assert(ROW_ALIGN >= 4 && ROW_ALIGN < UINT8_MAX); constexpr size_t ROW_PTR_TAG_BITS = 16; constexpr size_t ROW_PTR_TAG_MASK = (1 << ROW_PTR_TAG_BITS) - 1; @@ -54,9 +62,42 @@ constexpr size_t ROW_PTR_TAG_SHIFT = 8 * sizeof(RowPtr) - ROW_PTR_TAG_BITS; static_assert(sizeof(RowPtr) == sizeof(uintptr_t)); static_assert(sizeof(RowPtr) == 8); +template inline RowPtr getNextRowPtr(RowPtr ptr) { - return unalignedLoad(ptr); + using enum ASTTableJoin::Kind; + if constexpr (kind == RightOuter || kind == RightSemi || kind == RightAnti) + { + auto next = reinterpret_cast *>(ptr)->load(std::memory_order_relaxed); + return reinterpret_cast(next & (~static_cast(ROW_ALIGN - 1))); + } + return *reinterpret_cast(ptr); +} + +inline UInt8 getRowPtrFlag(RowPtr ptr) +{ + return reinterpret_cast *>(ptr)->load(std::memory_order_relaxed) + & static_cast(ROW_ALIGN - 1); +} + +inline bool hasRowPtrMatchedFlag(RowPtr ptr) +{ + return getRowPtrFlag(ptr) & 0x01; +} + +inline void setRowPtrMatchedFlag(RowPtr ptr) +{ + reinterpret_cast *>(ptr)->fetch_or(0x01, std::memory_order_relaxed); +} + +inline bool hasRowPtrNullFlag(RowPtr ptr) +{ + return getRowPtrFlag(ptr) & 0x10; +} + +inline void setRowPtrNullFlag(RowPtr ptr) +{ + reinterpret_cast *>(ptr)->fetch_or(0x10, std::memory_order_relaxed); } inline UInt16 getRowPtrTag(RowPtr ptr) @@ -88,6 +129,12 @@ struct RowContainer PaddedPODArray data; PaddedPODArray offsets; PaddedPODArray hashes; + /// Only used for right semi/anti join + /// Stores the other columns that are not used for other conditions. + /// The schema corresponds to the entries in `HashJoinRowLayout::other_column_indexes` + /// after the first `other_column_count_for_other_condition` elements. + /// These indexes refer to columns in `HashJoin::right_sample_block_pruned`. + Block other_column_block; size_t size() const { return offsets.size(); } @@ -95,15 +142,9 @@ struct RowContainer UInt64 getHash(ssize_t row) { return hashes[row]; } }; -struct alignas(CPU_CACHE_LINE_SIZE) MultipleRowContainer +class alignas(CPU_CACHE_LINE_SIZE) MultipleRowContainer { - std::mutex mu; - std::vector column_rows; - size_t all_row_count = 0; - - size_t build_table_index = 0; - size_t scan_table_index = 0; - +public: void insert(RowContainer && row_container, size_t count) { std::unique_lock lock(mu); @@ -113,19 +154,96 @@ struct alignas(CPU_CACHE_LINE_SIZE) MultipleRowContainer RowContainer * getNext() { + if (build_table_done.load(std::memory_order_relaxed)) + return nullptr; std::unique_lock lock(mu); if (build_table_index >= column_rows.size()) + { + build_table_done.store(true, std::memory_order_relaxed); return nullptr; + } return &column_rows[build_table_index++]; } RowContainer * getScanNext() { + if (scan_table_done.load(std::memory_order_relaxed)) + return nullptr; std::unique_lock lock(mu); if (scan_table_index >= column_rows.size()) + { + scan_table_done.store(true, std::memory_order_relaxed); return nullptr; + } return &column_rows[scan_table_index++]; } + +private: + std::mutex mu; + std::vector column_rows; + size_t all_row_count = 0; + + size_t build_table_index = 0; + size_t scan_table_index = 0; + + std::atomic_bool build_table_done = false; + std::atomic_bool scan_table_done = false; +}; + +class NonJoinedBlocks +{ +public: + void insertFullBlock(Block && block) + { + std::unique_lock lock(mu); + full_blocks.push_back(block); + } + + void insertNonFullBlock(Block && block) + { + std::unique_lock lock(mu); + non_full_blocks.push_back(block); + } + + Block * getNextFullBlock() + { + if (scan_full_blocks_done.load(std::memory_order_acquire)) + return nullptr; + std::unique_lock lock(mu); + if (scan_full_blocks_index >= full_blocks.size()) + { + scan_full_blocks_done.store(true, std::memory_order_release); + return nullptr; + } + return &full_blocks[scan_full_blocks_index++]; + } + + Block * getNextNonFullBlock() + { + if (scan_non_full_blocks_done.load(std::memory_order_acquire)) + return nullptr; + std::unique_lock lock(mu); + if (scan_non_full_blocks_index >= non_full_blocks.size()) + { + scan_non_full_blocks_done.store(true, std::memory_order_release); + return nullptr; + } + return &non_full_blocks[scan_non_full_blocks_index++]; + } + +private: + std::mutex mu; + /// Schema: HashJoin::output_block_after_finalize + /// Each block's size is equal to max_block_size + std::vector full_blocks; + /// Each block's size is less than max_block_size + std::vector non_full_blocks; + + size_t scan_full_blocks_index = 0; + size_t scan_non_full_blocks_index = 0; + + std::atomic_bool scan_full_blocks_done = false; + std::atomic_bool scan_non_full_blocks_done = false; }; } // namespace DB diff --git a/dbms/src/Interpreters/JoinV2/SemiJoinProbe.cpp b/dbms/src/Interpreters/JoinV2/SemiJoinProbe.cpp index 02f9cb8f797..c5c6b416790 100644 --- a/dbms/src/Interpreters/JoinV2/SemiJoinProbe.cpp +++ b/dbms/src/Interpreters/JoinV2/SemiJoinProbe.cpp @@ -261,15 +261,12 @@ SemiJoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDat RUNTIME_CHECK(probe_list->slotCapacity() == ctx.rows); size_t current_offset = 0; size_t collision = 0; - size_t key_offset = sizeof(RowPtr); - if constexpr (KeyGetterType::joinKeyCompareHashFirst()) - { - key_offset += sizeof(HashValueType); - } + constexpr size_t key_offset + = sizeof(RowPtr) + (KeyGetterType::joinKeyCompareHashFirst() ? sizeof(HashValueType) : 0); SCOPE_EXIT({ - flushInsertBatch(wd, added_columns); - fillNullMapWithZero(added_columns); + flushInsertBatch(wd, added_columns); + fillNullMapWithZero(added_columns); wd.collision += collision; }); @@ -291,15 +288,18 @@ SemiJoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDat if (key_is_equal) { wd.selective_offsets.push_back(idx); - insertRowToBatch(wd, added_columns, ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); + insertRowToBatch( + wd, + added_columns, + ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); if unlikely (current_offset >= end_offset) { - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); break; } } - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); } if (prev_offset == current_offset) { @@ -362,15 +362,18 @@ SemiJoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDat if (key_is_equal) { wd.selective_offsets.push_back(idx); - insertRowToBatch(wd, added_columns, ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); + insertRowToBatch( + wd, + added_columns, + ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); if unlikely (current_offset >= end_offset) { - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); break; } } - ptr = getNextRowPtr(ptr); + ptr = getNextRowPtr(ptr); if (ptr == nullptr) break; } @@ -474,7 +477,7 @@ void NO_INLINE SemiJoinProbeHelper::probeFillColumnsPrefetch( if (state->stage == ProbePrefetchStage::FindNext) { RowPtr ptr = state->ptr; - RowPtr next_ptr = getNextRowPtr(ptr); + RowPtr next_ptr = getNextRowPtr(ptr); state->ptr = next_ptr; const auto & key2 = key_getter.deserializeJoinKey(ptr + key_offset); @@ -487,7 +490,10 @@ void NO_INLINE SemiJoinProbeHelper::probeFillColumnsPrefetch( if (key_is_equal) { wd.selective_offsets.push_back(state->index); - insertRowToBatch(wd, added_columns, ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); + insertRowToBatch( + wd, + added_columns, + ptr + key_offset + key_getter.getRequiredKeyOffset(key2)); if unlikely (current_offset >= settings.max_block_size) { probe_list->at(state->index).build_row_ptr = next_ptr; @@ -649,8 +655,8 @@ void NO_INLINE SemiJoinProbeHelper::probeFillColumnsPrefetch( } } - flushInsertBatch(wd, added_columns); - fillNullMapWithZero(added_columns); + flushInsertBatch(wd, added_columns); + fillNullMapWithZero(added_columns); ctx.current_row_idx = idx; ctx.prefetch_active_states = active_states; diff --git a/dbms/src/Operators/HashJoinV2ProbeTransformOp.cpp b/dbms/src/Operators/HashJoinV2ProbeTransformOp.cpp index efd8ce06bbc..6c279ce5788 100644 --- a/dbms/src/Operators/HashJoinV2ProbeTransformOp.cpp +++ b/dbms/src/Operators/HashJoinV2ProbeTransformOp.cpp @@ -50,37 +50,60 @@ void HashJoinV2ProbeTransformOp::operateSuffixImpl() OperatorStatus HashJoinV2ProbeTransformOp::onOutput(Block & block) { - assert(!probe_context.isAllFinished()); - block = join_ptr->probeBlock(probe_context, op_index); - size_t rows = block.rows(); - joined_rows += rows; - return OperatorStatus::HAS_OUTPUT; + while (true) + { + switch (status) + { + case ProbeStatus::PROBE: + if unlikely (probe_context.isAllFinished()) + { + join_ptr->finishOneProbe(op_index); + status + = join_ptr->needScanBuildSideAfterProbe() ? ProbeStatus::WAIT_PROBE_FINISH : ProbeStatus::FINISHED; + block = join_ptr->probeLastResultBlock(op_index); + if (block) + return OperatorStatus::HAS_OUTPUT; + break; + } + block = join_ptr->probeBlock(probe_context, op_index); + joined_rows += block.rows(); + return OperatorStatus::HAS_OUTPUT; + case ProbeStatus::WAIT_PROBE_FINISH: + if (join_ptr->isAllProbeFinished()) + { + status = ProbeStatus::SCAN_BUILD_SIDE; + break; + } + return OperatorStatus::WAIT_FOR_NOTIFY; + case ProbeStatus::SCAN_BUILD_SIDE: + block = join_ptr->scanBuildSideAfterProbe(op_index); + scan_hash_map_rows += block.rows(); + if unlikely (!block) + status = ProbeStatus::FINISHED; + return OperatorStatus::HAS_OUTPUT; + case ProbeStatus::FINISHED: + block = {}; + return OperatorStatus::HAS_OUTPUT; + } + } } OperatorStatus HashJoinV2ProbeTransformOp::transformImpl(Block & block) { + assert(status == ProbeStatus::PROBE); assert(probe_context.isAllFinished()); - if unlikely (!block) + if likely (block) { - join_ptr->finishOneProbe(op_index); - probe_context.input_is_finished = true; - block = join_ptr->probeLastResultBlock(op_index); - return OperatorStatus::HAS_OUTPUT; + if unlikely (block.rows() == 0) + return OperatorStatus::NEED_INPUT; + probe_context.resetBlock(block); } - if (block.rows() == 0) - return OperatorStatus::NEED_INPUT; - probe_context.resetBlock(block); return onOutput(block); } OperatorStatus HashJoinV2ProbeTransformOp::tryOutputImpl(Block & block) { - if unlikely (probe_context.input_is_finished) - { - block = {}; - return OperatorStatus::HAS_OUTPUT; - } - if (probe_context.isAllFinished()) + if (status == ProbeStatus::PROBE && probe_context.isAllFinished()) return OperatorStatus::NEED_INPUT; return onOutput(block); } diff --git a/dbms/src/Operators/HashJoinV2ProbeTransformOp.h b/dbms/src/Operators/HashJoinV2ProbeTransformOp.h index b78872a6ae6..6e19e6e0fc1 100644 --- a/dbms/src/Operators/HashJoinV2ProbeTransformOp.h +++ b/dbms/src/Operators/HashJoinV2ProbeTransformOp.h @@ -51,5 +51,14 @@ class HashJoinV2ProbeTransformOp : public TransformOp size_t joined_rows = 0; size_t scan_hash_map_rows = 0; + + enum class ProbeStatus + { + PROBE, + WAIT_PROBE_FINISH, + SCAN_BUILD_SIDE, + FINISHED, + }; + ProbeStatus status = ProbeStatus::PROBE; }; } // namespace DB