Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
470fbe9
Scan API
gatesn Feb 10, 2026
f965615
Scan API
gatesn Feb 10, 2026
ae39e1a
Scan API
gatesn Feb 10, 2026
f41ccff
Scan API
gatesn Feb 10, 2026
23510e4
Scan API
gatesn Feb 10, 2026
e925792
Scan API
gatesn Feb 10, 2026
f8a7543
Scan API
gatesn Feb 10, 2026
d05997f
Scan API
gatesn Feb 10, 2026
5f6123a
Scan API
gatesn Feb 10, 2026
c5e60a4
Scan API
gatesn Feb 11, 2026
2d6c816
DataFusion streaming
gatesn Feb 11, 2026
5a7e047
DataFusion streaming
gatesn Feb 11, 2026
07cb0d3
DataFusion streaming
gatesn Feb 11, 2026
b19d03b
DataFusion streaming
gatesn Feb 11, 2026
899675e
Merge branch 'develop' into ngates/scan-api
gatesn Feb 11, 2026
811acd5
DataFusion streaming
gatesn Feb 11, 2026
bbbaa62
DataFusion streaming
gatesn Feb 12, 2026
717ebab
DataFusion streaming
gatesn Feb 12, 2026
0a9b542
DataFusion streaming
gatesn Feb 12, 2026
25cac35
DataFusion streaming
gatesn Feb 12, 2026
367baaa
DataFusion streaming
gatesn Feb 12, 2026
3b7d0ae
DataFusion streaming
gatesn Feb 12, 2026
aa46b99
Merge branch 'develop' into ngates/scan-api
gatesn Feb 12, 2026
ac06b32
DataFusion streaming
gatesn Feb 12, 2026
270b242
DataFusion streaming
gatesn Feb 13, 2026
6aa5a0b
DataFusion streaming
gatesn Feb 13, 2026
7aaf8b6
DataFusion streaming
gatesn Feb 13, 2026
669cedb
Scan API
gatesn Feb 13, 2026
392f263
Scan API
gatesn Feb 13, 2026
a45bf71
Scan API
gatesn Feb 13, 2026
4aff18a
Scan API
gatesn Feb 13, 2026
fa7a431
Scan API
gatesn Feb 13, 2026
fe6fbec
Scan API
gatesn Feb 13, 2026
ccc13c3
Scan API
gatesn Feb 13, 2026
07562d6
Scan API
gatesn Feb 13, 2026
7737131
Scan API
gatesn Feb 14, 2026
b6e1142
Scan API
gatesn Feb 14, 2026
35eeec7
Configure DuckDB threads
gatesn Feb 14, 2026
1da2a28
Configure DuckDB threads
gatesn Feb 15, 2026
c05641b
Configure DuckDB threads
gatesn Feb 15, 2026
5d59027
Configure DuckDB threads
gatesn Feb 15, 2026
d2aa4fb
Configure DuckDB threads
gatesn Feb 15, 2026
e1330dd
Configure DuckDB threads
gatesn Feb 15, 2026
d396f02
Configure DuckDB threads
gatesn Feb 15, 2026
9e54416
Configure DuckDB threads
gatesn Feb 15, 2026
8b2ca17
Configure DuckDB threads
gatesn Feb 15, 2026
68ec238
Configure DuckDB threads
gatesn Feb 15, 2026
d7ea77b
Configure DuckDB threads
gatesn Feb 16, 2026
6f030fe
Configure DuckDB threads
gatesn Feb 16, 2026
dfb275c
Configure DuckDB threads
gatesn Feb 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/sql-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ jobs:
if: matrix.remote_storage == null || github.event.pull_request.head.repo.fork == true
shell: bash
env:
VORTEX_USE_SCAN_API: "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be 0 or remove the old one?

OTEL_SERVICE_NAME: "vortex-bench"
OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"
OTEL_EXPORTER_OTLP_ENDPOINT: "${{ (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false) && secrets.OTEL_EXPORTER_OTLP_ENDPOINT || '' }}"
Expand All @@ -200,6 +201,7 @@ jobs:
if: matrix.remote_storage != null && (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false)
shell: bash
env:
VORTEX_USE_SCAN_API: "1"
AWS_REGION: "eu-west-1"
OTEL_SERVICE_NAME: "vortex-bench"
OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"
Expand Down
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions bench-orchestrator/bench_orchestrator/comparison/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,11 @@ def compare_runs(
# Pivot to get (query, engine, format) as rows, runs as columns
pivot = combined.pivot_table(index=["query", "engine", "format"], columns="run", values="value", aggfunc="mean")

# Deduplicate labels while preserving order (two runs can share a label).
unique_labels = list(dict.fromkeys(labels))

# Reorder columns to match input order
pivot = pivot[[label for label in labels if label in pivot.columns]]
pivot = pivot[[label for label in unique_labels if label in pivot.columns]]

# Compute ratios relative to baseline
if baseline_label in pivot.columns:
Expand All @@ -283,4 +286,4 @@ def compare_runs(
else:
result = pivot

return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=labels)
return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=unique_labels)
10 changes: 7 additions & 3 deletions bench-orchestrator/bench_orchestrator/storage/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,13 @@ def write_result(self, result: QueryResult) -> None:
self._result_count += 1

def write_raw_json(self, json_line: str) -> None:
"""Write a raw JSON line directly (from benchmark binary output)."""
if self._results_file:
self._results_file.write(json_line.strip() + "\n")
"""Write a raw JSON line directly (from benchmark binary output).

Non-JSON lines (e.g. DuckDB ASCII table output) are silently skipped.
"""
line = json_line.strip()
if self._results_file and line.startswith("{"):
self._results_file.write(line + "\n")
self._results_file.flush()
self._result_count += 1

Expand Down
2 changes: 2 additions & 0 deletions benchmarks/datafusion-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ publish = false
[dependencies]
anyhow = { workspace = true }
arrow-ipc.workspace = true
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
custom-labels = { workspace = true }
datafusion = { workspace = true, features = [
Expand All @@ -36,6 +37,7 @@ opentelemetry_sdk.workspace = true
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }
vortex = { workspace = true, features = ["object_store", "files", "tokio"] }
vortex-bench = { workspace = true }
vortex-cuda = { workspace = true, optional = true }
vortex-datafusion = { workspace = true }
Expand Down
57 changes: 57 additions & 0 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ use datafusion_physical_plan::collect;
use futures::StreamExt;
use parking_lot::Mutex;
use tokio::fs::File;
use vortex::scan::api::DataSourceRef;
use vortex_bench::Benchmark;
use vortex_bench::BenchmarkArg;
use vortex_bench::CompactionStrategy;
use vortex_bench::Engine;
use vortex_bench::Format;
use vortex_bench::Opt;
use vortex_bench::Opts;
use vortex_bench::SESSION;
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
Expand Down Expand Up @@ -220,13 +222,20 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

fn use_scan_api() -> bool {
std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1")
}

async fn register_benchmark_tables<B: Benchmark + ?Sized>(
session: &SessionContext,
benchmark: &B,
format: Format,
) -> anyhow::Result<()> {
match format {
Format::Arrow => register_arrow_tables(session, benchmark).await,
_ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => {
register_v2_tables(session, benchmark, format).await
}
_ => {
let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?;
let file_format = format_to_df_format(format);
Expand Down Expand Up @@ -265,6 +274,54 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
}
}

/// Register tables using the V2 `VortexTable` + `MultiFileDataSource` path.
async fn register_v2_tables<B: Benchmark + ?Sized>(
session: &SessionContext,
benchmark: &B,
format: Format,
) -> anyhow::Result<()> {
use vortex::file::filesystem::object_store::ObjectStoreFileSystem;
use vortex::file::multi::MultiFileDataSource;
use vortex::io::session::RuntimeSessionExt;
use vortex::scan::api::DataSource as _;
use vortex_datafusion::v2::VortexTable;

let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?;

for table in benchmark.table_specs().iter() {
let pattern = benchmark.pattern(table.name, format);
let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern.clone())?;
let store = session
.state()
.runtime_env()
.object_store(table_url.object_store())?;

let fs: Arc<dyn vortex::file::filesystem::FileSystem> =
Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle()));
let base_prefix = benchmark_base.path().trim_start_matches('/').to_string();
let fs = fs.with_prefix(base_prefix);

let glob_pattern = match &pattern {
Some(p) => p.as_str().to_string(),
None => format!("*.{}", format.ext()),
};

let multi_ds = MultiFileDataSource::new(SESSION.clone())
.with_filesystem(fs)
.with_glob_url(glob_pattern)
.build()
.await?;

let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?);
let data_source: DataSourceRef = Arc::new(multi_ds);

let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema));
session.register_table(table.name, table_provider)?;
}

Ok(())
}

/// Load Arrow IPC files into in-memory DataFusion tables.
async fn register_arrow_tables<B: Benchmark + ?Sized>(
session: &SessionContext,
Expand Down
31 changes: 31 additions & 0 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ struct Args {

#[arg(long = "opt", value_delimiter = ',', value_parser = value_parser!(Opt))]
options: Vec<Opt>,

/// Print EXPLAIN output for each query instead of running benchmarks.
#[arg(long, default_value_t = false)]
explain: bool,
}

fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -122,6 +126,33 @@ fn main() -> anyhow::Result<()> {
})?;
}

if args.explain {
for format in &args.formats {
let ctx = DuckClient::new(
&*benchmark,
*format,
args.delete_duckdb_database,
args.threads,
)?;
ctx.register_tables(&*benchmark, *format)?;

for (query_idx, query) in &filtered_queries {
println!("=== Q{query_idx} [{format}] ===");
println!("{query}");
println!();
let result = ctx.connection.query(&format!("EXPLAIN {query}"))?;
for chunk in result {
let chunk_str =
String::try_from(&chunk).unwrap_or_else(|_| "<error>".to_string());
println!("{chunk_str}");
}
println!();
}
}

return Ok(());
}

let mut runner = SqlBenchmarkRunner::new(
&*benchmark,
Engine::DuckDB,
Expand Down
7 changes: 3 additions & 4 deletions docs/developer-guide/integrations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ concurrently during the global initialization phase, with a concurrency limit pr
the number of DuckDB worker threads to keep the I/O pipeline saturated without overwhelming
the system.

A `MultiScan` stream manages the set of active file scans. It prioritises completing in-progress
scans before opening new files, ensuring that DuckDB's execution threads always have data to
consume while background I/O proceeds. File footers are cached to avoid redundant parsing when
the same file appears in multiple queries.
Active file scans are driven concurrently via `try_flatten_unordered`, ensuring that DuckDB's
execution threads always have data to consume while background I/O proceeds. File footers are
cached to avoid redundant parsing when the same file appears in multiple queries.

## Threading Model

Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/expr/stats/precision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::expr::stats::precision::Precision::Inexact;
/// This is statistic specific, for max this will be an upper bound. Meaning that the actual max
/// in an array is guaranteed to be less than or equal to the inexact value, but equal to the exact
/// value.
///
// TODO(ngates): should we model Unknown as a variant of Precision? Or have Option<Precision<T>>?
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Precision<T> {
Exact(T),
Expand Down
1 change: 0 additions & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
moka = { workspace = true, features = ["future"] }
object_store = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
tokio-stream = { workspace = true }
Expand Down
70 changes: 70 additions & 0 deletions vortex-datafusion/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,74 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert:

pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result<bool, Self::Error>

pub mod vortex_datafusion::v2

pub struct vortex_datafusion::v2::DataFusionFooterCache

impl vortex_datafusion::v2::DataFusionFooterCache

pub fn vortex_datafusion::v2::DataFusionFooterCache::new(cache: alloc::sync::Arc<dyn datafusion_execution::cache::cache_manager::FileMetadataCache>) -> Self

impl vortex_file::footer::cache::FooterCache for vortex_datafusion::v2::DataFusionFooterCache

pub fn vortex_datafusion::v2::DataFusionFooterCache::get(&self, key: &str) -> core::option::Option<vortex_file::footer::Footer>

pub fn vortex_datafusion::v2::DataFusionFooterCache::put(&self, key: &str, footer: vortex_file::footer::Footer)

pub struct vortex_datafusion::v2::VortexScanSource

impl core::fmt::Debug for vortex_datafusion::v2::VortexScanSource

pub fn vortex_datafusion::v2::VortexScanSource::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl datafusion_datasource::source::DataSource for vortex_datafusion::v2::VortexScanSource

pub fn vortex_datafusion::v2::VortexScanSource::as_any(&self) -> &dyn core::any::Any

pub fn vortex_datafusion::v2::VortexScanSource::eq_properties(&self) -> datafusion_physical_expr::equivalence::properties::EquivalenceProperties

pub fn vortex_datafusion::v2::VortexScanSource::fetch(&self) -> core::option::Option<usize>

pub fn vortex_datafusion::v2::VortexScanSource::fmt_as(&self, _t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_datafusion::v2::VortexScanSource::open(&self, partition: usize, _context: alloc::sync::Arc<datafusion_execution::task::TaskContext>) -> datafusion_common::error::Result<datafusion_execution::stream::SendableRecordBatchStream>

pub fn vortex_datafusion::v2::VortexScanSource::output_partitioning(&self) -> datafusion_physical_expr::partitioning::Partitioning

pub fn vortex_datafusion::v2::VortexScanSource::partition_statistics(&self, partition: core::option::Option<usize>) -> datafusion_common::error::Result<datafusion_common::stats::Statistics>

pub fn vortex_datafusion::v2::VortexScanSource::repartitioned(&self, target_partitions: usize, _repartition_file_min_size: usize, _output_ordering: core::option::Option<datafusion_physical_expr_common::sort_expr::LexOrdering>) -> datafusion_common::error::Result<core::option::Option<alloc::sync::Arc<dyn datafusion_datasource::source::DataSource>>>

pub fn vortex_datafusion::v2::VortexScanSource::try_pushdown_filters(&self, filters: alloc::vec::Vec<alloc::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>>, _config: &datafusion_common::config::ConfigOptions) -> datafusion_common::error::Result<datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation<alloc::sync::Arc<dyn datafusion_datasource::source::DataSource>>>

pub fn vortex_datafusion::v2::VortexScanSource::try_swapping_with_projection(&self, projection: &datafusion_physical_expr::projection::ProjectionExprs) -> datafusion_common::error::Result<core::option::Option<alloc::sync::Arc<dyn datafusion_datasource::source::DataSource>>>

pub fn vortex_datafusion::v2::VortexScanSource::with_fetch(&self, limit: core::option::Option<usize>) -> core::option::Option<alloc::sync::Arc<dyn datafusion_datasource::source::DataSource>>

pub struct vortex_datafusion::v2::VortexTable

impl vortex_datafusion::v2::VortexTable

pub fn vortex_datafusion::v2::VortexTable::new(data_source: vortex_scan::api::DataSourceRef, session: vortex_session::VortexSession, arrow_schema: arrow_schema::schema::SchemaRef) -> Self

impl core::fmt::Debug for vortex_datafusion::v2::VortexTable

pub fn vortex_datafusion::v2::VortexTable::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl datafusion_catalog::table::TableProvider for vortex_datafusion::v2::VortexTable

pub fn vortex_datafusion::v2::VortexTable::as_any(&self) -> &dyn core::any::Any

pub fn vortex_datafusion::v2::VortexTable::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec<usize>>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option<usize>) -> core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output = datafusion_common::error::Result<alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait

pub fn vortex_datafusion::v2::VortexTable::schema(&self) -> arrow_schema::schema::SchemaRef

pub fn vortex_datafusion::v2::VortexTable::statistics(&self) -> core::option::Option<datafusion_common::stats::Statistics>

pub fn vortex_datafusion::v2::VortexTable::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result<alloc::vec::Vec<datafusion_expr::table_source::TableProviderFilterPushDown>>

pub fn vortex_datafusion::v2::VortexTable::table_type(&self) -> datafusion_expr::table_source::TableType

pub struct vortex_datafusion::DefaultVortexReaderFactory

impl vortex_datafusion::DefaultVortexReaderFactory
Expand All @@ -38,6 +106,8 @@ impl vortex_datafusion::VortexAccessPlan

pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder<A>(&self, scan_builder: vortex_scan::scan_builder::ScanBuilder<A>) -> vortex_scan::scan_builder::ScanBuilder<A> where A: 'static + core::marker::Send

pub fn vortex_datafusion::VortexAccessPlan::selection(&self) -> core::option::Option<&vortex_scan::selection::Selection>

impl vortex_datafusion::VortexAccessPlan

pub fn vortex_datafusion::VortexAccessPlan::with_selection(self, selection: vortex_scan::selection::Selection) -> Self
Expand Down
4 changes: 3 additions & 1 deletion vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
} else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
|| expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
{
true
// Cast can only be pushed into Vortex if it is lossless.
// TODO(ngates): check this, and return accordingly.
false
} else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
can_be_pushed_down_impl(is_null.arg(), schema)
} else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex::expr::stats::Precision;

mod convert;
mod persistent;
pub mod v2;

#[cfg(test)]
mod tests;
Expand Down
Loading
Loading