diff --git a/.github/workflows/sql-benchmarks.yml b/.github/workflows/sql-benchmarks.yml index f98e7ef87d4..318014ee752 100644 --- a/.github/workflows/sql-benchmarks.yml +++ b/.github/workflows/sql-benchmarks.yml @@ -187,6 +187,7 @@ jobs: if: matrix.remote_storage == null || github.event.pull_request.head.repo.fork == true shell: bash env: + VORTEX_USE_SCAN_API: "1" OTEL_SERVICE_NAME: "vortex-bench" OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf" OTEL_EXPORTER_OTLP_ENDPOINT: "${{ (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false) && secrets.OTEL_EXPORTER_OTLP_ENDPOINT || '' }}" @@ -200,6 +201,7 @@ jobs: if: matrix.remote_storage != null && (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false) shell: bash env: + VORTEX_USE_SCAN_API: "1" AWS_REGION: "eu-west-1" OTEL_SERVICE_NAME: "vortex-bench" OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf" diff --git a/Cargo.lock b/Cargo.lock index ff4e930bf21..afc8687c626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1904,6 +1904,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow-ipc", + "async-trait", "clap", "custom-labels", "datafusion 52.1.0", @@ -1919,6 +1920,7 @@ dependencies = [ "parking_lot", "tokio", "url", + "vortex", "vortex-bench", "vortex-cuda", "vortex-datafusion", @@ -10169,7 +10171,6 @@ dependencies = [ "futures", "insta", "itertools 0.14.0", - "moka", "object_store", "rstest", "tempfile", @@ -10253,6 +10254,7 @@ dependencies = [ "anyhow", "async-compat", "async-fs", + "async-trait", "bindgen", "bitvec", "cbindgen", @@ -10349,14 +10351,17 @@ dependencies = [ "flatbuffers", "futures", "getrandom 0.3.4", + "glob", "itertools 0.14.0", "kanal", + "moka", "object_store", "oneshot", "parking_lot", "pin-project-lite", "tokio", "tracing", + "url", "uuid", "vortex-alp", "vortex-array", @@ -10374,6 +10379,7 @@ dependencies = [ "vortex-fsst", "vortex-io", "vortex-layout", + "vortex-mask", "vortex-metrics", "vortex-pco", "vortex-runend", diff --git a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py index 5fa05c40d4a..7ac62f188ff 100644 --- a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py +++ b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py @@ -271,8 +271,11 @@ def compare_runs( # Pivot to get (query, engine, format) as rows, runs as columns pivot = combined.pivot_table(index=["query", "engine", "format"], columns="run", values="value", aggfunc="mean") + # Deduplicate labels while preserving order (two runs can share a label). + unique_labels = list(dict.fromkeys(labels)) + # Reorder columns to match input order - pivot = pivot[[label for label in labels if label in pivot.columns]] + pivot = pivot[[label for label in unique_labels if label in pivot.columns]] # Compute ratios relative to baseline if baseline_label in pivot.columns: @@ -283,4 +286,4 @@ def compare_runs( else: result = pivot - return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=labels) + return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=unique_labels) diff --git a/bench-orchestrator/bench_orchestrator/storage/store.py b/bench-orchestrator/bench_orchestrator/storage/store.py index 96847ac5df2..1703af1ad6f 100644 --- a/bench-orchestrator/bench_orchestrator/storage/store.py +++ b/bench-orchestrator/bench_orchestrator/storage/store.py @@ -96,9 +96,13 @@ def write_result(self, result: QueryResult) -> None: self._result_count += 1 def write_raw_json(self, json_line: str) -> None: - """Write a raw JSON line directly (from benchmark binary output).""" - if self._results_file: - self._results_file.write(json_line.strip() + "\n") + """Write a raw JSON line directly (from benchmark binary output). + + Non-JSON lines (e.g. DuckDB ASCII table output) are silently skipped. + """ + line = json_line.strip() + if self._results_file and line.startswith("{"): + self._results_file.write(line + "\n") self._results_file.flush() self._result_count += 1 diff --git a/benchmarks/datafusion-bench/Cargo.toml b/benchmarks/datafusion-bench/Cargo.toml index 7ac28193188..2c3ddc4bce5 100644 --- a/benchmarks/datafusion-bench/Cargo.toml +++ b/benchmarks/datafusion-bench/Cargo.toml @@ -17,6 +17,7 @@ publish = false [dependencies] anyhow = { workspace = true } arrow-ipc.workspace = true +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } custom-labels = { workspace = true } datafusion = { workspace = true, features = [ @@ -36,6 +37,7 @@ opentelemetry_sdk.workspace = true parking_lot = { workspace = true } tokio = { workspace = true, features = ["full"] } url = { workspace = true } +vortex = { workspace = true, features = ["object_store", "files", "tokio"] } vortex-bench = { workspace = true } vortex-cuda = { workspace = true, optional = true } vortex-datafusion = { workspace = true } diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 4ee078dd2dd..1801f616889 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -26,6 +26,7 @@ use datafusion_physical_plan::collect; use futures::StreamExt; use parking_lot::Mutex; use tokio::fs::File; +use vortex::scan::api::DataSourceRef; use vortex_bench::Benchmark; use vortex_bench::BenchmarkArg; use vortex_bench::CompactionStrategy; @@ -33,6 +34,7 @@ use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Opt; use vortex_bench::Opts; +use vortex_bench::SESSION; use vortex_bench::conversions::convert_parquet_directory_to_vortex; use vortex_bench::create_benchmark; use vortex_bench::create_output_writer; @@ -220,6 +222,10 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +fn use_scan_api() -> bool { + std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") +} + async fn register_benchmark_tables( session: &SessionContext, benchmark: &B, @@ -227,6 +233,9 @@ async fn register_benchmark_tables( ) -> anyhow::Result<()> { match format { Format::Arrow => register_arrow_tables(session, benchmark).await, + _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { + register_v2_tables(session, benchmark, format).await + } _ => { let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; let file_format = format_to_df_format(format); @@ -265,6 +274,54 @@ async fn register_benchmark_tables( } } +/// Register tables using the V2 `VortexTable` + `MultiFileDataSource` path. +async fn register_v2_tables( + session: &SessionContext, + benchmark: &B, + format: Format, +) -> anyhow::Result<()> { + use vortex::file::filesystem::object_store::ObjectStoreFileSystem; + use vortex::file::multi::MultiFileDataSource; + use vortex::io::session::RuntimeSessionExt; + use vortex::scan::api::DataSource as _; + use vortex_datafusion::v2::VortexTable; + + let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; + + for table in benchmark.table_specs().iter() { + let pattern = benchmark.pattern(table.name, format); + let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern.clone())?; + let store = session + .state() + .runtime_env() + .object_store(table_url.object_store())?; + + let fs: Arc = + Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle())); + let base_prefix = benchmark_base.path().trim_start_matches('/').to_string(); + let fs = fs.with_prefix(base_prefix); + + let glob_pattern = match &pattern { + Some(p) => p.as_str().to_string(), + None => format!("*.{}", format.ext()), + }; + + let multi_ds = MultiFileDataSource::new(SESSION.clone()) + .with_filesystem(fs) + .with_glob_url(glob_pattern) + .build() + .await?; + + let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?); + let data_source: DataSourceRef = Arc::new(multi_ds); + + let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema)); + session.register_table(table.name, table_provider)?; + } + + Ok(()) +} + /// Load Arrow IPC files into in-memory DataFusion tables. async fn register_arrow_tables( session: &SessionContext, diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 5da04c2950c..e0a69d232db 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -68,6 +68,10 @@ struct Args { #[arg(long = "opt", value_delimiter = ',', value_parser = value_parser!(Opt))] options: Vec, + + /// Print EXPLAIN output for each query instead of running benchmarks. + #[arg(long, default_value_t = false)] + explain: bool, } fn main() -> anyhow::Result<()> { @@ -122,6 +126,33 @@ fn main() -> anyhow::Result<()> { })?; } + if args.explain { + for format in &args.formats { + let ctx = DuckClient::new( + &*benchmark, + *format, + args.delete_duckdb_database, + args.threads, + )?; + ctx.register_tables(&*benchmark, *format)?; + + for (query_idx, query) in &filtered_queries { + println!("=== Q{query_idx} [{format}] ==="); + println!("{query}"); + println!(); + let result = ctx.connection.query(&format!("EXPLAIN {query}"))?; + for chunk in result { + let chunk_str = + String::try_from(&chunk).unwrap_or_else(|_| "".to_string()); + println!("{chunk_str}"); + } + println!(); + } + } + + return Ok(()); + } + let mut runner = SqlBenchmarkRunner::new( &*benchmark, Engine::DuckDB, diff --git a/docs/developer-guide/integrations/duckdb.md b/docs/developer-guide/integrations/duckdb.md index 398da841854..060f5fca973 100644 --- a/docs/developer-guide/integrations/duckdb.md +++ b/docs/developer-guide/integrations/duckdb.md @@ -17,10 +17,9 @@ concurrently during the global initialization phase, with a concurrency limit pr the number of DuckDB worker threads to keep the I/O pipeline saturated without overwhelming the system. -A `MultiScan` stream manages the set of active file scans. It prioritises completing in-progress -scans before opening new files, ensuring that DuckDB's execution threads always have data to -consume while background I/O proceeds. File footers are cached to avoid redundant parsing when -the same file appears in multiple queries. +Active file scans are driven concurrently via `try_flatten_unordered`, ensuring that DuckDB's +execution threads always have data to consume while background I/O proceeds. File footers are +cached to avoid redundant parsing when the same file appears in multiple queries. ## Threading Model diff --git a/vortex-array/src/expr/stats/precision.rs b/vortex-array/src/expr/stats/precision.rs index 7f3e13b3c48..2d2ec4b979e 100644 --- a/vortex-array/src/expr/stats/precision.rs +++ b/vortex-array/src/expr/stats/precision.rs @@ -21,6 +21,8 @@ use crate::expr::stats::precision::Precision::Inexact; /// This is statistic specific, for max this will be an upper bound. Meaning that the actual max /// in an array is guaranteed to be less than or equal to the inexact value, but equal to the exact /// value. +/// +// TODO(ngates): should we model Unknown as a variant of Precision? Or have Option>? #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Precision { Exact(T), diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index e50363a016d..8a9583105fc 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -31,7 +31,6 @@ datafusion-physical-plan = { workspace = true } datafusion-pruning = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -moka = { workspace = true, features = ["future"] } object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index f916d99e4e0..4d5f6ee6b72 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -18,6 +18,74 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert: pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result +pub mod vortex_datafusion::v2 + +pub struct vortex_datafusion::v2::DataFusionFooterCache + +impl vortex_datafusion::v2::DataFusionFooterCache + +pub fn vortex_datafusion::v2::DataFusionFooterCache::new(cache: alloc::sync::Arc) -> Self + +impl vortex_file::footer::cache::FooterCache for vortex_datafusion::v2::DataFusionFooterCache + +pub fn vortex_datafusion::v2::DataFusionFooterCache::get(&self, key: &str) -> core::option::Option + +pub fn vortex_datafusion::v2::DataFusionFooterCache::put(&self, key: &str, footer: vortex_file::footer::Footer) + +pub struct vortex_datafusion::v2::VortexScanSource + +impl core::fmt::Debug for vortex_datafusion::v2::VortexScanSource + +pub fn vortex_datafusion::v2::VortexScanSource::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl datafusion_datasource::source::DataSource for vortex_datafusion::v2::VortexScanSource + +pub fn vortex_datafusion::v2::VortexScanSource::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_datafusion::v2::VortexScanSource::eq_properties(&self) -> datafusion_physical_expr::equivalence::properties::EquivalenceProperties + +pub fn vortex_datafusion::v2::VortexScanSource::fetch(&self) -> core::option::Option + +pub fn vortex_datafusion::v2::VortexScanSource::fmt_as(&self, _t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub fn vortex_datafusion::v2::VortexScanSource::open(&self, partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result + +pub fn vortex_datafusion::v2::VortexScanSource::output_partitioning(&self) -> datafusion_physical_expr::partitioning::Partitioning + +pub fn vortex_datafusion::v2::VortexScanSource::partition_statistics(&self, partition: core::option::Option) -> datafusion_common::error::Result + +pub fn vortex_datafusion::v2::VortexScanSource::repartitioned(&self, target_partitions: usize, _repartition_file_min_size: usize, _output_ordering: core::option::Option) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexScanSource::try_pushdown_filters(&self, filters: alloc::vec::Vec>, _config: &datafusion_common::config::ConfigOptions) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexScanSource::try_swapping_with_projection(&self, projection: &datafusion_physical_expr::projection::ProjectionExprs) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexScanSource::with_fetch(&self, limit: core::option::Option) -> core::option::Option> + +pub struct vortex_datafusion::v2::VortexTable + +impl vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::new(data_source: vortex_scan::api::DataSourceRef, session: vortex_session::VortexSession, arrow_schema: arrow_schema::schema::SchemaRef) -> Self + +impl core::fmt::Debug for vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl datafusion_catalog::table::TableProvider for vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_datafusion::v2::VortexTable::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait + +pub fn vortex_datafusion::v2::VortexTable::schema(&self) -> arrow_schema::schema::SchemaRef + +pub fn vortex_datafusion::v2::VortexTable::statistics(&self) -> core::option::Option + +pub fn vortex_datafusion::v2::VortexTable::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> + +pub fn vortex_datafusion::v2::VortexTable::table_type(&self) -> datafusion_expr::table_source::TableType + pub struct vortex_datafusion::DefaultVortexReaderFactory impl vortex_datafusion::DefaultVortexReaderFactory @@ -38,6 +106,8 @@ impl vortex_datafusion::VortexAccessPlan pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder(&self, scan_builder: vortex_scan::scan_builder::ScanBuilder) -> vortex_scan::scan_builder::ScanBuilder where A: 'static + core::marker::Send +pub fn vortex_datafusion::VortexAccessPlan::selection(&self) -> core::option::Option<&vortex_scan::selection::Selection> + impl vortex_datafusion::VortexAccessPlan pub fn vortex_datafusion::VortexAccessPlan::with_selection(self, selection: vortex_scan::selection::Selection) -> Self diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index a3e8e9c21d2..2358e80b748 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -383,7 +383,9 @@ fn can_be_pushed_down_impl(df_expr: &Arc, schema: &Schema) -> } else if expr.downcast_ref::().is_some() || expr.downcast_ref::().is_some() { - true + // Cast can only be pushed into Vortex if it is lossless. + // TODO(ngates): check this, and return accordingly. + false } else if let Some(is_null) = expr.downcast_ref::() { can_be_pushed_down_impl(is_null.arg(), schema) } else if let Some(is_not_null) = expr.downcast_ref::() { diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index f6de47a95c1..133cee13c20 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -10,6 +10,7 @@ use vortex::expr::stats::Precision; mod convert; mod persistent; +pub mod v2; #[cfg(test)] mod tests; diff --git a/vortex-datafusion/src/persistent/access_plan.rs b/vortex-datafusion/src/persistent/access_plan.rs index 2a523ce8e16..9cf4a5824d6 100644 --- a/vortex-datafusion/src/persistent/access_plan.rs +++ b/vortex-datafusion/src/persistent/access_plan.rs @@ -23,6 +23,11 @@ impl VortexAccessPlan { } impl VortexAccessPlan { + /// Returns the selection, if one was set. + pub fn selection(&self) -> Option<&Selection> { + self.selection.as_ref() + } + /// Apply the plan to the scan's builder. pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder where diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 4bbd1e79f56..ebdac020a7f 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -1,33 +1,24 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::mem::size_of; - -use datafusion_common::ScalarValue; use datafusion_execution::cache::cache_manager::FileMetadata; -use vortex::expr::stats::Precision; -use vortex::expr::stats::Stat; use vortex::file::Footer; -use vortex::file::SegmentSpec; use vortex::file::VortexFile; -use vortex::layout::Layout; -use vortex::layout::segments::SegmentId; /// Cached Vortex file metadata for use with DataFusion's [`FileMetadataCache`]. pub struct CachedVortexMetadata { footer: Footer, - memory_size: usize, } impl CachedVortexMetadata { /// Create a new cached metadata entry from a VortexFile. pub fn new(vortex_file: &VortexFile) -> Self { - let footer = vortex_file.footer(); - let memory_size = estimate_footer_size(footer); - Self { - footer: footer.clone(), - memory_size, - } + Self::from_footer(vortex_file.footer().clone()) + } + + /// Create a new cached metadata entry from a [`Footer`]. + pub fn from_footer(footer: Footer) -> Self { + Self { footer } } /// Get the cached footer. @@ -42,7 +33,11 @@ impl FileMetadata for CachedVortexMetadata { } fn memory_size(&self) -> usize { - self.memory_size + self.footer + .approx_byte_size() + // 64KB is not an insane estimate... + // We just want to avoid returning zero and _never_ being evicted from the cache. + .unwrap_or(1024 * 64) } #[allow(clippy::disallowed_types)] @@ -50,34 +45,3 @@ impl FileMetadata for CachedVortexMetadata { Default::default() } } - -/// Approximate the in-memory size of a footer. -fn estimate_footer_size(footer: &Footer) -> usize { - let segments_size = footer.segment_map().len() * size_of::(); - let stats_size = footer - .statistics() - .map(|file_statistics| { - file_statistics - .stats_sets() - .iter() - .map(|s| { - s.iter().count() * (size_of::() + size_of::>()) - }) - .sum::() - }) - .unwrap_or(0); - - let layout_size = footer - .layout() - .depth_first_traversal() - .filter_map(|l| l.ok().map(|l| layout_size(l.as_ref()))) - .sum::(); - - segments_size + stats_size + layout_size -} - -fn layout_size(layout: &dyn Layout) -> usize { - size_of_val(layout.dtype()) - + layout.metadata().len() - + layout.segment_ids().len() * size_of::() -} diff --git a/vortex-datafusion/src/v2/mod.rs b/vortex-datafusion/src/v2/mod.rs new file mode 100644 index 00000000000..1e385ca3c85 --- /dev/null +++ b/vortex-datafusion/src/v2/mod.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! An experimental implementation of the Vortex Scan API for DataFusion. +//! +//! This integration directly implements `TableProvider` + `ExecutionPlan`, bypassing DataFusion's +//! `FileFormat` abstraction. + +mod source; +mod table; + +pub use source::VortexDataSource; +pub use table::VortexTable; diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs new file mode 100644 index 00000000000..4a03a01c575 --- /dev/null +++ b/vortex-datafusion/src/v2/source.rs @@ -0,0 +1,507 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`VortexDataSource`] implements DataFusion's [`DataSource`] trait, deferring scan construction +//! to [`DataSource::open`] so that pushed-down filters and limits are included in the +//! [`ScanRequest`]. A single DataFusion partition is used; Vortex handles internal parallelism +//! by driving splits concurrently via [`TryStreamExt::try_flatten_unordered`]. + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::num::NonZero; +use std::num::NonZeroUsize; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; +use datafusion_common::ColumnStatistics; +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::Statistics; +use datafusion_common::stats::Precision as DFPrecision; +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_datasource::source::DataSource; +use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::DisplayFormatType; +use datafusion_physical_plan::expressions as df_expr; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; +use datafusion_physical_plan::filter_pushdown::PushedDown; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; +use futures::StreamExt; +use futures::TryStreamExt; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowArrayExecutor; +use vortex::dtype::Nullability; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::expr::Expression; +use vortex::expr::and as vx_and; +use vortex::expr::get_item; +use vortex::expr::pack; +use vortex::expr::root; +use vortex::expr::stats::Precision; +use vortex::expr::transform::replace; +use vortex::io::session::RuntimeSessionExt; +use vortex::scan::api::DataSourceRef; +use vortex::scan::api::ScanRequest; +use vortex::session::VortexSession; + +use crate::convert::exprs::DefaultExpressionConvertor; +use crate::convert::exprs::ExpressionConvertor; +use crate::convert::exprs::make_vortex_predicate; + +/// A builder for a [`VortexDataSource`]. +pub struct VortexDataSourceBuilder { + data_source: DataSourceRef, + session: VortexSession, + + arrow_schema: Option, + projection: Option>, +} + +impl VortexDataSourceBuilder { + /// Manually configure an Arrow schema to use when reading from the Vortex source. + /// If not specified, the data source will infer an Arrow schema from the Vortex DType. + /// + /// Note that this schema is not validated against the Vortex DType so any errors will be + /// deferred until read time. + pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self { + self.arrow_schema = Some(arrow_schema); + self + } + + /// Configure an initial projection using top-level field indices. + pub fn with_projection(mut self, indices: Vec) -> Self { + self.projection = Some(indices); + self + } + + /// Configure an initial projection using top-level field indices. + pub fn with_some_projection(mut self, indices: Option>) -> Self { + self.projection = indices; + self + } + + /// Build the [`VortexDataSource`]. + /// + /// FIXME(ngates): Note that due to the DataFusion API, this function eagerly resolves + /// statistics for all projected columns. That said.. we only need to do this for aggregation + /// reductions. Any stats used for pruning are handled internally. We could possibly look + /// at the plan ourselves and decide whether there is any need for the stats? + pub async fn build(self) -> VortexResult { + // The projection expression + let mut projection = root(); + + // Resolve the Arrow schema + let mut arrow_schema = match self.arrow_schema { + Some(schema) => schema, + None => { + let data_type = self.data_source.dtype().to_arrow_dtype()?; + let DataType::Struct(fields) = data_type else { + vortex_bail!("Expected a struct-like DataType, found {}", data_type); + }; + Arc::new(Schema::new(fields)) + } + }; + + let mut column_statistics = + vec![ColumnStatistics::new_unknown(); arrow_schema.fields.len()]; + + // Apply any selection and create a projection expression. + if let Some(indices) = self.projection { + let fields = indices.iter().map(|&i| { + let name = arrow_schema.field(i).name().clone(); + let expr = get_item(name.as_str(), root()); + (name, expr) + }); + + // Update the projection expression + projection = pack(fields, Nullability::NonNullable); + + // Update the arrow schema + arrow_schema = Arc::new(Schema::new( + indices + .iter() + .map(|&i| arrow_schema.field(i).clone()) + .collect::>(), + )); + + // Update the column statistics. + column_statistics = column_statistics.into_iter().take(indices.len()).collect(); + } + + Ok(VortexDataSource { + data_source: self.data_source, + session: self.session, + initial_schema: arrow_schema.clone(), + initial_projection: projection.clone(), + initial_column_stats: column_statistics, + final_projection: projection, + final_schema: arrow_schema, + filter: None, + limit: None, + num_partitions: std::thread::available_parallelism() + .unwrap_or(unsafe { NonZero::new_unchecked(1) }), + }) + } +} + +impl VortexDataSource { + /// Create a builder for a [`VortexDataSource`]. + pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder { + VortexDataSourceBuilder { + data_source, + session, + arrow_schema: None, + projection: None, + } + } +} + +/// A DataFusion [`DataSource`] that defers Vortex scan construction to [`open`](DataSource::open). +/// +/// Holds a [`DataSourceRef`] rather than pre-collected splits, so that filters and limits pushed +/// down by DataFusion's optimizer are included in the [`ScanRequest`]. A single DataFusion +/// partition is exposed; Vortex drives splits concurrently via +/// [`TryStreamExt::try_flatten_unordered`]. +#[derive(Clone)] +pub struct VortexDataSource { + /// The Vortex data source. + data_source: DataSourceRef, + /// Vortex session handle. + session: VortexSession, + + /// The initial Arrow schema of the scan. + initial_schema: SchemaRef, + initial_projection: Expression, + initial_column_stats: Vec, + + /// The projection expression pushed down by [`DataSource::try_swapping_with_projection`] + /// This projection has already been evaluated against the initial_projection. + final_projection: Expression, + final_schema: SchemaRef, + + /// An optional filter expression. + /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down. + filter: Option, + /// An optional row limit populated by [`DataSource::with_fetch`]. + limit: Option, + + /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`]. + /// We use this as a hint for how many splits to execute concurrently in `open()`, but we + /// always declare to DataFusion that we only have a single partition so that we can + /// internally manage concurrency and fix the problem of partition skew. + num_partitions: NonZeroUsize, +} + +impl fmt::Debug for VortexDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexScanSource") + .field("schema", &self.final_schema) + .field("projection", &format!("{}", &self.final_projection)) + .field("filter", &self.filter.as_ref().map(|e| format!("{}", e))) + .field("limit", &self.limit) + .finish() + } +} + +impl DataSource for VortexDataSource { + fn open( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + // VortexScanSource always uses a single partition since Vortex handles parallelism + // and concurrency internally. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "VortexScanSource: expected partition 0, got {partition}" + ))); + } + + // Build the scan request with pushed-down projection, filter, and limit. + // The projection is included so the scan can prune columns at the I/O level. + let scan_request = ScanRequest { + projection: Some(self.final_projection.clone()), + filter: self.filter.clone(), + limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)), + ..Default::default() + }; + + let scan = self + .data_source + .scan(scan_request) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let output_schema = self.final_schema.clone(); + let session = self.session.clone(); + + // Each split.execute() returns a lazy stream whose early polls do preparation + // work (expression resolution, layout traversal, first I/O spawns). We use + // try_flatten_unordered to poll multiple split streams concurrently so that + // the next split is already warm when the current one finishes. + let scan_streams = scan.splits().map(|split_result| { + let split = split_result?; + split.execute() + }); + + let handle = session.handle(); + let stream = scan_streams + .try_flatten_unordered(Some(self.num_partitions.get() * 2)) + .map(move |result| { + let session = session.clone(); + let schema = output_schema.clone(); + handle.spawn_cpu(move || { + let mut ctx = session.create_execution_ctx(); + result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx)) + }) + }) + .buffered(self.num_partitions.get()) + .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e)))); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.final_schema.clone(), + stream, + ))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "VortexScanSource: projection={}", self.final_projection)?; + if let Some(ref filter) = self.filter { + write!(f, ", filter={filter}")?; + } + if let Some(limit) = self.limit { + write!(f, ", limit={limit}")?; + } + Ok(()) + } + + fn repartitioned( + &self, + target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + ) -> DFResult>> { + // Vortex handles parallelism internally — always use a single partition. + let mut this = self.clone(); + this.num_partitions = NonZero::new(target_partitions) + .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?; + Ok(Some(Arc::new(this))) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn eq_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new(self.final_schema.clone()) + } + + fn partition_statistics(&self, _partition: Option) -> DFResult { + // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics, + // and in the future, store the selectivity stats in the session. + let num_rows = estimate_to_df_precision(&self.data_source.row_count_estimate()); + + // FIXME(ngates): byte size should be adjusted for the initial projection... + let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size_estimate()); + + // FIXME(ngates): are column statistics after the projection expression? Or before? + // let column_statistics = + // vec![ColumnStatistics::new_unknown(); self.final_schema.fields().len()]; + let column_statistics = self.initial_column_stats.clone(); + + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + fn with_fetch(&self, limit: Option) -> Option> { + let mut this = self.clone(); + this.limit = limit; + Some(Arc::new(this)) + } + + fn fetch(&self) -> Option { + self.limit + } + + // Note that we're explicitly "swapping" the projection. That means everything we do must + // be computed over the original input schema, rather than the projected output schema. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExprs, + ) -> DFResult>> { + tracing::debug!( + "VortexScanSource: trying to swap with projection: {}", + projection + ); + + let convertor = DefaultExpressionConvertor::default(); + let input_schema = self.initial_schema.as_ref(); + + // Check if all expressions can be pushed down. If any cannot, bail out entirely + // since DataSource::try_swapping_with_projection replaces the ProjectionExec, + // requiring the output schema to match the projection output exactly. + for proj_expr in projection { + if !convertor.can_be_pushed_down(&proj_expr.expr, input_schema) + || has_decimal_binary(&proj_expr.expr, input_schema) + { + return Ok(None); + } + } + + tracing::debug!("Swapping DataFusion projection {:?}", projection); + + // Convert all projection expressions to Vortex. + let mut scan_columns: Vec<(String, Expression)> = Vec::new(); + let mut scan_fields: Vec = Vec::new(); + + for proj_expr in projection { + // We convert the expression, and then swap out the root node + // for the initial_projection. + let vx_expr = convertor.convert(proj_expr.expr.as_ref())?; + let vx_expr = replace(vx_expr, &root(), self.initial_projection.clone()); + + let dt = proj_expr.expr.data_type(input_schema)?; + let nullable = proj_expr.expr.nullable(input_schema)?; + + scan_fields.push(arrow_schema::Field::new(&proj_expr.alias, dt, nullable)); + scan_columns.push((proj_expr.alias.clone(), vx_expr)); + } + + let scan_projection = pack(scan_columns, Nullability::NonNullable); + let scan_output_schema = Arc::new(Schema::new(scan_fields)); + + let mut this = self.clone(); + this.final_projection = scan_projection; + this.final_schema = scan_output_schema; + Ok(Some(Arc::new(this))) + } + + // TODO(ngates): do these filters apply to the initial_schema, or the final_schema? + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &datafusion_common::config::ConfigOptions, + ) -> DFResult>> { + if filters.is_empty() { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![], + )); + } + + let convertor = DefaultExpressionConvertor::default(); + let input_schema = self.initial_schema.as_ref(); + + // Classify each filter: pushable filters are passed into the ScanRequest in open(), + // so we can safely claim PushedDown::Yes for them. + let pushdown_results: Vec = filters + .iter() + .map(|expr| { + if convertor.can_be_pushed_down(expr, input_schema) + && !has_decimal_binary(expr, input_schema) + { + PushedDown::Yes + } else { + PushedDown::No + } + }) + .collect(); + + // If nothing can be pushed down, return early. + if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + pushdown_results, + )); + } + + // Collect the pushable filter expressions. + let pushable: Vec> = filters + .iter() + .zip(pushdown_results.iter()) + .filter_map(|(expr, pushed)| match pushed { + PushedDown::Yes => Some(expr.clone()), + PushedDown::No => None, + }) + .collect(); + + // Convert to Vortex conjunction. + let vortex_pred = make_vortex_predicate(&convertor, &pushable)?; + + // Combine with existing filter. + let new_filter = match (&self.filter, vortex_pred) { + (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)), + (Some(existing), None) => Some(existing.clone()), + (None, Some(new_pred)) => Some(new_pred), + (None, None) => None, + }; + + let mut this = self.clone(); + this.filter = new_filter; + Ok( + FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results) + .with_updated_node(Arc::new(this) as _), + ) + } +} + +/// Check if an expression tree contains decimal binary arithmetic that Vortex cannot handle. +/// +/// DataFusion assumes different decimal types can be coerced, but Vortex expects exact type +/// matches for binary operations. We avoid pushing these down. +fn has_decimal_binary(expr: &Arc, schema: &Schema) -> bool { + use datafusion_common::tree_node::TreeNode; + + let mut found = false; + drop(expr.apply(|node| { + if let Some(binary) = node.as_any().downcast_ref::() + && binary.op().is_numerical_operators() + && let (Ok(l), Ok(r)) = ( + binary.left().data_type(schema), + binary.right().data_type(schema), + ) + && is_decimal(&l) + && is_decimal(&r) + { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })); + found +} + +fn is_decimal(dt: &DataType) -> bool { + matches!( + dt, + DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + ) +} + +/// Convert a Vortex [`Option`] to a DataFusion [`Precision`](DFPrecision). +fn estimate_to_df_precision(est: &Option>) -> DFPrecision { + match est { + Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)), + Some(Precision::Inexact(v)) => { + DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX)) + } + None => DFPrecision::Absent, + } +} diff --git a/vortex-datafusion/src/v2/table.rs b/vortex-datafusion/src/v2/table.rs new file mode 100644 index 00000000000..fdc39c0bd1d --- /dev/null +++ b/vortex-datafusion/src/v2/table.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`VortexTable`] implements DataFusion's [`TableProvider`] trait, providing a direct +//! integration between a Vortex [`DataSource`] and DataFusion's query engine. + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_catalog::TableProvider; +use datafusion_common::ColumnStatistics; +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::Statistics; +use datafusion_common::stats::Precision; +use datafusion_datasource::source::DataSourceExec; +use datafusion_expr::Expr; +use datafusion_expr::TableType; +use datafusion_physical_plan::ExecutionPlan; +use vortex::scan::api::DataSourceRef; +use vortex::session::VortexSession; + +use crate::v2::source::VortexDataSource; + +/// A DataFusion [`TableProvider`] backed by a Vortex [`DataSourceRef`]. +/// +/// Passes the [`DataSourceRef`] to [`VortexDataSource`], which defers scan construction to +/// [`open`](datafusion_datasource::source::DataSource::open) so that pushed-down filters and +/// limits are included in the scan request. +pub struct VortexTable { + data_source: DataSourceRef, + session: VortexSession, + arrow_schema: SchemaRef, +} + +impl fmt::Debug for VortexTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexTable") + .field("schema", &self.arrow_schema) + .finish() + } +} + +impl VortexTable { + /// Creates a new [`VortexTable`] from a Vortex data source and session. + /// + /// The Arrow schema will be used to emit the correct column names and types to DataFusion. + /// The Vortex DType of the data source should be compatible with this Arrow schema. + pub fn new( + data_source: DataSourceRef, + session: VortexSession, + arrow_schema: SchemaRef, + ) -> Self { + Self { + data_source, + session, + arrow_schema, + } + } +} + +#[async_trait] +impl TableProvider for VortexTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.arrow_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + // Unlike filters and limit, we _do_ apply the projection at this stage since DataFusion's + // physical projection expression push-down is still in its early stages. In theory, we + // could also wait to apply the projection until we can push down over the physical plan. + projection: Option<&Vec>, + // We ignore push-down of logical filters since Vortex requires a physical + // expression (i.e. we require that coercion semantics have already been performed by the + // engine). Instead, DataFusion will push down filters through the physical plan via + // the VortexScanSource DataSource. + _filters: &[Expr], + // Similarly for limit, we wait until we can push down over the physical plan. + _limit: Option, + ) -> DFResult> { + let data_source = VortexDataSource::builder(self.data_source.clone(), self.session.clone()) + .with_arrow_schema(self.arrow_schema.clone()) + .with_some_projection(projection.cloned()) + .build() + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(DataSourceExec::from_data_source(data_source)) + } + + /// Returns statistics for the full table, before any projection. + /// To keep this reasonably cheap, we just return cardinality and byte size estimates. + /// We provide full statistics from the physical plan. + fn statistics(&self) -> Option { + let num_rows = match self.data_source.row_count_estimate() { + Some(vortex::expr::stats::Precision::Exact(v)) => { + usize::try_from(v).map(Precision::Exact).unwrap_or_default() + } + _ => Precision::Absent, + }; + + let total_byte_size = match self.data_source.byte_size_estimate() { + Some(vortex::expr::stats::Precision::Exact(v)) => { + usize::try_from(v).map(Precision::Exact).unwrap_or_default() + } + _ => Precision::Absent, + }; + + let column_statistics = + vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()]; + + Some(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } +} diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index d5613202e0e..06420c1e7d7 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -26,6 +26,7 @@ crate-type = ["staticlib", "cdylib", "rlib"] [dependencies] anyhow = { workspace = true } async-compat = { workspace = true } +async-trait = { workspace = true } async-fs = { workspace = true } bitvec = { workspace = true } custom-labels = { workspace = true } diff --git a/vortex-duckdb/src/duckdb/table_function/init.rs b/vortex-duckdb/src/duckdb/table_function/init.rs index c3cd5d0d013..a460d7c25b6 100644 --- a/vortex-duckdb/src/duckdb/table_function/init.rs +++ b/vortex-duckdb/src/duckdb/table_function/init.rs @@ -73,7 +73,7 @@ impl Debug for TableInitInput<'_, T> { .field("table_function", &std::any::type_name::()) .field("column_ids", &self.column_ids()) .field("projection_ids", &self.projection_ids()) - // .field("table_filter_set", &self.table_filter_set()) + .field("table_filter_set", &self.table_filter_set()) .finish() } } diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 95a09cce68b..9789b2e1097 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -139,6 +139,7 @@ pub trait TableFunction: Sized + Debug { // TODO(ngates): there are many more callbacks that can be configured. } +#[derive(Debug)] pub enum Cardinality { /// Completely unknown cardinality. Unknown, diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index c5ccf235903..45ef34b542b 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -21,11 +21,13 @@ pub use crate::duckdb::Database; pub use crate::duckdb::LogicalType; pub use crate::duckdb::Value; use crate::scan::VortexTableFunction; +use crate::scan_api::VortexScanApiTableFunction; pub mod convert; pub mod duckdb; pub mod exporter; mod scan; +mod scan_api; mod utils; #[rustfmt::skip] @@ -47,8 +49,13 @@ static SESSION: LazyLock = /// Note: This also registers extension options. If you want to register options /// separately (e.g., before creating connections), call `register_extension_options` first. pub fn register_table_functions(conn: &Connection) -> VortexResult<()> { - conn.register_table_function::(c"vortex_scan")?; - conn.register_table_function::(c"read_vortex")?; + if std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") { + conn.register_table_function::(c"vortex_scan")?; + conn.register_table_function::(c"read_vortex")?; + } else { + conn.register_table_function::(c"vortex_scan")?; + conn.register_table_function::(c"read_vortex")?; + } conn.register_copy_function::(c"vortex", c"vortex") } diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 6108fd5fcf2..ec74631ab56 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -5,21 +5,16 @@ use std::cmp::max; use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; -use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; use async_compat::Compat; use custom_labels::CURRENT_LABELSET; use futures::FutureExt; -use futures::Stream; use futures::StreamExt; +use futures::TryStreamExt; use futures::stream; -use futures::stream::BoxStream; -use futures::stream::SelectAll; use itertools::Itertools; use num_traits::AsPrimitive; use url::Url; @@ -62,6 +57,7 @@ use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; +use crate::duckdb::ObjectCacheRef; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; use crate::duckdb::VirtualColumnsResult; @@ -105,28 +101,25 @@ impl Debug for VortexBindData { } pub struct VortexGlobalData { - iterator: ThreadSafeIterator)>>, - batch_id: AtomicU64, - ctx: ExecutionCtx, + pub(crate) iterator: ThreadSafeIterator)>>, + pub(crate) batch_id: AtomicU64, + pub(crate) ctx: ExecutionCtx, } pub struct VortexLocalData { - iterator: ThreadSafeIterator)>>, - exporter: Option, + pub(crate) iterator: ThreadSafeIterator)>>, + pub(crate) exporter: Option, // The unique batch id the of the last chunk exported via scan() - batch_id: Option, + pub(crate) batch_id: Option, } #[derive(Debug)] pub struct VortexTableFunction; -/// Extracts the schema from a Vortex file. -fn extract_schema_from_vortex_file( - file: &VortexFile, +/// Extracts DuckDB column names and logical types from a Vortex struct DType. +pub(crate) fn extract_schema_from_dtype( + dtype: &vortex::dtype::DType, ) -> VortexResult<(Vec, Vec)> { - let dtype = file.dtype(); - - // For now, we assume the top-level type to be a struct. let struct_dtype = dtype .as_struct_fields_opt() .ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?; @@ -143,10 +136,20 @@ fn extract_schema_from_vortex_file( Ok((column_names, column_types)) } -/// Creates a projection expression based on the table initialization input. -fn extract_projection_expr(init: &TableInitInput) -> Expression { - let projection_ids = init.projection_ids().unwrap_or(&[]); - let column_ids = init.column_ids(); +/// Extracts the schema from a Vortex file. +fn extract_schema_from_vortex_file( + file: &VortexFile, +) -> VortexResult<(Vec, Vec)> { + extract_schema_from_dtype(file.dtype()) +} + +/// Creates a projection expression from raw projection/column IDs and column names. +pub(crate) fn extract_projection_expr_from( + projection_ids: Option<&[u64]>, + column_ids: &[u64], + column_names: &[String], +) -> Expression { + let projection_ids = projection_ids.unwrap_or(&[]); select( projection_ids @@ -157,8 +160,7 @@ fn extract_projection_expr(init: &TableInitInput) -> Expres val }) .map(|idx| { - init.bind_data() - .column_names + column_names .get(idx) .vortex_expect("prune idx in column names") }) @@ -168,28 +170,31 @@ fn extract_projection_expr(init: &TableInitInput) -> Expres ) } -/// Creates a table filter expression from the table filter set. -fn extract_table_filter_expr( - init: &TableInitInput, +/// Creates a projection expression based on the table initialization input. +fn extract_projection_expr(init: &TableInitInput) -> Expression { + extract_projection_expr_from( + init.projection_ids(), + init.column_ids(), + &init.bind_data().column_names, + ) +} + +/// Creates a table filter expression from raw components. +pub(crate) fn extract_table_filter_expr_from( + table_filter_set: Option, column_ids: &[u64], + column_names: &[String], + dtype: &vortex::dtype::DType, + additional_filter_exprs: &[Expression], ) -> VortexResult> { - let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() - { + let mut table_filter_exprs: HashSet = if let Some(filter) = table_filter_set { filter .into_iter() .map(|(idx, ex)| { let idx_u: usize = idx.as_(); let col_idx: usize = column_ids[idx_u].as_(); - let name = init - .bind_data() - .column_names - .get(col_idx) - .vortex_expect("exists"); - try_from_table_filter( - &ex, - &col(name.as_str()), - init.bind_data().first_file.dtype(), - ) + let name = column_names.get(col_idx).vortex_expect("exists"); + try_from_table_filter(&ex, &col(name.as_str()), dtype) }) .collect::>>>()? .unwrap_or_else(HashSet::new) @@ -197,10 +202,24 @@ fn extract_table_filter_expr( HashSet::new() }; - table_filter_exprs.extend(init.bind_data().filter_exprs.clone()); + table_filter_exprs.extend(additional_filter_exprs.iter().cloned()); Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) } +/// Creates a table filter expression from the table initialization input. +fn extract_table_filter_expr( + init: &TableInitInput, + column_ids: &[u64], +) -> VortexResult> { + extract_table_filter_expr_from( + init.table_filter_set(), + column_ids, + &init.bind_data().column_names, + init.bind_data().first_file.dtype(), + &init.bind_data().filter_exprs, + ) +} + /// Helper function to open a Vortex file from either a local or S3 URL async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult { if url.scheme() == "s3" { @@ -228,8 +247,8 @@ async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult VortexResult<()> { - loop { - if local_state.exporter.is_none() { - let Some(result) = local_state.iterator.next() else { - return Ok(()); - }; - - let (array_result, conversion_cache) = result?; - - let array_result = array_result.optimize_recursive()?; - let array_result = if let Some(array) = array_result.as_opt::() { - array.clone() - } else if let Some(array) = array_result.as_opt::() - && let Some(pack_options) = array.scalar_fn().as_opt::() - { - StructArray::new( - pack_options.names.clone(), - array.children(), - array.len(), - pack_options.nullability.into(), - ) - } else { - array_result - .execute::(&mut global_state.ctx)? - .into_struct() - }; - - local_state.exporter = Some(ArrayExporter::try_new( - &array_result, - &conversion_cache, - &mut global_state.ctx, - )?); - // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); - } - - let exporter = local_state - .exporter - .as_mut() - .vortex_expect("error: exporter missing"); - - let has_more_data = exporter.export(chunk)?; - - if !has_more_data { - // This exporter is fully consumed. - local_state.exporter = None; - local_state.batch_id = None; - } else { - break; - } - } - - assert!(!chunk.is_empty()); - - Ok(()) + scan_shared(local_state, global_state, chunk) } fn init_global(init_input: &TableInitInput) -> VortexResult { @@ -370,65 +336,18 @@ impl TableFunction for VortexTableFunction { let object_cache = client_context.object_cache(); let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) + .map(|v| v.get()) .unwrap_or(1); - - let handle = RUNTIME.handle(); - let first_file = bind_data.first_file.clone(); - let scan_streams = stream::iter(bind_data.file_urls.clone()) - .enumerate() - .map(move |(idx, url)| { - let first_file = first_file.clone(); - let filter_expr = filter_expr.clone(); - let projection_expr = projection_expr.clone(); - let conversion_cache = Arc::new(ConversionCache::new(idx as u64)); - let object_cache = object_cache; - - handle - .spawn(async move { - let vxf = if idx == 0 { - // The first path from `file_paths` is skipped as - // the first file was already opened during bind. - Ok(first_file) - } else { - let cache = FooterCache::new(object_cache); - let entry = cache.entry(url.as_ref()); - let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(url.clone(), options).await?; - entry.put_if_absent(|| file.footer().clone()); - VortexResult::Ok(file) - }?; - - if let Some(ref filter) = filter_expr - && vxf.can_prune(filter)? - { - return Ok(None); - }; - - let scan = vxf - .scan()? - .with_some_filter(filter_expr) - .with_projection(projection_expr) - .with_ordered(false) - .map(move |split| Ok((split, conversion_cache.clone()))) - .into_stream()? - .boxed(); - - Ok(Some(scan)) - }) - .boxed() - }) - // Open up to num_workers * 2 files concurrently so we always have one ready to go. - .buffer_unordered(num_workers * 2) - .filter_map(|result| async move { result.transpose() }); + let iterator = init_global_direct( + bind_data, + projection_expr, + filter_expr, + num_workers, + object_cache, + )?; Ok(VortexGlobalData { - iterator: RUNTIME.block_on_stream_thread_safe(move |_| MultiScan { - streams: scan_streams.boxed(), - streams_finished: false, - select_all: Default::default(), - max_concurrency: num_workers * 2, - }), + iterator, batch_id: AtomicU64::new(0), // TODO(joe): fetch this from somewhere??. ctx: ExecutionCtx::new(VortexSession::default()), @@ -522,71 +441,133 @@ impl TableFunction for VortexTableFunction { } } -struct MultiScan<'rt, T> { - // A stream-of-streams of scan results. - streams: BoxStream<'rt, VortexResult>>>, - streams_finished: bool, - // The SelectAll used to drive the inner streams. - select_all: SelectAll>>, - // The maximum number of streams to be driving concurrently. - max_concurrency: usize, -} +pub(crate) type ScanIterator = ThreadSafeIterator)>>; + +/// Shared scan logic used by both `VortexTableFunction` and `VortexScanApiTableFunction`. +/// +/// Pulls arrays from the thread-safe iterator, converts them to struct arrays, and exports +/// them into DuckDB data chunks. +pub(crate) fn scan_shared( + local_state: &mut VortexLocalData, + global_state: &mut VortexGlobalData, + chunk: &mut DataChunk, +) -> VortexResult<()> { + loop { + if local_state.exporter.is_none() { + let Some(result) = local_state.iterator.next() else { + return Ok(()); + }; -impl<'rt, T: 'rt> Stream for MultiScan<'rt, T> { - type Item = VortexResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = &mut *self; - - loop { - // First, try to pull from the SelectAll of active streams. - // This means we prefer to complete existing work before starting new work, unless it - // all returns Poll::Pending. - match this.select_all.poll_next_unpin(cx) { - Poll::Ready(None) => { - if this.streams_finished { - // All streams are done - return Poll::Ready(None); - } - } - Poll::Ready(Some(result)) => return Poll::Ready(Some(result)), - Poll::Pending => { - // None of the active streams are ready right now. - } - } - - // If all current streams returned `Poll::Pending`, then we try to fetch the next - // stream to drive. The idea here is to ensure our executors are always busy with - // CPU work by driving as many streams necessary to keep the I/O queues full. - if this.select_all.len() < this.max_concurrency { - match Pin::new(&mut this.streams).poll_next(cx) { - Poll::Ready(Some(Ok(stream))) => { - // Add the new stream to SelectAll, and continue the loop to poll it. - this.select_all.push(stream); - continue; - } - Poll::Ready(Some(Err(e))) => { - // Error opening one of the streams - return Poll::Ready(Some(Err(e))); - } - Poll::Ready(None) => { - // No more streams available from the source - this.streams_finished = true; - if this.select_all.is_empty() { - // No active streams, so we're done. - return Poll::Ready(None); - } - return Poll::Pending; - } - Poll::Pending => { - // Can't get more streams right now - return Poll::Pending; - } - } + let (array_result, conversion_cache) = result?; + + let array_result = array_result.optimize_recursive()?; + let array_result = if let Some(array) = array_result.as_opt::() { + array.clone() + } else if let Some(array) = array_result.as_opt::() + && let Some(pack_options) = array.scalar_fn().as_opt::() + { + StructArray::new( + pack_options.names.clone(), + array.children(), + array.len(), + pack_options.nullability.into(), + ) } else { - // We have enough active streams, so just wait for one of them to yield. - return Poll::Pending; - } + array_result + .execute::(&mut global_state.ctx)? + .into_struct() + }; + + local_state.exporter = Some(ArrayExporter::try_new( + &array_result, + &conversion_cache, + &mut global_state.ctx, + )?); + // Relaxed since there is no intra-instruction ordering required. + local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + } + + let exporter = local_state + .exporter + .as_mut() + .vortex_expect("error: exporter missing"); + + let has_more_data = exporter.export(chunk)?; + + if !has_more_data { + // This exporter is fully consumed. + local_state.exporter = None; + local_state.batch_id = None; + } else { + break; } } + + assert!(!chunk.is_empty()); + + Ok(()) +} + +/// Direct ScanBuilder path (existing behavior): opens files lazily via spawned tasks, +/// creates per-file scan streams, and drives them concurrently via +/// [`TryStreamExt::try_flatten_unordered`]. +fn init_global_direct( + bind_data: &VortexBindData, + projection_expr: Expression, + filter_expr: Option, + num_workers: usize, + object_cache: ObjectCacheRef<'static>, +) -> VortexResult { + let handle = RUNTIME.handle(); + let first_file = bind_data.first_file.clone(); + let scan_streams = stream::iter(bind_data.file_urls.clone()) + .enumerate() + .map(move |(idx, url)| { + let first_file = first_file.clone(); + let filter_expr = filter_expr.clone(); + let projection_expr = projection_expr.clone(); + let conversion_cache = Arc::new(ConversionCache::new(idx as u64)); + let object_cache = object_cache; + + handle + .spawn(async move { + let vxf = if idx == 0 { + // The first path from `file_paths` is skipped as + // the first file was already opened during bind. + Ok(first_file) + } else { + let cache = FooterCache::new(object_cache); + let entry = cache.entry(url.as_ref()); + let options = entry.apply_to_file(SESSION.open_options()); + let file = open_file(url.clone(), options).await?; + entry.put_if_absent(|| file.footer().clone()); + VortexResult::Ok(file) + }?; + + if let Some(ref filter) = filter_expr + && vxf.can_prune(filter)? + { + return VortexResult::Ok(None); + }; + + let scan = vxf + .scan()? + .with_some_filter(filter_expr) + .with_projection(projection_expr) + .with_ordered(false) + .map(move |split| Ok((split, conversion_cache.clone()))) + .into_stream()? + .boxed(); + + VortexResult::Ok(Some(scan)) + }) + .boxed() + }) + // Open up to num_workers * 2 files concurrently so we always have one ready to go. + .buffer_unordered(num_workers * 2) + .filter_map(|result| async move { result.transpose() }); + + Ok(RUNTIME.block_on_stream_thread_safe(move |_| { + scan_streams.try_flatten_unordered(Some(num_workers * 2)) + })) } diff --git a/vortex-duckdb/src/scan_api.rs b/vortex-duckdb/src/scan_api.rs new file mode 100644 index 00000000000..72f39cdfba1 --- /dev/null +++ b/vortex-duckdb/src/scan_api.rs @@ -0,0 +1,317 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; + +use custom_labels::CURRENT_LABELSET; +use futures::StreamExt; +use futures::TryStreamExt; +use itertools::Itertools; +use vortex::VortexSessionDefault; +use vortex::array::ExecutionCtx; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::stats::Precision; +use vortex::file::filesystem::FileSystemRef; +use vortex::file::filesystem::object_store::ObjectStoreFileSystem; +use vortex::file::multi::MultiFileDataSource; +use vortex::io::runtime::BlockingRuntime; +use vortex::metrics::tracing::get_global_labels; +use vortex::scan::api::DataSourceRef; +use vortex::scan::api::ScanRequest; +use vortex::session::VortexSession; + +use crate::RUNTIME; +use crate::SESSION; +use crate::convert::try_from_bound_expression; +use crate::duckdb; +use crate::duckdb::BindInput; +use crate::duckdb::BindResult; +use crate::duckdb::Cardinality; +use crate::duckdb::ClientContext; +use crate::duckdb::DataChunk; +use crate::duckdb::LogicalType; +use crate::duckdb::TableFunction; +use crate::duckdb::TableInitInput; +use crate::duckdb::VirtualColumnsResult; +use crate::exporter::ConversionCache; +use crate::scan::EMPTY_COLUMN_IDX; +use crate::scan::EMPTY_COLUMN_NAME; +use crate::scan::VortexGlobalData; +use crate::scan::VortexLocalData; +use crate::scan::extract_projection_expr_from; +use crate::scan::extract_schema_from_dtype; +use crate::scan::extract_table_filter_expr_from; +use crate::scan::scan_shared; +use crate::utils::object_store::s3_store; + +/// Bind data for the scan API table function, holding a [`DataSourceRef`] instead of +/// per-file URLs. +pub struct VortexScanApiBindData { + data_source: DataSourceRef, + filter_exprs: Vec, + column_names: Vec, + column_types: Vec, +} + +impl Clone for VortexScanApiBindData { + fn clone(&self) -> Self { + Self { + data_source: self.data_source.clone(), + // filter_exprs are consumed once in `init_global`. + filter_exprs: vec![], + column_names: self.column_names.clone(), + column_types: self.column_types.clone(), + } + } +} + +impl Debug for VortexScanApiBindData { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexScanApiBindData") + .field("column_names", &self.column_names) + .field("column_types", &self.column_types) + .field( + "filter_exprs", + &self + .filter_exprs + .iter() + .map(|e| e.to_string()) + .collect_vec(), + ) + .finish() + } +} + +/// Creates an S3-backed [`FileSystemRef`] and extracts the glob pattern from the URL path. +/// +/// Returns `None` for local paths (the builder handles local filesystem creation internally). +fn create_s3_filesystem_and_glob(url_glob: &str) -> VortexResult> { + if url_glob.starts_with("s3://") { + let url = url::Url::parse(url_glob)?; + let bucket = url + .host_str() + .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; + let store = s3_store(bucket)?; + let fs: FileSystemRef = Arc::new(ObjectStoreFileSystem::new(store, RUNTIME.handle())); + let glob_pattern = url + .path() + .strip_prefix('/') + .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))? + .to_string(); + Ok(Some((fs, glob_pattern))) + } else if url_glob.starts_with("gs://") { + vortex_bail!("GCS glob expansion not yet implemented") + } else { + Ok(None) + } +} + +#[derive(Debug)] +pub struct VortexScanApiTableFunction; + +impl TableFunction for VortexScanApiTableFunction { + type BindData = VortexScanApiBindData; + type GlobalState = VortexGlobalData; + type LocalState = VortexLocalData; + + const PROJECTION_PUSHDOWN: bool = true; + const FILTER_PUSHDOWN: bool = true; + const FILTER_PRUNE: bool = true; + + fn parameters() -> Vec { + vec![LogicalType::varchar()] + } + + fn bind( + _ctx: &ClientContext, + input: &BindInput, + result: &mut BindResult, + ) -> VortexResult { + let file_glob_string = input + .get_parameter(0) + .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; + + let glob_str = file_glob_string.as_ref().as_string(); + + let data_source: DataSourceRef = RUNTIME.block_on(async { + let mut builder = MultiFileDataSource::new(SESSION.clone()); + if let Some((fs, glob_pattern)) = create_s3_filesystem_and_glob(&glob_str)? { + builder = builder.with_filesystem(fs).with_glob_url(glob_pattern); + } else { + builder = builder.with_glob_url(glob_str.to_string()); + } + let ds = builder.build().await?; + VortexResult::Ok(Arc::new(ds)) + })?; + + let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; + + for (column_name, column_type) in column_names.iter().zip(&column_types) { + result.add_result_column(column_name, column_type); + } + + Ok(VortexScanApiBindData { + data_source, + filter_exprs: vec![], + column_names, + column_types, + }) + } + + fn scan( + _client_context: &ClientContext, + _bind_data: &Self::BindData, + local_state: &mut Self::LocalState, + global_state: &mut Self::GlobalState, + chunk: &mut DataChunk, + ) -> VortexResult<()> { + scan_shared(local_state, global_state, chunk) + } + + fn init_global(init_input: &TableInitInput) -> VortexResult { + let bind_data = init_input.bind_data(); + let projection_expr = extract_projection_expr_from( + init_input.projection_ids(), + init_input.column_ids(), + &bind_data.column_names, + ); + let filter_expr = extract_table_filter_expr_from( + init_input.table_filter_set(), + init_input.column_ids(), + &bind_data.column_names, + bind_data.data_source.dtype(), + &bind_data.filter_exprs, + )?; + + tracing::debug!( + "Global init Vortex scan_api SELECT {} WHERE {}", + &projection_expr, + filter_expr + .as_ref() + .map_or_else(|| "true".to_string(), |f| f.to_string()) + ); + + let request = ScanRequest { + projection: Some(projection_expr), + filter: filter_expr, + ..Default::default() + }; + + let scan = bind_data.data_source.scan(request)?; + let conversion_cache = Arc::new(ConversionCache::new(0)); + + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // Each split.execute() returns a lazy stream whose early polls do preparation + // work (expression resolution, layout traversal, first I/O spawns). We use + // try_flatten_unordered to poll multiple split streams concurrently so that + // the next split is already warm when the current one finishes. + let scan_streams = scan.splits().map(move |split_result| { + let cache = conversion_cache.clone(); + let split = split_result?; + let s = split.execute()?; + VortexResult::Ok(s.map(move |r| Ok((r?, cache.clone()))).boxed()) + }); + + let iterator = RUNTIME.block_on_stream_thread_safe(move |_| { + scan_streams.try_flatten_unordered(Some(num_workers * 2)) + }); + + Ok(VortexGlobalData { + iterator, + batch_id: AtomicU64::new(0), + ctx: ExecutionCtx::new(VortexSession::default()), + }) + } + + fn init_local( + _init: &TableInitInput, + global: &mut Self::GlobalState, + ) -> VortexResult { + unsafe { + use custom_labels::sys; + + if sys::current().is_null() { + let ls = sys::new(0); + sys::replace(ls); + }; + } + + let global_labels = get_global_labels(); + + for (key, value) in global_labels { + CURRENT_LABELSET.set(key, value); + } + + Ok(VortexLocalData { + iterator: global.iterator.clone(), + exporter: None, + batch_id: None, + }) + } + + fn pushdown_complex_filter( + bind_data: &mut Self::BindData, + expr: &duckdb::Expression, + ) -> VortexResult { + tracing::debug!("Attempting to push down filter expression: {expr}"); + let Some(expr) = try_from_bound_expression(expr)? else { + return Ok(false); + }; + bind_data.filter_exprs.push(expr); + + // NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true` + // here to tell DuckDB we've handled the filter. However, DuckDB applies some crude + // cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that + // means by returning false, DuckDB runs an additional filter (a little bit of overhead) + // but tends to end up with a better query plan. + // If we plumb row count estimation into the layout tree, perhaps we could use zone maps + // etc. to return estimates. But this function is probably called too late anyway. Maybe + // we need our own cardinality heuristics. + Ok(false) + } + + fn cardinality(bind_data: &Self::BindData) -> Cardinality { + match bind_data.data_source.row_count_estimate() { + Some(Precision::Exact(v)) => Cardinality::Maximum(v), + Some(Precision::Inexact(v)) => Cardinality::Estimate(v), + None => Cardinality::Unknown, + } + } + + fn partition_data( + _bind_data: &Self::BindData, + _global_init_data: &mut Self::GlobalState, + local_init_data: &mut Self::LocalState, + ) -> VortexResult { + local_init_data + .batch_id + .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) + } + + fn to_string(bind_data: &Self::BindData) -> Option> { + let mut result = Vec::new(); + + result.push(("Function".to_string(), "Vortex Scan (scan API)".to_string())); + + if !bind_data.filter_exprs.is_empty() { + let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f)); + result.push(("Filters".to_string(), filters.join(" /\\\n"))); + } + + Some(result) + } + + fn virtual_columns(_bind_data: &Self::BindData, result: &mut VirtualColumnsResult) { + result.register(EMPTY_COLUMN_IDX, EMPTY_COLUMN_NAME, &LogicalType::bool()); + } +} diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index fc56334bae5..afddf47d01b 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -22,8 +22,10 @@ bytes = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration +glob = { workspace = true } itertools = { workspace = true } kanal = { workspace = true } +moka = { workspace = true, features = ["sync"] } object_store = { workspace = true, optional = true } oneshot.workspace = true parking_lot = { workspace = true } @@ -31,6 +33,7 @@ pin-project-lite = { workspace = true } tokio = { workspace = true, features = ["rt"], optional = true } tracing = { workspace = true } uuid = { workspace = true } # Needed to pickup the "js" feature for wasm targets from the workspace configuration +url = { workspace = true } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-btrblocks = { workspace = true } @@ -46,6 +49,7 @@ vortex-flatbuffers = { workspace = true, features = ["file"] } vortex-fsst = { workspace = true } vortex-io = { workspace = true } vortex-layout = { workspace = true } +vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } @@ -62,6 +66,7 @@ vortex-zstd = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-io = { workspace = true, features = ["tokio"] } +vortex-layout = { workspace = true, features = ["_test-harness"] } vortex-scan = { workspace = true } [target.'cfg(unix)'.dev-dependencies] diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index b40c2b22a7a..31ab6ea76b9 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -1,5 +1,141 @@ pub mod vortex_file +pub mod vortex_file::filesystem + +pub mod vortex_file::filesystem::object_store + +pub struct vortex_file::filesystem::object_store::ObjectStoreFileSystem + +impl vortex_file::filesystem::object_store::ObjectStoreFileSystem + +pub fn vortex_file::filesystem::object_store::ObjectStoreFileSystem::new(store: alloc::sync::Arc, handle: vortex_io::runtime::handle::Handle) -> Self + +impl vortex_file::filesystem::FileSystem for vortex_file::filesystem::object_store::ObjectStoreFileSystem + +pub fn vortex_file::filesystem::object_store::ObjectStoreFileSystem::list(&self, prefix: core::option::Option<&str>) -> futures_core::stream::BoxStream<'_, vortex_error::VortexResult> + +pub fn vortex_file::filesystem::object_store::ObjectStoreFileSystem::open_read<'life0, 'life1, 'async_trait>(&'life0 self, path: &'life1 str) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +pub struct vortex_file::filesystem::FileListing + +pub vortex_file::filesystem::FileListing::path: alloc::string::String + +pub vortex_file::filesystem::FileListing::size: core::option::Option + +impl core::clone::Clone for vortex_file::filesystem::FileListing + +pub fn vortex_file::filesystem::FileListing::clone(&self) -> vortex_file::filesystem::FileListing + +impl core::cmp::Eq for vortex_file::filesystem::FileListing + +impl core::cmp::Ord for vortex_file::filesystem::FileListing + +pub fn vortex_file::filesystem::FileListing::cmp(&self, other: &vortex_file::filesystem::FileListing) -> core::cmp::Ordering + +impl core::cmp::PartialEq for vortex_file::filesystem::FileListing + +pub fn vortex_file::filesystem::FileListing::eq(&self, other: &vortex_file::filesystem::FileListing) -> bool + +impl core::cmp::PartialOrd for vortex_file::filesystem::FileListing + +pub fn vortex_file::filesystem::FileListing::partial_cmp(&self, other: &vortex_file::filesystem::FileListing) -> core::option::Option + +impl core::fmt::Debug for vortex_file::filesystem::FileListing + +pub fn vortex_file::filesystem::FileListing::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::marker::StructuralPartialEq for vortex_file::filesystem::FileListing + +pub trait vortex_file::filesystem::FileSystem: core::marker::Send + core::marker::Sync + 'static + +pub fn vortex_file::filesystem::FileSystem::list(&self, prefix: core::option::Option<&str>) -> futures_core::stream::BoxStream<'_, vortex_error::VortexResult> + +pub fn vortex_file::filesystem::FileSystem::open_read<'life0, 'life1, 'async_trait>(&'life0 self, path: &'life1 str) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +impl vortex_file::filesystem::FileSystem for vortex_file::filesystem::object_store::ObjectStoreFileSystem + +pub fn vortex_file::filesystem::object_store::ObjectStoreFileSystem::list(&self, prefix: core::option::Option<&str>) -> futures_core::stream::BoxStream<'_, vortex_error::VortexResult> + +pub fn vortex_file::filesystem::object_store::ObjectStoreFileSystem::open_read<'life0, 'life1, 'async_trait>(&'life0 self, path: &'life1 str) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +pub type vortex_file::filesystem::FileSystemRef = alloc::sync::Arc + +pub mod vortex_file::multi + +pub use vortex_file::multi::Pattern + +pub enum vortex_file::multi::FileDiscovery + +pub vortex_file::multi::FileDiscovery::Glob(glob::Pattern) + +pub vortex_file::multi::FileDiscovery::ListAll + +pub vortex_file::multi::FileDiscovery::Paths(alloc::vec::Vec) + +impl core::fmt::Debug for vortex_file::multi::FileDiscovery + +pub fn vortex_file::multi::FileDiscovery::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub enum vortex_file::multi::SchemaResolution + +pub vortex_file::multi::SchemaResolution::Exact + +pub vortex_file::multi::SchemaResolution::Union + +impl core::clone::Clone for vortex_file::multi::SchemaResolution + +pub fn vortex_file::multi::SchemaResolution::clone(&self) -> vortex_file::multi::SchemaResolution + +impl core::default::Default for vortex_file::multi::SchemaResolution + +pub fn vortex_file::multi::SchemaResolution::default() -> vortex_file::multi::SchemaResolution + +impl core::fmt::Debug for vortex_file::multi::SchemaResolution + +pub fn vortex_file::multi::SchemaResolution::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_file::multi::MultiFileDataSource + +impl vortex_file::multi::MultiFileDataSource + +pub fn vortex_file::multi::MultiFileDataSource::builder(session: vortex_session::VortexSession, fs: vortex_file::filesystem::FileSystemRef) -> vortex_file::multi::MultiFileDataSourceBuilder + +impl vortex_file::multi::MultiFileDataSource + +pub fn vortex_file::multi::MultiFileDataSource::file_count(&self) -> usize + +impl vortex_scan::api::DataSource for vortex_file::multi::MultiFileDataSource + +pub fn vortex_file::multi::MultiFileDataSource::deserialize_split(&self, data: &[u8], session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_file::multi::MultiFileDataSource::dtype(&self) -> &vortex_dtype::dtype::DType + +pub fn vortex_file::multi::MultiFileDataSource::row_count_estimate(&self) -> vortex_scan::api::Estimate + +pub fn vortex_file::multi::MultiFileDataSource::scan(&self, scan_request: vortex_scan::api::ScanRequest) -> vortex_error::VortexResult + +pub struct vortex_file::multi::MultiFileDataSourceBuilder + +impl vortex_file::multi::MultiFileDataSourceBuilder + +pub async fn vortex_file::multi::MultiFileDataSourceBuilder::build(self) -> vortex_error::VortexResult + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_discovery(self, discovery: vortex_file::multi::FileDiscovery) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_dtype(self, dtype: vortex_dtype::dtype::DType) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_footer_cache(self, cache: vortex_file::FooterCacheRef) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_glob(self, pattern: glob::Pattern) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_open_options(self, f: impl core::ops::function::Fn(vortex_file::VortexOpenOptions) -> vortex_file::VortexOpenOptions + core::marker::Send + core::marker::Sync + 'static) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_paths(self, paths: alloc::vec::Vec) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_prefetch(self, prefetch: usize) -> Self + +pub fn vortex_file::multi::MultiFileDataSourceBuilder::with_schema_resolution(self, resolution: vortex_file::multi::SchemaResolution) -> Self + pub mod vortex_file::segments pub enum vortex_file::segments::ReadEvent @@ -48,6 +184,30 @@ impl vortex_file::segments::RequestMetrics pub fn vortex_file::segments::RequestMetrics::new(metrics_registry: &dyn vortex_metrics::MetricsRegistry, labels: alloc::vec::Vec) -> Self +pub mod vortex_file::v2 + +pub struct vortex_file::v2::FileStatsLayoutReader + +impl vortex_file::v2::FileStatsLayoutReader + +pub fn vortex_file::v2::FileStatsLayoutReader::new(child: vortex_layout::reader::LayoutReaderRef, file_stats: vortex_file::FileStatistics, session: vortex_session::VortexSession) -> Self + +impl vortex_layout::reader::LayoutReader for vortex_file::v2::FileStatsLayoutReader + +pub fn vortex_file::v2::FileStatsLayoutReader::dtype(&self) -> &vortex_dtype::dtype::DType + +pub fn vortex_file::v2::FileStatsLayoutReader::filter_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::name(&self) -> &alloc::sync::Arc + +pub fn vortex_file::v2::FileStatsLayoutReader::projection_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::pruning_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_mask::Mask) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::register_splits(&self, field_mask: &[vortex_dtype::field_mask::FieldMask], row_range: &core::ops::range::Range, splits: &mut alloc::collections::btree::set::BTreeSet) -> vortex_error::VortexResult<()> + +pub fn vortex_file::v2::FileStatsLayoutReader::row_count(&self) -> u64 + pub enum vortex_file::DeserializeStep pub vortex_file::DeserializeStep::Done(vortex_file::Footer) @@ -120,6 +280,8 @@ pub struct vortex_file::Footer impl vortex_file::Footer +pub fn vortex_file::Footer::approx_byte_size(&self) -> core::option::Option + pub fn vortex_file::Footer::deserializer(eof_buffer: vortex_buffer::ByteBuffer, session: vortex_session::VortexSession) -> vortex_file::FooterDeserializer pub fn vortex_file::Footer::dtype(&self) -> &vortex_dtype::dtype::DType @@ -172,6 +334,22 @@ pub fn vortex_file::FooterSerializer::with_exclude_dtype(self, exclude_dtype: bo pub fn vortex_file::FooterSerializer::with_offset(self, offset: u64) -> Self +pub struct vortex_file::InMemoryFooterCache + +impl vortex_file::InMemoryFooterCache + +pub fn vortex_file::InMemoryFooterCache::new() -> Self + +impl core::default::Default for vortex_file::InMemoryFooterCache + +pub fn vortex_file::InMemoryFooterCache::default() -> vortex_file::InMemoryFooterCache + +impl vortex_file::FooterCache for vortex_file::InMemoryFooterCache + +pub fn vortex_file::InMemoryFooterCache::get(&self, key: &str) -> core::option::Option + +pub fn vortex_file::InMemoryFooterCache::put(&self, key: &str, footer: vortex_file::Footer) + pub struct vortex_file::SegmentSpec pub vortex_file::SegmentSpec::alignment: vortex_buffer::alignment::Alignment @@ -338,6 +516,18 @@ pub const vortex_file::VORTEX_FILE_EXTENSION: &str pub static vortex_file::ALLOWED_ENCODINGS: std::sync::lazy_lock::LazyLock +pub trait vortex_file::FooterCache: core::marker::Send + core::marker::Sync + +pub fn vortex_file::FooterCache::get(&self, key: &str) -> core::option::Option + +pub fn vortex_file::FooterCache::put(&self, key: &str, footer: vortex_file::Footer) + +impl vortex_file::FooterCache for vortex_file::InMemoryFooterCache + +pub fn vortex_file::InMemoryFooterCache::get(&self, key: &str) -> core::option::Option + +pub fn vortex_file::InMemoryFooterCache::put(&self, key: &str, footer: vortex_file::Footer) + pub trait vortex_file::OpenOptionsSessionExt: vortex_array::session::ArraySessionExt + vortex_layout::session::LayoutSessionExt + vortex_io::session::RuntimeSessionExt pub fn vortex_file::OpenOptionsSessionExt::open_options(&self) -> vortex_file::VortexOpenOptions @@ -351,3 +541,5 @@ pub fn vortex_file::WriteOptionsSessionExt::write_options(&self) -> vortex_file: impl vortex_file::WriteOptionsSessionExt for S pub fn vortex_file::register_default_encodings(session: &mut vortex_session::VortexSession) + +pub type vortex_file::FooterCacheRef = alloc::sync::Arc diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 370fd386dab..33f15b6a583 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -25,12 +25,15 @@ use vortex_layout::LayoutReader; use vortex_layout::segments::SegmentSource; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; +use vortex_scan::api::DataSourceRef; +use vortex_scan::layout::LayoutReaderDataSource; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; use crate::FileStatistics; use crate::footer::Footer; use crate::pruning::extract_relevant_file_stats_as_struct_row; +use crate::v2::FileStatsLayoutReader; /// Represents a Vortex file, providing access to its metadata and content. /// @@ -87,6 +90,25 @@ impl VortexFile { .new_reader("".into(), segment_source, &self.session) } + /// Create a [`DataSource`](vortex_scan::api::DataSource) from this file for scanning. + /// + /// Wraps the file's layout reader with [`FileStatsLayoutReader`] (when file-level + /// statistics are available) and [`LayoutReaderDataSource`]. + pub fn data_source(&self) -> VortexResult { + let mut reader = self.layout_reader()?; + if let Some(stats) = self.file_stats().cloned() { + reader = Arc::new(FileStatsLayoutReader::new( + reader, + stats, + self.session.clone(), + )); + } + Ok(Arc::new(LayoutReaderDataSource::new( + reader, + self.session.clone(), + ))) + } + /// Initiate a scan of the file, returning a builder for configuring the scan. pub fn scan(&self) -> VortexResult> { Ok(ScanBuilder::new( diff --git a/vortex-file/src/filesystem/mod.rs b/vortex-file/src/filesystem/mod.rs new file mode 100644 index 00000000000..0fd8aa9cc18 --- /dev/null +++ b/vortex-file/src/filesystem/mod.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A filesystem abstraction for discovering and opening Vortex files. +//! +//! [`FileSystem`] provides a storage-agnostic interface for listing files under a prefix +//! and opening them for reading. Implementations can target local filesystems, object stores, +//! or any other storage backend. + +#[cfg(feature = "object_store")] +pub mod object_store; +mod prefix; + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use vortex_error::VortexResult; +use vortex_io::VortexReadAt; + +/// A file discovered during listing, with its path and optional size in bytes. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct FileListing { + /// The file path (relative to the filesystem root). + pub path: String, + /// The file size in bytes, if known from the listing metadata. + pub size: Option, +} + +/// A reference-counted handle to a file system. +pub type FileSystemRef = Arc; + +/// A storage-agnostic filesystem interface for discovering and reading Vortex files. +/// +/// Implementations handle the details of a particular storage backend (local disk, S3, GCS, etc.) +/// while consumers work through this uniform interface. +/// +/// # Future Work +/// +/// An `open_write` method will be added once [`VortexWrite`](vortex_io::VortexWrite) is +/// object-safe (it currently uses `impl Future` return types which prevent trait-object usage). +#[async_trait] +pub trait FileSystem: Send + Sync + 'static { + /// List files whose paths start with `prefix`. + /// + /// Returns a stream of [`FileListing`] entries. The stream may yield entries in any order; + /// callers should sort if deterministic ordering is required. + fn list(&self, prefix: Option<&str>) -> BoxStream<'_, VortexResult>; + + /// Open a file for reading at the given path. + async fn open_read(&self, path: &str) -> VortexResult>; +} diff --git a/vortex-file/src/filesystem/object_store.rs b/vortex-file/src/filesystem/object_store.rs new file mode 100644 index 00000000000..5a555e6da37 --- /dev/null +++ b/vortex-file/src/filesystem/object_store.rs @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`FileSystem`] implementation backed by an [`ObjectStore`]. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::ObjectStore; +use object_store::path::Path; +use vortex_error::VortexResult; +use vortex_io::VortexReadAt; +use vortex_io::file::object_store::ObjectStoreSource; +use vortex_io::runtime::Handle; + +use crate::filesystem::FileListing; +use crate::filesystem::FileSystem; + +/// A [`FileSystem`] backed by an [`ObjectStore`]. +/// +// TODO(ngates): we could consider spawning a driver task inside this file system such that we can +// apply concurrency limits to the overall object store, rather than on a per-file basis. +pub struct ObjectStoreFileSystem { + store: Arc, + handle: Handle, +} + +impl ObjectStoreFileSystem { + /// Create a new filesystem backed by the given object store and runtime handle. + pub fn new(store: Arc, handle: Handle) -> Self { + Self { store, handle } + } +} + +#[async_trait] +impl FileSystem for ObjectStoreFileSystem { + fn list(&self, prefix: Option<&str>) -> BoxStream<'_, VortexResult> { + let path = prefix.map(Path::from); + self.store + .list(path.as_ref()) + .map(|result| { + result + .map(|meta| FileListing { + path: meta.location.to_string(), + size: Some(meta.size), + }) + .map_err(Into::into) + }) + .boxed() + } + + async fn open_read(&self, path: &str) -> VortexResult> { + Ok(Arc::new(ObjectStoreSource::new( + self.store.clone(), + path.into(), + self.handle.clone(), + ))) + } +} diff --git a/vortex-file/src/filesystem/prefix.rs b/vortex-file/src/filesystem/prefix.rs new file mode 100644 index 00000000000..d33dda62a21 --- /dev/null +++ b/vortex-file/src/filesystem/prefix.rs @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream::BoxStream; +use vortex_error::VortexResult; +use vortex_io::VortexReadAt; + +use crate::filesystem::FileListing; +use crate::filesystem::FileSystem; +use crate::filesystem::FileSystemRef; + +/// A [`FileSystem`] decorator that roots all operations under a given prefix. +/// +/// Paths returned from [`list`](FileSystem::list) are relative to the prefix, and paths +/// passed to [`open_read`](FileSystem::open_read) are automatically prefixed. +pub struct PrefixFileSystem { + inner: FileSystemRef, + prefix: String, +} + +impl PrefixFileSystem { + pub fn new(inner: FileSystemRef, prefix: String) -> Self { + // Normalize to always have a trailing slash for clean concatenation. + let prefix = format!("{}/", prefix.trim_matches('/')); + Self { inner, prefix } + } +} + +#[async_trait] +impl FileSystem for PrefixFileSystem { + fn list(&self, prefix: Option<&str>) -> BoxStream<'_, VortexResult> { + let full_prefix = match prefix { + Some(suffix) => format!("{}{}", self.prefix, suffix.trim_start_matches('/')), + None => self.prefix.clone(), + }; + + let strip_prefix = self.prefix.clone(); + self.inner + .list(Some(&full_prefix)) + .map(move |result| { + result.map(|mut listing| { + listing.path = listing + .path + .strip_prefix(&strip_prefix) + .unwrap_or(&listing.path) + .to_string(); + listing + }) + }) + .boxed() + } + + async fn open_read(&self, path: &str) -> VortexResult> { + self.inner + .open_read(&format!("{}{}", self.prefix, path.trim_start_matches('/'))) + .await + } +} + +impl dyn FileSystem + '_ { + /// Create a new filesystem that applies the given prefix to all operations on this filesystem. + pub fn with_prefix(self: Arc, prefix: String) -> FileSystemRef { + Arc::new(PrefixFileSystem::new(self, prefix)) + } +} diff --git a/vortex-file/src/footer/mod.rs b/vortex-file/src/footer/mod.rs index 78b0681dfb2..bc923c88d7a 100644 --- a/vortex-file/src/footer/mod.rs +++ b/vortex-file/src/footer/mod.rs @@ -48,6 +48,8 @@ pub struct Footer { statistics: Option, // The specific arrays used within the file, in the order they were registered. array_ctx: ArrayContext, + // The approximate size of the footer in bytes, used for caching and memory management. + approx_byte_size: Option, } impl Footer { @@ -62,9 +64,15 @@ impl Footer { segments, statistics, array_ctx, + approx_byte_size: None, } } + pub(crate) fn with_approx_byte_size(mut self, approx_byte_size: usize) -> Self { + self.approx_byte_size = Some(approx_byte_size); + self + } + /// Read the [`Footer`] from a flatbuffer. pub(crate) fn from_flatbuffer( footer_bytes: FlatBuffer, @@ -73,6 +81,8 @@ impl Footer { statistics: Option, session: &VortexSession, ) -> VortexResult { + let approx_byte_size = footer_bytes.len() + layout_bytes.len(); + let fb_footer = root::(&footer_bytes)?; // Create a LayoutContext from the registry. @@ -118,6 +128,7 @@ impl Footer { segments, statistics, array_ctx, + approx_byte_size: Some(approx_byte_size), }) } @@ -146,6 +157,11 @@ impl Footer { self.root_layout.row_count() } + /// Returns the approximate size of the footer in bytes, used for caching and memory management. + pub fn approx_byte_size(&self) -> Option { + self.approx_byte_size + } + /// Returns a serializer for this footer. pub fn into_serializer(self) -> FooterSerializer { FooterSerializer::new(self) diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 38fe554c664..df23af21bae 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -92,7 +92,9 @@ mod counting; mod file; +pub mod filesystem; mod footer; +pub mod multi; mod open; mod pruning; mod read; @@ -100,6 +102,7 @@ pub mod segments; mod strategy; #[cfg(test)] mod tests; +pub mod v2; mod writer; pub use file::*; diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs new file mode 100644 index 00000000000..ec1af6c3ddb --- /dev/null +++ b/vortex-file/src/multi/mod.rs @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Builder for constructing a [`MultiDataSource`] from multiple Vortex files. + +mod session; + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::TryStreamExt; +use session::MultiFileSessionExt; +use tracing::debug; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_scan::api::DataSource; +use vortex_scan::api::DataSourceRef; +use vortex_scan::multi::DataSourceFactory; +use vortex_scan::multi::MultiDataSource; +use vortex_session::VortexSession; + +use crate::OpenOptionsSessionExt; +use crate::VortexOpenOptions; +use crate::filesystem::FileListing; +use crate::filesystem::FileSystem; +use crate::filesystem::FileSystemRef; + +/// A builder that discovers multiple Vortex files from a glob pattern and constructs a +/// [`MultiDataSource`] to scan them as a single data source. +/// +/// The primary interface is [`with_glob_url`](Self::with_glob_url), which accepts a glob +/// pattern (optionally prefixed with `file://`). For non-local filesystems (S3, GCS, etc.), +/// callers must also provide a [`FileSystem`] via [`with_filesystem`](Self::with_filesystem). +/// +/// # Examples +/// +/// ```ignore +/// // Local files — filesystem is auto-created: +/// let ds = MultiFileDataSource::new(session) +/// .with_glob_url("/data/warehouse/*.vortex") +/// .build() +/// .await?; +/// +/// // S3 — caller provides the filesystem: +/// let ds = MultiFileDataSource::new(session) +/// .with_filesystem(s3_fs) +/// .with_glob_url("prefix/*.vortex") +/// .build() +/// .await?; +/// ``` +pub struct MultiFileDataSource { + session: VortexSession, + fs: Option, + glob_url: Option, + open_options_fn: Arc VortexOpenOptions + Send + Sync>, +} + +impl MultiFileDataSource { + /// Create a new [`MultiFileDataSource`] builder. + pub fn new(session: VortexSession) -> Self { + Self { + session, + fs: None, + glob_url: None, + open_options_fn: Arc::new(|opts| opts), + } + } + + /// Set the glob URL for file discovery. + /// + /// For local files, this can be a bare path (`/data/*.vortex`) or a `file://` URL. + /// For remote filesystems, this should be the glob pattern relative to the filesystem + /// root — the filesystem must be provided via [`with_filesystem`](Self::with_filesystem). + pub fn with_glob_url(mut self, glob_url: impl Into) -> Self { + self.glob_url = Some(glob_url.into()); + self + } + + /// Set the filesystem to use for file discovery and reading. + /// + /// Required for non-local URLs (S3, GCS, etc.). For `file://` or bare path URLs, + /// a local filesystem is created automatically if none is provided. + pub fn with_filesystem(mut self, fs: FileSystemRef) -> Self { + self.fs = Some(fs); + self + } + + /// Customize [`VortexOpenOptions`] applied to each file. + /// + /// Use this to configure segment caches, metrics registries, or other per-file options. + pub fn with_open_options( + mut self, + f: impl Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync + 'static, + ) -> Self { + self.open_options_fn = Arc::new(f); + self + } + + /// Build the [`MultiDataSource`]. + /// + /// Discovers files via glob, opens the first file eagerly to determine the schema, + /// and creates lazy factories for the remaining files. + pub async fn build(mut self) -> VortexResult { + let glob_url = self + .glob_url + .take() + .ok_or_else(|| vortex_err!("MultiFileDataSource requires a glob URL"))?; + + let (fs, glob_pattern) = self.resolve_filesystem(&glob_url)?; + + let files: Vec = glob_files(fs.as_ref(), &glob_pattern)? + .try_collect() + .await?; + + if files.is_empty() { + vortex_bail!("No files matched the glob pattern '{}'", glob_url); + } + + let file_count = files.len(); + debug!(file_count, glob = %glob_url, "discovered files"); + + // Open first file eagerly for dtype. + let first_file = + open_file(&fs, &files[0], &self.session, self.open_options_fn.as_ref()).await?; + let first_ds = first_file.data_source()?; + + let factories: Vec> = files[1..] + .iter() + .map(|f| { + Arc::new(VortexFileFactory { + fs: fs.clone(), + file: f.clone(), + session: self.session.clone(), + open_options_fn: self.open_options_fn.clone(), + }) as Arc + }) + .collect(); + + let inner = MultiDataSource::lazy(first_ds, factories, &self.session); + + debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); + + Ok(inner) + } + + /// Resolve the filesystem from the builder configuration and glob URL. + fn resolve_filesystem(&self, glob_url: &str) -> VortexResult<(FileSystemRef, String)> { + if let Some(ref fs) = self.fs { + return Ok((fs.clone(), glob_url.to_string())); + } + + // Auto-create local filesystem for file:// or bare paths. + let glob_pattern = if let Some(stripped) = glob_url.strip_prefix("file://") { + stripped.to_string() + } else if glob_url.starts_with('/') + || glob_url.starts_with('.') + || !glob_url.contains("://") + { + glob_url.to_string() + } else { + vortex_bail!( + "A filesystem must be provided for non-local URLs. \ + Use .with_filesystem() for URL: {}", + glob_url + ); + }; + + let fs = create_local_filesystem(&self.session)?; + + // object_store paths are always relative (no leading `/`), so strip it from + // the glob pattern to ensure consistent matching with listed file paths. + let glob_pattern = glob_pattern + .strip_prefix('/') + .unwrap_or(&glob_pattern) + .to_string(); + + Ok((fs, glob_pattern)) + } +} + +/// Creates a local filesystem backed by `object_store::local::LocalFileSystem`. +// TODO(ngates): create a native file system without an object_store dependency. +// Turns out it's not a trivial change because we have always used object_store with its own +// coalescing and concurrency configs, so we need to re-tune for local disk. +#[cfg(feature = "object_store")] +fn create_local_filesystem(session: &VortexSession) -> VortexResult { + use vortex_io::session::RuntimeSessionExt; + + use crate::filesystem::object_store::ObjectStoreFileSystem; + + let store = Arc::new(object_store::local::LocalFileSystem::default()); + let fs: FileSystemRef = Arc::new(ObjectStoreFileSystem::new(store, session.handle())); + Ok(fs) +} + +#[cfg(not(feature = "object_store"))] +fn create_local_filesystem(_session: &VortexSession) -> VortexResult { + vortex_bail!( + "The 'object_store' feature is required for automatic local filesystem creation. \ + Either enable the feature or provide a filesystem via .with_filesystem()." + ); +} + +/// Open a single Vortex file, checking the session's footer cache. +async fn open_file( + fs: &FileSystemRef, + file: &FileListing, + session: &VortexSession, + open_options_fn: &(dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync), +) -> VortexResult { + debug!(path = %file.path, "opening vortex file"); + + // Build open options. The DashMap Ref from multi_file() must not live across an await, + // so we scope the cache lookup in a block. + let options = { + let mut options = open_options_fn(session.open_options()); + if let Some(size) = file.size { + options = options.with_file_size(size); + } + if let Some(footer) = session.multi_file().get_footer(&file.path) { + options = options.with_footer(footer); + } + options + }; + + let source = fs.open_read(&file.path).await?; + let vortex_file = options.open(source).await?; + + // Store footer in cache (scoped to avoid holding the Ref across subsequent code). + session + .multi_file() + .put_footer(&file.path, vortex_file.footer().clone()); + Ok(vortex_file) +} + +/// A [`DataSourceFactory`] that lazily opens a single Vortex file. +struct VortexFileFactory { + fs: FileSystemRef, + file: FileListing, + session: VortexSession, + open_options_fn: Arc VortexOpenOptions + Send + Sync>, +} + +#[async_trait] +impl DataSourceFactory for VortexFileFactory { + async fn open(&self) -> VortexResult> { + let file = open_file( + &self.fs, + &self.file, + &self.session, + self.open_options_fn.as_ref(), + ) + .await?; + Ok(Some(file.data_source()?)) + } +} + +// --------------------------------------------------------------------------- +// Internal glob helpers (moved from filesystem/glob.rs) +// --------------------------------------------------------------------------- + +/// Expand a glob pattern against a filesystem, returning matching files as a stream. +/// +/// Splits the pattern at the first glob character (`*`, `?`, or `[`) and uses the +/// preceding path prefix to narrow the `FileSystem::list()` call. The full glob pattern +/// is then applied as a filter over the returned entries. +fn glob_files<'a>( + fs: &'a dyn FileSystem, + pattern: &str, +) -> VortexResult>> { + validate_glob(pattern)?; + + let glob_pattern = glob::Pattern::new(pattern) + .map_err(|e| vortex_err!("Invalid glob pattern '{}': {}", pattern, e))?; + + let prefix = list_prefix(pattern); + let listing_prefix = if prefix.is_empty() { + None + } else { + Some(prefix.trim_end_matches('/')) + }; + + debug!(?listing_prefix, pattern, "expanding glob"); + + let stream = fs + .list(listing_prefix) + .try_filter(move |listing| { + let matches = glob_pattern.matches(&listing.path); + async move { matches } + }) + .into_stream(); + + Ok(Box::pin(stream)) +} + +/// Returns the list prefix for a path pattern containing glob characters. +/// +/// Finds the first glob character and returns everything up to and including the last `/` +/// before it. For example, `data/2023/*/logs/*.log` returns `data/2023/`. +fn list_prefix(pattern: &str) -> &str { + let glob_pos = pattern.find(['*', '?', '[']).unwrap_or(pattern.len()); + match pattern[..glob_pos].rfind('/') { + Some(slash_pos) => &pattern[..=slash_pos], + None => "", + } +} + +/// Validates that a glob pattern does not contain escaped glob characters. +fn validate_glob(pattern: &str) -> VortexResult<()> { + for escape_pattern in ["\\*", "\\?", "\\["] { + if pattern.contains(escape_pattern) { + vortex_bail!( + "Escaped glob characters are not allowed in patterns. Found '{}' in: {}", + escape_pattern, + pattern + ); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_list_prefix_with_wildcard_in_filename() { + assert_eq!(list_prefix("folder/file*.txt"), "folder/"); + } + + #[test] + fn test_list_prefix_with_wildcard_in_directory() { + assert_eq!(list_prefix("folder/*/file.txt"), "folder/"); + } + + #[test] + fn test_list_prefix_nested_directories() { + assert_eq!(list_prefix("data/2023/*/logs/*.log"), "data/2023/"); + } + + #[test] + fn test_list_prefix_wildcard_at_root() { + assert_eq!(list_prefix("*.txt"), ""); + } + + #[test] + fn test_list_prefix_no_wildcards() { + assert_eq!( + list_prefix("folder/subfolder/file.txt"), + "folder/subfolder/" + ); + } + + #[test] + fn test_list_prefix_question_mark() { + assert_eq!(list_prefix("folder/file?.txt"), "folder/"); + } + + #[test] + fn test_list_prefix_bracket() { + assert_eq!(list_prefix("folder/file[abc].txt"), "folder/"); + } + + #[test] + fn test_list_prefix_empty() { + assert_eq!(list_prefix(""), ""); + } + + #[test] + fn test_validate_glob_valid() -> VortexResult<()> { + validate_glob("path/*.txt")?; + validate_glob("path/to/**/*.vortex")?; + Ok(()) + } + + #[test] + fn test_validate_glob_escaped_asterisk() { + assert!(validate_glob("path\\*.txt").is_err()); + } + + #[test] + fn test_validate_glob_escaped_question() { + assert!(validate_glob("path\\?.txt").is_err()); + } + + #[test] + fn test_validate_glob_escaped_bracket() { + assert!(validate_glob("path\\[test].txt").is_err()); + } +} diff --git a/vortex-file/src/multi/session.rs b/vortex-file/src/multi/session.rs new file mode 100644 index 00000000000..769e01eda21 --- /dev/null +++ b/vortex-file/src/multi/session.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session extension for multi-file scanning, providing a shared footer cache. + +use std::fmt; +use std::fmt::Debug; + +use vortex_session::SessionExt; + +use crate::footer::Footer; + +/// Session state for multi-file scanning. +/// +/// Provides a shared, in-memory footer cache so that repeated scans over the same files +/// avoid redundant footer I/O. The cache is bounded by entry count and lives as long as +/// the [`VortexSession`](vortex_session::VortexSession). +/// +/// # Future Work +/// +/// Consider generalizing this cache into [`VortexOpenOptions`](crate::VortexOpenOptions) so +/// that single-file opens also benefit from session-level footer caching. +pub(super) struct MultiFileSession { + footer_cache: moka::sync::Cache, +} + +impl Default for MultiFileSession { + fn default() -> Self { + Self { + footer_cache: moka::sync::Cache::builder().max_capacity(10_000).build(), + } + } +} + +impl Debug for MultiFileSession { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MultiFileSession") + .field("footer_cache_entry_count", &self.footer_cache.entry_count()) + .finish() + } +} + +impl MultiFileSession { + /// Retrieve a cached footer for the given file path. + pub fn get_footer(&self, path: &str) -> Option