Skip to content

Commit c3181a6

Browse files
committed
Split tpch tests
1 parent 83efd9e commit c3181a6

File tree

6 files changed

+2686
-2318
lines changed

6 files changed

+2686
-2318
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
steps:
3939
- uses: actions/checkout@v4
4040
- uses: ./.github/actions/setup
41-
- run: cargo test --features tpch --test tpch_validation_test
41+
- run: cargo test --features tpch --test 'tpch_*'
4242

4343
format-check:
4444
runs-on: ubuntu-latest

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/.idea
22
/target
33
/benchmarks/data/
4-
testdata/tpch/data/
4+
testdata/tpch/*
5+
!testdata/tpch/queries

tests/tpch_correctness_test.rs

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#[cfg(all(feature = "integration", feature = "tpch", test))]
2+
mod tests {
3+
use datafusion::physical_plan::execute_stream;
4+
use datafusion::prelude::SessionContext;
5+
use datafusion_distributed::DefaultSessionBuilder;
6+
use datafusion_distributed::test_utils::localhost::start_localhost_context;
7+
use datafusion_distributed::test_utils::tpch;
8+
use futures::TryStreamExt;
9+
use std::error::Error;
10+
use std::fmt::Display;
11+
use std::fs;
12+
use std::path::Path;
13+
use tokio::sync::OnceCell;
14+
15+
const PARTITIONS: usize = 6;
16+
const TPCH_SCALE_FACTOR: f64 = 1.0;
17+
const TPCH_DATA_PARTS: i32 = 16;
18+
19+
#[tokio::test]
20+
async fn test_tpch_1() -> Result<(), Box<dyn Error>> {
21+
test_tpch_query(1).await
22+
}
23+
24+
#[tokio::test]
25+
async fn test_tpch_2() -> Result<(), Box<dyn Error>> {
26+
test_tpch_query(2).await
27+
}
28+
29+
#[tokio::test]
30+
async fn test_tpch_3() -> Result<(), Box<dyn Error>> {
31+
test_tpch_query(3).await
32+
}
33+
34+
#[tokio::test]
35+
async fn test_tpch_4() -> Result<(), Box<dyn Error>> {
36+
test_tpch_query(4).await
37+
}
38+
39+
#[tokio::test]
40+
async fn test_tpch_5() -> Result<(), Box<dyn Error>> {
41+
test_tpch_query(5).await
42+
}
43+
44+
#[tokio::test]
45+
async fn test_tpch_6() -> Result<(), Box<dyn Error>> {
46+
test_tpch_query(6).await
47+
}
48+
49+
#[tokio::test]
50+
async fn test_tpch_7() -> Result<(), Box<dyn Error>> {
51+
test_tpch_query(7).await
52+
}
53+
54+
#[tokio::test]
55+
async fn test_tpch_8() -> Result<(), Box<dyn Error>> {
56+
test_tpch_query(8).await
57+
}
58+
59+
#[tokio::test]
60+
async fn test_tpch_9() -> Result<(), Box<dyn Error>> {
61+
test_tpch_query(9).await
62+
}
63+
64+
#[tokio::test]
65+
async fn test_tpch_10() -> Result<(), Box<dyn Error>> {
66+
test_tpch_query(10).await
67+
}
68+
69+
#[tokio::test]
70+
async fn test_tpch_11() -> Result<(), Box<dyn Error>> {
71+
test_tpch_query(11).await
72+
}
73+
74+
#[tokio::test]
75+
async fn test_tpch_12() -> Result<(), Box<dyn Error>> {
76+
test_tpch_query(12).await
77+
}
78+
79+
#[tokio::test]
80+
async fn test_tpch_13() -> Result<(), Box<dyn Error>> {
81+
test_tpch_query(13).await
82+
}
83+
84+
#[tokio::test]
85+
async fn test_tpch_14() -> Result<(), Box<dyn Error>> {
86+
test_tpch_query(14).await
87+
}
88+
89+
#[tokio::test]
90+
async fn test_tpch_15() -> Result<(), Box<dyn Error>> {
91+
test_tpch_query(15).await
92+
}
93+
94+
#[tokio::test]
95+
async fn test_tpch_16() -> Result<(), Box<dyn Error>> {
96+
test_tpch_query(16).await
97+
}
98+
99+
#[tokio::test]
100+
async fn test_tpch_17() -> Result<(), Box<dyn Error>> {
101+
test_tpch_query(17).await
102+
}
103+
104+
#[tokio::test]
105+
async fn test_tpch_18() -> Result<(), Box<dyn Error>> {
106+
test_tpch_query(18).await
107+
}
108+
109+
#[tokio::test]
110+
async fn test_tpch_19() -> Result<(), Box<dyn Error>> {
111+
test_tpch_query(19).await
112+
}
113+
114+
#[tokio::test]
115+
async fn test_tpch_20() -> Result<(), Box<dyn Error>> {
116+
test_tpch_query(20).await
117+
}
118+
119+
#[tokio::test]
120+
async fn test_tpch_21() -> Result<(), Box<dyn Error>> {
121+
test_tpch_query(21).await
122+
}
123+
124+
#[tokio::test]
125+
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
126+
test_tpch_query(22).await
127+
}
128+
129+
async fn test_tpch_query(query_id: u8) -> Result<(), Box<dyn Error>> {
130+
let (ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await;
131+
let results_d = run_tpch_query(ctx, query_id).await?;
132+
let results_s = run_tpch_query(SessionContext::new(), query_id).await?;
133+
134+
assert_eq!(
135+
results_d.to_string(),
136+
results_s.to_string(),
137+
"Query {query_id} results differ between executions",
138+
);
139+
Ok(())
140+
}
141+
142+
// test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner
143+
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
144+
async fn run_tpch_query(
145+
ctx: SessionContext,
146+
query_id: u8,
147+
) -> Result<impl Display, Box<dyn Error>> {
148+
let data_dir = ensure_tpch_data(TPCH_SCALE_FACTOR, TPCH_DATA_PARTS).await;
149+
let sql = get_test_tpch_query(query_id);
150+
ctx.state_ref()
151+
.write()
152+
.config_mut()
153+
.options_mut()
154+
.execution
155+
.target_partitions = PARTITIONS;
156+
157+
// Register tables for first context
158+
for table_name in [
159+
"lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier",
160+
] {
161+
let query_path = data_dir.join(table_name);
162+
ctx.register_parquet(
163+
table_name,
164+
query_path.to_string_lossy().as_ref(),
165+
datafusion::prelude::ParquetReadOptions::default(),
166+
)
167+
.await?;
168+
}
169+
170+
// Query 15 has three queries in it, one creating the view, the second
171+
// executing, which we want to capture the output of, and the third
172+
// tearing down the view
173+
let stream = if query_id == 15 {
174+
let queries: Vec<&str> = sql
175+
.split(';')
176+
.map(str::trim)
177+
.filter(|s| !s.is_empty())
178+
.collect();
179+
180+
ctx.sql(queries[0]).await?.collect().await?;
181+
let df = ctx.sql(queries[1]).await?;
182+
let plan = df.create_physical_plan().await?;
183+
let stream = execute_stream(plan.clone(), ctx.task_ctx())?;
184+
ctx.sql(queries[2]).await?.collect().await?;
185+
186+
stream
187+
} else {
188+
let df = ctx.sql(&sql).await?;
189+
let plan = df.create_physical_plan().await?;
190+
execute_stream(plan.clone(), ctx.task_ctx())?
191+
};
192+
193+
let batches = stream.try_collect::<Vec<_>>().await?;
194+
195+
Ok(arrow::util::pretty::pretty_format_batches(&batches)?)
196+
}
197+
198+
pub fn get_test_tpch_query(num: u8) -> String {
199+
let queries_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries");
200+
tpch::tpch_query_from_dir(&queries_dir, num)
201+
}
202+
203+
// OnceCell to ensure TPCH tables are generated only once for tests
204+
static INIT_TEST_TPCH_TABLES: OnceCell<()> = OnceCell::const_new();
205+
206+
// ensure_tpch_data initializes the TPCH data on disk.
207+
pub async fn ensure_tpch_data(sf: f64, parts: i32) -> std::path::PathBuf {
208+
let data_dir =
209+
Path::new(env!("CARGO_MANIFEST_DIR")).join(format!("testdata/tpch/correctness_sf{sf}"));
210+
INIT_TEST_TPCH_TABLES
211+
.get_or_init(|| async {
212+
if !fs::exists(&data_dir).unwrap() {
213+
tpch::generate_tpch_data(&data_dir, sf, parts);
214+
}
215+
})
216+
.await;
217+
data_dir
218+
}
219+
}

0 commit comments

Comments
 (0)