Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
4adb4cb
support late materialization for other condition
gengliqi Mar 6, 2025
cff7ccc
u
gengliqi Mar 22, 2025
02b3377
merge master
gengliqi Mar 22, 2025
733aa1e
u
gengliqi Mar 22, 2025
3d9507b
update tests
gengliqi Mar 24, 2025
44a76d1
update left-outer with other condition tests
gengliqi Mar 24, 2025
e4cbb15
u
gengliqi Mar 25, 2025
147136e
save
gengliqi Mar 25, 2025
823b875
u
gengliqi Mar 25, 2025
41bb1bc
u
gengliqi Mar 25, 2025
10b7e51
u
gengliqi Mar 25, 2025
5af7518
add deserializeAndAdvancePos interface
gengliqi Mar 26, 2025
4d53325
u
gengliqi Mar 26, 2025
61c5dae
Merge branch 'master' into refine-deserialize
gengliqi Mar 26, 2025
d84daeb
merge refine-deserialize
gengliqi Mar 26, 2025
772f6c2
fix
gengliqi Mar 26, 2025
a3ff6cc
Merge remote-tracking branch 'upstream/master' into refine-deserialize
gengliqi Mar 31, 2025
d029566
merge refine-deserialize
gengliqi Mar 31, 2025
8df3f0b
Merge remote-tracking branch 'upstream/master' into join-v2-late-mate…
gengliqi Apr 1, 2025
b4fbbe4
merge master
gengliqi Apr 4, 2025
933ed11
update tests
gengliqi Apr 4, 2025
b535cd7
update tests
gengliqi Apr 4, 2025
1945bf0
address comments
gengliqi Apr 7, 2025
270363f
address comments
gengliqi Apr 8, 2025
f0c0240
Merge remote-tracking branch 'upstream/master' into join-v2-late-mate…
gengliqi Apr 9, 2025
b70e60b
add unit tests for IColumn::serializeByteSize
gengliqi Apr 9, 2025
b1738d5
u
gengliqi Apr 9, 2025
43b8e0a
u
gengliqi Apr 10, 2025
62ca1fb
support (left outer) (anti) semi join with no other condition
gengliqi Apr 10, 2025
e959d02
u
gengliqi Apr 10, 2025
7364415
remove enable_pipeline for ExecutorTest::executeExecutor
gengliqi Apr 11, 2025
32fb643
refine some code
gengliqi Apr 11, 2025
1568c1a
u
gengliqi Apr 11, 2025
106634c
use safeGetByPosition in some places
gengliqi Apr 14, 2025
36e565b
tiny refine
gengliqi Apr 14, 2025
9227a0b
u
gengliqi Apr 14, 2025
a3ed1e2
u
gengliqi Apr 14, 2025
c8f79d1
Merge branch 'master' into join-v2-late-materialization
gengliqi Apr 14, 2025
994e8a6
u
gengliqi Apr 15, 2025
03f47ea
Merge remote-tracking branch 'upstream/master' into join-v2-late-mate…
gengliqi Apr 21, 2025
9ee3b81
use < 0 instead of == -1 for output_index
gengliqi Apr 21, 2025
f75eee8
add comments
gengliqi Apr 21, 2025
66dd0d7
address comments
gengliqi Apr 21, 2025
c6966d3
add comments
gengliqi Apr 21, 2025
338e3a0
u
gengliqi Apr 22, 2025
70c0e71
u
gengliqi Apr 22, 2025
f4d4137
rename
gengliqi Apr 22, 2025
80800b7
rename
gengliqi Apr 22, 2025
302fab5
u
gengliqi Apr 22, 2025
d7836d5
u
gengliqi Apr 22, 2025
1450e9e
add SemiJoinProbe
gengliqi Apr 22, 2025
6024b72
u
gengliqi Apr 23, 2025
2627cd5
tiny refine
gengliqi Apr 23, 2025
ff2e01e
u
gengliqi Apr 23, 2025
1fe45ce
merge join-v2-late-materialization & tiny update
gengliqi Apr 23, 2025
8f681c1
u
gengliqi Apr 23, 2025
23644d8
Merge remote-tracking branch 'origin/join-v2-late-materialization' in…
gengliqi Apr 23, 2025
97b2b70
fix
gengliqi Apr 23, 2025
e1c050f
u
gengliqi Apr 23, 2025
617e1b1
merge
gengliqi Apr 23, 2025
63688f9
u
gengliqi Apr 24, 2025
0e59dfc
merge
gengliqi Apr 24, 2025
9b1421c
fix
gengliqi Apr 24, 2025
44c0ce0
u
gengliqi Apr 24, 2025
b36e23e
merge
gengliqi Apr 24, 2025
fbb4e3f
u
gengliqi Apr 24, 2025
801d99c
Merge remote-tracking branch 'origin/join-v2-late-materialization' in…
gengliqi Apr 24, 2025
d1499bd
format
gengliqi Apr 24, 2025
418f7c4
u
gengliqi Apr 24, 2025
4ba216d
u
gengliqi Apr 28, 2025
68c3b57
u
gengliqi Apr 29, 2025
1bdeace
u
gengliqi Apr 29, 2025
537c123
u
gengliqi Apr 30, 2025
dd986a5
fix
gengliqi Apr 30, 2025
fcbb400
fix bug
gengliqi Apr 30, 2025
55f66a1
u
gengliqi May 1, 2025
a35c741
u
gengliqi May 6, 2025
ee4a8aa
Merge remote-tracking branch 'origin/join-v2-late-materialization' in…
gengliqi May 6, 2025
4ce4f23
merge master
gengliqi May 7, 2025
7c29983
fix
gengliqi May 7, 2025
efbf062
u
gengliqi May 7, 2025
7a808f8
fix
gengliqi May 7, 2025
74fb056
add test for SemiJoinProbeList
gengliqi May 15, 2025
3eebd15
format
gengliqi May 15, 2025
67d1b16
support right outer/semi/anti join
gengliqi Jun 6, 2025
d44c186
u
gengliqi Jun 6, 2025
f4b87a8
u
gengliqi Jun 6, 2025
12dfd49
u
gengliqi Jun 6, 2025
81eeb90
u
gengliqi Jun 6, 2025
af9c1e0
u
gengliqi Jun 7, 2025
f960707
fix
gengliqi Jun 7, 2025
750a788
consider a key is required only if it's needed for other condition in…
gengliqi Jun 9, 2025
36401b7
u
gengliqi Jun 12, 2025
2d41fc0
fix bugs
gengliqi Jun 23, 2025
2f8380e
u
gengliqi Jun 23, 2025
6b80fa5
Merge remote-tracking branch 'upstream/master' into join-v2-semi-join
gengliqi Jul 24, 2025
0d1e83d
fix tests
gengliqi Jul 30, 2025
acb2130
Merge branch 'join-v2-semi-join' into join-v2-right-semi
gengliqi Aug 1, 2025
57a6023
merge master
gengliqi Aug 1, 2025
8e75569
address comments
gengliqi Aug 7, 2025
c13d2d3
u
gengliqi Aug 7, 2025
3ee90e9
format
gengliqi Aug 7, 2025
89f6b37
optimize
gengliqi Aug 7, 2025
a76a9ca
Merge remote-tracking branch 'upstream/master' into join-v2-right-semi
gengliqi Aug 7, 2025
68d3e81
address comments
gengliqi Aug 8, 2025
dba86d6
Merge remote-tracking branch 'upstream/master' into join-v2-right-semi
gengliqi Nov 20, 2025
7dc1e62
address comments
gengliqi Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ check_then_add_sources_compile_flag (
src/Interpreters/JoinV2/HashJoinBuild.cpp
src/Interpreters/JoinV2/HashJoinProbe.cpp
src/Interpreters/JoinV2/SemiJoinProbe.cpp
src/Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.cpp
src/IO/Compression/EncodingUtil.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ bool PhysicalJoinV2::isSupported(const tipb::Join & join)
case Anti:
case LeftOuterSemi:
case LeftOuterAnti:
case RightOuter:
case RightSemi:
case RightAnti:
if (!tiflash_join.getBuildJoinKeys().empty())
return true;
break;
//case RightOuter:
//case RightSemi:
//case RightAnti:
default:
}
return false;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoinV2Probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void PhysicalJoinV2Probe::buildPipelineExecGroupImpl(
builder.appendTransformOp(
std::make_unique<HashJoinV2ProbeTransformOp>(exec_context, log->identifier(), join_ptr, probe_index++));
});
exec_context.addOneTimeFuture(join_ptr->getWaitProbeFinishFuture());
join_ptr.reset();
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,8 @@ try
Field(static_cast<UInt64>(threshold)));
ASSERT_COLUMNS_EQ_UR(ref, executeStreams(request, original_max_streams))
<< "left_table_name = " << left_table_name << ", right_table_name = " << right_table_name;
if (cfg.enable_join_v2)
break;
}
WRAP_FOR_JOIN_TEST_END
}
Expand Down Expand Up @@ -3483,6 +3485,8 @@ try
<< "left_table_name = " << left_table_name
<< ", right_exchange_receiver_concurrency = " << exchange_concurrency
<< ", join_probe_cache_columns_threshold = " << threshold;
if (cfg.enable_join_v2)
break;
}
WRAP_FOR_JOIN_TEST_END
}
Expand Down Expand Up @@ -4795,13 +4799,15 @@ try
auto request = context.scan("right_semi_family", "t")
.join(context.scan("right_semi_family", "s"), type, {col("a")}, {}, {}, {}, {}, 0, false, 0)
.build(context);
WRAP_FOR_JOIN_TEST_BEGIN
executeAndAssertColumnsEqual(request, res);
auto request_column_prune
= context.scan("right_semi_family", "t")
.join(context.scan("right_semi_family", "s"), type, {col("a")}, {}, {}, {}, {}, 0, false, 0)
.aggregation({Count(lit(static_cast<UInt64>(1)))}, {})
.build(context);
ASSERT_COLUMNS_EQ_UR(genScalarCountResults(res), executeStreams(request_column_prune, 2));
WRAP_FOR_JOIN_TEST_END
}

/// One join key(t.a = s.a) + other condition(t.c < s.c).
Expand Down
98 changes: 69 additions & 29 deletions dbms/src/Interpreters/JoinV2/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ HashJoin::HashJoin(
, log(Logger::get(join_req_id))
, has_other_condition(non_equal_conditions.other_cond_expr != nullptr)
, output_columns(output_columns_)
, wait_probe_finished_future(std::make_shared<OneTimeNotifyFuture>(NotifyType::WAIT_ON_JOIN_PROBE_FINISH))
{
RUNTIME_ASSERT(key_names_left.size() == key_names_right.size());
output_block = Block(output_columns);
Expand Down Expand Up @@ -198,19 +199,27 @@ void HashJoin::initRowLayoutAndHashJoinMethod()
/// Move all raw required join key column to the end of the join key.
Names new_key_names_left, new_key_names_right;
BoolVec raw_required_key_flag(keys_size);
bool is_right_semi_join = isRightSemiFamily(kind);
auto check_key_required = [&](const String & name) -> bool {
// If it's right semi/anti join, the key is required if and only if it's needed for other condition
if (is_right_semi_join)
return required_columns_names_set_for_other_condition.contains(name);
else
return right_sample_block_pruned.has(name);
};
for (size_t i = 0; i < keys_size; ++i)
{
bool is_raw_required = false;
if (key_columns[i].column_ptr->valuesHaveFixedSize())
{
if (right_sample_block_pruned.has(key_names_right[i]))
if (check_key_required(key_names_right[i]))
is_raw_required = true;
}
else
{
if (canAsColumnString(key_columns[i].column_ptr)
&& getStringCollatorKind(collators) == StringCollatorKind::StringBinary
&& right_sample_block_pruned.has(key_names_right[i]))
&& check_key_required(key_names_right[i]))
{
is_raw_required = true;
}
Expand Down Expand Up @@ -261,6 +270,7 @@ void HashJoin::initRowLayoutAndHashJoinMethod()
if (c.column->valuesHaveFixedSize())
{
row_layout.other_column_fixed_size += c.column->sizeOfValueIfFixed();
row_layout.other_column_for_other_condition_fixed_size += c.column->sizeOfValueIfFixed();
row_layout.other_column_indexes.push_back({i, true});
}
else
Expand Down Expand Up @@ -319,7 +329,7 @@ void HashJoin::initBuild(const Block & sample_block, size_t build_concurrency_)
build_workers_data.resize(build_concurrency);
for (size_t i = 0; i < build_concurrency; ++i)
build_workers_data[i].key_getter = createHashJoinKeyGetter(method, collators);
for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT + 1; ++i)
for (size_t i = 0; i < JOIN_BUILD_PARTITION_COUNT; ++i)
multi_row_containers.emplace_back(std::make_unique<MultipleRowContainer>());

build_initialized = true;
Expand Down Expand Up @@ -415,18 +425,28 @@ void HashJoin::initProbe(const Block & sample_block, size_t probe_concurrency_)
active_probe_worker = probe_concurrency;
probe_workers_data.resize(probe_concurrency);

if (needScanBuildSideAfterProbe())
join_build_scanner_after_probe = std::make_unique<JoinBuildScannerAfterProbe>(this);

probe_initialized = true;
}

bool HashJoin::finishOneBuildRow(size_t stream_index)
{
auto & wd = build_workers_data[stream_index];
if (wd.non_joined_block.rows() > 0)
{
non_joined_blocks.insertNonFullBlock(std::move(wd.non_joined_block));
wd.non_joined_block = {};
}
LOG_DEBUG(
log,
"{} insert block to row containers cost {}ms, row count {}, padding size {}({:.2f}% of all size {})",
"{} insert block to row containers cost {}ms, row count {}, non-joined count {}, padding size {}({:.2f}% of "
"all size {})",
stream_index,
wd.build_time,
wd.row_count,
wd.non_joined_row_count,
wd.padding_size,
100.0 * wd.padding_size / wd.all_size,
wd.all_size);
Expand Down Expand Up @@ -455,11 +475,22 @@ bool HashJoin::finishOneProbe(size_t stream_index)
if (active_probe_worker.fetch_sub(1) == 1)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_probe);
wait_probe_finished_future->finish();
return true;
}
return false;
}

bool HashJoin::isAllProbeFinished()
{
if (active_probe_worker > 0)
{
setNotifyFuture(wait_probe_finished_future.get());
return false;
}
return true;
}

void HashJoin::workAfterBuildRowFinish()
{
size_t all_build_row_count = 0;
Expand All @@ -478,13 +509,15 @@ void HashJoin::workAfterBuildRowFinish()
enable_tagged_pointer,
false);

/// Conservative threshold: trigger late materialization when lm_row_size average >= 16 bytes.
constexpr size_t trigger_lm_row_size_threshold = 16;
bool late_materialization = false;
size_t avg_lm_row_size = 0;
if (has_other_condition
&& row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size())
if (shouldCheckLateMaterialization())
{
// Calculate the average row size of late materialization rows.
// If the average row size is greater than or equal to the threshold, enable late materialization.
// Otherwise, disable it.
// Note: this is a conservative threshold, enable late materialization when lm_row_size average >= 16 bytes.
constexpr size_t trigger_lm_row_size_threshold = 16;
size_t total_lm_row_size = 0;
size_t total_lm_row_count = 0;
for (size_t i = 0; i < build_concurrency; ++i)
Expand Down Expand Up @@ -556,41 +589,35 @@ void HashJoin::buildRowFromBlock(const Block & b, size_t stream_index)

assertBlocksHaveEqualStructure(block, right_sample_block_pruned, "Join Build");

bool check_lm_row_size = has_other_condition
&& row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size();
insertBlockToRowContainers(
method,
needRecordNotInsertRows(kind),
JoinBuildHelper::insertBlockToRowContainers(
this,
block,
rows,
key_columns,
null_map,
row_layout,
multi_row_containers,
build_workers_data[stream_index],
check_lm_row_size);
shouldCheckLateMaterialization());

build_workers_data[stream_index].build_time += watch.elapsedMilliseconds();
}

bool HashJoin::buildPointerTable(size_t stream_index)
{
bool is_end;
size_t max_build_size = 2 * settings.max_block_size;
switch (method)
{
#define M(METHOD) \
case HashJoinKeyMethod::METHOD: \
using KeyGetterType##METHOD = HashJoinKeyGetterForType<HashJoinKeyMethod::METHOD>; \
if constexpr (KeyGetterType##METHOD::Type::joinKeyCompareHashFirst()) \
is_end = pointer_table.build<KeyGetterType##METHOD::HashValueType>( \
build_workers_data[stream_index], \
multi_row_containers, \
settings.max_block_size); \
else \
is_end = pointer_table.build<void>( \
build_workers_data[stream_index], \
multi_row_containers, \
settings.max_block_size); \
#define M(METHOD) \
case HashJoinKeyMethod::METHOD: \
using KeyGetterType##METHOD = HashJoinKeyGetterForType<HashJoinKeyMethod::METHOD>; \
if constexpr (KeyGetterType##METHOD::Type::joinKeyCompareHashFirst()) \
is_end = pointer_table.build<KeyGetterType##METHOD::HashValueType>( \
build_workers_data[stream_index], \
multi_row_containers, \
max_build_size); \
else \
is_end \
= pointer_table.build<void>(build_workers_data[stream_index], multi_row_containers, max_build_size); \
break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
Expand Down Expand Up @@ -665,6 +692,19 @@ Block HashJoin::probeLastResultBlock(size_t stream_index)
return {};
}

bool HashJoin::needScanBuildSideAfterProbe() const
{
return isRightOuterJoin(kind) || isRightSemiFamily(kind);
}

Block HashJoin::scanBuildSideAfterProbe(size_t stream_index)
{
auto & wd = probe_workers_data[stream_index];
Stopwatch all_watch;
SCOPE_EXIT({ probe_workers_data[stream_index].scan_build_side_time += all_watch.elapsedFromLastTime(); });
return join_build_scanner_after_probe->scan(wd);
}

void HashJoin::removeUselessColumn(Block & block) const
{
const NameSet & probe_output_name_set = has_other_condition
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Interpreters/JoinV2/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
#include <Core/Block.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Pipeline/Schedule/Tasks/OneTimeNotifyFuture.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/JoinUtils.h>
#include <Interpreters/JoinV2/HashJoinBuild.h>
#include <Interpreters/JoinV2/HashJoinBuildScannerAfterProbe.h>
#include <Interpreters/JoinV2/HashJoinKey.h>
#include <Interpreters/JoinV2/HashJoinPointerTable.h>
#include <Interpreters/JoinV2/HashJoinProbe.h>
Expand Down Expand Up @@ -54,13 +57,18 @@ class HashJoin
bool finishOneBuildRow(size_t stream_index);
/// Return true if it is the last probe worker.
bool finishOneProbe(size_t stream_index);
/// Return true if all probe work has finished.
bool isAllProbeFinished();

void buildRowFromBlock(const Block & block, size_t stream_index);
bool buildPointerTable(size_t stream_index);

Block probeBlock(JoinProbeContext & ctx, size_t stream_index);
Block probeLastResultBlock(size_t stream_index);

bool needScanBuildSideAfterProbe() const;
Block scanBuildSideAfterProbe(size_t stream_index);

void removeUselessColumn(Block & block) const;
/// Block's schema must be all_sample_block_pruned.
Block removeUselessColumnForOutput(const Block & block) const;
Expand All @@ -76,14 +84,25 @@ class HashJoin

const JoinProfileInfoPtr & getProfileInfo() const { return profile_info; }

const OneTimeNotifyFuturePtr & getWaitProbeFinishFuture() { return wait_probe_finished_future; }

private:
void initRowLayoutAndHashJoinMethod();

void workAfterBuildRowFinish();

bool shouldCheckLateMaterialization() const
{
bool is_any_semi_join = isSemiFamily(kind) || isLeftOuterSemiFamily(kind) || isRightSemiFamily(kind);
return has_other_condition && !is_any_semi_join
&& row_layout.other_column_count_for_other_condition < row_layout.other_column_indexes.size();
}

private:
friend JoinBuildHelper;
friend JoinProbeHelper;
friend SemiJoinProbeHelper;
friend JoinBuildScannerAfterProbe;

static const DataTypePtr match_helper_type;

Expand Down Expand Up @@ -140,6 +159,8 @@ class HashJoin

/// Row containers
std::vector<std::unique_ptr<MultipleRowContainer>> multi_row_containers;
/// Non-joined blocks
NonJoinedBlocks non_joined_blocks;

/// Build row phase
size_t build_concurrency = 0;
Expand All @@ -152,8 +173,11 @@ class HashJoin
size_t probe_concurrency = 0;
std::vector<JoinProbeWorkerData> probe_workers_data;
std::atomic<size_t> active_probe_worker = 0;
OneTimeNotifyFuturePtr wait_probe_finished_future;
std::unique_ptr<JoinProbeHelper> join_probe_helper;
std::unique_ptr<SemiJoinProbeHelper> semi_join_probe_helper;
/// Probe scan build side
std::unique_ptr<JoinBuildScannerAfterProbe> join_build_scanner_after_probe;

const JoinProfileInfoPtr profile_info = std::make_shared<JoinProfileInfo>();

Expand Down
Loading