From 553a6ea6e402507759be99ae258e4331b8333295 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Sun, 16 Nov 2025 15:23:53 -0500 Subject: [PATCH] wip --- Cargo.lock | 76 +++++- Cargo.toml | 14 +- src/bin/fuzz_sql_server.rs | 529 +++++++++++++++++++++++++++++++++++++ src/test_utils/mod.rs | 1 + 4 files changed, 614 insertions(+), 6 deletions(-) create mode 100644 src/bin/fuzz_sql_server.rs diff --git a/Cargo.lock b/Cargo.lock index f71585ea..ef2e8455 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,14 +264,22 @@ version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c8b0ba0784d56bc6266b79f5de7a24b47024e7b3a0045d2ad4df3d9b686099f" dependencies = [ + "arrow-arith", "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-data", "arrow-ipc", + "arrow-ord", + "arrow-row", "arrow-schema", + "arrow-select", + "arrow-string", "base64", "bytes", "futures", + "once_cell", + "paste", "prost", "prost-types", "tonic", @@ -431,7 +439,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -1088,6 +1096,7 @@ version = "0.1.0" dependencies = [ "arrow", "arrow-flight", + "arrow-schema", "arrow-select", "async-trait", "bytes", @@ -1096,11 +1105,14 @@ dependencies = [ "datafusion", "datafusion-proto", "delegate", + "env_logger 0.10.2", "futures", "http", "hyper-util", "insta", "itertools", + "log", + "mimalloc", "object_store", "parquet", "pin-project", @@ -1127,7 +1139,7 @@ dependencies = [ "datafusion", "datafusion-distributed", "datafusion-proto", - "env_logger", + "env_logger 0.11.8", "futures", "log", "parquet", @@ -1603,6 +1615,19 @@ dependencies = [ "regex", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "env_logger" version = "0.11.8" @@ -1903,6 +1928,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -2154,7 +2185,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", - "hashbrown 0.15.4", + "hashbrown 0.16.0", ] [[package]] @@ -2186,6 +2217,17 @@ dependencies = [ "libc", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -2339,6 +2381,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libmimalloc-sys" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "libz-rs-sys" version = "0.5.1" @@ -2418,6 +2470,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "mimalloc" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" +dependencies = [ + "libmimalloc-sys", +] + [[package]] name = "mime" version = "0.3.17" @@ -3300,6 +3361,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.11.0" diff --git a/Cargo.toml b/Cargo.toml index 6b51920a..9c09aea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,18 +4,25 @@ members = ["benchmarks"] [workspace.dependencies] datafusion = { version = "50.0.0", default-features = false } datafusion-proto = { version = "50.0.0" } +arrow-flight = { version = "56.1.0", features = ["flight-sql-experimental"] } [package] name = "datafusion-distributed" version = "0.1.0" edition = "2024" +[[bin]] +name = "fuzz_sql_server" +path = "src/bin/fuzz_sql_server.rs" + [dependencies] chrono = { version = "0.4.42" } datafusion = { workspace = true } datafusion-proto = { workspace = true } -arrow-flight = "56.1.0" +arrow-flight = { workspace = true } arrow-select = "56.1.0" +arrow-schema = "56.1.0" +arrow = "56.1.0" async-trait = "0.1.88" tokio = { version = "1.46.1", features = ["full"] } # Updated to 0.13.1 to match arrow-flight 56.1.0 @@ -38,10 +45,12 @@ insta = { version = "1.43.1", features = ["filters"], optional = true } tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true } tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true } parquet = { version = "56.1.0", optional = true } -arrow = { version = "56.1.0", optional = true } tokio-stream = { version = "0.1.17", optional = true } hyper-util = { version = "0.1.16", optional = true } pin-project = "1.1.10" +env_logger = "0.10" +log = "0.4" +mimalloc = "0.1" [features] integration = [ @@ -49,7 +58,6 @@ integration = [ "tpchgen", "tpchgen-arrow", "parquet", - "arrow", "tokio-stream", "hyper-util", ] diff --git a/src/bin/fuzz_sql_server.rs b/src/bin/fuzz_sql_server.rs new file mode 100644 index 00000000..cc870e52 --- /dev/null +++ b/src/bin/fuzz_sql_server.rs @@ -0,0 +1,529 @@ +use arrow::array::{ArrayRef, StringArray}; +use arrow::ipc::writer::IpcWriteOptions; +use arrow::record_batch::RecordBatch; +use arrow_flight::encode::FlightDataEncoderBuilder; +use arrow_flight::flight_descriptor::DescriptorType; +use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream}; +use arrow_flight::sql::{ + ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, + ActionCreatePreparedStatementResult, Any, CommandGetTables, CommandPreparedStatementQuery, + CommandPreparedStatementUpdate, ProstMessageExt, SqlInfo, +}; +use arrow_flight::{ + Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, + IpcMessage, SchemaAsIpc, Ticket, +}; +use arrow_flight::flight_service_client::FlightServiceClient; +use arrow_schema::{DataType, Field, Schema}; +use async_trait::async_trait; +use dashmap::DashMap; +use datafusion::common::DataFusionError; +use datafusion::execution::SessionStateBuilder; +use datafusion::logical_expr::LogicalPlan; +use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; +use datafusion_distributed::{ + BoxCloneSyncChannel, ChannelResolver, DistributedExt, + DistributedPhysicalOptimizerRule, create_flight_client, +}; +use futures::{Stream, StreamExt, TryStreamExt}; +use log::info; +use mimalloc::MiMalloc; +use prost::Message; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::Mutex; +use tonic::metadata::MetadataValue; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use uuid::Uuid; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +macro_rules! status { + ($desc:expr, $err:expr) => { + Status::internal(format!("{}: {} at {}:{}", $desc, $err, file!(), line!())) + }; +} + +/// Adapted from https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/flight/flight_sql_server.rs +/// Can be used as a remote DataFusion server and connected by `JDBC` from client +/// Supported SQL statements: +/// CREATE +/// INSERT +/// SELECT +/// +/// Only single client is supported +/// For now use `ctx` instead of `contexts` inside `FlightSqlServiceImpl` +/// +/// === Below origianl comment === +/// +/// This example shows how to wrap DataFusion with `FlightSqlService` to support connecting +/// to a standalone DataFusion-based server with a JDBC client, using the open source "JDBC Driver +/// for Arrow Flight SQL". +/// +/// To install the JDBC driver in DBeaver for example, see these instructions: +/// https://docs.dremio.com/software/client-applications/dbeaver/ +/// When configuring the driver, specify property "UseEncryption" = false +/// +/// JDBC connection string: "jdbc:arrow-flight-sql://127.0.0.1:50051/" +/// +/// Based heavily on Ballista's implementation: https://github.com/apache/datafusion-ballista/blob/main/ballista/scheduler/src/flight_sql.rs +/// and the example in arrow-rs: https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/flight_sql_server.rs +/// +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let addr = "0.0.0.0:50051".parse()?; + let resolver = MultiWorkerInMemoryChannelResolver::new(5); + let state = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_network_shuffle_tasks(3) + .with_distributed_network_coalesce_tasks(3) + .with_config(SessionConfig::new().with_information_schema(true)) + .build(); + let session_ctx = SessionContext::from(state); + let service = FlightSqlServiceImpl { + contexts: Default::default(), + statements: Default::default(), + results: Default::default(), + ctx: Arc::new(Mutex::new(session_ctx)), + }; + info!("Listening on {addr:?}"); + let svc = FlightServiceServer::new(service); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} + +pub struct FlightSqlServiceImpl { + contexts: Arc>>, + statements: Arc>, + results: Arc>>, + ctx: Arc>, +} + +impl FlightSqlServiceImpl { + async fn create_ctx(&self) -> Result { + let uuid = Uuid::new_v4().hyphenated().to_string(); + let session_config = SessionConfig::from_env() + .map_err(|e| Status::internal(format!("Error building plan: {e}")))? + .with_information_schema(true); + + let resolver = MultiWorkerInMemoryChannelResolver::new(5); + let state = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_network_shuffle_tasks(3) + .with_distributed_network_coalesce_tasks(3) + .with_config(session_config) + .build(); + let ctx = Arc::new(SessionContext::from(state)); + + self.contexts.insert(uuid.clone(), ctx); + Ok(uuid) + } + + fn get_ctx(&self, req: &Request) -> Result, Status> { + // get the token from the authorization header on Request + let auth = req + .metadata() + .get("authorization") + .ok_or_else(|| Status::internal("No authorization header!"))?; + let str = auth + .to_str() + .map_err(|e| Status::internal(format!("Error parsing header: {e}")))?; + let authorization = str.to_string(); + let bearer = "Bearer "; + if !authorization.starts_with(bearer) { + Err(Status::internal("Invalid auth header!"))?; + } + let auth = authorization[bearer.len()..].to_string(); + + if let Some(context) = self.contexts.get(&auth) { + Ok(context.clone()) + } else { + Err(Status::internal(format!( + "Context handle not found: {auth}" + )))? + } + } + + fn get_plan(&self, handle: &str) -> Result { + if let Some(plan) = self.statements.get(handle) { + Ok(plan.clone()) + } else { + Err(Status::internal(format!("Plan handle not found: {handle}")))? + } + } + + fn get_result(&self, handle: &str) -> Result, Status> { + if let Some(result) = self.results.get(handle) { + Ok(result.clone()) + } else { + Err(Status::internal(format!( + "Request handle not found: {handle}" + )))? + } + } + + async fn tables(&self, ctx: Arc) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("catalog_name", DataType::Utf8, true), + Field::new("db_schema_name", DataType::Utf8, true), + Field::new("table_name", DataType::Utf8, false), + Field::new("table_type", DataType::Utf8, false), + ])); + + let mut catalogs = vec![]; + let mut schemas = vec![]; + let mut names = vec![]; + let mut types = vec![]; + for catalog in ctx.catalog_names() { + let catalog_provider = ctx.catalog(&catalog).unwrap(); + for schema in catalog_provider.schema_names() { + let schema_provider = catalog_provider.schema(&schema).unwrap(); + for table in schema_provider.table_names() { + let table_provider = schema_provider.table(&table).await.unwrap().unwrap(); + catalogs.push(catalog.clone()); + schemas.push(schema.clone()); + names.push(table.clone()); + types.push(table_provider.table_type().to_string()) + } + } + } + + RecordBatch::try_new( + schema, + [catalogs, schemas, names, types] + .into_iter() + .map(|i| Arc::new(StringArray::from(i)) as ArrayRef) + .collect::>(), + ) + .unwrap() + } + + fn remove_plan(&self, handle: &str) -> Result<(), Status> { + self.statements.remove(&handle.to_string()); + Ok(()) + } + + fn remove_result(&self, handle: &str) -> Result<(), Status> { + self.results.remove(&handle.to_string()); + Ok(()) + } +} + +#[tonic::async_trait] +impl FlightSqlService for FlightSqlServiceImpl { + type FlightService = FlightSqlServiceImpl; + + // This function will be triggered if client JDBC property's `user` and `password` field set + async fn do_handshake( + &self, + _request: Request>, + ) -> Result< + Response> + Send>>>, + Status, + > { + info!("do_handshake"); + if let Some(msg) = _request.metadata().get("create") { + // A new round start at SQLancer, clear the ctx + info!("Resetting ctx {:?}", msg); + let resolver = MultiWorkerInMemoryChannelResolver::new(5); + let state = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_network_shuffle_tasks(3) + .with_distributed_network_coalesce_tasks(3) + .with_config(SessionConfig::new().with_information_schema(true)) + .build(); + let new_ctx = SessionContext::from(state); + + let mut ctx_guard = self.ctx.lock().await; // Use `lock()` for async Mutex + *ctx_guard = new_ctx; + } + // no authentication actually takes place here + // see Ballista implementation for example of basic auth + // in this case, we simply accept the connection and create a new SessionContext + // the SessionContext will be re-used within this same connection/session + let token = self.create_ctx().await?; + + let result = HandshakeResponse { + protocol_version: 0, + payload: token.as_bytes().to_vec().into(), + }; + let result = Ok(result); + let output = futures::stream::iter(vec![result]); + let str = format!("Bearer {token}"); + let mut resp: Response> + Send>>> = + Response::new(Box::pin(output)); + let md = MetadataValue::try_from(str) + .map_err(|_| Status::invalid_argument("authorization not parsable"))?; + resp.metadata_mut().insert("authorization", md); + Ok(resp) + } + + async fn do_get_fallback( + &self, + _request: Request, + message: Any, + ) -> Result::DoGetStream>, Status> { + if !message.is::() { + Err(Status::unimplemented(format!( + "do_get: The defined request is invalid: {}", + message.type_url + )))? + } + + let fr: FetchResults = message + .unpack() + .map_err(|e| Status::internal(format!("{e:?}")))? + .ok_or_else(|| Status::internal("Expected FetchResults but got None!"))?; + + let handle = fr.handle; + + info!("getting results for {handle}"); + let result = self.get_result(&handle)?; + // if we get an empty result, create an empty schema + let (schema, batches) = match result.first() { + None => (Arc::new(Schema::empty()), vec![]), + Some(batch) => (batch.schema(), result.clone()), + }; + + let batch_stream = futures::stream::iter(batches).map(Ok); + + let stream = FlightDataEncoderBuilder::new() + .with_schema(schema) + .build(batch_stream) + .map_err(Status::from); + + Ok(Response::new(Box::pin(stream))) + } + + async fn get_flight_info_prepared_statement( + &self, + cmd: CommandPreparedStatementQuery, + _request: Request, + ) -> Result, Status> { + info!("get_flight_info_prepared_statement {:?}", cmd); + let handle = std::str::from_utf8(&cmd.prepared_statement_handle) + .map_err(|e| status!("Unable to parse uuid", e))?; + + //let ctx = self.get_ctx(&request)?; + let plan = self.get_plan(handle)?; + + let ctx_guard = self.ctx.lock().await; + let state = (*ctx_guard).state(); + let df = DataFrame::new(state, plan); + let result = df + .collect() + .await + .map_err(|e| status!("Errorr executing query", e))?; + + // if we get an empty result, create an empty schema + let schema = match result.first() { + None => Schema::empty(), + Some(batch) => (*batch.schema()).clone(), + }; + + self.results.insert(handle.to_string(), result); + + // if we had multiple endpoints to connect to, we could use this Location + // but in the case of standalone DataFusion, we don't + // let loc = Location { + // uri: "grpc+tcp://127.0.0.1:50051".to_string(), + // }; + let fetch = FetchResults { + handle: handle.to_string(), + }; + let buf = fetch.as_any().encode_to_vec().into(); + let ticket = Ticket { ticket: buf }; + + let info = FlightInfo::new() + // Encode the Arrow schema + .try_with_schema(&schema) + .expect("encoding failed") + .with_endpoint(FlightEndpoint::new().with_ticket(ticket)) + .with_descriptor(FlightDescriptor { + r#type: DescriptorType::Cmd.into(), + cmd: Default::default(), + path: vec![], + }); + let resp = Response::new(info); + Ok(resp) + } + + async fn get_flight_info_tables( + &self, + _query: CommandGetTables, + request: Request, + ) -> Result, Status> { + info!("get_flight_info_tables"); + let ctx = self.get_ctx(&request)?; + let data = self.tables(ctx).await; + let schema = data.schema(); + + let uuid = Uuid::new_v4().hyphenated().to_string(); + self.results.insert(uuid.clone(), vec![data]); + + let fetch = FetchResults { handle: uuid }; + let buf = fetch.as_any().encode_to_vec().into(); + let ticket = Ticket { ticket: buf }; + + let info = FlightInfo::new() + // Encode the Arrow schema + .try_with_schema(&schema) + .expect("encoding failed") + .with_endpoint(FlightEndpoint::new().with_ticket(ticket)) + .with_descriptor(FlightDescriptor { + r#type: DescriptorType::Cmd.into(), + cmd: Default::default(), + path: vec![], + }); + let resp = Response::new(info); + Ok(resp) + } + + async fn do_put_prepared_statement_update( + &self, + handle: CommandPreparedStatementUpdate, + _request: Request, + ) -> Result { + info!("do_put_prepared_statement_update"); + // statements like "CREATE TABLE.." or "SET datafusion.nnn.." call this function + // and we are required to return some row count here + let handle = std::str::from_utf8(&handle.prepared_statement_handle) + .map_err(|e| status!("Unable to parse uuid", e))?; + + //let ctx = self.get_ctx(&request)?; + let plan = self.get_plan(handle)?; + //println!("do_put_prepared_statement_update plan is {:?}", plan); + + let ctx_guard = self.ctx.lock().await; + let state = (*ctx_guard).state(); + let df = DataFrame::new(state, plan); + df.collect() + .await + .map_err(|e| status!("Error executing query", e))?; + + Ok(1) + } + + async fn do_action_create_prepared_statement( + &self, + query: ActionCreatePreparedStatementRequest, + _request: Request, + ) -> Result { + let user_query = query.query.as_str(); + info!("do_action_create_prepared_statement: {user_query}"); + + //let ctx = self.get_ctx(&request)?; + + let ctx_guard = self.ctx.lock().await; + let plan = (*ctx_guard) + .sql(user_query) + .await + .and_then(|df| df.into_optimized_plan()) + .map_err(|e| Status::internal(format!("Error building plan: {e}")))?; + + info!("Plan is {:#?}", plan); + + // store a copy of the plan, it will be used for execution + let plan_uuid = Uuid::new_v4().hyphenated().to_string(); + self.statements.insert(plan_uuid.clone(), plan.clone()); + + let plan_schema = plan.schema(); + + let arrow_schema = (&**plan_schema).into(); + let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default()) + .try_into() + .map_err(|e| status!("Unable to serialize schema", e))?; + let IpcMessage(schema_bytes) = message; + + let res = ActionCreatePreparedStatementResult { + prepared_statement_handle: plan_uuid.into(), + dataset_schema: schema_bytes, + parameter_schema: Default::default(), + }; + info!("do_action_create_prepared_statement SUCCEED!"); + Ok(res) + } + + async fn do_action_close_prepared_statement( + &self, + handle: ActionClosePreparedStatementRequest, + _request: Request, + ) -> Result<(), Status> { + info!("do_action_close_prepared_statement"); + let handle = std::str::from_utf8(&handle.prepared_statement_handle); + if let Ok(handle) = handle { + info!("do_action_close_prepared_statement: removing plan and results for {handle}"); + let _ = self.remove_plan(handle); + let _ = self.remove_result(handle); + } + Ok(()) + } + + async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {} +} + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FetchResults { + #[prost(string, tag = "1")] + pub handle: ::prost::alloc::string::String, +} + +impl ProstMessageExt for FetchResults { + fn type_url() -> &'static str { + "type.googleapis.com/datafusion.example.com.sql.FetchResults" + } + + fn as_any(&self) -> Any { + Any { + type_url: FetchResults::type_url().to_string(), + value: ::prost::Message::encode_to_vec(self).into(), + } + } +} + +/// Simple [ChannelResolver] implementation that simulates multiple workers +/// by returning multiple URLs but connecting to the same endpoint. +#[derive(Clone)] +struct MultiWorkerInMemoryChannelResolver { + num_workers: usize, +} + +impl MultiWorkerInMemoryChannelResolver { + fn new(num_workers: usize) -> Self { + Self { num_workers } + } +} + +#[async_trait] +impl ChannelResolver for MultiWorkerInMemoryChannelResolver { + fn get_urls(&self) -> Result, DataFusionError> { + // Return multiple URLs representing our 5 workers + let urls: Result, _> = (0..self.num_workers) + .map(|i| url::Url::parse(&format!("http://localhost:5005{}", i))) + .collect(); + urls.map_err(|e| DataFusionError::External(Box::new(e))) + } + + async fn get_flight_client_for_url( + &self, + _url: &url::Url, + ) -> Result, DataFusionError> { + // For simplicity in the fuzz server, all workers connect to the same in-memory endpoint + // In a real distributed setup, each URL would connect to a different worker + let channel = tonic::transport::Channel::from_static("http://localhost:50051") + .connect_lazy(); + Ok(create_flight_client(BoxCloneSyncChannel::new(channel))) + } +} diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs index a5b3becd..b13e44e7 100644 --- a/src/test_utils/mod.rs +++ b/src/test_utils/mod.rs @@ -7,3 +7,4 @@ pub mod parquet; pub mod plans; pub mod session_context; pub mod tpch; +pub mod flight_sql_server;