Skip to content

Commit 3260143

Browse files
Add fuzzing infrastructure for distributed DataFusion
This change introduces a new `fuzz` binary that enables fuzzing of distributed DataFusion against a localhost cluster. The fuzzing framework is designed to be extensible, allowing us to define custom workloads and oracles. This change defines a TPC-DS workload which uses the duckdb data generator and 99 TPC-DS queries from their github repo. It also defines one oracle, the `SingleNodeOracle` which validates the set of rows produced by distributed datafusion against the set of rows produced by single node datafusion (if a query errors in both, then this is not counted as a failure). You can run this using the following command: ``` RUST_LOG=info cargo run --features integration --bin fuzz -- tpcds --force-regenerate ``` Fuzzing also uses randomized cluster configurations using a deterministic seed. Next steps: - Add ordering oracle to validate ORDER BY correctness - Idea: Inspect the ordering properties in the logical plan and assert this on the RecordBatches - Observability - Log stats on queries that were invalid (ie. failed to execute on single node df) - Log localhost cluster ports - Add metrics oracle to validate output_rows metric (ensuring metrics are working correctly) - Set up nightly github actions workflow to run the fuzzer automatically - Ensure that the data is available to be downloaded so we can reproduce any failures locally
1 parent 731ad2a commit 3260143

File tree

208 files changed

+11139
-4
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

208 files changed

+11139
-4
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/.idea
22
/target
33
/benchmarks/data/
4-
testdata/tpch/data/
4+
testdata/tpch/data/
5+
testdata/tpcds/data/

Cargo.lock

Lines changed: 54 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ name = "datafusion-distributed"
1010
version = "0.1.0"
1111
edition = "2024"
1212

13+
[[bin]]
14+
name = "fuzz"
15+
path = "src/bin/fuzz.rs"
16+
required-features = ["integration"]
17+
1318
[dependencies]
1419
chrono = { version = "0.4.42" }
1520
datafusion = { workspace = true }
@@ -30,8 +35,12 @@ delegate = "0.13.4"
3035
dashmap = "6.1.0"
3136
prost = "0.13.5"
3237
rand = "0.8.5"
38+
rand_chacha = "0.3.1"
39+
base64 = "0.22"
3340
object_store = "0.12.3"
3441
bytes = "1.10.1"
42+
log = "0.4"
43+
env_logger = "0.11"
3544

3645
# integration_tests deps
3746
insta = { version = "1.43.1", features = ["filters"], optional = true }
@@ -42,6 +51,7 @@ arrow = { version = "56.1.0", optional = true }
4251
tokio-stream = { version = "0.1.17", optional = true }
4352
hyper-util = { version = "0.1.16", optional = true }
4453
pin-project = "1.1.10"
54+
clap = { version = "4.5", features = ["derive"] }
4555

4656
[features]
4757
integration = [

src/bin/fuzz.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
use arrow::util::pretty::pretty_format_batches;
2+
use async_trait::async_trait;
3+
use clap::{Parser, Subcommand};
4+
use datafusion::error::Result;
5+
use datafusion::prelude::SessionContext;
6+
use datafusion_distributed::test_utils::{
7+
fuzz::{FuzzConfig, FuzzDB},
8+
rand::rng,
9+
tpcds::{generate_tpcds_data, queries, register_tables},
10+
};
11+
use log::{Level, debug, error, info, log_enabled};
12+
use rand::Rng;
13+
use std::process;
14+
15+
#[derive(Parser)]
16+
#[command(author, version, about, long_about = None)]
17+
#[command(name = "fuzz")]
18+
#[command(about = "Fuzz test distributed datafusion with various workloads")]
19+
struct Cli {
20+
#[command(subcommand)]
21+
command: Commands,
22+
23+
/// Base64-encoded seed for random number generator. This seed is used to
24+
/// randomize configuration options and not used to generate data.
25+
#[arg(long)]
26+
seed: Option<String>,
27+
}
28+
29+
/// Trait for defining fuzz workloads
30+
#[async_trait]
31+
trait Workload {
32+
/// Do any necessary setup for the workload such as registering tables
33+
async fn setup(ctx: SessionContext) -> Result<()>;
34+
35+
/// Iterator of (query_name, query_sql) pairs
36+
async fn queries(&self) -> Result<Box<dyn Iterator<Item = (String, String)> + Send>>;
37+
38+
/// Get workload name for logging
39+
fn name(&self) -> &'static str;
40+
}
41+
42+
#[derive(Subcommand)]
43+
enum Commands {
44+
/// Run TPC-DS benchmark queries
45+
Tpcds {
46+
/// Scale factor for TPCDS data generation
47+
#[arg(short, long, default_value = "0.01")]
48+
scale_factor: String,
49+
50+
/// Generate data even if it already exists
51+
#[arg(long, default_value_t = false)]
52+
force_regenerate: bool,
53+
54+
/// Run only specific queries (comma-separated list, e.g., "q1,q5,q10")
55+
#[arg(long)]
56+
queries: Option<String>,
57+
},
58+
}
59+
60+
/// TPC-DS workload implementation
61+
#[derive(Clone)]
62+
struct TpcdsWorkload {
63+
queries: Option<String>,
64+
}
65+
66+
impl TpcdsWorkload {
67+
fn try_new(
68+
scale_factor: String,
69+
force_regenerate: bool,
70+
queries: Option<String>,
71+
) -> Result<Self> {
72+
if force_regenerate {
73+
info!(
74+
"Generating TPC-DS data with scale factor {}...",
75+
scale_factor
76+
);
77+
generate_tpcds_data(&scale_factor)?;
78+
info!("Done");
79+
} else {
80+
info!("Using existing TPC-DS data");
81+
}
82+
83+
Ok(Self { queries })
84+
}
85+
}
86+
87+
#[async_trait]
88+
impl Workload for TpcdsWorkload {
89+
async fn setup(ctx: SessionContext) -> Result<()> {
90+
info!("Registering TPC-DS tables...");
91+
let registered_tables = register_tables(&ctx).await?;
92+
info!("Done");
93+
debug!("Registered tables: {:?}", registered_tables);
94+
Ok(())
95+
}
96+
97+
async fn queries(&self) -> Result<Box<dyn Iterator<Item = (String, String)> + Send>> {
98+
let queries_vec = queries(self.queries.as_deref())?;
99+
Ok(Box::new(queries_vec.into_iter()))
100+
}
101+
102+
fn name(&self) -> &'static str {
103+
"TPC-DS"
104+
}
105+
}
106+
107+
#[tokio::main]
108+
async fn main() -> Result<()> {
109+
env_logger::init();
110+
let cli = Cli::parse();
111+
112+
match cli.command {
113+
Commands::Tpcds {
114+
scale_factor,
115+
force_regenerate,
116+
queries,
117+
} => {
118+
// Validate arguments
119+
if let Err(e) = validate_tpcds_args(&scale_factor) {
120+
error!("Error: {}", e);
121+
process::exit(1);
122+
}
123+
124+
info!("🌀 Starting Distributed DataFusion Fuzz Test");
125+
126+
let workload = TpcdsWorkload::try_new(scale_factor.clone(), force_regenerate, queries)?;
127+
128+
if let Err(e) = run_workload(workload, cli.seed).await {
129+
error!("❌ Fuzz testing failed: {}", e);
130+
process::exit(1);
131+
}
132+
133+
info!("✅ All fuzz tests passed!");
134+
}
135+
}
136+
137+
Ok(())
138+
}
139+
140+
/// Validate TPC-DS command line arguments
141+
fn validate_tpcds_args(scale_factor: &str) -> std::result::Result<(), String> {
142+
// Validate scale factor is numeric (decimal allowed)
143+
if scale_factor.parse::<f64>().is_err() {
144+
return Err(format!(
145+
"Scale factor '{}' is not a valid number",
146+
scale_factor
147+
));
148+
}
149+
150+
Ok(())
151+
}
152+
153+
fn randomized_fuzz_config(seed: Option<String>) -> Result<(FuzzConfig, String)> {
154+
let (mut rng, seed_b64) = rng(seed)?;
155+
156+
let config = FuzzConfig {
157+
num_workers: rng.gen_range(2..=8),
158+
files_per_task: rng.gen_range(1..=8),
159+
cardinality_task_count_factor: rng.gen_range(1.0..=3.0),
160+
};
161+
162+
Ok((config, seed_b64))
163+
}
164+
165+
/// Run a workload using the generic workload trait
166+
async fn run_workload<W>(workload: W, seed: Option<String>) -> Result<()>
167+
where
168+
W: Workload,
169+
{
170+
info!("Starting workload: {}", workload.name());
171+
172+
let (config, seed_b64) = randomized_fuzz_config(seed)?;
173+
info!("Fuzz config: {}", config);
174+
info!("Seed: {}", seed_b64);
175+
176+
let fuzz_db = match FuzzDB::new(config, W::setup).await {
177+
Ok(db) => db,
178+
Err(e) => {
179+
return Err(datafusion::error::DataFusionError::Execution(format!(
180+
"Failed to create FuzzDB: {}",
181+
e
182+
)));
183+
}
184+
};
185+
186+
// Get queries from workload
187+
let queries_iter = workload.queries().await?;
188+
189+
// Run each query
190+
let mut successful = 0;
191+
let mut failed = 0;
192+
193+
for (query_name, query_sql) in queries_iter {
194+
debug!("Executing fuzz query: {}", query_sql.trim());
195+
196+
match fuzz_db.run(&query_sql).await {
197+
Ok(results) => {
198+
successful += 1;
199+
info!("✅ {} completed successfully", query_name);
200+
if log_enabled!(Level::Debug) {
201+
match results {
202+
Some(batches) => {
203+
debug!("{}", pretty_format_batches(&batches)?,);
204+
}
205+
None => {
206+
debug!("No results (query errored expectedly)");
207+
}
208+
}
209+
}
210+
}
211+
Err(e) => {
212+
failed += 1;
213+
error!("❌ {} failed: {}", query_name, e);
214+
}
215+
}
216+
}
217+
218+
if failed > 0 {
219+
return Err(datafusion::error::DataFusionError::Execution(format!(
220+
"{} out of {} queries failed",
221+
failed,
222+
successful + failed
223+
)));
224+
}
225+
226+
Ok(())
227+
}

0 commit comments

Comments
 (0)