Skip to content

Commit 36e1de7

Browse files
working
1 parent 681014d commit 36e1de7

File tree

6 files changed

+79
-32
lines changed

6 files changed

+79
-32
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ delegate = "0.13.4"
3535
dashmap = "6.1.0"
3636
prost = "0.13.5"
3737
rand = "0.8.5"
38+
rand_chacha = "0.3.1"
39+
base64 = "0.22"
3840
object_store = "0.12.3"
3941
bytes = "1.10.1"
4042
log = "0.4"

src/bin/fuzz.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use datafusion::error::Result;
44
use datafusion_distributed::test_utils::{
55
fuzz::{FuzzDB, FuzzConfig},
66
tpcds::{discover_tpcds_queries, register_available_tpcds_tables, generate_tpcds_data},
7+
rand::rng,
78
};
9+
use rand::Rng;
810
use datafusion::prelude::SessionContext;
911
use log::{debug, info, warn, error};
1012
use std::process;
1113
use std::time::Instant;
12-
use rand::Rng;
1314

1415
#[derive(Parser)]
1516
#[command(author, version, about, long_about = None)]
@@ -18,6 +19,10 @@ use rand::Rng;
1819
struct Cli {
1920
#[command(subcommand)]
2021
command: Commands,
22+
23+
/// Base64-encoded seed for reproducible randomization
24+
#[arg(long)]
25+
seed: Option<String>,
2126
}
2227

2328
/// Trait for defining workloads that can be fuzzed
@@ -124,7 +129,7 @@ async fn main() -> Result<()> {
124129
info!(" Workload: {}", workload.name());
125130
info!(" Scale factor: {}", scale_factor);
126131

127-
if let Err(e) = run_workload(workload).await {
132+
if let Err(e) = run_workload(workload, cli.seed).await {
128133
error!("❌ Fuzz testing failed: {}", e);
129134
process::exit(1);
130135
}
@@ -146,25 +151,30 @@ fn validate_tpcds_args(scale_factor: &str) -> std::result::Result<(), String> {
146151
Ok(())
147152
}
148153

149-
fn randomized_fuzz_config() -> FuzzConfig {
150-
let mut rng = rand::thread_rng();
154+
fn randomized_fuzz_config(seed: Option<String>) -> Result<(FuzzConfig, String)> {
155+
let (mut rng, seed_b64) = rng(seed)?;
156+
151157
let config = FuzzConfig {
152158
num_workers: rng.gen_range(2..=8),
153159
files_per_task: rng.gen_range(1..=8),
154160
cardinality_task_count_factor: rng.gen_range(1.0..=3.0),
155161
};
156-
config
162+
163+
Ok((config, seed_b64))
157164
}
158165

159166
/// Run a workload using the generic workload trait
160-
async fn run_workload<W>(workload: W) -> Result<()>
167+
async fn run_workload<W>(workload: W, seed: Option<String>) -> Result<()>
161168
where W: Workload {
162169
info!("📊 Running {} workload...", workload.name());
163170

164171
// Create FuzzDB with randomized session config
165172
info!("⚙️ Setting up distributed session with randomized configuration...");
166173

167-
let fuzz_db = match FuzzDB::new(randomized_fuzz_config(), W::setup).await {
174+
let (config, seed_b64) = randomized_fuzz_config(seed)?;
175+
info!("🎲 Using seed: {}", seed_b64);
176+
177+
let fuzz_db = match FuzzDB::new(config, W::setup).await {
168178
Ok(db) => db,
169179
Err(e) => {
170180
return Err(datafusion::error::DataFusionError::Execution(format!(

src/test_utils/fuzz.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -315,31 +315,6 @@ mod tests {
315315
use std::sync::Arc;
316316
use datafusion::logical_expr::LogicalPlanBuilder;
317317

318-
#[tokio::test]
319-
async fn test_session_config_default() {
320-
let config = SessionConfig::default();
321-
assert_eq!(config.num_workers, 4);
322-
assert_eq!(config.tasks_per_file, 4);
323-
assert_eq!(config.cardinality_task_count_factor, 4);
324-
assert_eq!(config.target_partitions, 8);
325-
}
326-
327-
#[tokio::test]
328-
async fn test_randomize_session_config() {
329-
let config1 = randomize_session_config();
330-
let config2 = randomize_session_config();
331-
332-
// Basic bounds checking
333-
assert!(config1.num_workers >= 2 && config1.num_workers <= 8);
334-
assert!(config1.tasks_per_file >= 1 && config1.tasks_per_file <= 8);
335-
assert!(config1.cardinality_task_count_factor >= 1 && config1.cardinality_task_count_factor <= 8);
336-
assert!(config1.target_partitions >= 4 && config1.target_partitions <= 16);
337-
338-
// Configs should be different (with high probability)
339-
// Note: There's a small chance they could be equal due to randomness
340-
println!("Config 1: {:?}", config1);
341-
println!("Config 2: {:?}", config2);
342-
}
343318

344319
#[tokio::test]
345320
async fn test_records_equal_as_sets_empty() {

src/test_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod metrics;
66
pub mod mock_exec;
77
pub mod parquet;
88
pub mod plans;
9+
pub mod rand;
910
pub mod session_context;
1011
pub mod tpcds;
1112
pub mod tpch;

src/test_utils/rand.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use rand::{SeedableRng, RngCore};
2+
use rand_chacha::ChaCha8Rng;
3+
use base64::{Engine as _, engine::general_purpose::STANDARD};
4+
use datafusion::error::{DataFusionError, Result};
5+
6+
/// Create a seeded ChaCha8 RNG from a base64 string or generate a new random seed
7+
pub fn rng(seed: Option<String>) -> Result<(ChaCha8Rng, String)> {
8+
let (rng, seed_b64) = if let Some(seed_str) = seed {
9+
// Use provided seed
10+
let seed_bytes = STANDARD.decode(&seed_str).map_err(|e| {
11+
DataFusionError::Execution(format!("Invalid base64 seed: {}", e))
12+
})?;
13+
14+
if seed_bytes.len() != 32 {
15+
return Err(DataFusionError::Execution(
16+
"Seed must be 32 bytes (256 bits) when base64-decoded".to_string()
17+
));
18+
}
19+
20+
let mut seed_array = [0u8; 32];
21+
seed_array.copy_from_slice(&seed_bytes);
22+
let rng = ChaCha8Rng::from_seed(seed_array);
23+
24+
(rng, seed_str)
25+
} else {
26+
// Generate random seed
27+
let mut seed = [0u8; 32];
28+
rand::thread_rng().fill_bytes(&mut seed);
29+
let seed_b64 = STANDARD.encode(&seed);
30+
let rng = ChaCha8Rng::from_seed(seed);
31+
32+
(rng, seed_b64)
33+
};
34+
35+
Ok((rng, seed_b64))
36+
}
37+
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use super::*;
42+
43+
#[test]
44+
fn test_rng_with_seed() {
45+
// Create a known seed
46+
let seed_bytes = [42u8; 32];
47+
let seed_b64 = STANDARD.encode(&seed_bytes);
48+
49+
let (mut rng1, _) = rng(Some(seed_b64.clone())).unwrap();
50+
let (mut rng2, _) = rng(Some(seed_b64)).unwrap();
51+
52+
// Both RNGs should generate the same sequence
53+
assert_eq!(rng1.next_u64(), rng2.next_u64());
54+
assert_eq!(rng1.next_u64(), rng2.next_u64());
55+
}
56+
57+
}

0 commit comments

Comments
 (0)