Skip to content

Commit a8952c7

Browse files
authored
Split TPCH tests in correctness, planning and explain analyze (#232)
* Split tpch tests * Fix correctness tests * Fix plan tests
1 parent ba3e1dc commit a8952c7

File tree

8 files changed

+2643
-2320
lines changed

8 files changed

+2643
-2320
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
steps:
3939
- uses: actions/checkout@v4
4040
- uses: ./.github/actions/setup
41-
- run: cargo test --features tpch --test tpch_validation_test
41+
- run: cargo test --features tpch --test 'tpch_*'
4242

4343
format-check:
4444
runs-on: ubuntu-latest

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/.idea
22
/target
33
/benchmarks/data/
4-
testdata/tpch/data/
4+
testdata/tpch/*
5+
!testdata/tpch/queries

Cargo.lock

Lines changed: 24 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ prost = "0.13.5"
3232
rand = "0.8.5"
3333
object_store = "0.12.3"
3434
bytes = "1.10.1"
35+
pin-project = "1.1.10"
3536

3637
# integration_tests deps
3738
insta = { version = "1.43.1", features = ["filters"], optional = true }
@@ -41,7 +42,7 @@ parquet = { version = "56.1.0", optional = true }
4142
arrow = { version = "56.1.0", optional = true }
4243
tokio-stream = { version = "0.1.17", optional = true }
4344
hyper-util = { version = "0.1.16", optional = true }
44-
pin-project = "1.1.10"
45+
pretty_assertions = { version = "1.4", optional = true }
4546

4647
[features]
4748
integration = [
@@ -52,6 +53,7 @@ integration = [
5253
"arrow",
5354
"tokio-stream",
5455
"hyper-util",
56+
"pretty_assertions"
5557
]
5658

5759
tpch = ["integration"]
@@ -65,3 +67,4 @@ parquet = "56.1.0"
6567
arrow = "56.1.0"
6668
tokio-stream = "0.1.17"
6769
hyper-util = "0.1.16"
70+
pretty_assertions = "1.4"

tests/tpch_correctness_test.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#[cfg(all(feature = "integration", feature = "tpch", test))]
2+
mod tests {
3+
use datafusion::physical_plan::execute_stream;
4+
use datafusion::prelude::SessionContext;
5+
use datafusion_distributed::DefaultSessionBuilder;
6+
use datafusion_distributed::test_utils::localhost::start_localhost_context;
7+
use datafusion_distributed::test_utils::tpch;
8+
use futures::TryStreamExt;
9+
use std::error::Error;
10+
use std::fmt::Display;
11+
use std::fs;
12+
use std::path::Path;
13+
use tokio::sync::OnceCell;
14+
15+
const PARTITIONS: usize = 6;
16+
const TPCH_SCALE_FACTOR: f64 = 1.0;
17+
const TPCH_DATA_PARTS: i32 = 16;
18+
19+
#[tokio::test]
20+
async fn test_tpch_1() -> Result<(), Box<dyn Error>> {
21+
test_tpch_query(get_test_tpch_query(1)).await
22+
}
23+
24+
#[tokio::test]
25+
async fn test_tpch_2() -> Result<(), Box<dyn Error>> {
26+
test_tpch_query(get_test_tpch_query(2)).await
27+
}
28+
29+
#[tokio::test]
30+
async fn test_tpch_3() -> Result<(), Box<dyn Error>> {
31+
test_tpch_query(get_test_tpch_query(3)).await
32+
}
33+
34+
#[tokio::test]
35+
async fn test_tpch_4() -> Result<(), Box<dyn Error>> {
36+
test_tpch_query(get_test_tpch_query(4)).await
37+
}
38+
39+
#[tokio::test]
40+
async fn test_tpch_5() -> Result<(), Box<dyn Error>> {
41+
test_tpch_query(get_test_tpch_query(5)).await
42+
}
43+
44+
#[tokio::test]
45+
async fn test_tpch_6() -> Result<(), Box<dyn Error>> {
46+
test_tpch_query(get_test_tpch_query(6)).await
47+
}
48+
49+
#[tokio::test]
50+
async fn test_tpch_7() -> Result<(), Box<dyn Error>> {
51+
test_tpch_query(get_test_tpch_query(7)).await
52+
}
53+
54+
#[tokio::test]
55+
async fn test_tpch_8() -> Result<(), Box<dyn Error>> {
56+
test_tpch_query(get_test_tpch_query(8)).await
57+
}
58+
59+
#[tokio::test]
60+
async fn test_tpch_9() -> Result<(), Box<dyn Error>> {
61+
test_tpch_query(get_test_tpch_query(9)).await
62+
}
63+
64+
#[tokio::test]
65+
async fn test_tpch_10() -> Result<(), Box<dyn Error>> {
66+
let sql = get_test_tpch_query(10);
67+
// There is a chance that this query returns non-deterministic results if two entries
68+
// happen to have the exact same revenue. With small scales, this never happens, but with
69+
// bigger scales, this is very likely to happen.
70+
// This extra ordering accounts for it.
71+
let sql = sql.replace("revenue desc", "revenue, c_acctbal desc");
72+
test_tpch_query(sql).await
73+
}
74+
75+
#[tokio::test]
76+
async fn test_tpch_11() -> Result<(), Box<dyn Error>> {
77+
test_tpch_query(get_test_tpch_query(11)).await
78+
}
79+
80+
#[tokio::test]
81+
async fn test_tpch_12() -> Result<(), Box<dyn Error>> {
82+
test_tpch_query(get_test_tpch_query(12)).await
83+
}
84+
85+
#[tokio::test]
86+
async fn test_tpch_13() -> Result<(), Box<dyn Error>> {
87+
test_tpch_query(get_test_tpch_query(13)).await
88+
}
89+
90+
#[tokio::test]
91+
async fn test_tpch_14() -> Result<(), Box<dyn Error>> {
92+
test_tpch_query(get_test_tpch_query(14)).await
93+
}
94+
95+
#[tokio::test]
96+
async fn test_tpch_15() -> Result<(), Box<dyn Error>> {
97+
test_tpch_query(get_test_tpch_query(15)).await
98+
}
99+
100+
#[tokio::test]
101+
async fn test_tpch_16() -> Result<(), Box<dyn Error>> {
102+
test_tpch_query(get_test_tpch_query(16)).await
103+
}
104+
105+
#[tokio::test]
106+
async fn test_tpch_17() -> Result<(), Box<dyn Error>> {
107+
test_tpch_query(get_test_tpch_query(17)).await
108+
}
109+
110+
#[tokio::test]
111+
async fn test_tpch_18() -> Result<(), Box<dyn Error>> {
112+
test_tpch_query(get_test_tpch_query(18)).await
113+
}
114+
115+
#[tokio::test]
116+
async fn test_tpch_19() -> Result<(), Box<dyn Error>> {
117+
test_tpch_query(get_test_tpch_query(19)).await
118+
}
119+
120+
#[tokio::test]
121+
async fn test_tpch_20() -> Result<(), Box<dyn Error>> {
122+
test_tpch_query(get_test_tpch_query(20)).await
123+
}
124+
125+
#[tokio::test]
126+
async fn test_tpch_21() -> Result<(), Box<dyn Error>> {
127+
test_tpch_query(get_test_tpch_query(21)).await
128+
}
129+
130+
#[tokio::test]
131+
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
132+
test_tpch_query(get_test_tpch_query(22)).await
133+
}
134+
135+
async fn test_tpch_query(sql: String) -> Result<(), Box<dyn Error>> {
136+
let (ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await;
137+
138+
let results_d = run_tpch_query(ctx, sql.clone()).await?;
139+
let results_s = run_tpch_query(SessionContext::new(), sql).await?;
140+
141+
pretty_assertions::assert_eq!(results_d.to_string(), results_s.to_string(),);
142+
Ok(())
143+
}
144+
145+
// test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner
146+
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
147+
async fn run_tpch_query(
148+
ctx: SessionContext,
149+
sql: String,
150+
) -> Result<impl Display, Box<dyn Error>> {
151+
let data_dir = ensure_tpch_data(TPCH_SCALE_FACTOR, TPCH_DATA_PARTS).await;
152+
ctx.state_ref()
153+
.write()
154+
.config_mut()
155+
.options_mut()
156+
.execution
157+
.target_partitions = PARTITIONS;
158+
159+
// Register tables for first context
160+
for table_name in [
161+
"lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier",
162+
] {
163+
let query_path = data_dir.join(table_name);
164+
ctx.register_parquet(
165+
table_name,
166+
query_path.to_string_lossy().as_ref(),
167+
datafusion::prelude::ParquetReadOptions::default(),
168+
)
169+
.await?;
170+
}
171+
172+
// Query 15 has three queries in it, one creating the view, the second
173+
// executing, which we want to capture the output of, and the third
174+
// tearing down the view
175+
let stream = if sql.starts_with("create view") {
176+
let queries: Vec<&str> = sql
177+
.split(';')
178+
.map(str::trim)
179+
.filter(|s| !s.is_empty())
180+
.collect();
181+
182+
ctx.sql(queries[0]).await?.collect().await?;
183+
let df = ctx.sql(queries[1]).await?;
184+
let plan = df.create_physical_plan().await?;
185+
let stream = execute_stream(plan.clone(), ctx.task_ctx())?;
186+
ctx.sql(queries[2]).await?.collect().await?;
187+
188+
stream
189+
} else {
190+
let df = ctx.sql(&sql).await?;
191+
let plan = df.create_physical_plan().await?;
192+
execute_stream(plan.clone(), ctx.task_ctx())?
193+
};
194+
195+
let batches = stream.try_collect::<Vec<_>>().await?;
196+
197+
Ok(arrow::util::pretty::pretty_format_batches(&batches)?)
198+
}
199+
200+
pub fn get_test_tpch_query(num: u8) -> String {
201+
let queries_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries");
202+
tpch::tpch_query_from_dir(&queries_dir, num)
203+
}
204+
205+
// OnceCell to ensure TPCH tables are generated only once for tests
206+
static INIT_TEST_TPCH_TABLES: OnceCell<()> = OnceCell::const_new();
207+
208+
// ensure_tpch_data initializes the TPCH data on disk.
209+
pub async fn ensure_tpch_data(sf: f64, parts: i32) -> std::path::PathBuf {
210+
let data_dir =
211+
Path::new(env!("CARGO_MANIFEST_DIR")).join(format!("testdata/tpch/correctness_sf{sf}"));
212+
INIT_TEST_TPCH_TABLES
213+
.get_or_init(|| async {
214+
if !fs::exists(&data_dir).unwrap() {
215+
tpch::generate_tpch_data(&data_dir, sf, parts);
216+
}
217+
})
218+
.await;
219+
data_dir
220+
}
221+
}

0 commit comments

Comments
 (0)