Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-bench/src/datasets/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn fetch_taxi_data() -> Result<ArrayRef> {
let vortex_data = taxi_data_vortex().await?;
Ok(SESSION
.open_options()
.open(vortex_data)
.open_path(vortex_data)
.await?
.scan()?
.into_array_stream()?
Expand Down
2 changes: 1 addition & 1 deletion vortex-bench/src/datasets/tpch_l_comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Dataset for TPCHLCommentChunked {
.to_string_lossy()
.as_ref(),
)? {
let file = SESSION.open_options().open(path?).await?;
let file = SESSION.open_options().open_path(path?).await?;
let file_chunks: Vec<_> = file
.scan()?
.with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable))
Expand Down
2 changes: 1 addition & 1 deletion vortex-bench/src/downloadable_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Dataset for DownloadableDataset {

Ok(SESSION
.open_options()
.open(vortex.as_path())
.open_path(vortex.as_path())
.await?
.scan()?
.into_array_stream()?
Expand Down
2 changes: 1 addition & 1 deletion vortex-bench/src/public_bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl Dataset for PBIBenchmark {

Ok(SESSION
.open_options()
.open(path.as_path())
.open_path(path.as_path())
.await?
.scan()?
.into_array_stream()?
Expand Down
2 changes: 1 addition & 1 deletion vortex-bench/src/random_access/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl RandomAccessor for ParquetRandomAccessor {
async fn take_vortex(reader: impl AsRef<Path>, indices: Buffer<u64>) -> anyhow::Result<ArrayRef> {
let array = SESSION
.open_options()
.open(reader.as_ref())
.open_path(reader.as_ref())
.await?
.scan()?
.with_row_indices(indices)
Expand Down
2 changes: 1 addition & 1 deletion vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl VortexFile {
/// File operations - using blocking operations for simplicity
/// TODO(xinyu): object store (see vortex-ffi)
pub(crate) fn open_file(path: &str) -> Result<Box<VortexFile>> {
let file = RUNTIME.block_on(SESSION.open_options().open(std::path::Path::new(path)))?;
let file = RUNTIME.block_on(SESSION.open_options().open_path(std::path::Path::new(path)))?;
Ok(Box::new(VortexFile { inner: file }))
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult<VortexF
.to_file_path()
.map_err(|_| vortex_err!("Invalid file URL: {url}"))?;

options.open(path).await
options.open_path(path).await
}
}

Expand Down
8 changes: 4 additions & 4 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ async-trait = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std", "async-await"] }
# Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration
getrandom_v03 = { workspace = true }
getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration
itertools = { workspace = true }
kanal = { workspace = true }
object_store = { workspace = true, optional = true }
oneshot.workspace = true
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
tokio = { workspace = true, features = ["rt"], optional = true }
tracing = { workspace = true }
# Needed to pickup the "js" feature for wasm targets from the workspace configuration
uuid = { workspace = true }
uuid = { workspace = true } # Needed to pickup the "js" feature for wasm targets from the workspace configuration
vortex-alp = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ mod file;
mod footer;
mod open;
mod pruning;
mod read;
pub mod segments;
mod strategy;
#[cfg(test)]
Expand Down
52 changes: 33 additions & 19 deletions vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_io::InstrumentedReadAt;
use vortex_io::VortexReadAt;
use vortex_io::file::IntoReadSource;
use vortex_io::session::RuntimeSessionExt;
use vortex_layout::segments::NoOpSegmentCache;
use vortex_layout::segments::SegmentCache;
Expand Down Expand Up @@ -139,10 +138,17 @@ impl VortexOpenOptions {
/// out-of-the-box performance. The underlying I/O system will continue to be optimised for
/// different file systems and object stores so we encourage users to use this method
/// whenever possible and file issues if they encounter problems.
pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
pub async fn open(self, source: Arc<dyn VortexReadAt>) -> VortexResult<VortexFile> {
Copy link
Contributor

@gatesn gatesn Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you don't want open<R: TryInto<VortexReadAt>>(read: R) and open_read_at(Arc<dyn VortexReadAt>)?

Should help reduce the public API diff

Copy link
Contributor Author

@AdamGS AdamGS Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its going to be a while before we release again, mind if I do it in a followup PR? There are a few things I want to try here but would love to merge this so I can move on to the python thing that came up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also shouldn't be TryInto, if it has some issue getting started it'll just fail.

self.open_read(source).await
}

/// Open a Vortex file from a filesystem path.
#[cfg(not(target_arch = "wasm32"))]
pub async fn open_path(self, path: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
use vortex_io::file::std_file::FileReadAdapter;
let handle = self.session.handle();
let metrics = self.metrics.clone();
self.open_read_at(handle.open_read(source, metrics)?).await
let source = Arc::new(FileReadAdapter::open(path, handle)?);
self.open(source).await
}

/// Open a Vortex file from an in-memory buffer.
Expand All @@ -151,14 +157,14 @@ impl VortexOpenOptions {
block_on(
self.with_initial_read_size(0)
.without_segment_cache()
.open_read_at(buffer.into()),
.open_read(buffer.into()),
)
}

/// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
///
/// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`].
pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
async fn open_read<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));

let footer = if let Some(footer) = self.footer {
Expand All @@ -175,10 +181,12 @@ impl VortexOpenOptions {
self.metrics.clone(),
));

// Create a segment source backed by the VortexReadAt implementation.
let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
// Create a segment source backed by the VortexRead implementation.
let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open(
footer.segment_map().clone(),
read,
self.session.handle(),
self.metrics.clone(),
)));

// Wrap up the segment source to first resolve segments from the initial read cache.
Expand Down Expand Up @@ -272,13 +280,15 @@ impl VortexOpenOptions {
object_store: &Arc<dyn object_store::ObjectStore>,
path: &str,
) -> VortexResult<VortexFile> {
use vortex_io::file::object_store::ObjectStoreReadSource;
use vortex_io::file::object_store::ObjectStoreSource;

self.open(ObjectStoreReadSource::new(
let handle = self.session.handle();
let source = Arc::new(ObjectStoreSource::new(
object_store.clone(),
path.into(),
))
.await
handle,
));
self.open(source).await
}
}

Expand All @@ -299,14 +309,18 @@ mod tests {
use super::*;
use crate::WriteOptionsSessionExt;

// Define CountingReadAt struct
struct CountingReadAt<R> {
// Define CountingRead struct
struct CountingRead<R> {
inner: R,
total_read: Arc<AtomicUsize>,
first_read_len: Arc<AtomicUsize>,
}

impl<R: VortexReadAt> VortexReadAt for CountingReadAt<R> {
impl<R: VortexReadAt> VortexReadAt for CountingRead<R> {
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.inner.size()
}

fn read_at(
&self,
offset: u64,
Expand All @@ -323,8 +337,8 @@ mod tests {
self.inner.read_at(offset, length, alignment)
}

fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.inner.size()
fn concurrency(&self) -> usize {
self.inner.concurrency()
}
}

Expand Down Expand Up @@ -364,14 +378,14 @@ mod tests {

let total_read = Arc::new(AtomicUsize::new(0));
let first_read_len = Arc::new(AtomicUsize::new(0));
let reader = CountingReadAt {
let reader = CountingRead {
inner: buffer,
total_read: total_read.clone(),
first_read_len: first_read_len.clone(),
};

// Open the file
let _file = session.open_options().open_read_at(reader).await.unwrap();
let _file = session.open_options().open_read(reader).await.unwrap();

// Assert that we read approximately the postscript size, not 1MB
let first = first_read_len.load(Ordering::Relaxed);
Expand Down
Loading
Loading