diff --git a/Cargo.lock b/Cargo.lock index dea869a9b55..5e0136688fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10376,7 +10376,9 @@ dependencies = [ "itertools 0.14.0", "kanal", "object_store", + "oneshot", "parking_lot", + "pin-project-lite", "tokio", "tracing", "uuid", diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index fc71b8c4879..74db55437fa 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -48,7 +48,7 @@ pub async fn fetch_taxi_data() -> Result { let vortex_data = taxi_data_vortex().await?; Ok(SESSION .open_options() - .open(vortex_data) + .open_path(vortex_data) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/datasets/tpch_l_comment.rs b/vortex-bench/src/datasets/tpch_l_comment.rs index c46872bd421..0a1da7aea8a 100644 --- a/vortex-bench/src/datasets/tpch_l_comment.rs +++ b/vortex-bench/src/datasets/tpch_l_comment.rs @@ -73,7 +73,7 @@ impl Dataset for TPCHLCommentChunked { .to_string_lossy() .as_ref(), )? { - let file = SESSION.open_options().open(path?).await?; + let file = SESSION.open_options().open_path(path?).await?; let file_chunks: Vec<_> = file .scan()? .with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable)) diff --git a/vortex-bench/src/downloadable_dataset.rs b/vortex-bench/src/downloadable_dataset.rs index 00ccc30e3c0..712dcec6049 100644 --- a/vortex-bench/src/downloadable_dataset.rs +++ b/vortex-bench/src/downloadable_dataset.rs @@ -79,7 +79,7 @@ impl Dataset for DownloadableDataset { Ok(SESSION .open_options() - .open(vortex.as_path()) + .open_path(vortex.as_path()) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index 65b90551c50..ca1f8df1471 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -467,7 +467,7 @@ impl Dataset for PBIBenchmark { Ok(SESSION .open_options() - .open(path.as_path()) + .open_path(path.as_path()) .await? .scan()? .into_array_stream()? diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index a1f531c01ad..1f251c2d8db 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -134,7 +134,7 @@ impl RandomAccessor for ParquetRandomAccessor { async fn take_vortex(reader: impl AsRef, indices: Buffer) -> anyhow::Result { let array = SESSION .open_options() - .open(reader.as_ref()) + .open_path(reader.as_ref()) .await? .scan()? .with_row_indices(indices) diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index a3968b9f68d..3fda15d6dda 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -46,7 +46,7 @@ impl VortexFile { /// File operations - using blocking operations for simplicity /// TODO(xinyu): object store (see vortex-ffi) pub(crate) fn open_file(path: &str) -> Result> { - let file = RUNTIME.block_on(SESSION.open_options().open(std::path::Path::new(path)))?; + let file = RUNTIME.block_on(SESSION.open_options().open_path(std::path::Path::new(path)))?; Ok(Box::new(VortexFile { inner: file })) } diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index cbcd247d39b..f48ce764fc5 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -222,7 +222,7 @@ async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult(self, source: S) -> VortexResult { + pub async fn open(self, source: Arc) -> VortexResult { + self.open_read(source).await + } + + /// Open a Vortex file from a filesystem path. + #[cfg(not(target_arch = "wasm32"))] + pub async fn open_path(self, path: impl AsRef) -> VortexResult { + use vortex_io::file::std_file::FileReadAdapter; let handle = self.session.handle(); - let metrics = self.metrics.clone(); - self.open_read_at(handle.open_read(source, metrics)?).await + let source = Arc::new(FileReadAdapter::open(path, handle)?); + self.open(source).await } /// Open a Vortex file from an in-memory buffer. @@ -151,14 +157,14 @@ impl VortexOpenOptions { block_on( self.with_initial_read_size(0) .without_segment_cache() - .open_read_at(buffer.into()), + .open_read(buffer.into()), ) } /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. /// /// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`]. - pub async fn open_read_at(self, read: R) -> VortexResult { + async fn open_read(self, read: R) -> VortexResult { let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics)); let footer = if let Some(footer) = self.footer { @@ -175,10 +181,12 @@ impl VortexOpenOptions { self.metrics.clone(), )); - // Create a segment source backed by the VortexReadAt implementation. - let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new( + // Create a segment source backed by the VortexRead implementation. + let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open( footer.segment_map().clone(), read, + self.session.handle(), + self.metrics.clone(), ))); // Wrap up the segment source to first resolve segments from the initial read cache. @@ -272,13 +280,15 @@ impl VortexOpenOptions { object_store: &Arc, path: &str, ) -> VortexResult { - use vortex_io::file::object_store::ObjectStoreReadSource; + use vortex_io::file::object_store::ObjectStoreSource; - self.open(ObjectStoreReadSource::new( + let handle = self.session.handle(); + let source = Arc::new(ObjectStoreSource::new( object_store.clone(), path.into(), - )) - .await + handle, + )); + self.open(source).await } } @@ -299,14 +309,18 @@ mod tests { use super::*; use crate::WriteOptionsSessionExt; - // Define CountingReadAt struct - struct CountingReadAt { + // Define CountingRead struct + struct CountingRead { inner: R, total_read: Arc, first_read_len: Arc, } - impl VortexReadAt for CountingReadAt { + impl VortexReadAt for CountingRead { + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.inner.size() + } + fn read_at( &self, offset: u64, @@ -323,8 +337,8 @@ mod tests { self.inner.read_at(offset, length, alignment) } - fn size(&self) -> BoxFuture<'static, VortexResult> { - self.inner.size() + fn concurrency(&self) -> usize { + self.inner.concurrency() } } @@ -364,14 +378,14 @@ mod tests { let total_read = Arc::new(AtomicUsize::new(0)); let first_read_len = Arc::new(AtomicUsize::new(0)); - let reader = CountingReadAt { + let reader = CountingRead { inner: buffer, total_read: total_read.clone(), first_read_len: first_read_len.clone(), }; // Open the file - let _file = session.open_options().open_read_at(reader).await.unwrap(); + let _file = session.open_options().open_read(reader).await.unwrap(); // Assert that we read approximately the postscript size, not 1MB let first = first_read_len.load(Ordering::Relaxed); diff --git a/vortex-io/src/file/driver.rs b/vortex-file/src/read/driver.rs similarity index 93% rename from vortex-io/src/file/driver.rs rename to vortex-file/src/read/driver.rs index 6d6519de60f..4ee92547a03 100644 --- a/vortex-io/src/file/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -1,22 +1,26 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors + use std::collections::BTreeMap; use std::collections::BTreeSet; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use futures::Stream; use pin_project_lite::pin_project; use vortex_error::VortexExpect; +use vortex_io::CoalesceConfig; +use vortex_metrics::Counter; +use vortex_metrics::Histogram; use vortex_metrics::VortexMetrics; -use crate::file::read::CoalesceWindow; -use crate::file::read::CoalescedRequest; -use crate::file::read::IoRequest; -use crate::file::read::ReadEvent; -use crate::file::read::ReadRequest; -use crate::file::read::RequestId; +use crate::read::ReadRequest; +use crate::read::RequestId; +use crate::read::request::CoalescedRequest; +use crate::read::request::IoRequest; +use crate::segments::ReadEvent; pin_project! { /// A stream that performs coalescing and prioritization of I/O requests. @@ -32,7 +36,7 @@ pin_project! { #[pin] events: S, inner_done: bool, - coalesce_window: Option, + coalesce_window: Option, state: State, } } @@ -42,7 +46,7 @@ impl IoRequestStream { // expanding the request by coalesce_distance, but stop if we hit max_read_size. pub(crate) fn new( events: S, - coalesce_window: Option, + coalesce_window: Option, metrics: VortexMetrics, ) -> Self where @@ -112,7 +116,23 @@ struct State { requests_by_offset: BTreeSet<(u64, RequestId)>, // Metrics for tracking I/O request patterns - metrics: VortexMetrics, + metrics: StateMetrics, +} + +struct StateMetrics { + individual_requests: Arc, + coalesced_requests: Arc, + num_requests_coalesced: Arc, +} + +impl StateMetrics { + fn new(registry: VortexMetrics) -> Self { + Self { + individual_requests: registry.counter("io.requests.individual"), + coalesced_requests: registry.counter("io.requests.coalesced"), + num_requests_coalesced: registry.histogram("io.requests.coalesced.num_coalesced"), + } + } } impl State { @@ -121,7 +141,7 @@ impl State { requests: BTreeMap::new(), polled_requests: BTreeMap::new(), requests_by_offset: BTreeSet::new(), - metrics, + metrics: StateMetrics::new(metrics), } } @@ -152,19 +172,19 @@ impl State { } /// Get the next request, if any. - fn next(&mut self, coalesce_window: Option<&CoalesceWindow>) -> Option { + fn next(&mut self, coalesce_window: Option<&CoalesceConfig>) -> Option { match coalesce_window { None => self.next_uncoalesced().map(|request| { - self.metrics.counter("io.requests.individual").inc(); + self.metrics.individual_requests.inc(); IoRequest::new_single(request) }), Some(window) => self.next_coalesced(window).map(|request| { match request.requests.len() { - 1 => self.metrics.counter("io.requests.individual").inc(), + 1 => self.metrics.individual_requests.inc(), num_requests => { - self.metrics.counter("io.requests.coalesced").inc(); + self.metrics.coalesced_requests.inc(); self.metrics - .histogram("io.requests.coalesced.num_coalesced") + .num_requests_coalesced .update(num_requests as i64); } }; @@ -186,7 +206,7 @@ impl State { None } - fn next_coalesced(&mut self, window: &CoalesceWindow) -> Option { + fn next_coalesced(&mut self, window: &CoalesceConfig) -> Option { // Find the next valid request in priority order let first_req = self.next_uncoalesced()?; @@ -295,9 +315,7 @@ mod tests { use vortex_error::VortexResult; use super::*; - use crate::file::IoRequestInner; - use crate::file::ReadEvent; - use crate::file::ReadRequest; + use crate::read::request::IoRequestInner; fn create_request( id: usize, @@ -319,7 +337,7 @@ mod tests { async fn collect_outputs( events: Vec, - coalesce_window: Option, + coalesce_window: Option, ) -> Vec { let event_stream = stream::iter(events); let metrics = VortexMetrics::default(); @@ -380,7 +398,7 @@ mod tests { let outputs = collect_outputs( events, - Some(CoalesceWindow { + Some(CoalesceConfig { distance: 0, max_size: 1024, }), @@ -411,7 +429,7 @@ mod tests { // Gap is 5, window is 6 - should coalesce let outputs = collect_outputs( events, - Some(CoalesceWindow { + Some(CoalesceConfig { distance: 6, max_size: 1024, }), @@ -510,7 +528,7 @@ mod tests { let outputs = collect_outputs( events, - Some(CoalesceWindow { + Some(CoalesceConfig { distance: 60, max_size: 1024, }), @@ -555,7 +573,7 @@ mod tests { let outputs = collect_outputs( events, - Some(CoalesceWindow { + Some(CoalesceConfig { distance: 5, max_size: 1024, }), @@ -587,7 +605,7 @@ mod tests { let metrics = VortexMetrics::default(); let io_stream = IoRequestStream::new( event_stream, - Some(CoalesceWindow { + Some(CoalesceConfig { distance: 5, max_size: 1024, }), diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs new file mode 100644 index 00000000000..a812b81f63b --- /dev/null +++ b/vortex-file/src/read/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod driver; +mod request; + +pub(crate) use driver::IoRequestStream; +pub(crate) use request::ReadRequest; +pub(crate) use request::RequestId; diff --git a/vortex-io/src/file/read/request.rs b/vortex-file/src/read/request.rs similarity index 85% rename from vortex-io/src/file/read/request.rs rename to vortex-file/src/read/request.rs index 1b0d7d89a93..695c597ed4e 100644 --- a/vortex-io/src/file/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -14,7 +14,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; /// An I/O request, either a single read or a coalesced set of reads. -pub struct IoRequest(IoRequestInner); +pub(crate) struct IoRequest(IoRequestInner); impl IoRequest { pub(crate) fn new_single(request: ReadRequest) -> Self { @@ -25,12 +25,6 @@ impl IoRequest { IoRequest(IoRequestInner::Coalesced(request)) } - // For debugging purposes. - #[cfg(test)] - pub(crate) fn inner(&self) -> &IoRequestInner { - &self.0 - } - /// Returns the starting offset of this request within the file. pub fn offset(&self) -> u64 { match &self.0 { @@ -39,25 +33,6 @@ impl IoRequest { } } - /// Returns the byte range this request within the file. - pub fn range(&self) -> Range { - match &self.0 { - IoRequestInner::Single(r) => { - r.offset - ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64")) - } - IoRequestInner::Coalesced(r) => r.range.clone(), - } - } - - /// Returns true if this request has zero length. - pub fn is_empty(&self) -> bool { - match &self.0 { - IoRequestInner::Single(r) => r.length == 0, - IoRequestInner::Coalesced(r) => r.range.start == r.range.end, - } - } - /// Returns the length of this request in bytes. pub fn len(&self) -> usize { match &self.0 { @@ -75,15 +50,6 @@ impl IoRequest { } } - /// Returns true if all callbacks associated with this request have been dropped. - /// In other words, there is no one waiting for the result of this request. - pub fn is_canceled(&self) -> bool { - match &self.0 { - IoRequestInner::Single(req) => req.callback.is_closed(), - IoRequestInner::Coalesced(req) => req.requests.iter().all(|r| r.callback.is_closed()), - } - } - /// Resolves the request with the given result. pub fn resolve(self, result: VortexResult) { match self.0 { @@ -93,6 +59,25 @@ impl IoRequest { } } +// Testing functionality +#[cfg(test)] +impl IoRequest { + pub(crate) fn inner(&self) -> &IoRequestInner { + &self.0 + } + + /// Returns the byte range this request within the file. + pub(crate) fn range(&self) -> Range { + match &self.0 { + IoRequestInner::Single(r) => { + r.offset + ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64")) + } + IoRequestInner::Coalesced(r) => r.range.clone(), + } + } +} + pub(crate) enum IoRequestInner { Single(ReadRequest), Coalesced(CoalescedRequest), @@ -100,7 +85,7 @@ pub(crate) enum IoRequestInner { pub(crate) type RequestId = usize; -pub(crate) struct ReadRequest { +pub struct ReadRequest { pub(crate) id: RequestId, pub(crate) offset: u64, pub(crate) length: usize, diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 58517505ce1..a75af2cc027 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -1,28 +1,107 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::task; +use std::task::Context; +use std::task::Poll; use futures::FutureExt; -use futures::TryFutureExt; +use futures::StreamExt; +use futures::channel::mpsc; use vortex_array::buffer::BufferHandle; -use vortex_error::VortexError; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::VortexReadAt; +use vortex_io::runtime::Handle; use vortex_layout::segments::SegmentFuture; use vortex_layout::segments::SegmentId; use vortex_layout::segments::SegmentSource; +use vortex_metrics::VortexMetrics; use crate::SegmentSpec; +use crate::read::IoRequestStream; +use crate::read::ReadRequest; +use crate::read::RequestId; +#[derive(Debug)] +pub enum ReadEvent { + Request(ReadRequest), + Polled(RequestId), + Dropped(RequestId), +} + +/// A [`SegmentSource`] for file-like IO. +/// ## Coalescing and Pre-fetching +/// +/// It is important to understand the semantics of the read futures returned by a [`FileSegmentSource`]. +/// Under the hood, each instance is backed by a stream that services read requests by +/// applying coalescing and concurrency constraints. +/// +/// Each read future has four states: +/// * `registered` - the read future has been created, but not yet polled. +/// * `requested` - the read future has been polled. +/// * `in-flight` - the read request has been sent to the underlying storage system. +/// * `resolved` - the read future has completed and resolved a result. +/// +/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to +/// be coalesced with other requests. +/// +/// If a read future is dropped, it will be canceled if possible. This depends on the current +/// state of the request, as well as whether the underlying storage system supports cancellation. +/// +/// I/O requests will be processed in the order they are `registered`, however coalescing may mean +/// other registered requests are lumped together into a single I/O operation. pub struct FileSegmentSource { segments: Arc<[SegmentSpec]>, - read: Arc, + /// A queue for sending read request events to the I/O stream. + events: mpsc::UnboundedSender, + /// The next read request ID. + next_id: Arc, } impl FileSegmentSource { - pub fn new(segments: Arc<[SegmentSpec]>, read: Arc) -> Self { - Self { segments, read } + pub fn open( + segments: Arc<[SegmentSpec]>, + source: Arc, + handle: Handle, + metrics: VortexMetrics, + ) -> Self { + let (send, recv) = mpsc::unbounded(); + + let coalesce_config = source.coalesce_config(); + let concurrency = source.concurrency(); + + let drive_fut = async move { + let stream = + IoRequestStream::new(StreamExt::boxed(recv), coalesce_config, metrics).boxed(); + + stream + .map(move |req| { + let source = source.clone(); + async move { + let result = source + .read_at(req.offset(), req.len(), req.alignment()) + .await; + req.resolve(result); + } + }) + .buffer_unordered(concurrency) + .collect::<()>() + .await + }; + + handle.spawn(drive_fut).detach(); + + Self { + segments, + events: send, + next_id: Arc::new(AtomicUsize::new(0)), + } } } @@ -31,10 +110,35 @@ impl SegmentSource for FileSegmentSource { // We eagerly create the read future here assuming the behaviour of [`FileRead`], where // coalescing becomes effective prior to the future being polled. let maybe_fut = self.segments.get(*id as usize).cloned().map(|spec| { - self.read - .clone() - .read_at(spec.offset, spec.length as usize, spec.alignment) - .map_err(VortexError::from) + let SegmentSpec { + offset, + length, + alignment, + } = spec; + + let (send, recv) = oneshot::channel(); + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let event = ReadEvent::Request(ReadRequest { + id, + offset, + length: length as usize, + alignment, + callback: send, + }); + + // If we fail to submit the event, we create a future that has failed. + if let Err(e) = self.events.unbounded_send(event) { + return async move { Err(vortex_err!("Failed to submit read request: {e}")) } + .boxed(); + } + + ReadFuture { + id, + recv, + polled: false, + events: self.events.clone(), + } + .boxed() }); async move { @@ -46,3 +150,41 @@ impl SegmentSource for FileSegmentSource { .boxed() } } + +/// A future that resolves a read request from a [`FileRead`]. +/// +/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching. +/// If dropped, the read request will be canceled where possible. +struct ReadFuture { + id: usize, + recv: oneshot::Receiver>, + polled: bool, + events: mpsc::UnboundedSender, +} + +impl Future for ReadFuture { + type Output = VortexResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.polled { + self.polled = true; + // Notify the I/O stream that this request has been polled. + if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) { + return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))); + } + } + + match task::ready!(self.recv.poll_unpin(cx)) { + Ok(result) => Poll::Ready(result), + Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))), + } + } +} + +impl Drop for ReadFuture { + fn drop(&mut self) { + // When the FileHandle is dropped, we can send a shutdown event to the I/O stream. + // If the I/O stream has already been dropped, this will fail silently. + drop(self.events.unbounded_send(ReadEvent::Dropped(self.id))); + } +} diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index ea105ce5810..388e52294e2 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -87,7 +87,7 @@ async fn test_file_roundtrip() { .expect("write"); let bytes = ByteBuffer::from(bytes); - let vxf = SESSION.open_options().open(bytes).await.expect("open"); + let vxf = SESSION.open_options().open_buffer(bytes).expect("open"); // Read the data back let stream = vxf diff --git a/vortex-io/src/file/buffer.rs b/vortex-io/src/file/buffer.rs deleted file mode 100644 index efd25686724..00000000000 --- a/vortex-io/src/file/buffer.rs +++ /dev/null @@ -1,71 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; -use std::sync::LazyLock; - -use futures::FutureExt; -use futures::StreamExt; -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use vortex_buffer::ByteBuffer; -use vortex_buffer::ByteBufferMut; -use vortex_error::VortexExpect; -use vortex_error::VortexResult; -use vortex_error::vortex_err; - -use crate::file::IoRequest; -use crate::file::read::CoalesceWindow; -use crate::file::read::IntoReadSource; -use crate::file::read::ReadSource; -use crate::file::read::ReadSourceRef; -use crate::runtime::Handle; - -impl IntoReadSource for ByteBuffer { - fn into_read_source(self, _handle: Handle) -> VortexResult { - Ok(Arc::new(self)) - } -} - -impl ReadSource for ByteBuffer { - fn uri(&self) -> &Arc { - static URI: LazyLock> = LazyLock::new(|| Arc::from(":in-memory:")); - &URI - } - - fn coalesce_window(&self) -> Option { - None - } - - fn size(&self) -> BoxFuture<'static, VortexResult> { - let len = self.len() as u64; - async move { Ok(len) }.boxed() - } - - fn drive_send( - self: Arc, - mut requests: BoxStream<'static, IoRequest>, - ) -> BoxFuture<'static, ()> { - let buffer = self; - async move { - while let Some(req) = requests.next().await { - let offset = usize::try_from(req.offset()) - .vortex_expect("In-memory buffer offset exceeds usize"); - let len = req.len(); - - let result = if offset + len > buffer.len() { - Err(vortex_err!("Read out of bounds")) - } else { - let mut slice = ByteBufferMut::with_capacity_aligned(len, req.alignment()); - unsafe { slice.set_len(len) }; - slice - .as_mut_slice() - .copy_from_slice(&buffer.as_slice()[offset..offset + len]); - Ok(slice.freeze()) - }; - req.resolve(result); - } - } - .boxed() - } -} diff --git a/vortex-io/src/file/mod.rs b/vortex-io/src/file/mod.rs index 645152b53c1..da2edbae604 100644 --- a/vortex-io/src/file/mod.rs +++ b/vortex-io/src/file/mod.rs @@ -1,13 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -mod buffer; -mod driver; #[cfg(feature = "object_store")] pub mod object_store; -mod read; #[cfg(not(target_arch = "wasm32"))] -pub(crate) mod std_file; - -pub(crate) use driver::*; -pub use read::*; +pub mod std_file; diff --git a/vortex-io/src/file/object_store.rs b/vortex-io/src/file/object_store.rs index bfeb47567d0..0d09cbdcd2b 100644 --- a/vortex-io/src/file/object_store.rs +++ b/vortex-io/src/file/object_store.rs @@ -8,87 +8,91 @@ use async_compat::Compat; use futures::FutureExt; use futures::StreamExt; use futures::future::BoxFuture; -use futures::stream::BoxStream; -use tracing::Instrument; +use object_store::GetOptions; +use object_store::GetRange; +use object_store::GetResultPayload; +use object_store::ObjectStore; +use object_store::path::Path as ObjectPath; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexError; use vortex_error::VortexResult; use vortex_error::vortex_ensure; -use crate::file::IoRequest; -use crate::file::read::CoalesceWindow; -use crate::file::read::IntoReadSource; -use crate::file::read::ReadSource; -use crate::file::read::ReadSourceRef; +use crate::CoalesceConfig; +use crate::VortexReadAt; #[cfg(not(target_arch = "wasm32"))] use crate::file::std_file::read_exact_at; use crate::runtime::Handle; -const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow { +const DEFAULT_COALESCING_CONFIG: CoalesceConfig = CoalesceConfig { distance: 1024 * 1024, // 1 MB max_size: 16 * 1024 * 1024, // 16 MB }; -const CONCURRENCY: usize = 192; // Number of concurrent requests to allow. -pub struct ObjectStoreReadSource { - store: Arc, - path: object_store::path::Path, +/// Default number of concurrent requests to allow. +const DEFAULT_CONCURRENCY: usize = 192; + +/// An object store backed I/O source. +pub struct ObjectStoreSource { + store: Arc, + path: ObjectPath, uri: Arc, + handle: Handle, concurrency: usize, - coalesce_window: Option, + coalesce_config: Option, } -impl ObjectStoreReadSource { - pub fn new(store: Arc, path: object_store::path::Path) -> Self { +impl ObjectStoreSource { + /// Create a new object store source. + pub fn new(store: Arc, path: ObjectPath, handle: Handle) -> Self { let uri = Arc::from(path.to_string()); Self { store, path, uri, - concurrency: CONCURRENCY, - coalesce_window: Some(COALESCING_WINDOW), + handle, + concurrency: DEFAULT_CONCURRENCY, + coalesce_config: Some(DEFAULT_COALESCING_CONFIG), } } + /// Set the concurrency for this source. pub fn with_concurrency(mut self, concurrency: usize) -> Self { self.concurrency = concurrency; self } - pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self { - self.coalesce_window = Some(window); + /// Set the coalesce config for this source. + pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self { + self.coalesce_config = Some(config); self } - pub fn with_some_coalesce_window(mut self, window: Option) -> Self { - self.coalesce_window = window; + /// Set an optional coalesce config for this source. + pub fn with_some_coalesce_config(mut self, config: Option) -> Self { + self.coalesce_config = config; self } } -impl IntoReadSource for ObjectStoreReadSource { - fn into_read_source(self, handle: Handle) -> VortexResult { - Ok(Arc::new(ObjectStoreIoSource { io: self, handle })) +impl VortexReadAt for ObjectStoreSource { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) } -} -struct ObjectStoreIoSource { - io: ObjectStoreReadSource, - handle: Handle, -} - -impl ReadSource for ObjectStoreIoSource { - fn uri(&self) -> &Arc { - &self.io.uri + fn coalesce_config(&self) -> Option { + self.coalesce_config } - fn coalesce_window(&self) -> Option { - self.io.coalesce_window + fn concurrency(&self) -> usize { + self.concurrency } fn size(&self) -> BoxFuture<'static, VortexResult> { - let store = self.io.store.clone(); - let path = self.io.path.clone(); + let store = self.store.clone(); + let path = self.path.clone(); Compat::new(async move { store .head(&path) @@ -99,79 +103,66 @@ impl ReadSource for ObjectStoreIoSource { .boxed() } - fn drive_send( - self: Arc, - requests: BoxStream<'static, IoRequest>, - ) -> BoxFuture<'static, ()> { - let self2 = self.clone(); - let concurrency = self.io.concurrency; - requests - .map(move |req| { - let handle = self.handle.clone(); - let store = self.io.store.clone(); - let path = self.io.path.clone(); - - let len = req.len(); - let range = req.range(); - let alignment = req.alignment(); - - let read = async move { - // Instead of calling `ObjectStore::get_range`, we expand the implementation and run it - // ourselves to avoid a second copy to align the buffer. Instead, we can write directly - // into the aligned buffer. - let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment); - - let response = store - .get_opts( - &path, - object_store::GetOptions { - range: Some(object_store::GetRange::Bounded(range.clone())), - ..Default::default() - }, - ) - .await?; - - let buffer = match response.payload { - object_store::GetResultPayload::File(file, _) => { - // SAFETY: We're setting the length to the exact size we're about to read. - // The read_exact_at call will either fill the entire buffer or return an error, - // ensuring no uninitialized memory is exposed. - unsafe { buffer.set_len(len) }; - - handle - .spawn_blocking(move || { - read_exact_at(&file, &mut buffer, range.start)?; - Ok::<_, io::Error>(buffer) - }) - .await - .map_err(io::Error::other)? - } - object_store::GetResultPayload::Stream(mut byte_stream) => { - while let Some(bytes) = byte_stream.next().await { - buffer.extend_from_slice(&bytes?); - } - - vortex_ensure!( - buffer.len() == len, - "Object store stream returned {} bytes but expected {} bytes (range: {:?})", - buffer.len(), - len, - range - ); - - buffer - } - }; - - Ok(buffer.freeze()) + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let store = self.store.clone(); + let path = self.path.clone(); + let handle = self.handle.clone(); + let range = offset..(offset + length as u64); + + Compat::new(async move { + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + + let response = store + .get_opts( + &path, + GetOptions { + range: Some(GetRange::Bounded(range.clone())), + ..Default::default() + }, + ) + .await?; + + let buffer = match response.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, _) => { + unsafe { buffer.set_len(length) }; + + handle + .spawn_blocking(move || { + read_exact_at(&file, &mut buffer, range.start)?; + Ok::<_, io::Error>(buffer) + }) + .await + .map_err(io::Error::other)? + } + #[cfg(target_arch = "wasm32")] + GetResultPayload::File(..) => { + unreachable!("File payload not supported on wasm32") + } + GetResultPayload::Stream(mut byte_stream) => { + while let Some(bytes) = byte_stream.next().await { + buffer.extend_from_slice(&bytes?); + } + + vortex_ensure!( + buffer.len() == length, + "Object store stream returned {} bytes but expected {} bytes (range: {:?})", + buffer.len(), + length, + range + ); + + buffer } - .in_current_span(); - - async move { req.resolve(Compat::new(read).await) } - }) - .map(move |f| self2.handle.spawn(f)) - .buffer_unordered(concurrency) - .collect::<()>() - .boxed() + }; + + Ok(buffer.freeze()) + }) + .boxed() } } diff --git a/vortex-io/src/file/read/mod.rs b/vortex-io/src/file/read/mod.rs deleted file mode 100644 index c82a29b4e74..00000000000 --- a/vortex-io/src/file/read/mod.rs +++ /dev/null @@ -1,183 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -mod request; -mod source; - -use std::fmt; -use std::fmt::Debug; -use std::fmt::Display; -use std::pin::Pin; -use std::sync::Arc; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; -use std::task::ready; - -use async_trait::async_trait; -use futures::FutureExt; -use futures::TryFutureExt; -use futures::channel::mpsc; -use futures::future::BoxFuture; -use futures::future::Shared; -pub use request::*; -pub use source::*; -use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; -use vortex_error::SharedVortexResult; -use vortex_error::VortexError; -use vortex_error::VortexResult; -use vortex_error::vortex_err; - -use crate::VortexReadAt; - -/// A handle to an open file that can be read using a Vortex runtime. -/// -/// ## Coalescing and Pre-fetching -/// -/// It is important to understand the semantics of the read futures returned by a [`FileRead`]. -/// Under the hood, each [`FileRead`] is backed by a stream that services read requests by -/// applying coalescing and concurrency constraints. -/// -/// Each read future has four states: -/// * `registered` - the read future has been created, but not yet polled. -/// * `requested` - the read future has been polled. -/// * `in-flight` - the read request has been sent to the underlying storage system. -/// * `resolved` - the read future has completed and resolved a result. -/// -/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to -/// be coalesced with other requests. -/// -/// If a read future is dropped, it will be canceled if possible. This depends on the current -/// state of the request, as well as whether the underlying storage system supports cancellation. -/// -/// I/O requests will be processed in the order they are `registered`, however coalescing may mean -/// other registered requests are lumped together into a single I/O operation. -#[derive(Clone)] -pub struct FileRead { - /// Human-readable descriptor for the file, typically its URI. - uri: Arc, - /// A shared future that resolves to the size of the file. - size: Shared>>, - /// A queue for sending read request events to the I/O stream. - events: mpsc::UnboundedSender, - /// The next read request ID. - next_id: Arc, -} - -impl Debug for FileRead { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FileHandle") - .field("uri", &self.uri) - .finish() - } -} - -impl Display for FileRead { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.uri) - } -} - -impl FileRead { - pub(crate) fn new( - uri: Arc, - size: BoxFuture<'static, VortexResult>, - send: mpsc::UnboundedSender, - ) -> Self { - Self { - uri, - size: size.map_err(Arc::new).boxed().shared(), - events: send, - next_id: Arc::new(AtomicUsize::new(0)), - } - } - - /// The URI of the file. - pub fn uri(&self) -> &Arc { - &self.uri - } -} - -/// A future that resolves a read request from a [`FileRead`]. -/// -/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching. -/// If dropped, the read request will be canceled where possible. -struct ReadFuture { - id: usize, - recv: oneshot::Receiver>, - polled: bool, - events: mpsc::UnboundedSender, -} - -impl Future for ReadFuture { - type Output = VortexResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if !self.polled { - self.polled = true; - // Notify the I/O stream that this request has been polled. - if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) { - return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))); - } - } - - match ready!(self.recv.poll_unpin(cx)) { - Ok(result) => Poll::Ready(result), - Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))), - } - } -} - -impl Drop for ReadFuture { - fn drop(&mut self) { - // When the FileHandle is dropped, we can send a shutdown event to the I/O stream. - // If the I/O stream has already been dropped, this will fail silently. - drop(self.events.unbounded_send(ReadEvent::Dropped(self.id))); - } -} - -#[derive(Debug)] -pub(crate) enum ReadEvent { - Request(ReadRequest), - Polled(RequestId), - Dropped(RequestId), -} - -#[async_trait] -impl VortexReadAt for FileRead { - fn read_at( - &self, - offset: u64, - length: usize, - alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { - let (send, recv) = oneshot::channel(); - let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let event = ReadEvent::Request(ReadRequest { - id, - offset, - length, - alignment, - callback: send, - }); - - // If we fail to submit the event, we create a future that has failed. - if let Err(e) = self.events.unbounded_send(event) { - return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed(); - } - - ReadFuture { - id, - recv, - polled: false, - events: self.events.clone(), - } - .boxed() - } - - fn size(&self) -> BoxFuture<'static, VortexResult> { - self.size.clone().map_err(VortexError::from).boxed() - } -} diff --git a/vortex-io/src/file/read/source.rs b/vortex-io/src/file/read/source.rs deleted file mode 100644 index e1becd48849..00000000000 --- a/vortex-io/src/file/read/source.rs +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; - -use futures::FutureExt; -use futures::future::BoxFuture; -use futures::future::LocalBoxFuture; -use futures::stream::BoxStream; -use vortex_error::VortexResult; - -use crate::file::IoRequest; -use crate::runtime::Handle; - -/// A trait for types that can be opened as an `IoSource`. -pub trait IntoReadSource { - fn into_read_source(self, handle: Handle) -> VortexResult; -} - -pub type ReadSourceRef = Arc; - -/// An object-safe trait representing an open file-like I/O object for reading. -pub trait ReadSource: Send + Sync { - /// The URI of this source, for logging and debugging purposes. - fn uri(&self) -> &Arc; - - /// The coalescing window to use for this source, if any. - fn coalesce_window(&self) -> Option; - - /// Returns a shared future that resolves to the byte size of the underlying data source. - fn size(&self) -> BoxFuture<'static, VortexResult>; - - /// Drive a stream of I/O requests to completion. - fn drive_send( - self: Arc, - requests: BoxStream<'static, IoRequest>, - ) -> BoxFuture<'static, ()>; - - /// Drive a stream of I/O requests to completion on the local thread. - fn drive_local( - self: Arc, - requests: BoxStream<'static, IoRequest>, - ) -> LocalBoxFuture<'static, ()> { - self.drive_send(requests).boxed_local() - } -} - -#[derive(Clone, Copy, Debug)] -pub struct CoalesceWindow { - /// The maximum "empty" distance between two requests to consider them for coalescing. - pub distance: u64, - /// The maximum total size spanned by a coalesced request. - pub max_size: u64, -} diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index f1527f0ea71..56abd56eb60 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -11,22 +11,17 @@ use std::os::unix::fs::FileExt; #[cfg(windows)] use std::os::windows::fs::FileExt; use std::path::Path; -use std::path::PathBuf; use std::sync::Arc; use futures::FutureExt; -use futures::StreamExt; use futures::future::BoxFuture; -use futures::stream::BoxStream; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; -use vortex_error::VortexError; use vortex_error::VortexResult; -use crate::file::CoalesceWindow; -use crate::file::IntoReadSource; -use crate::file::IoRequest; -use crate::file::ReadSource; -use crate::file::ReadSourceRef; +use crate::CoalesceConfig; +use crate::VortexReadAt; use crate::runtime::Handle; /// Read exactly `buffer.len()` bytes from `file` starting at `offset`. @@ -61,86 +56,70 @@ pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std: } } -const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow { +const COALESCING_CONFIG: CoalesceConfig = CoalesceConfig { // TODO(ngates): these numbers don't make sense if we're using spawn_blocking.. distance: 8 * 1024, // KB max_size: 8 * 1024, // KB }; const CONCURRENCY: usize = 32; -impl IntoReadSource for PathBuf { - fn into_read_source(self, handle: Handle) -> VortexResult { - self.as_path().into_read_source(handle) - } +/// An adapter type wrapping a [`File`] to implement [`VortexReadAt`]. +pub struct FileReadAdapter { + uri: Arc, + file: Arc, + handle: Handle, } -impl IntoReadSource for &Path { - fn into_read_source(self, handle: Handle) -> VortexResult { - let uri = self.to_string_lossy().to_string().into(); - let file = Arc::new(File::open(self)?); - Ok(Arc::new(FileIoSource { uri, file, handle })) +impl FileReadAdapter { + /// Open a file for reading. + pub fn open(path: impl AsRef, handle: Handle) -> VortexResult { + let path = path.as_ref(); + let uri = path.to_string_lossy().to_string().into(); + let file = Arc::new(File::open(path)?); + Ok(Self { uri, file, handle }) } } -impl IntoReadSource for &str { - fn into_read_source(self, handle: Handle) -> VortexResult { - Path::new(self).into_read_source(handle) +impl VortexReadAt for FileReadAdapter { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) } -} - -pub(crate) struct FileIoSource { - uri: Arc, - file: Arc, - handle: Handle, -} -impl ReadSource for FileIoSource { - fn uri(&self) -> &Arc { - &self.uri + fn coalesce_config(&self) -> Option { + Some(COALESCING_CONFIG) } - fn coalesce_window(&self) -> Option { - Some(COALESCING_WINDOW) + fn concurrency(&self) -> usize { + CONCURRENCY } fn size(&self) -> BoxFuture<'static, VortexResult> { let file = self.file.clone(); async move { - let metadata = file.metadata().map_err(VortexError::from)?; + let metadata = file.metadata()?; Ok(metadata.len()) } .boxed() } - fn drive_send( - self: Arc, - requests: BoxStream<'static, IoRequest>, - ) -> BoxFuture<'static, ()> { - requests - // Amortize the cost of spawn_blocking by batching available requests. - // Too much batching, and we reduce concurrency. - .ready_chunks(1) - .map(move |reqs| { - let file = self.file.clone(); - self.handle.spawn_blocking(move || { - for req in reqs { - let len = req.len(); - let offset = req.offset(); - let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment()); - unsafe { buffer.set_len(len) }; - - let buffer_res = read_exact_at(&file, &mut buffer, offset); - - req.resolve( - buffer_res - .map(|_| buffer.freeze()) - .map_err(VortexError::from), - ) - } + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let file = self.file.clone(); + let handle = self.handle.clone(); + async move { + handle + .spawn_blocking(move || { + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + unsafe { buffer.set_len(length) }; + read_exact_at(&file, &mut buffer, offset)?; + Ok(buffer.freeze()) }) - }) - .buffer_unordered(CONCURRENCY) - .collect::<()>() - .boxed() + .await + } + .boxed() } } diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index de8eccc1a4e..ed8ddecad39 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -3,7 +3,6 @@ use std::sync::Arc; -use async_trait::async_trait; use futures::FutureExt; use futures::future::BoxFuture; use vortex_buffer::Alignment; @@ -16,95 +15,118 @@ use vortex_metrics::Histogram; use vortex_metrics::Timer; use vortex_metrics::VortexMetrics; -/// The read trait used within Vortex. +/// Configuration for coalescing nearby I/O requests into single operations. +#[derive(Clone, Copy, Debug)] +pub struct CoalesceConfig { + /// The maximum "empty" distance between two requests to consider them for coalescing. + pub distance: u64, + /// The maximum total size spanned by a coalesced request. + pub max_size: u64, +} + +impl CoalesceConfig { + /// Creates a new coalesce configuration. + pub fn new(distance: u64, max_size: u64) -> Self { + Self { distance, max_size } + } + + /// Configuration appropriate for fast local storage (memory, NVMe). + pub fn local() -> Self { + Self::new(8 * 1024, 8 * 1024) // 8KB + } + + /// Configuration appropriate for object storage (S3, GCS, etc.). + pub fn object_storage() -> Self { + Self::new(1 << 20, 16 << 20) // 1MB distance, 16MB max + } +} + +/// The unified read trait for Vortex I/O sources. /// /// This trait provides async positional reads to underlying storage and is used by the vortex-file /// crate to read data from files or object stores. -/// -/// It behaves a little differently from a typical async read trait in order to provide us with -/// some nice additional semantics for use within Vortex. See the [`VortexReadAt::read_at`] method -/// for details. pub trait VortexReadAt: Send + Sync + 'static { - /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`]. - /// - /// If the reader does not have the requested number of bytes, the returned Future will complete - /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof]. - /// - /// This function returns a future with a `'static` lifetime. This allows us to define the - /// following semantics: - /// - /// This function returns a future with a `'static` lifetime, allowing us to define the - /// following semantics: - /// - /// * Creation of the future hints to the implementation that a read _may_ be required. - /// * Polling of the future indicates that the read _is now_ required. - /// * Dropping of the future indicates that the read is not required, and may be cancelled. + /// URI for debugging/logging. Returns `None` for anonymous sources. + fn uri(&self) -> Option<&Arc> { + None + } + + /// Configuration for merging nearby I/O requests into fewer, larger reads. + fn coalesce_config(&self) -> Option { + None + } + + /// Maximum number of concurrent I/O requests for that should be pulled from this source. /// - /// Implementations may choose to ignore these semantics, but they allow optimizations such as - /// coalescing and cancellation. See [`crate::file::FileRead`] for an example of such an - /// implementation. + /// This value is used to control how many [`VortexReadAt::read_at`] calls can + /// be in-flight simultaneously. Higher values allow more parallelism but consume + /// more resources (memory, file descriptors, network connections). /// - /// ## For Developers + /// Implementations should choose a value appropriate for their underlying storage + /// characteristics. Low-latency sources benefit less from high concurrency, while + /// high-latency sources (like remote storage) benefit significantly from issuing + /// many requests in parallel. + fn concurrency(&self) -> usize; + + /// Asynchronously get the number of bytes of the underlying source. + fn size(&self) -> BoxFuture<'static, VortexResult>; + + /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`]. /// - /// This trait is left unsealed to provide maximum flexibility for users of the Vortex, however - /// we strongly recommend using the [`crate::file::FileRead`] abstraction where possible as we - /// will continue to evolve and optimize its implementation for the best performance across - /// as many filesystems and platforms as possible. + /// If the reader does not have the requested number of bytes, the returned Future will complete + /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error. fn read_at( &self, offset: u64, length: usize, alignment: Alignment, ) -> BoxFuture<'static, VortexResult>; +} - /// Asynchronously get the number of bytes of the underlying file. - fn size(&self) -> BoxFuture<'static, VortexResult>; +impl VortexReadAt for Arc { + fn uri(&self) -> Option<&Arc> { + self.as_ref().uri() + } - // TODO(ngates): this is deprecated, but cannot yet be removed. - fn performance_hint(&self) -> PerformanceHint { - PerformanceHint::local() + fn coalesce_config(&self) -> Option { + self.as_ref().coalesce_config() } -} -#[derive(Debug, Clone)] -pub struct PerformanceHint { - coalescing_window: u64, - max_read: Option, -} + fn concurrency(&self) -> usize { + self.as_ref().concurrency() + } -impl PerformanceHint { - pub fn new(coalescing_window: u64, max_read: Option) -> Self { - Self { - coalescing_window, - max_read, - } + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.as_ref().size() } - /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices. - pub fn local() -> Self { - // Coalesce ~8K page size, also ensures we span padding for adjacent segments. - Self::new(8192, Some(8192)) + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.as_ref().read_at(offset, length, alignment) } +} - pub fn object_storage() -> Self { - Self::new( - 1 << 20, // 1MB, - Some(8 << 20), // 8MB, - ) +impl VortexReadAt for Arc { + fn uri(&self) -> Option<&Arc> { + self.as_ref().uri() } - /// The maximum distance between two reads that should coalesced into a single operation. - pub fn coalescing_window(&self) -> u64 { - self.coalescing_window + fn coalesce_config(&self) -> Option { + self.as_ref().coalesce_config() } - /// Maximum number of bytes in a coalesced read. - pub fn max_read(&self) -> Option { - self.max_read + fn concurrency(&self) -> usize { + self.as_ref().concurrency() + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.as_ref().size() } -} -impl VortexReadAt for Arc { fn read_at( &self, offset: u64, @@ -114,16 +136,23 @@ impl VortexReadAt for Arc { self.as_ref().read_at(offset, length, alignment) } + // fn drive(self: Arc, requests: BoxStream<'static, IoRequest>) -> BoxFuture<'static, ()> { + // // Delegate to the inner implementation's drive + // let inner: Arc = Arc::clone(&self); + // inner.drive(requests) + // } +} + +impl VortexReadAt for ByteBuffer { fn size(&self) -> BoxFuture<'static, VortexResult> { - self.as_ref().size() + let length = self.len() as u64; + async move { Ok(length) }.boxed() } - fn performance_hint(&self) -> PerformanceHint { - self.as_ref().performance_hint() + fn concurrency(&self) -> usize { + 16 } -} -impl VortexReadAt for ByteBuffer { fn read_at( &self, offset: u64, @@ -147,17 +176,9 @@ impl VortexReadAt for ByteBuffer { } .boxed() } - - fn size(&self) -> BoxFuture<'static, VortexResult> { - let length = self.len() as u64; - async move { Ok(length) }.boxed() - } - - fn performance_hint(&self) -> PerformanceHint { - PerformanceHint::local() - } } +/// A wrapper that instruments a [`VortexReadAt`] with metrics. #[derive(Clone)] pub struct InstrumentedReadAt { read: Arc, @@ -177,10 +198,7 @@ impl InstrumentedReadAt { } } -impl Drop for InstrumentedReadAt -where - T: VortexReadAt, -{ +impl Drop for InstrumentedReadAt { #[allow(clippy::cognitive_complexity)] fn drop(&mut self) { let sizes = self.sizes.snapshot(); @@ -207,8 +225,23 @@ where } } -#[async_trait] impl VortexReadAt for InstrumentedReadAt { + fn uri(&self) -> Option<&Arc> { + self.read.uri() + } + + fn coalesce_config(&self) -> Option { + self.read.coalesce_config() + } + + fn concurrency(&self) -> usize { + self.read.concurrency() + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.read.size() + } + fn read_at( &self, offset: u64, @@ -228,15 +261,6 @@ impl VortexReadAt for InstrumentedReadAt { } .boxed() } - - #[inline] - fn size(&self) -> BoxFuture<'static, VortexResult> { - self.read.size() - } - - fn performance_hint(&self) -> PerformanceHint { - self.read.performance_hint() - } } #[cfg(test)] @@ -249,31 +273,17 @@ mod tests { use super::*; #[test] - fn test_performance_hint_local() { - let hint = PerformanceHint::local(); - assert_eq!(hint.coalescing_window(), 8192); - assert_eq!(hint.max_read(), Some(8192)); - } - - #[test] - fn test_performance_hint_object_storage() { - let hint = PerformanceHint::object_storage(); - assert_eq!(hint.coalescing_window(), 1 << 20); // 1MB - assert_eq!(hint.max_read(), Some(8 << 20)); // 8MB - } - - #[test] - fn test_performance_hint_custom() { - let hint = PerformanceHint::new(4096, Some(16384)); - assert_eq!(hint.coalescing_window(), 4096); - assert_eq!(hint.max_read(), Some(16384)); + fn test_coalesce_config_local() { + let config = CoalesceConfig::local(); + assert_eq!(config.distance, 8 * 1024); + assert_eq!(config.max_size, 8 * 1024); } #[test] - fn test_performance_hint_no_max() { - let hint = PerformanceHint::new(2048, None); - assert_eq!(hint.coalescing_window(), 2048); - assert_eq!(hint.max_read(), None); + fn test_coalesce_config_object_storage() { + let config = CoalesceConfig::object_storage(); + assert_eq!(config.distance, 1 << 20); // 1MB + assert_eq!(config.max_size, 16 << 20); // 16MB } #[tokio::test] diff --git a/vortex-io/src/runtime/handle.rs b/vortex-io/src/runtime/handle.rs index 530eca0cf96..08bf9862f4f 100644 --- a/vortex-io/src/runtime/handle.rs +++ b/vortex-io/src/runtime/handle.rs @@ -9,18 +9,10 @@ use std::task::Poll; use std::task::ready; use futures::FutureExt; -use futures::StreamExt; -use futures::channel::mpsc; -use vortex_error::VortexResult; use vortex_error::vortex_panic; -use vortex_metrics::VortexMetrics; -use crate::file::FileRead; -use crate::file::IntoReadSource; -use crate::file::IoRequestStream; use crate::runtime::AbortHandleRef; use crate::runtime::Executor; -use crate::runtime::IoTask; /// A handle to an active Vortex runtime. /// @@ -143,26 +135,6 @@ impl Handle { abort_handle: Some(abort_handle), } } - - /// Open a file for I/O on this runtime. - pub fn open_read( - &self, - source: S, - metrics: VortexMetrics, - ) -> VortexResult { - let source = source.into_read_source(self.clone())?; - - let (send, recv) = mpsc::unbounded(); - - let read = FileRead::new(source.uri().clone(), source.size(), send); - - let stream = - IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed(); - - self.runtime().spawn_io(IoTask::new(source, stream)); - - Ok(read) - } } /// A handle to a spawned Task. diff --git a/vortex-io/src/runtime/mod.rs b/vortex-io/src/runtime/mod.rs index 939e9e6100f..8892a3dfe66 100644 --- a/vortex-io/src/runtime/mod.rs +++ b/vortex-io/src/runtime/mod.rs @@ -11,13 +11,8 @@ //! * Multi-threaded: work is driven on a pool of threads managed by Vortex. //! * Worker Pool: work is driven on a pool of threads provided by the caller. //! * Tokio: work is driven on a Tokio runtime provided by the caller. -//! use futures::future::BoxFuture; -use futures::stream::BoxStream; - -use crate::file::IoRequest; -use crate::file::ReadSourceRef; mod blocking; pub use blocking::*; @@ -59,12 +54,6 @@ pub(crate) trait Executor: Send + Sync { /// The returned `AbortHandle` may be used to optimistically cancel the task if it has not /// yet started executing. fn spawn_blocking(&self, task: Box) -> AbortHandleRef; - - /// Spawns an I/O task for execution on the runtime. - /// The runtime can choose to invoke the task's `Send` or `!Send` versions. - /// - /// Cancellation is implied by termination of the request stream. - fn spawn_io(&self, task: IoTask); } /// A handle that may be used to optimistically abort a spawned task. @@ -76,22 +65,3 @@ pub(crate) trait AbortHandle: Send + Sync { } pub(crate) type AbortHandleRef = Box; - -/// A task for driving I/O requests against a source. -/// -/// Instead of just spawning a future to process requests, we allow each runtime to decide how -/// spawn the driver for the request stream. This allows runtimes to shared, parallelize, further -/// spawn, or otherwise manage the I/O task as they see fit. -/// -// NOTE(ngates): We could in theory make IoSource support as_any if we wanted each runtime to implement the -// actual read logic themselves? Not sure yet... -pub(crate) struct IoTask { - pub(crate) source: ReadSourceRef, - pub(crate) stream: BoxStream<'static, IoRequest>, -} - -impl IoTask { - pub(crate) fn new(source: ReadSourceRef, stream: BoxStream<'static, IoRequest>) -> Self { - IoTask { source, stream } - } -} diff --git a/vortex-io/src/runtime/single.rs b/vortex-io/src/runtime/single.rs index 982b93c6013..ca27988a552 100644 --- a/vortex-io/src/runtime/single.rs +++ b/vortex-io/src/runtime/single.rs @@ -17,7 +17,6 @@ use crate::runtime::AbortHandleRef; use crate::runtime::BlockingRuntime; use crate::runtime::Executor; use crate::runtime::Handle; -use crate::runtime::IoTask; use crate::runtime::smol::SmolAbortHandle; /// A runtime that drives all work on the current thread. @@ -41,7 +40,6 @@ struct Sender { scheduling: kanal::Sender>, cpu: kanal::Sender>, blocking: kanal::Sender>, - io: kanal::Sender, } impl Sender { @@ -49,7 +47,6 @@ impl Sender { let (scheduling_send, scheduling_recv) = kanal::unbounded::(); let (cpu_send, cpu_recv) = kanal::unbounded::(); let (blocking_send, blocking_recv) = kanal::unbounded::(); - let (io_send, io_recv) = kanal::unbounded::(); // We pass weak references to the local execution into the async tasks such that the task's // reference doesn't keep the execution alive after the runtime is dropped. @@ -104,23 +101,10 @@ impl Sender { }) .detach(); - // Drive I/O tasks. - let weak_local2 = weak_local; - local - .spawn(async move { - while let Ok(task) = io_recv.as_async().recv().await { - if let Some(local) = weak_local2.upgrade() { - local.spawn(task.source.drive_local(task.stream)).detach(); - } - } - }) - .detach(); - Self { scheduling: scheduling_send, cpu: cpu_send, blocking: blocking_send, - io: io_send, } } } @@ -168,12 +152,6 @@ impl Executor for Sender { task: Mutex::new(recv), }) } - - fn spawn_io(&self, task: IoTask) { - if let Err(e) = self.io.send(task) { - vortex_panic!("Executor missing: {}", e); - } - } } impl BlockingRuntime for SingleThreadRuntime { diff --git a/vortex-io/src/runtime/smol.rs b/vortex-io/src/runtime/smol.rs index b55e86dba5e..ad789e3bc07 100644 --- a/vortex-io/src/runtime/smol.rs +++ b/vortex-io/src/runtime/smol.rs @@ -2,12 +2,10 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use futures::future::BoxFuture; -use tracing::Instrument; use crate::runtime::AbortHandle; use crate::runtime::AbortHandleRef; use crate::runtime::Executor; -use crate::runtime::IoTask; // NOTE(ngates): we implement this for a Weak reference to adhere to the constraint that this // trait should not hold strong references to the underlying runtime. @@ -24,10 +22,6 @@ impl Executor for smol::Executor<'static> { fn spawn_blocking(&self, task: Box) -> AbortHandleRef { SmolAbortHandle::new_handle(smol::unblock(task)) } - - fn spawn_io(&self, task: IoTask) { - smol::Executor::spawn(self, task.source.drive_send(task.stream).in_current_span()).detach() - } } /// An abort handle for a `smol::Task`. diff --git a/vortex-io/src/runtime/tests.rs b/vortex-io/src/runtime/tests.rs index 621f0adf481..10832633983 100644 --- a/vortex-io/src/runtime/tests.rs +++ b/vortex-io/src/runtime/tests.rs @@ -9,9 +9,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use futures::FutureExt; -use futures::StreamExt; use futures::future::BoxFuture; -use futures::stream::BoxStream; use tempfile::NamedTempFile; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; @@ -19,11 +17,7 @@ use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; use crate::VortexReadAt; -use crate::file::IntoReadSource; -use crate::file::IoRequest; -use crate::file::ReadSource; -use crate::file::ReadSourceRef; -use crate::runtime::Handle; +use crate::file::std_file::FileReadAdapter; use crate::runtime::single::block_on; use crate::runtime::tokio::TokioRuntime; @@ -38,10 +32,9 @@ const TEST_LEN: usize = 5; #[test] fn test_file_read_with_single_thread_runtime() { - let result = block_on(|handle| { + let result = block_on(|_handle| { async move { - let buffer = ByteBuffer::from(TEST_DATA.to_vec()); - let file_read = handle.open_read(buffer, Default::default()).unwrap(); + let file_read: Arc = Arc::new(ByteBuffer::from(TEST_DATA.to_vec())); // Read a slice let result = file_read @@ -69,9 +62,7 @@ fn test_file_read_with_single_thread_runtime() { #[tokio::test] async fn test_file_read_with_tokio_runtime() { - let handle = TokioRuntime::current(); - let buffer = ByteBuffer::from(TEST_DATA.to_vec()); - let file_read = handle.open_read(buffer, Default::default()).unwrap(); + let file_read: Arc = Arc::new(ByteBuffer::from(TEST_DATA.to_vec())); // Read a slice let result = file_read @@ -107,9 +98,8 @@ fn test_file_read_with_real_file_single_thread() { temp_file.flush().unwrap(); // Open and read the file - let file_read = handle - .open_read(temp_file.path(), Default::default()) - .unwrap(); + let file_read: Arc = + Arc::new(FileReadAdapter::open(temp_file.path(), handle.clone()).unwrap()); // Read a slice let result = file_read @@ -145,9 +135,8 @@ async fn test_file_read_with_real_file_tokio() { temp_file.flush().unwrap(); let handle = TokioRuntime::current(); - let file_read = handle - .open_read(temp_file.path(), Default::default()) - .unwrap(); + let file_read: Arc = + Arc::new(FileReadAdapter::open(temp_file.path(), handle.clone()).unwrap()); // Read a slice let result = file_read @@ -173,16 +162,14 @@ async fn test_file_read_with_real_file_tokio() { #[tokio::test] async fn test_concurrent_reads() { - let handle = TokioRuntime::current(); - let buffer = ByteBuffer::from(TEST_DATA.to_vec()); - let file_read = handle.open_read(buffer, Default::default()).unwrap(); + let read_at: Arc = Arc::new(ByteBuffer::from(TEST_DATA.to_vec())); // Issue multiple concurrent reads let futures = vec![ - file_read.read_at(0, 5, Alignment::new(1)), - file_read.read_at(5, 5, Alignment::new(1)), - file_read.read_at(10, 5, Alignment::new(1)), - file_read.read_at(15, 5, Alignment::new(1)), + read_at.read_at(0, 5, Alignment::new(1)), + read_at.read_at(5, 5, Alignment::new(1)), + read_at.read_at(10, 5, Alignment::new(1)), + read_at.read_at(15, 5, Alignment::new(1)), ]; let results = futures::future::join_all(futures).await; @@ -226,22 +213,16 @@ async fn test_handle_spawn_cpu() { } // ============================================================================ -// Test custom IoSource implementation +// Test custom VortexRead implementation // ============================================================================ -struct CountingIoSource { +struct CountingReadAt { data: ByteBuffer, read_count: Arc, } -impl ReadSource for CountingIoSource { - fn uri(&self) -> &Arc { - static URI: std::sync::LazyLock> = - std::sync::LazyLock::new(|| Arc::from("counting://test")); - &URI - } - - fn coalesce_window(&self) -> Option { +impl VortexReadAt for CountingReadAt { + fn uri(&self) -> Option<&Arc> { None } @@ -250,58 +231,49 @@ impl ReadSource for CountingIoSource { async move { Ok(len) }.boxed() } - fn drive_send( - self: Arc, - mut requests: BoxStream<'static, IoRequest>, - ) -> BoxFuture<'static, ()> { + fn concurrency(&self) -> usize { + 16 + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.read_count.fetch_add(1, Ordering::SeqCst); + let data = self.data.clone(); async move { - while let Some(req) = requests.next().await { - self.read_count.fetch_add(1, Ordering::SeqCst); - - let offset = req.offset() as usize; - let len = req.len(); - - let result = if offset + len > self.data.len() { - Err(vortex_error::vortex_err!("Read out of bounds")) - } else { - let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment()); - unsafe { buffer.set_len(len) }; - buffer - .as_mut_slice() - .copy_from_slice(&self.data.as_slice()[offset..offset + len]); - Ok(buffer.freeze()) - }; - req.resolve(result); + let start = offset as usize; + if start + length > data.len() { + return Err(vortex_error::vortex_err!("Read out of bounds")); } + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + unsafe { buffer.set_len(length) }; + buffer + .as_mut_slice() + .copy_from_slice(&data.as_slice()[start..start + length]); + Ok(buffer.freeze()) } .boxed() } } -impl IntoReadSource for CountingIoSource { - fn into_read_source(self, _handle: Handle) -> VortexResult { - Ok(Arc::new(self)) - } -} - #[tokio::test] -async fn test_custom_io_source() { - let handle = TokioRuntime::current(); +async fn test_custom_vortex_read() { let read_count = Arc::new(AtomicUsize::new(0)); - let source = CountingIoSource { + let read_at: Arc = Arc::new(CountingReadAt { data: ByteBuffer::from(TEST_DATA.to_vec()), read_count: read_count.clone(), - }; - - let file_read = handle.open_read(source, Default::default()).unwrap(); + }); // Perform several reads - file_read.read_at(0, 5, Alignment::new(1)).await.unwrap(); - file_read.read_at(5, 5, Alignment::new(1)).await.unwrap(); - file_read.read_at(10, 5, Alignment::new(1)).await.unwrap(); + read_at.read_at(0, 5, Alignment::new(1)).await.unwrap(); + read_at.read_at(5, 5, Alignment::new(1)).await.unwrap(); + read_at.read_at(10, 5, Alignment::new(1)).await.unwrap(); - // Check that our custom IoSource was called 3 times + // Check that our custom VortexRead was called 3 times assert_eq!(read_count.load(Ordering::SeqCst), 3); } @@ -311,16 +283,14 @@ async fn test_custom_io_source() { #[tokio::test] async fn test_read_out_of_bounds() { - let handle = TokioRuntime::current(); - let buffer = ByteBuffer::from(TEST_DATA.to_vec()); - let file_read = handle.open_read(buffer, Default::default()).unwrap(); + let reader: Arc = Arc::new(ByteBuffer::from(TEST_DATA.to_vec())); // Try to read beyond the buffer - let result = file_read.read_at(100, 10, Alignment::new(1)).await; + let result = reader.read_at(100, 10, Alignment::new(1)).await; assert!(result.is_err()); // Try to read with length that exceeds buffer - let result = file_read.read_at(40, 20, Alignment::new(1)).await; + let result = reader.read_at(40, 20, Alignment::new(1)).await; assert!(result.is_err()); } diff --git a/vortex-io/src/runtime/tokio.rs b/vortex-io/src/runtime/tokio.rs index 6bf12fd3b73..770fb794b0e 100644 --- a/vortex-io/src/runtime/tokio.rs +++ b/vortex-io/src/runtime/tokio.rs @@ -5,14 +5,12 @@ use std::sync::Arc; use std::sync::LazyLock; use futures::future::BoxFuture; -use tracing::Instrument; use crate::runtime::AbortHandle; use crate::runtime::AbortHandleRef; use crate::runtime::BlockingRuntime; use crate::runtime::Executor; use crate::runtime::Handle; -use crate::runtime::IoTask; /// A Vortex runtime that drives all work the enclosed Tokio runtime handle. pub struct TokioRuntime(Arc); @@ -55,10 +53,6 @@ impl Executor for tokio::runtime::Handle { fn spawn_blocking(&self, task: Box) -> AbortHandleRef { Box::new(tokio::runtime::Handle::spawn_blocking(self, task).abort_handle()) } - - fn spawn_io(&self, task: IoTask) { - tokio::runtime::Handle::spawn(self, task.source.drive_send(task.stream).in_current_span()); - } } /// A runtime implementation that grabs the current Tokio runtime handle on each call. @@ -84,11 +78,6 @@ impl Executor for CurrentTokioRuntime { .abort_handle(), ) } - - fn spawn_io(&self, task: IoTask) { - tokio::runtime::Handle::current() - .spawn(task.source.drive_send(task.stream).in_current_span()); - } } impl AbortHandle for tokio::task::AbortHandle { diff --git a/vortex-io/src/runtime/wasm.rs b/vortex-io/src/runtime/wasm.rs index 4cfa2416d47..9c2bfde9e89 100644 --- a/vortex-io/src/runtime/wasm.rs +++ b/vortex-io/src/runtime/wasm.rs @@ -11,7 +11,6 @@ use crate::runtime::AbortHandle; use crate::runtime::AbortHandleRef; use crate::runtime::Executor; use crate::runtime::Handle; -use crate::runtime::IoTask; /// A Vortex runtime that drives work in a WebAssembly environment. pub struct WasmRuntime; @@ -37,10 +36,6 @@ impl Executor for WasmRuntime { Box::new(NoOpAbortHandle) } - fn spawn_io(&self, task: IoTask) { - spawn_local(task.source.drive_local(task.stream)); - } - fn spawn_blocking(&self, task: Box) -> AbortHandleRef { spawn_local(async move { task() }); Box::new(NoOpAbortHandle) diff --git a/vortex-python/src/file.rs b/vortex-python/src/file.rs index a640a1dd9f1..1b3f1f94eb4 100644 --- a/vortex-python/src/file.rs +++ b/vortex-python/src/file.rs @@ -59,7 +59,7 @@ pub fn open(py: Python, path: &str, without_segment_cache: bool) -> PyResult AppState<'a> { session: &'a VortexSession, path: impl AsRef, ) -> VortexResult> { - let vxf = session.open_options().open(path.as_ref()).await?; + let vxf = session.open_options().open_path(path.as_ref()).await?; let cursor = LayoutCursor::new(vxf.footer().clone(), vxf.segment_source()); diff --git a/vortex-tui/src/inspect.rs b/vortex-tui/src/inspect.rs index 65ad6da1105..d2258e59a4d 100644 --- a/vortex-tui/src/inspect.rs +++ b/vortex-tui/src/inspect.rs @@ -444,7 +444,7 @@ impl<'a> VortexInspector<'a> { Ok(self .session .open_options() - .open(self.path.as_path()) + .open_path(self.path.as_path()) .await? .footer() .clone()) diff --git a/vortex-tui/src/segments.rs b/vortex-tui/src/segments.rs index 2a4bbc9ca1b..ac132280325 100644 --- a/vortex-tui/src/segments.rs +++ b/vortex-tui/src/segments.rs @@ -57,9 +57,9 @@ struct SegmentInfo { /// /// Returns an error if the file cannot be opened or read. pub async fn exec_segments(session: &VortexSession, args: SegmentsArgs) -> VortexResult<()> { - let vxf = session.open_options().open(args.file).await?; - + let vxf = session.open_options().open_path(args.file).await?; let footer = vxf.footer(); + let mut segment_tree = collect_segment_tree(footer.layout().as_ref(), footer.segment_map()); // Convert to output format diff --git a/vortex-tui/src/tree.rs b/vortex-tui/src/tree.rs index 32ef339d6b4..a8799afea1e 100644 --- a/vortex-tui/src/tree.rs +++ b/vortex-tui/src/tree.rs @@ -93,7 +93,7 @@ pub async fn exec_tree(session: &VortexSession, args: TreeArgs) -> VortexResult< async fn exec_array_tree(session: &VortexSession, file: &Path, _json: bool) -> VortexResult<()> { let full = session .open_options() - .open(file) + .open_path(file) .await? .scan()? .into_array_stream()? @@ -111,7 +111,7 @@ async fn exec_layout_tree( verbose: bool, json: bool, ) -> VortexResult<()> { - let vxf = session.open_options().open(file).await?; + let vxf = session.open_options().open_path(file).await?; let footer = vxf.footer(); if json { diff --git a/vortex/examples/tracing_vortex.rs b/vortex/examples/tracing_vortex.rs index a27730ac6cf..fb849fb4e5c 100644 --- a/vortex/examples/tracing_vortex.rs +++ b/vortex/examples/tracing_vortex.rs @@ -92,7 +92,7 @@ async fn main() -> Result<(), Box> { } /// Simulates application activity with various log levels and spans -#[expect(clippy::cognitive_complexity)] +#[allow(clippy::cognitive_complexity)] async fn simulate_application_activity(user_id: u32) { // Simulate HTTP request handling let request_span = span!( @@ -422,7 +422,7 @@ async fn read_trace_files( file_count += 1; // Read the file - let reader = session.open_options().open(path.clone()).await?; + let reader = session.open_options().open_path(path.clone()).await?; let array = reader.scan()?.into_array_stream()?.read_all().await?; total_events += array.len(); diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 1fa11ac14da..70b9b27c153 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -281,7 +281,7 @@ mod test { // [read] let array = session .open_options() - .open(path.clone()) + .open_path(path.clone()) .await? .scan()? .with_filter(gt(root(), lit(2u64))) @@ -323,7 +323,7 @@ mod test { // [compact read] let recovered_array = session .open_options() - .open(path.clone()) + .open_path(path.clone()) .await? .scan()? .into_array_stream()? @@ -370,7 +370,7 @@ mod test { // Read the file back, but project down to just the "value" column. let projected = session .open_options() - .open(path.clone()) + .open_path(path.clone()) .await? .scan()? .with_projection(select(["value"], root()))