Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3875,10 +3875,27 @@ impl WrappedSelectNode {
}
};

// Merge remapping from join subquery before rendering join condition:
// condition can reference columns from both sides of the join,
// and those columns can be remapped in respective inputs
if let Some(join_column_remapping) = join_column_remapping {
if let Some(column_remapping) = column_remapping.as_mut() {
column_remapping.extend(join_column_remapping);
} else {
column_remapping = Some(join_column_remapping);
}
};

let join_condition = if let Some(column_remapping) = &column_remapping {
column_remapping.remap(join_condition)?
} else {
join_condition.clone()
};

let (join_condition_sql, join_sql) = Self::generate_sql_for_expr(
join_sql,
generator.clone(),
join_condition.clone(),
join_condition,
None,
&HashMap::new(),
)?;
Expand All @@ -3889,13 +3906,6 @@ impl WrappedSelectNode {
let mapping = sql.add_values(new_values);
let join_sql_str = SqlQuery::remap_placeholders(&join_sql_str, &mapping)?;
let join_condition_sql = SqlQuery::remap_placeholders(&join_condition_sql, &mapping)?;
if let Some(join_column_remapping) = join_column_remapping {
if let Some(column_remapping) = column_remapping.as_mut() {
column_remapping.extend(join_column_remapping);
} else {
column_remapping = Some(join_column_remapping);
}
};

let aliased_join_sql = generator
.get_sql_templates()
Expand Down
8 changes: 7 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,10 +1321,16 @@ impl LanguageToLogicalPlanConverter {
let input = Arc::new(self.to_logical_plan(params[0])?);
let window_expr =
match_expr_list_node!(node_by_id, to_expr, params[1], WindowWindowExpr);
// Don't realias replaced columns inside window expressions: an alias
// inside e.g. ORDER BY would change the name of the whole window
// expression, while plans above reference it by the original name.
// Replacing a qualified column with its flat name keeps the rendered
// expression name intact, since `flat_name` of a qualified column
// matches the name of the unqualified flat column.
let window_expr = replace_qualified_col_with_flat_name_if_missing(
window_expr,
input.schema(),
true,
false,
)?;
let mut window_fields: Vec<DFField> =
exprlist_to_fields(window_expr.iter(), &input)?;
Expand Down
122 changes: 122 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,128 @@ ORDER BY 1,2,3,4
);
}

/// Regression test for grouped-grouped join SQL push down (Tableau-style query).
/// Join condition columns must be remapped to generated aliases of the joined
/// subqueries — otherwise the generated ON clause references the original column
/// name (e.g. `"t0"."My Notes"`) which does not exist in the aliased subqueries.
#[tokio::test]
async fn test_wrapper_grouped_join_grouped_condition_remapping() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT "t0"."Customer Gender" AS "Customer Gender",
SUM("t2"."__measure__1") AS "sum:measure:ok"
FROM (
SELECT
customer_gender AS "Customer Gender",
notes AS "My Notes"
FROM KibanaSampleDataEcommerce
GROUP BY 1, 2
) "t0"
INNER JOIN (
SELECT
notes AS "My Notes",
AVG(avgPrice) AS "__measure__1"
FROM KibanaSampleDataEcommerce
GROUP BY 1
) "t2" ON ("t0"."My Notes" = "t2"."My Notes")
GROUP BY 1
;
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let _physical_plan = query_plan.as_physical_plan().await.unwrap();

let logical_plan = query_plan.as_logical_plan();
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
assert!(sql.contains("INNER JOIN ("));
assert!(
!sql.contains(r#""My Notes""#),
"join condition leaked original column name instead of remapped alias:\n{}",
sql
);
}

/// Regression test for window expression on top of a grouped join subquery
/// (Sigma-style query). Window expr rebase in converter must keep field names
/// consistent with column references in plans above.
#[tokio::test]
async fn test_wrapper_window_over_grouped_join() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
WITH
"qt_0" AS (
SELECT
"ta_1".content "ca_1",
DATE_TRUNC('month', "ta_2".order_date) "ca_2",
CASE
WHEN sum("ta_2"."sumPrice") IS NOT NULL THEN sum("ta_2"."sumPrice")
ELSE 0
END "ca_3"
FROM KibanaSampleDataEcommerce "ta_2"
JOIN Logs "ta_1"
ON "ta_2".__cubeJoinField = "ta_1".__cubeJoinField
GROUP BY
"ca_1",
"ca_2"
),
"qt_1" AS (
SELECT
RANK() OVER (
PARTITION BY DATE_TRUNC('month', "ta_3"."ca_2")
ORDER BY
DATE_TRUNC('month', "ta_3"."ca_2"),
CASE
WHEN sum("ta_3"."ca_3") IS NOT NULL THEN sum("ta_3"."ca_3")
ELSE 0
END DESC,
"ta_3"."ca_1"
) "ca_4",
DATE_TRUNC('month', "ta_3"."ca_2") "ca_5",
"ta_3"."ca_1" "ca_6",
CASE
WHEN sum("ta_3"."ca_3") IS NOT NULL THEN sum("ta_3"."ca_3")
ELSE 0
END "ca_7"
FROM "qt_0" "ta_3"
GROUP BY
"ca_5",
"ca_6"
)
SELECT
"ta_4"."ca_5" "ca_8",
"ta_4"."ca_6" "ca_9",
"ta_4"."ca_7" "ca_10"
FROM "qt_1" "ta_4"
WHERE "ta_4"."ca_4" <= 1
ORDER BY
"ca_8" ASC,
"ca_10" DESC,
"ca_9" ASC
;
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

/// Test that WrappedSelect(... limit=Some(0) ...) will render it correctly
#[tokio::test]
async fn test_wrapper_limit_zero() {
Expand Down
Loading