From 96ab7422e7b510c6f18b3ed4e57f327e38e8bbe7 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 13 Jan 2026 14:31:56 +0000 Subject: [PATCH 1/7] Trying to refactor some IO traits to improve extendability Signed-off-by: Adam Gutglick --- vortex-bench/src/datasets/taxi_data.rs | 2 +- vortex-bench/src/datasets/tpch_l_comment.rs | 2 +- vortex-bench/src/downloadable_dataset.rs | 2 +- vortex-bench/src/public_bi.rs | 2 +- vortex-bench/src/random_access/take.rs | 2 +- vortex-cxx/src/read.rs | 2 +- vortex-duckdb/src/scan.rs | 2 +- vortex-file/src/open.rs | 53 ++-- vortex-file/src/segments/source.rs | 6 +- vortex-file/tests/test_write_table.rs | 2 +- vortex-io/src/file/buffer.rs | 70 +----- vortex-io/src/file/driver.rs | 22 +- vortex-io/src/file/mod.rs | 2 +- vortex-io/src/file/object_store.rs | 211 ++++++++-------- vortex-io/src/file/read/mod.rs | 25 +- vortex-io/src/file/read/source.rs | 54 ---- vortex-io/src/file/std_file.rs | 107 ++++---- vortex-io/src/read.rs | 262 ++++++++++++-------- vortex-io/src/runtime/handle.rs | 27 +- vortex-io/src/runtime/mod.rs | 6 +- vortex-io/src/runtime/smol.rs | 2 +- vortex-io/src/runtime/tests.rs | 106 ++++---- vortex-io/src/runtime/tokio.rs | 5 +- vortex-python/src/file.rs | 2 +- vortex-tui/src/browse/app.rs | 2 +- vortex-tui/src/inspect.rs | 2 +- vortex-tui/src/segments.rs | 4 +- vortex-tui/src/tree.rs | 4 +- vortex/examples/tracing_vortex.rs | 4 +- vortex/src/lib.rs | 6 +- 30 files changed, 439 insertions(+), 559 deletions(-) delete mode 100644 vortex-io/src/file/read/source.rs diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index fc71b8c4879..74db55437fa 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -48,7 +48,7 @@ pub async fn fetch_taxi_data() -> Result { let vortex_data = taxi_data_vortex().await?; Ok(SESSION .open_options() - .open(vortex_data) + .open_path(vortex_data) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/datasets/tpch_l_comment.rs b/vortex-bench/src/datasets/tpch_l_comment.rs index c46872bd421..0a1da7aea8a 100644 --- a/vortex-bench/src/datasets/tpch_l_comment.rs +++ b/vortex-bench/src/datasets/tpch_l_comment.rs @@ -73,7 +73,7 @@ impl Dataset for TPCHLCommentChunked { .to_string_lossy() .as_ref(), )? { - let file = SESSION.open_options().open(path?).await?; + let file = SESSION.open_options().open_path(path?).await?; let file_chunks: Vec<_> = file .scan()? .with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable)) diff --git a/vortex-bench/src/downloadable_dataset.rs b/vortex-bench/src/downloadable_dataset.rs index 00ccc30e3c0..712dcec6049 100644 --- a/vortex-bench/src/downloadable_dataset.rs +++ b/vortex-bench/src/downloadable_dataset.rs @@ -79,7 +79,7 @@ impl Dataset for DownloadableDataset { Ok(SESSION .open_options() - .open(vortex.as_path()) + .open_path(vortex.as_path()) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index 65b90551c50..ca1f8df1471 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -467,7 +467,7 @@ impl Dataset for PBIBenchmark { Ok(SESSION .open_options() - .open(path.as_path()) + .open_path(path.as_path()) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index a1f531c01ad..1f251c2d8db 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -134,7 +134,7 @@ impl RandomAccessor for ParquetRandomAccessor { async fn take_vortex(reader: impl AsRef, indices: Buffer) -> anyhow::Result { let array = SESSION .open_options() - .open(reader.as_ref()) + .open_path(reader.as_ref()) .await? .scan()? .with_row_indices(indices) diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index a3968b9f68d..3fda15d6dda 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -46,7 +46,7 @@ impl VortexFile { /// File operations - using blocking operations for simplicity /// TODO(xinyu): object store (see vortex-ffi) pub(crate) fn open_file(path: &str) -> Result> { - let file = RUNTIME.block_on(SESSION.open_options().open(std::path::Path::new(path)))?; + let file = RUNTIME.block_on(SESSION.open_options().open_path(std::path::Path::new(path)))?; Ok(Box::new(VortexFile { inner: file })) } diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index cbcd247d39b..f48ce764fc5 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -222,7 +222,7 @@ async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult(self, source: S) -> VortexResult { + pub async fn open(self, source: VortexReadRef) -> VortexResult { let handle = self.session.handle(); let metrics = self.metrics.clone(); - self.open_read_at(handle.open_read(source, metrics)?).await + self.open_read_at(handle.open_read(source, metrics)).await + } + + /// Open a Vortex file from a filesystem path. + #[cfg(not(target_arch = "wasm32"))] + pub async fn open_path(self, path: impl AsRef) -> VortexResult { + use vortex_io::file::std_file::FileSource; + let handle = self.session.handle(); + let source: VortexReadRef = Arc::new(FileSource::open(path, handle)?); + self.open(source).await } /// Open a Vortex file from an in-memory buffer. @@ -155,11 +164,11 @@ impl VortexOpenOptions { ) } - /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. + /// An API for opening a [`VortexFile`] using any [`VortexRead`] implementation. /// /// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`]. - pub async fn open_read_at(self, read: R) -> VortexResult { - let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics)); + pub async fn open_read_at(self, read: R) -> VortexResult { + let read = Arc::new(InstrumentedRead::new(Arc::new(read), &self.metrics)); let footer = if let Some(footer) = self.footer { footer @@ -195,7 +204,7 @@ impl VortexOpenOptions { }) } - async fn read_footer(&self, read: Arc) -> VortexResult