Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup
- run: cargo test --features tpch --test tpch_validation_test
- run: cargo test --features tpch --test 'tpch_*'

format-check:
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/.idea
/target
/benchmarks/data/
testdata/tpch/data/
testdata/tpch/*
!testdata/tpch/queries
25 changes: 24 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ prost = "0.13.5"
rand = "0.8.5"
object_store = "0.12.3"
bytes = "1.10.1"
pin-project = "1.1.10"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }
Expand All @@ -41,7 +42,7 @@ parquet = { version = "56.1.0", optional = true }
arrow = { version = "56.1.0", optional = true }
tokio-stream = { version = "0.1.17", optional = true }
hyper-util = { version = "0.1.16", optional = true }
pin-project = "1.1.10"
pretty_assertions = { version = "1.4", optional = true }

[features]
integration = [
Expand All @@ -52,6 +53,7 @@ integration = [
"arrow",
"tokio-stream",
"hyper-util",
"pretty_assertions"
]

tpch = ["integration"]
Expand All @@ -65,3 +67,4 @@ parquet = "56.1.0"
arrow = "56.1.0"
tokio-stream = "0.1.17"
hyper-util = "0.1.16"
pretty_assertions = "1.4"
221 changes: 221 additions & 0 deletions tests/tpch_correctness_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
#[cfg(all(feature = "integration", feature = "tpch", test))]
mod tests {
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::DefaultSessionBuilder;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::test_utils::tpch;
use futures::TryStreamExt;
use std::error::Error;
use std::fmt::Display;
use std::fs;
use std::path::Path;
use tokio::sync::OnceCell;

const PARTITIONS: usize = 6;
const TPCH_SCALE_FACTOR: f64 = 1.0;
const TPCH_DATA_PARTS: i32 = 16;

#[tokio::test]
async fn test_tpch_1() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(1)).await
}

#[tokio::test]
async fn test_tpch_2() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(2)).await
}

#[tokio::test]
async fn test_tpch_3() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(3)).await
}

#[tokio::test]
async fn test_tpch_4() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(4)).await
}

#[tokio::test]
async fn test_tpch_5() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(5)).await
}

#[tokio::test]
async fn test_tpch_6() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(6)).await
}

#[tokio::test]
async fn test_tpch_7() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(7)).await
}

#[tokio::test]
async fn test_tpch_8() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(8)).await
}

#[tokio::test]
async fn test_tpch_9() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(9)).await
}

#[tokio::test]
async fn test_tpch_10() -> Result<(), Box<dyn Error>> {
let sql = get_test_tpch_query(10);
// There is a chance that this query returns non-deterministic results if two entries
// happen to have the exact same revenue. With small scales, this never happens, but with
// bigger scales, this is very likely to happen.
// This extra ordering accounts for it.
let sql = sql.replace("revenue desc", "revenue, c_acctbal desc");
test_tpch_query(sql).await
}

#[tokio::test]
async fn test_tpch_11() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(11)).await
}

#[tokio::test]
async fn test_tpch_12() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(12)).await
}

#[tokio::test]
async fn test_tpch_13() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(13)).await
}

#[tokio::test]
async fn test_tpch_14() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(14)).await
}

#[tokio::test]
async fn test_tpch_15() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(15)).await
}

#[tokio::test]
async fn test_tpch_16() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(16)).await
}

#[tokio::test]
async fn test_tpch_17() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(17)).await
}

#[tokio::test]
async fn test_tpch_18() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(18)).await
}

#[tokio::test]
async fn test_tpch_19() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(19)).await
}

#[tokio::test]
async fn test_tpch_20() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(20)).await
}

#[tokio::test]
async fn test_tpch_21() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(21)).await
}

#[tokio::test]
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
test_tpch_query(get_test_tpch_query(22)).await
}

async fn test_tpch_query(sql: String) -> Result<(), Box<dyn Error>> {
let (ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await;

let results_d = run_tpch_query(ctx, sql.clone()).await?;
let results_s = run_tpch_query(SessionContext::new(), sql).await?;

pretty_assertions::assert_eq!(results_d.to_string(), results_s.to_string(),);
Ok(())
}

// test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
async fn run_tpch_query(
ctx: SessionContext,
sql: String,
) -> Result<impl Display, Box<dyn Error>> {
let data_dir = ensure_tpch_data(TPCH_SCALE_FACTOR, TPCH_DATA_PARTS).await;
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.execution
.target_partitions = PARTITIONS;

// Register tables for first context
for table_name in [
"lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier",
] {
let query_path = data_dir.join(table_name);
ctx.register_parquet(
table_name,
query_path.to_string_lossy().as_ref(),
datafusion::prelude::ParquetReadOptions::default(),
)
.await?;
}

// Query 15 has three queries in it, one creating the view, the second
// executing, which we want to capture the output of, and the third
// tearing down the view
let stream = if sql.starts_with("create view") {
let queries: Vec<&str> = sql
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();

ctx.sql(queries[0]).await?.collect().await?;
let df = ctx.sql(queries[1]).await?;
let plan = df.create_physical_plan().await?;
let stream = execute_stream(plan.clone(), ctx.task_ctx())?;
ctx.sql(queries[2]).await?.collect().await?;

stream
} else {
let df = ctx.sql(&sql).await?;
let plan = df.create_physical_plan().await?;
execute_stream(plan.clone(), ctx.task_ctx())?
};

let batches = stream.try_collect::<Vec<_>>().await?;

Ok(arrow::util::pretty::pretty_format_batches(&batches)?)
}

pub fn get_test_tpch_query(num: u8) -> String {
let queries_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries");
tpch::tpch_query_from_dir(&queries_dir, num)
}

// OnceCell to ensure TPCH tables are generated only once for tests
static INIT_TEST_TPCH_TABLES: OnceCell<()> = OnceCell::const_new();

// ensure_tpch_data initializes the TPCH data on disk.
pub async fn ensure_tpch_data(sf: f64, parts: i32) -> std::path::PathBuf {
let data_dir =
Path::new(env!("CARGO_MANIFEST_DIR")).join(format!("testdata/tpch/correctness_sf{sf}"));
INIT_TEST_TPCH_TABLES
.get_or_init(|| async {
if !fs::exists(&data_dir).unwrap() {
tpch::generate_tpch_data(&data_dir, sf, parts);
}
})
.await;
data_dir
}
}
Loading