Skip to content

Commit 14c9b7f

Browse files
working
1 parent 58d7715 commit 14c9b7f

File tree

2 files changed

+28
-20
lines changed

2 files changed

+28
-20
lines changed

src/bin/fuzz.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ enum Commands {
5151
}
5252

5353
/// TPC-DS workload implementation
54+
#[derive(Clone)]
5455
struct TpcdsWorkload {
5556
scale_factor: String,
5657
force_regenerate: bool,
@@ -69,7 +70,7 @@ impl TpcdsWorkload {
6970

7071
#[async_trait]
7172
impl Workload for TpcdsWorkload {
72-
async fn setup(&self, ctx: &SessionContext) -> Result<Vec<String>> {
73+
async fn setup(&self, ctx: &SessionContext) -> Result<()> {
7374
// Register tables
7475
info!("🔧 Registering TPC-DS tables...");
7576
let (registered_tables, missing_tables) = register_available_tpcds_tables(ctx, None).await?;
@@ -120,7 +121,7 @@ async fn main() -> Result<()> {
120121
info!(" Workload: {}", workload.name());
121122
info!(" Scale factor: {}", scale_factor);
122123

123-
if let Err(e) = run_workload(Box::new(workload)).await {
124+
if let Err(e) = run_workload(workload).await {
124125
error!("❌ Fuzz testing failed: {}", e);
125126
process::exit(1);
126127
}
@@ -143,16 +144,20 @@ fn validate_tpcds_args(scale_factor: &str) -> std::result::Result<(), String> {
143144
}
144145

145146
/// Run a workload using the generic workload trait
146-
async fn run_workload(workload: Box<dyn Workload + Send + Sync>) -> Result<()> {
147+
async fn run_workload(workload: TpcdsWorkload) -> Result<()> {
147148
info!("📊 Running {} workload...", workload.name());
148149

149150
// Create FuzzDB with randomized session config
150151
info!("⚙️ Setting up distributed session with randomized configuration...");
151152

152-
let fuzz_db = match FuzzDB::new(|ctx| {
153-
let ctx = ctx.clone();
154-
let workload = &workload;
155-
workload.setup(&ctx)
153+
let fuzz_db = match FuzzDB::new({
154+
let workload_clone = workload.clone();
155+
move |ctx| {
156+
let workload_inner = workload_clone.clone();
157+
async move {
158+
workload_inner.setup(&ctx).await
159+
}
160+
}
156161
}).await {
157162
Ok(db) => db,
158163
Err(e) => {

src/test_utils/fuzz.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use datafusion::{
99
logical_expr::LogicalPlan,
1010
};
1111
use rand::Rng;
12-
use std::collections::HashSet;
1312

1413
/// Fuzzing database with distributed session context and helper functions
1514
pub struct FuzzDB {
@@ -42,8 +41,11 @@ impl Default for SessionConfig {
4241
}
4342

4443
impl FuzzDB {
45-
/// Create a new FuzzDB with randomized session parameters
46-
pub async fn new() -> Result<Self>
44+
/// Create a new FuzzDB with randomized session parameters and setup function
45+
pub async fn new<F, Fut>(setup: F) -> Result<Self>
46+
where
47+
F: Fn(SessionContext) -> Fut + Send + Sync,
48+
Fut: std::future::Future<Output = Result<()>> + Send
4749
{
4850
let config = randomize_session_config();
4951
create_db(config, setup).await
@@ -54,12 +56,12 @@ impl FuzzDB {
5456

5557
// Execute on distributed context
5658
let df = self.distributed_ctx.sql(query).await?;
57-
let logical_plan = df.logical_plan().clone();
59+
let _logical_plan = df.logical_plan().clone();
5860
let results = df.collect().await?;
5961

6062
// Run oracles
61-
let single_node_oracle = SingleNodeOracle::new(&self.single_node_ctx);
62-
let ordering_oracle = OrderingOracle::new();
63+
let _single_node_oracle = SingleNodeOracle::new(&self.single_node_ctx);
64+
let _ordering_oracle = OrderingOracle::new();
6365

6466
// Validate with SingleNodeOracle
6567
// single_node_oracle.validate(&self.distributed_ctx, query, &results).await?;
@@ -87,7 +89,10 @@ fn randomize_session_config() -> SessionConfig {
8789
}
8890

8991
/// Create distributed session context with specified configuration
90-
async fn create_db<F, Fut>(config: SessionConfig) -> Result<FuzzDB>
92+
async fn create_db<F, Fut>(config: SessionConfig, setup: F) -> Result<FuzzDB>
93+
where
94+
F: Fn(SessionContext) -> Fut + Send + Sync,
95+
Fut: std::future::Future<Output = Result<()>> + Send
9196
{
9297
println!("Creating FuzzDB with {} workers", config.num_workers);
9398

@@ -114,8 +119,8 @@ async fn create_db<F, Fut>(config: SessionConfig) -> Result<FuzzDB>
114119
let single_node_ctx = SessionContext::new();
115120

116121
// Call the setup function to register tables on both contexts
117-
setup(&distributed_ctx).await?;
118-
setup(&single_node_ctx).await?;
122+
setup(distributed_ctx.clone()).await?;
123+
setup(single_node_ctx.clone()).await?;
119124

120125
// Log worker configuration
121126
println!("Session configuration:");
@@ -297,8 +302,6 @@ fn records_equal_as_sets(left: &[RecordBatch], right: &[RecordBatch]) -> bool {
297302

298303
/// Perform detailed comparison of record batches
299304
fn detailed_batch_comparison(left: &[RecordBatch], right: &[RecordBatch]) -> bool {
300-
use std::collections::HashSet;
301-
302305
// Convert both sides to sets of string representations of rows
303306
let left_rows = batch_rows_to_strings(left);
304307
let right_rows = batch_rows_to_strings(right);
@@ -307,8 +310,8 @@ fn detailed_batch_comparison(left: &[RecordBatch], right: &[RecordBatch]) -> boo
307310
return false;
308311
}
309312

310-
let left_set: HashSet<_> = left_rows.into_iter().collect();
311-
let right_set: HashSet<_> = right_rows.into_iter().collect();
313+
let left_set: std::collections::HashSet<_> = left_rows.into_iter().collect();
314+
let right_set: std::collections::HashSet<_> = right_rows.into_iter().collect();
312315

313316
if left_set != right_set {
314317
println!("Row content differs between result sets");

0 commit comments

Comments
 (0)