Skip to content

Commit d44a8e7

Browse files
committed
x
1 parent be7ccb0 commit d44a8e7

File tree

18 files changed

+775
-482
lines changed

18 files changed

+775
-482
lines changed

src/query/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#![allow(clippy::diverging_sub_expression)]
4242
#![allow(clippy::arc_with_non_send_sync)]
4343
#![feature(debug_closure_helpers)]
44+
#![feature(stmt_expr_attributes)]
4445

4546
extern crate core;
4647

src/query/service/src/physical_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub use physical_exchange::Exchange;
8282
pub use physical_exchange_sink::ExchangeSink;
8383
pub use physical_exchange_source::ExchangeSource;
8484
pub use physical_filter::Filter;
85-
pub use physical_hash_join::HashJoin;
85+
pub use physical_hash_join::*;
8686
pub use physical_limit::Limit;
8787
pub use physical_materialized_cte::*;
8888
pub use physical_multi_table_insert::*;

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 78 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ use databend_common_pipeline::core::Pipe;
3535
use databend_common_pipeline::core::PipeItem;
3636
use databend_common_pipeline::core::ProcessorPtr;
3737
use databend_common_sql::optimizer::ir::SExpr;
38+
use databend_common_sql::plans::FunctionCall;
3839
use databend_common_sql::plans::Join;
40+
use databend_common_sql::plans::JoinEquiCondition;
3941
use databend_common_sql::plans::JoinType;
4042
use databend_common_sql::ColumnEntry;
4143
use databend_common_sql::ColumnSet;
@@ -52,6 +54,7 @@ use crate::physical_plans::format::PhysicalFormat;
5254
use crate::physical_plans::physical_plan::IPhysicalPlan;
5355
use crate::physical_plans::physical_plan::PhysicalPlan;
5456
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
57+
use crate::physical_plans::resolve_scalar;
5558
use crate::physical_plans::runtime_filter::build_runtime_filter;
5659
use crate::physical_plans::Exchange;
5760
use crate::physical_plans::PhysicalPlanBuilder;
@@ -99,6 +102,12 @@ type MergedFieldsResult = (
99102
Vec<(usize, (bool, bool))>,
100103
);
101104

105+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
106+
pub struct NestedLoopFilterInfo {
107+
pub predicates: Vec<RemoteExpr>,
108+
pub projection: Vec<usize>,
109+
}
110+
102111
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
103112
pub struct HashJoin {
104113
pub meta: PhysicalPlanMeta,
@@ -140,6 +149,7 @@ pub struct HashJoin {
140149

141150
pub runtime_filter: PhysicalRuntimeFilters,
142151
pub broadcast_id: Option<u32>,
152+
pub nested_loop_filter: NestedLoopFilterInfo,
143153
}
144154

145155
#[typetag::serde]
@@ -261,6 +271,7 @@ impl IPhysicalPlan for HashJoin {
261271
build_side_cache_info: self.build_side_cache_info.clone(),
262272
runtime_filter: self.runtime_filter.clone(),
263273
broadcast_id: self.broadcast_id,
274+
nested_loop_filter: self.nested_loop_filter.clone(),
264275
})
265276
}
266277

@@ -1184,80 +1195,23 @@ impl PhysicalPlanBuilder {
11841195
.collect::<Result<_>>()
11851196
}
11861197

1187-
/// Creates a HashJoin physical plan
1188-
///
1189-
/// # Arguments
1190-
/// * `join` - Join operation
1191-
/// * `probe_side` - Probe side physical plan
1192-
/// * `build_side` - Build side physical plan
1193-
/// * `is_broadcast` - Whether this is a broadcast join
1194-
/// * `projections` - Column projections
1195-
/// * `probe_projections` - Probe side projections
1196-
/// * `build_projections` - Build side projections
1197-
/// * `left_join_conditions` - Left join conditions
1198-
/// * `right_join_conditions` - Right join conditions
1199-
/// * `is_null_equal` - Null equality flags
1200-
/// * `non_equi_conditions` - Non-equi conditions
1201-
/// * `probe_to_build` - Probe to build mapping
1202-
/// * `output_schema` - Output schema
1203-
/// * `build_side_cache_info` - Build side cache info
1204-
/// * `runtime_filter` - Runtime filter
1205-
/// * `stat_info` - Statistics info
1206-
///
1207-
/// # Returns
1208-
/// * `Result<PhysicalPlan>` - The HashJoin physical plan
1209-
#[allow(clippy::too_many_arguments)]
1210-
fn create_hash_join(
1198+
fn build_nested_loop_filter_info(
12111199
&self,
1212-
s_expr: &SExpr,
12131200
join: &Join,
1214-
probe_side: PhysicalPlan,
1215-
build_side: PhysicalPlan,
1216-
projections: ColumnSet,
1217-
probe_projections: ColumnSet,
1218-
build_projections: ColumnSet,
1219-
left_join_conditions: Vec<RemoteExpr>,
1220-
right_join_conditions: Vec<RemoteExpr>,
1221-
is_null_equal: Vec<bool>,
1222-
non_equi_conditions: Vec<RemoteExpr>,
1223-
probe_to_build: Vec<(usize, (bool, bool))>,
1224-
output_schema: DataSchemaRef,
1225-
build_side_cache_info: Option<(usize, HashMap<IndexType, usize>)>,
1226-
runtime_filter: PhysicalRuntimeFilters,
1227-
stat_info: PlanStatsInfo,
1228-
) -> Result<PhysicalPlan> {
1229-
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
1230-
let broadcast_id = if build_side_data_distribution
1231-
.as_ref()
1232-
.is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_)))
1233-
{
1234-
Some(self.ctx.get_next_broadcast_id())
1235-
} else {
1236-
None
1237-
};
1238-
Ok(PhysicalPlan::new(HashJoin {
1239-
projections,
1240-
build_projections,
1241-
probe_projections,
1242-
build: build_side,
1243-
probe: probe_side,
1244-
join_type: join.join_type,
1245-
build_keys: right_join_conditions,
1246-
probe_keys: left_join_conditions,
1247-
is_null_equal,
1248-
non_equi_conditions,
1249-
marker_index: join.marker_index,
1250-
meta: PhysicalPlanMeta::new("HashJoin"),
1251-
from_correlated_subquery: join.from_correlated_subquery,
1252-
probe_to_build,
1253-
output_schema,
1254-
need_hold_hash_table: join.need_hold_hash_table,
1255-
stat_info: Some(stat_info),
1256-
single_to_inner: join.single_to_inner,
1257-
build_side_cache_info,
1258-
runtime_filter,
1259-
broadcast_id,
1260-
}))
1201+
merged_schema: &DataSchemaRef,
1202+
) -> Result<NestedLoopFilterInfo> {
1203+
let predicates = join
1204+
.non_equi_conditions
1205+
.iter()
1206+
.map(|c| Ok(c.clone()))
1207+
.chain(join.equi_conditions.iter().map(condition_to_expr))
1208+
.map(|scalar| resolve_scalar(&scalar?, merged_schema))
1209+
.collect::<Result<_>>()?;
1210+
1211+
Ok(NestedLoopFilterInfo {
1212+
predicates,
1213+
projection: vec![],
1214+
})
12611215
}
12621216

12631217
pub async fn build_hash_join(
@@ -1332,6 +1286,8 @@ impl PhysicalPlanBuilder {
13321286
// Step 10: Process non-equi conditions
13331287
let non_equi_conditions = self.process_non_equi_conditions(join, &merged_schema)?;
13341288

1289+
let nested_loop_filter = self.build_nested_loop_filter_info(join, &merged_schema)?;
1290+
13351291
// Step 11: Build runtime filter
13361292
let runtime_filter = build_runtime_filter(
13371293
self.ctx.clone(),
@@ -1345,23 +1301,63 @@ impl PhysicalPlanBuilder {
13451301
.await?;
13461302

13471303
// Step 12: Create and return the HashJoin
1348-
self.create_hash_join(
1349-
s_expr,
1350-
join,
1351-
probe_side,
1352-
build_side,
1304+
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
1305+
let broadcast_id = if build_side_data_distribution
1306+
.as_ref()
1307+
.is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_)))
1308+
{
1309+
Some(self.ctx.get_next_broadcast_id())
1310+
} else {
1311+
None
1312+
};
1313+
Ok(PhysicalPlan::new(HashJoin {
13531314
projections,
1354-
probe_projections,
13551315
build_projections,
1356-
left_join_conditions,
1357-
right_join_conditions,
1316+
probe_projections,
1317+
build: build_side,
1318+
probe: probe_side,
1319+
join_type: join.join_type,
1320+
build_keys: right_join_conditions,
1321+
probe_keys: left_join_conditions,
13581322
is_null_equal,
13591323
non_equi_conditions,
1324+
marker_index: join.marker_index,
1325+
meta: PhysicalPlanMeta::new("HashJoin"),
1326+
from_correlated_subquery: join.from_correlated_subquery,
13601327
probe_to_build,
13611328
output_schema,
1329+
need_hold_hash_table: join.need_hold_hash_table,
1330+
stat_info: Some(stat_info),
1331+
single_to_inner: join.single_to_inner,
13621332
build_side_cache_info,
13631333
runtime_filter,
1364-
stat_info,
1365-
)
1334+
broadcast_id,
1335+
nested_loop_filter,
1336+
}))
1337+
}
1338+
}
1339+
1340+
fn condition_to_expr(condition: &JoinEquiCondition) -> Result<ScalarExpr> {
1341+
let left_type = condition.left.data_type()?;
1342+
let right_type = condition.right.data_type()?;
1343+
1344+
let arguments = match (&left_type, &right_type) {
1345+
(DataType::Nullable(left), right) if **left == *right => vec![
1346+
condition.left.clone(),
1347+
condition.right.clone().unify_to_data_type(&left_type),
1348+
],
1349+
(left, DataType::Nullable(right)) if *left == **right => vec![
1350+
condition.left.clone().unify_to_data_type(&right_type),
1351+
condition.right.clone(),
1352+
],
1353+
_ => vec![condition.left.clone(), condition.right.clone()],
1354+
};
1355+
1356+
Ok(FunctionCall {
1357+
span: condition.left.span(),
1358+
func_name: "eq".to_string(),
1359+
params: vec![],
1360+
arguments,
13661361
}
1362+
.into())
13671363
}

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ enum PhysicalJoinType {
4343
}
4444

4545
// Choose physical join type by join conditions
46-
fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<PhysicalJoinType> {
46+
fn physical_join(join: &Join, s_expr: &SExpr, _settings: &Settings) -> Result<PhysicalJoinType> {
4747
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
4848
return Err(ErrorCode::SemanticError(
4949
"ANY JOIN only supports equality-based hash joins",
@@ -53,7 +53,7 @@ fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<Phy
5353
let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child());
5454
let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child());
5555
let right_stat_info = right_rel_expr.derive_cardinality()?;
56-
let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()?;
56+
let nested_loop_join_threshold = 0;
5757
if matches!(join.join_type, JoinType::Inner | JoinType::Cross)
5858
&& (right_stat_info
5959
.statistics

src/query/service/src/physical_plans/physical_nested_loop_join.rs

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::sync::Arc;
1716

1817
use databend_common_exception::Result;
1918
use databend_common_expression::DataSchemaRef;
2019
use databend_common_expression::DataSchemaRefExt;
2120
use databend_common_expression::RemoteExpr;
2221
use databend_common_functions::BUILTIN_FUNCTIONS;
22+
use databend_common_pipeline::core::InputPort;
23+
use databend_common_pipeline::core::OutputPort;
24+
use databend_common_pipeline::core::Pipe;
25+
use databend_common_pipeline::core::PipeItem;
2326
use databend_common_pipeline::core::ProcessorPtr;
24-
use databend_common_pipeline::sinks::Sinker;
2527
use databend_common_sql::executor::cast_expr_to_non_null_boolean;
2628
use databend_common_sql::optimizer::ir::SExpr;
2729
use databend_common_sql::plans::JoinType;
@@ -38,8 +40,7 @@ use super::PhysicalPlan;
3840
use super::PhysicalPlanBuilder;
3941
use super::PhysicalPlanMeta;
4042
use crate::pipelines::processors::transforms::LoopJoinState;
41-
use crate::pipelines::processors::transforms::TransformLoopJoinLeft;
42-
use crate::pipelines::processors::transforms::TransformLoopJoinRight;
43+
use crate::pipelines::processors::transforms::TransformLoopJoin;
4344
use crate::pipelines::PipelineBuilder;
4445

4546
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -110,25 +111,56 @@ impl IPhysicalPlan for NestedLoopJoin {
110111
}
111112

112113
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
113-
let state = Arc::new(LoopJoinState::new(builder.ctx.clone(), self));
114-
self.build_right(state.clone(), builder)?;
115-
self.build_left(state, builder)
116-
}
117-
}
114+
// Build right side (build side)
115+
let right_side_builder = builder.create_sub_pipeline_builder();
116+
let mut right_res = right_side_builder.finalize(&self.right)?;
117+
let mut build_sinks = right_res.main_pipeline.take_sinks();
118+
builder
119+
.pipelines
120+
.push(right_res.main_pipeline.finalize(None));
121+
builder.pipelines.extend(right_res.sources_pipelines);
118122

119-
impl NestedLoopJoin {
120-
fn build_left(&self, state: Arc<LoopJoinState>, builder: &mut PipelineBuilder) -> Result<()> {
123+
// Build left side (probe side)
121124
self.left.build_pipeline(builder)?;
122125

123-
let max_threads = builder.settings.get_max_threads()? as usize;
124-
builder.main_pipeline.try_resize(max_threads)?;
125-
builder.main_pipeline.add_transform(|input, output| {
126-
Ok(ProcessorPtr::create(TransformLoopJoinLeft::create(
127-
input,
128-
output,
129-
state.clone(),
130-
)))
131-
})?;
126+
let output_len = std::cmp::max(build_sinks.len(), builder.main_pipeline.output_len());
127+
builder.main_pipeline.resize(output_len, false)?;
128+
let probe_sinks = builder.main_pipeline.take_sinks();
129+
130+
if output_len != build_sinks.len() {
131+
builder.main_pipeline.extend_sinks(build_sinks);
132+
builder.main_pipeline.resize(output_len, false)?;
133+
build_sinks = builder.main_pipeline.take_sinks();
134+
}
135+
136+
debug_assert_eq!(build_sinks.len(), probe_sinks.len());
137+
138+
let join_pipe_items = build_sinks
139+
.into_iter()
140+
.zip(probe_sinks)
141+
.map(|(build_sink, probe_sink)| {
142+
builder.main_pipeline.extend_sinks([build_sink, probe_sink]);
143+
144+
let build_input = InputPort::create();
145+
let probe_input = InputPort::create();
146+
let joined_output = OutputPort::create();
147+
148+
let join_state = LoopJoinState::new(builder.ctx.clone(), self);
149+
let loop_join = ProcessorPtr::create(TransformLoopJoin::create(
150+
build_input.clone(),
151+
probe_input.clone(),
152+
joined_output.clone(),
153+
Box::new(join_state),
154+
));
155+
156+
PipeItem::create(loop_join, vec![build_input, probe_input], vec![
157+
joined_output,
158+
])
159+
})
160+
.collect();
161+
162+
let join_pipe = Pipe::create(output_len * 2, output_len, join_pipe_items);
163+
builder.main_pipeline.add_pipe(join_pipe);
132164

133165
match self.conditions.len() {
134166
0 => Ok(()),
@@ -148,24 +180,6 @@ impl NestedLoopJoin {
148180
}
149181
}
150182
}
151-
152-
fn build_right(&self, state: Arc<LoopJoinState>, builder: &mut PipelineBuilder) -> Result<()> {
153-
let right_side_builder = builder.create_sub_pipeline_builder();
154-
155-
let mut right_res = right_side_builder.finalize(&self.right)?;
156-
right_res.main_pipeline.add_sink(|input| {
157-
Ok(ProcessorPtr::create(Sinker::create(
158-
input,
159-
TransformLoopJoinRight::create(state.clone())?,
160-
)))
161-
})?;
162-
163-
builder
164-
.pipelines
165-
.push(right_res.main_pipeline.finalize(None));
166-
builder.pipelines.extend(right_res.sources_pipelines);
167-
Ok(())
168-
}
169183
}
170184

171185
impl PhysicalPlanBuilder {

0 commit comments

Comments
 (0)