diff --git a/crates/guest-rust/src/rt/async_support/future_support.rs b/crates/guest-rust/src/rt/async_support/future_support.rs index d18798549..37beb3007 100644 --- a/crates/guest-rust/src/rt/async_support/future_support.rs +++ b/crates/guest-rust/src/rt/async_support/future_support.rs @@ -118,12 +118,46 @@ use std::alloc::Layout; use std::fmt; use std::future::{Future, IntoFuture}; use std::marker; +use std::mem::{self, ManuallyDrop}; use std::pin::Pin; use std::ptr; use std::sync::atomic::{AtomicU32, Ordering::Relaxed}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Wake, Waker}; +/// Helper trait which encapsulates the various operations which can happen +/// with a future. +pub trait FutureOps { + /// The Rust type that's sent or received on this future. + type Payload; + + /// The `future.new` intrinsic. + fn new(&mut self) -> u64; + /// The canonical ABI layout of the type that this future is + /// sending/receiving. + fn elem_layout(&mut self) -> Layout; + /// Converts a Rust type to its canonical ABI representation. + unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8); + /// Used to deallocate any Rust-owned lists in the canonical ABI + /// representation for when a value is successfully sent but needs to be + /// cleaned up. + unsafe fn dealloc_lists(&mut self, dst: *mut u8); + /// Converts from the canonical ABI representation to a Rust value. + unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload; + /// The `future.write` intrinsic + unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32; + /// The `future.read` intrinsic + unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32; + /// The `future.cancel-read` intrinsic + unsafe fn cancel_read(&mut self, future: u32) -> u32; + /// The `future.cancel-write` intrinsic + unsafe fn cancel_write(&mut self, future: u32) -> u32; + /// The `future.drop-readable` intrinsic + unsafe fn drop_readable(&mut self, future: u32); + /// The `future.drop-writable` intrinsic + unsafe fn drop_writable(&mut self, future: u32); +} + /// Function table used for [`FutureWriter`] and [`FutureReader`] /// /// Instances of this table are generated by `wit_bindgen::generate!`. This is @@ -175,6 +209,44 @@ pub struct FutureVtable { pub new: unsafe extern "C" fn() -> u64, } +impl FutureOps for &'static FutureVtable { + type Payload = T; + + fn new(&mut self) -> u64 { + unsafe { (self.new)() } + } + fn elem_layout(&mut self) -> Layout { + self.layout + } + unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) { + (self.lower)(payload, dst) + } + unsafe fn dealloc_lists(&mut self, dst: *mut u8) { + (self.dealloc_lists)(dst) + } + unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload { + (self.lift)(dst) + } + unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32 { + (self.start_write)(future, val) + } + unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32 { + (self.start_read)(future, val) + } + unsafe fn cancel_read(&mut self, future: u32) -> u32 { + (self.cancel_read)(future) + } + unsafe fn cancel_write(&mut self, future: u32) -> u32 { + (self.cancel_write)(future) + } + unsafe fn drop_readable(&mut self, future: u32) { + (self.drop_readable)(future) + } + unsafe fn drop_writable(&mut self, future: u32) { + (self.drop_writable)(future) + } +} + /// Helper function to create a new read/write pair for a component model /// future. /// @@ -186,14 +258,29 @@ pub unsafe fn future_new( default: fn() -> T, vtable: &'static FutureVtable, ) -> (FutureWriter, FutureReader) { + let (tx, rx) = unsafe { raw_future_new(vtable) }; + (FutureWriter::new(tx, default), rx) +} + +/// Helper function to create a new read/write pair for a component model +/// future. +/// +/// # Unsafety +/// +/// This function is unsafe as it requires the functions within `vtable` to +/// correctly uphold the contracts of the component model. +pub unsafe fn raw_future_new(mut ops: O) -> (RawFutureWriter, RawFutureReader) +where + O: FutureOps + Clone, +{ unsafe { - let handles = (vtable.new)(); + let handles = ops.new(); let reader = handles as u32; let writer = (handles >> 32) as u32; rtdebug!("future.new() = [{writer}, {reader}]"); ( - FutureWriter::new(writer, default, vtable), - FutureReader::new(reader, vtable), + RawFutureWriter::new(writer, ops.clone()), + RawFutureReader::new(reader, ops), ) } } @@ -203,8 +290,7 @@ pub unsafe fn future_new( /// A [`FutureWriter`] can be used to send a single value of `T` to the other /// end of a `future`. In a sense this is similar to a oneshot channel in Rust. pub struct FutureWriter { - handle: u32, - vtable: &'static FutureVtable, + raw: ManuallyDrop>>, /// Whether or not a value should be written during `drop`. /// @@ -227,13 +313,11 @@ impl FutureWriter { /// /// This function is unsafe as it requires the functions within `vtable` to /// correctly uphold the contracts of the component model. - #[doc(hidden)] - pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable) -> Self { + unsafe fn new(raw: RawFutureWriter<&'static FutureVtable>, default: fn() -> T) -> Self { Self { - handle, + raw: ManuallyDrop::new(raw), default, should_write_default_value: true, - vtable, } } @@ -264,17 +348,18 @@ impl FutureWriter { /// will be lost. There is also [`FutureWrite::cancel`] which can be used to /// possibly re-acquire `value` and `self` if the operation was cancelled. /// In such a situation the operation can be retried at a future date. - pub fn write(self, value: T) -> FutureWrite { - FutureWrite { - op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)), - } + pub fn write(mut self, value: T) -> FutureWrite { + let raw = unsafe { ManuallyDrop::take(&mut self.raw).write(value) }; + let default = self.default; + mem::forget(self); + FutureWrite { raw, default } } } impl fmt::Debug for FutureWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FutureWriter") - .field("handle", &self.handle) + .field("handle", &self.raw.handle) .finish() } } @@ -282,103 +367,228 @@ impl fmt::Debug for FutureWriter { impl Drop for FutureWriter { fn drop(&mut self) { // If a value has not yet been written into this writer than that must - // be done so now. Perform a "clone" of `self` ensuring that - // `should_write_default_value` is set to `false` to avoid infinite - // loops by accident. The cloned `FutureWriter` will be responsible for - // performing the `drop-writable` call below once the write has - // completed. + // be done so now. Take the `raw` writer and perform the write via a + // waker that drives the future. // - // Note, though, that if `should_write_default_value` is `false` then a - // write has already happened and we can go ahead and just synchronously - // drop this writer as we would any other handle. + // If `should_write_default_value` is `false` then a write has already + // happened and we can go ahead and just synchronously drop this writer + // as we would any other handle. if self.should_write_default_value { - let clone = FutureWriter { - handle: self.handle, - default: self.default, - should_write_default_value: false, - vtable: self.vtable, - }; - let value = (clone.default)(); - let write = clone.write(value); - Arc::new(DeferredWrite::new(write)).wake(); + let raw = unsafe { ManuallyDrop::take(&mut self.raw) }; + let value = (self.default)(); + raw.write_and_forget(value); } else { - unsafe { - rtdebug!("future.drop-writable({})", self.handle); - (self.vtable.drop_writable)(self.handle); - } + unsafe { ManuallyDrop::drop(&mut self.raw) } } } } -/// Helper structure which behaves both as a future of sorts and an executor of -/// sorts. -/// -/// This type is constructed in `Drop for FutureWriter` to send out a -/// default value when no other has been written. This manages the -/// `FutureWrite` operation happening internally through a `Wake` -/// implementation. That means that this is a sort of cyclical future which, -/// when woken, will complete the write operation. +/// Represents a write operation which may be cancelled prior to completion. /// -/// The purpose of this is to be a "lightweight" way of "spawn"-ing a future -/// write to happen in the background. Crucially, however, this doesn't require -/// the `async-spawn` feature and instead works with the `wasip3_task` C ABI -/// structures (which spawn doesn't support). -struct DeferredWrite { - write: Mutex>, -} - -// TODO -unsafe impl Send for DeferredWrite {} -unsafe impl Sync for DeferredWrite {} - -impl DeferredWrite { - fn new(write: FutureWrite) -> DeferredWrite { - DeferredWrite { - write: Mutex::new(write), +/// This is returned by [`FutureWriter::write`]. +pub struct FutureWrite { + raw: RawFutureWrite<&'static FutureVtable>, + default: fn() -> T, +} + +/// Result of [`FutureWrite::cancel`]. +#[derive(Debug)] +pub enum FutureWriteCancel { + /// The cancel request raced with the receipt of the sent value, and the + /// value was actually sent. Neither the value nor the writer are made + /// available here as both are gone. + AlreadySent, + + /// The other end was dropped before cancellation happened. + /// + /// In this case the original value is returned back to the caller but the + /// writer itself is not longer accessible as it's no longer usable. + Dropped(T), + + /// The pending write was successfully cancelled and the value being written + /// is returned along with the writer to resume again in the future if + /// necessary. + Cancelled(T, FutureWriter), +} + +impl Future for FutureWrite { + type Output = Result<(), FutureWriteError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.pin_project().poll(cx) + } +} + +impl FutureWrite { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut RawFutureWrite<&'static FutureVtable>> { + // SAFETY: we've chosen that when `Self` is pinned that it translates to + // always pinning the inner field, so that's codified here. + unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().raw) } + } + + /// Cancel this write if it hasn't already completed. + /// + /// This method can be used to cancel a write-in-progress and re-acquire + /// the writer and the value being sent. Note that the write operation may + /// succeed racily or the other end may also drop racily, and these + /// outcomes are reflected in the returned value here. + /// + /// # Panics + /// + /// Panics if the operation has already been completed via `Future::poll`, + /// or if this method is called twice. + pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel { + let default = self.default; + match self.pin_project().cancel() { + RawFutureWriteCancel::AlreadySent => FutureWriteCancel::AlreadySent, + RawFutureWriteCancel::Dropped(val) => FutureWriteCancel::Dropped(val), + RawFutureWriteCancel::Cancelled(val, raw) => FutureWriteCancel::Cancelled( + val, + FutureWriter { + raw: ManuallyDrop::new(raw), + default, + should_write_default_value: true, + }, + ), } } } -impl Wake for DeferredWrite { - fn wake(self: Arc) { - // When a `wake` signal comes in that should happen in two locations: - // - // 1. When `DeferredWrite` is initially constructed. - // 2. When an event comes in indicating that the internal write has - // completed. - // - // The implementation here is the same in both cases. A clone of `self` - // is converted to a `Waker`, and this `Waker` notably owns the - // internal future itself. The internal write operation is then pushed - // forward (e.g. it's issued in (1) or checked up on in (2)). +impl Drop for FutureWrite { + fn drop(&mut self) { + if self.raw.op.is_done() { + return; + } + + // Although the underlying `WaitableOperation` will already + // auto-cancel-on-drop we need to specially handle that here because if + // the cancellation goes through then it means that no value will have + // been written to this future which will cause a trap. By using + // `Self::cancel` it's ensured that if cancellation succeeds a + // `FutureWriter` is created. In `Drop for FutureWriter` that'll handle + // the last-ditch write-default logic. // - // If `Pending` is returned then `waker` should have been stored away - // within the `wasip3_task` C ABI structure. Otherwise it should not - // have been stored away and `self` should be the sole reference which - // means everything will get cleaned up when this function returns. - let poll = { - let waker = Waker::from(self.clone()); - let mut cx = Context::from_waker(&waker); - let mut write = self.write.lock().unwrap(); - unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) } - }; - if poll.is_ready() { - assert_eq!(Arc::strong_count(&self), 1); - } else { - assert!(Arc::strong_count(&self) > 1); + // SAFETY: we're in the destructor here so the value `self` is about + // to go away and we can guarantee we're not moving out of it. + let pin = unsafe { Pin::new_unchecked(self) }; + pin.cancel(); + } +} + +/// Raw version of [`FutureWriter`]. +pub struct RawFutureWriter { + handle: u32, + ops: O, +} + +impl RawFutureWriter { + unsafe fn new(handle: u32, ops: O) -> Self { + Self { handle, ops } + } + + /// Same as [`FutureWriter::write`], but the raw version. + pub fn write(self, value: O::Payload) -> RawFutureWrite { + RawFutureWrite { + op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)), + } + } + + /// Writes `value` in the background. + /// + /// This does not block and is not cancellable. + pub fn write_and_forget(self, value: O::Payload) + where + O: 'static, + { + return Arc::new(DeferredWrite { + write: Mutex::new(self.write(value)), + }) + .wake(); + + /// Helper structure which behaves both as a future of sorts and an + /// executor of sorts. + /// + /// This type is constructed in `Drop for FutureWriter` to send out a + /// default value when no other has been written. This manages the + /// `FutureWrite` operation happening internally through a `Wake` + /// implementation. That means that this is a sort of cyclical future + /// which, when woken, will complete the write operation. + /// + /// The purpose of this is to be a "lightweight" way of "spawn"-ing a + /// future write to happen in the background. Crucially, however, this + /// doesn't require the `async-spawn` feature and instead works with the + /// `wasip3_task` C ABI structures (which spawn doesn't support). + struct DeferredWrite { + write: Mutex>, + } + + // SAFETY: Needed to satisfy `Waker::from` but otherwise should be ok + // because wasm doesn't have threads anyway right now. + unsafe impl Send for DeferredWrite {} + unsafe impl Sync for DeferredWrite {} + + impl Wake for DeferredWrite { + fn wake(self: Arc) { + // When a `wake` signal comes in that should happen in two + // locations: + // + // 1. When `DeferredWrite` is initially constructed. + // 2. When an event comes in indicating that the internal write + // has completed. + // + // The implementation here is the same in both cases. A clone of + // `self` is converted to a `Waker`, and this `Waker` notably + // owns the internal future itself. The internal write operation + // is then pushed forward (e.g. it's issued in (1) or checked up + // on in (2)). + // + // If `Pending` is returned then `waker` should have been stored + // away within the `wasip3_task` C ABI structure. Otherwise it + // should not have been stored away and `self` should be the + // sole reference which means everything will get cleaned up + // when this function returns. + let poll = { + let waker = Waker::from(self.clone()); + let mut cx = Context::from_waker(&waker); + let mut write = self.write.lock().unwrap(); + unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) } + }; + if poll.is_ready() { + assert_eq!(Arc::strong_count(&self), 1); + } else { + assert!(Arc::strong_count(&self) > 1); + } + assert_eq!(Arc::weak_count(&self), 0); + } + } + } +} + +impl fmt::Debug for RawFutureWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawFutureWriter") + .field("handle", &self.handle) + .finish() + } +} + +impl Drop for RawFutureWriter { + fn drop(&mut self) { + unsafe { + rtdebug!("future.drop-writable({})", self.handle); + self.ops.drop_writable(self.handle); } - assert_eq!(Arc::weak_count(&self), 0); } } /// Represents a write operation which may be cancelled prior to completion. /// /// This is returned by [`FutureWriter::write`]. -pub struct FutureWrite { - op: WaitableOperation>, +pub struct RawFutureWrite { + op: WaitableOperation>, } -struct FutureWriteOp(marker::PhantomData); +struct FutureWriteOp(marker::PhantomData); enum WriteComplete { Written, @@ -386,16 +596,13 @@ enum WriteComplete { Cancelled(T), } -unsafe impl WaitableOp for FutureWriteOp -where - T: 'static, -{ - type Start = (FutureWriter, T); - type InProgress = (FutureWriter, Option); - type Result = (WriteComplete, FutureWriter); - type Cancel = FutureWriteCancel; +unsafe impl WaitableOp for FutureWriteOp { + type Start = (RawFutureWriter, O::Payload); + type InProgress = (RawFutureWriter, Option); + type Result = (WriteComplete, RawFutureWriter); + type Cancel = RawFutureWriteCancel; - fn start(&self, (writer, value): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (mut writer, value): Self::Start) -> (u32, Self::InProgress) { // TODO: it should be safe to store the lower-destination in // `WaitableOperation` using `Pin` memory and such, but that would // require some type-level trickery to get a correctly-sized value @@ -406,23 +613,23 @@ where // In lieu of that a dedicated location on the heap is created for the // lowering, and then `value`, as an owned value, is lowered into this // pointer to initialize it. - let (ptr, cleanup) = Cleanup::new(writer.vtable.layout); - // SAFETY: `ptr` is allocated with `vtable.layout` and should be + let (ptr, cleanup) = Cleanup::new(writer.ops.elem_layout()); + // SAFETY: `ptr` is allocated with `ops.layout` and should be // safe to use here. let code = unsafe { - (writer.vtable.lower)(value, ptr); - (writer.vtable.start_write)(writer.handle, ptr) + writer.ops.lower(value, ptr); + writer.ops.start_write(writer.handle, ptr) }; rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle); (code, (writer, cleanup)) } - fn start_cancelled(&self, (writer, value): Self::Start) -> Self::Cancel { - FutureWriteCancel::Cancelled(value, writer) + fn start_cancelled(&mut self, (writer, value): Self::Start) -> Self::Cancel { + RawFutureWriteCancel::Cancelled(value, writer) } fn in_progress_update( - &self, + &mut self, (mut writer, cleanup): Self::InProgress, code: u32, ) -> Result { @@ -443,13 +650,8 @@ where super::DROPPED | super::CANCELLED => { // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. - let value = unsafe { (writer.vtable.lift)(ptr) }; + let value = unsafe { writer.ops.lift(ptr) }; let status = if code == super::DROPPED { - // This writer has been witnessed to be dropped, meaning that - // `writer` is going to get destroyed soon as this return - // value propagates up the stack. There's no need to write - // the default value, so set this to `false`. - writer.should_write_default_value = false; WriteComplete::Dropped(value) } else { WriteComplete::Cancelled(value) @@ -467,13 +669,10 @@ where // Afterwards the `cleanup` itself is naturally dropped and cleaned // up. super::COMPLETED => { - // A value was written, so no need to write the default value. - writer.should_write_default_value = false; - // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. unsafe { - (writer.vtable.dealloc_lists)(ptr); + writer.ops.dealloc_lists(ptr); } Ok((WriteComplete::Written, writer)) } @@ -482,35 +681,35 @@ where } } - fn in_progress_waitable(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 { writer.handle } - fn in_progress_cancel(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `writer` and all the various operational bits, // so this relies on `WaitableOperation` being safe. - let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; + let code = unsafe { writer.ops.cancel_write(writer.handle) }; rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle); code } - fn result_into_cancel(&self, (result, writer): Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, (result, writer): Self::Result) -> Self::Cancel { match result { // The value was actually sent, meaning we can't yield back the // future nor the value. - WriteComplete::Written => FutureWriteCancel::AlreadySent, + WriteComplete::Written => RawFutureWriteCancel::AlreadySent, // The value was not sent because the other end either hung up or we // successfully cancelled. In both cases return back the value here // with the writer. - WriteComplete::Dropped(val) => FutureWriteCancel::Dropped(val), - WriteComplete::Cancelled(val) => FutureWriteCancel::Cancelled(val, writer), + WriteComplete::Dropped(val) => RawFutureWriteCancel::Dropped(val), + WriteComplete::Cancelled(val) => RawFutureWriteCancel::Cancelled(val, writer), } } } -impl Future for FutureWrite { - type Output = Result<(), FutureWriteError>; +impl Future for RawFutureWrite { + type Output = Result<(), FutureWriteError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pin_project() @@ -524,25 +723,16 @@ impl Future for FutureWrite { } } -impl FutureWrite { - fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { +impl RawFutureWrite { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { // SAFETY: we've chosen that when `Self` is pinned that it translates to // always pinning the inner field, so that's codified here. unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) } } - /// Cancel this write if it hasn't already completed. - /// - /// This method can be used to cancel a write-in-progress and re-acquire - /// the writer and the value being sent. Note that the write operation may - /// succeed racily or the other end may also drop racily, and these - /// outcomes are reflected in the returned value here. - /// - /// # Panics - /// - /// Panics if the operation has already been completed via `Future::poll`, - /// or if this method is called twice. - pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel { + /// Same as [`FutureWrite::cancel`], but returns a [`RawFutureWriteCancel`] + /// instead. + pub fn cancel(self: Pin<&mut Self>) -> RawFutureWriteCancel { self.pin_project().cancel() } } @@ -570,7 +760,7 @@ impl std::error::Error for FutureWriteError {} /// Result of [`FutureWrite::cancel`]. #[derive(Debug)] -pub enum FutureWriteCancel { +pub enum RawFutureWriteCancel { /// The cancel request raced with the receipt of the sent value, and the /// value was actually sent. Neither the value nor the writer are made /// available here as both are gone. @@ -580,34 +770,43 @@ pub enum FutureWriteCancel { /// /// In this case the original value is returned back to the caller but the /// writer itself is not longer accessible as it's no longer usable. - Dropped(T), + Dropped(O::Payload), /// The pending write was successfully cancelled and the value being written /// is returned along with the writer to resume again in the future if /// necessary. - Cancelled(T, FutureWriter), + Cancelled(O::Payload, RawFutureWriter), } /// Represents the readable end of a Component Model `future`. -pub struct FutureReader { +pub type FutureReader = RawFutureReader<&'static FutureVtable>; + +/// Represents the readable end of a Component Model `future`. +pub struct RawFutureReader { handle: AtomicU32, - vtable: &'static FutureVtable, + ops: O, } -impl fmt::Debug for FutureReader { +impl fmt::Debug for RawFutureReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FutureReader") + f.debug_struct("RawFutureReader") .field("handle", &self.handle) .finish() } } -impl FutureReader { - #[doc(hidden)] - pub fn new(handle: u32, vtable: &'static FutureVtable) -> Self { +impl RawFutureReader { + /// Raw constructor for a future reader. + /// + /// Takes ownership of the `handle` provided. + /// + /// # Safety + /// + /// The `ops` specified must be both valid and well-typed for `handle`. + pub unsafe fn new(handle: u32, ops: O) -> Self { Self { handle: AtomicU32::new(handle), - vtable, + ops, } } @@ -630,27 +829,27 @@ impl FutureReader { } } -impl IntoFuture for FutureReader { - type Output = T; - type IntoFuture = FutureRead; +impl IntoFuture for RawFutureReader { + type Output = O::Payload; + type IntoFuture = RawFutureRead; /// Convert this object into a `Future` which will resolve when a value is /// written to the writable end of this `future`. fn into_future(self) -> Self::IntoFuture { - FutureRead { + RawFutureRead { op: WaitableOperation::new(FutureReadOp(marker::PhantomData), self), } } } -impl Drop for FutureReader { +impl Drop for RawFutureReader { fn drop(&mut self) { let Some(handle) = self.opt_handle() else { return; }; unsafe { rtdebug!("future.drop-readable({handle})"); - (self.vtable.drop_readable)(handle); + self.ops.drop_readable(handle); } } } @@ -659,43 +858,46 @@ impl Drop for FutureReader { /// /// This represents a read operation on a [`FutureReader`] and is created via /// `IntoFuture`. -pub struct FutureRead { - op: WaitableOperation>, +pub type FutureRead = RawFutureRead<&'static FutureVtable>; + +/// Represents a read operation which may be cancelled prior to completion. +/// +/// This represents a read operation on a [`FutureReader`] and is created via +/// `IntoFuture`. +pub struct RawFutureRead { + op: WaitableOperation>, } -struct FutureReadOp(marker::PhantomData); +struct FutureReadOp(marker::PhantomData); enum ReadComplete { Value(T), Cancelled, } -unsafe impl WaitableOp for FutureReadOp -where - T: 'static, -{ - type Start = FutureReader; - type InProgress = (FutureReader, Option); - type Result = (ReadComplete, FutureReader); - type Cancel = Result>; +unsafe impl WaitableOp for FutureReadOp { + type Start = RawFutureReader; + type InProgress = (RawFutureReader, Option); + type Result = (ReadComplete, RawFutureReader); + type Cancel = Result>; - fn start(&self, reader: Self::Start) -> (u32, Self::InProgress) { - let (ptr, cleanup) = Cleanup::new(reader.vtable.layout); + fn start(&mut self, mut reader: Self::Start) -> (u32, Self::InProgress) { + let (ptr, cleanup) = Cleanup::new(reader.ops.elem_layout()); // SAFETY: `ptr` is allocated with `vtable.layout` and should be // safe to use here. Its lifetime for the async operation is hinged on // `WaitableOperation` being safe. - let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr) }; + let code = unsafe { reader.ops.start_read(reader.handle(), ptr) }; rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle()); (code, (reader, cleanup)) } - fn start_cancelled(&self, state: Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, state: Self::Start) -> Self::Cancel { Err(state) } fn in_progress_update( - &self, - (reader, cleanup): Self::InProgress, + &mut self, + (mut reader, cleanup): Self::InProgress, code: u32, ) -> Result { match ReturnCode::decode(code) { @@ -717,7 +919,7 @@ where // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. - let value = unsafe { (reader.vtable.lift)(ptr) }; + let value = unsafe { reader.ops.lift(ptr) }; Ok((ReadComplete::Value(value), reader)) } @@ -725,19 +927,19 @@ where } } - fn in_progress_waitable(&self, (reader, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (reader, _): &Self::InProgress) -> u32 { reader.handle() } - fn in_progress_cancel(&self, (reader, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (reader, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `reader` and all the various operational bits, // so this relies on `WaitableOperation` being safe. - let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; + let code = unsafe { reader.ops.cancel_read(reader.handle()) }; rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle()); code } - fn result_into_cancel(&self, (value, reader): Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, (value, reader): Self::Result) -> Self::Cancel { match value { // The value was actually read, so thread that through here. ReadComplete::Value(value) => Ok(value), @@ -749,8 +951,8 @@ where } } -impl Future for FutureRead { - type Output = T; +impl Future for RawFutureRead { + type Output = O::Payload; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pin_project() @@ -765,8 +967,8 @@ impl Future for FutureRead { } } -impl FutureRead { - fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { +impl RawFutureRead { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { // SAFETY: we've chosen that when `Self` is pinned that it translates to // always pinning the inner field, so that's codified here. unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) } @@ -786,7 +988,7 @@ impl FutureRead { /// Panics if the operation has already been completed via `Future::poll`, /// or if this method is called twice. Additionally if this method completes /// then calling `poll` again on `self` will panic. - pub fn cancel(self: Pin<&mut Self>) -> Result> { + pub fn cancel(self: Pin<&mut Self>) -> Result> { self.pin_project().cancel() } } diff --git a/crates/guest-rust/src/rt/async_support/stream_support.rs b/crates/guest-rust/src/rt/async_support/stream_support.rs index 597850cc3..ff11e6dbc 100644 --- a/crates/guest-rust/src/rt/async_support/stream_support.rs +++ b/crates/guest-rust/src/rt/async_support/stream_support.rs @@ -243,7 +243,7 @@ where type Result = (StreamResult, AbiBuffer); type Cancel = (StreamResult, AbiBuffer); - fn start(&self, (writer, buf): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) { if writer.done { return (DROPPED, (writer, buf)); } @@ -259,12 +259,12 @@ where (code, (writer, buf)) } - fn start_cancelled(&self, (_writer, buf): Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel { (StreamResult::Cancelled, buf) } fn in_progress_update( - &self, + &mut self, (writer, mut buf): Self::InProgress, code: u32, ) -> Result { @@ -285,11 +285,11 @@ where } } - fn in_progress_waitable(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 { writer.handle } - fn in_progress_cancel(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `writer` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; @@ -297,7 +297,7 @@ where code } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } @@ -460,7 +460,7 @@ where type Result = (StreamResult, Vec); type Cancel = (StreamResult, Vec); - fn start(&self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) { if reader.done { return (DROPPED, (reader, buf, None)); } @@ -493,12 +493,12 @@ where (code, (reader, buf, cleanup)) } - fn start_cancelled(&self, (_, buf): Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel { (StreamResult::Cancelled, buf) } fn in_progress_update( - &self, + &mut self, (reader, mut buf, cleanup): Self::InProgress, code: u32, ) -> Result { @@ -555,11 +555,11 @@ where } } - fn in_progress_waitable(&self, (reader, ..): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 { reader.handle() } - fn in_progress_cancel(&self, (reader, ..): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `reader` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; @@ -567,7 +567,7 @@ where code } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } diff --git a/crates/guest-rust/src/rt/async_support/subtask.rs b/crates/guest-rust/src/rt/async_support/subtask.rs index b902a1f34..750156225 100644 --- a/crates/guest-rust/src/rt/async_support/subtask.rs +++ b/crates/guest-rust/src/rt/async_support/subtask.rs @@ -42,14 +42,14 @@ pub unsafe trait Subtask { /// The in-memory layout of both parameters and results allocated with /// parameters coming first. - fn abi_layout(&self) -> Layout; + fn abi_layout(&mut self) -> Layout; /// The offset, in bytes, from the start of `ABI_LAYOUT` to where the /// results will be stored. - fn results_offset(&self) -> usize; + fn results_offset(&mut self) -> usize; /// The raw function import using `[async-lower]` and the canonical ABI. - unsafe fn call_import(&self, params: Self::ParamsLower, results: *mut u8) -> u32; + unsafe fn call_import(&mut self, params: Self::ParamsLower, results: *mut u8) -> u32; /// Bindings-generated version of lowering `params`. /// @@ -60,22 +60,22 @@ pub unsafe trait Subtask { /// Note that `ParamsLower` may return `dst` if there are more ABI /// parameters than are allowed flat params (as specified by the canonical /// ABI). - unsafe fn params_lower(&self, params: Self::Params, dst: *mut u8) -> Self::ParamsLower; + unsafe fn params_lower(&mut self, params: Self::Params, dst: *mut u8) -> Self::ParamsLower; /// Bindings-generated version of deallocating any lists stored within /// `lower`. - unsafe fn params_dealloc_lists(&self, lower: Self::ParamsLower); + unsafe fn params_dealloc_lists(&mut self, lower: Self::ParamsLower); /// Bindings-generated version of deallocating not only owned lists within /// `lower` but also deallocating any owned resources. - unsafe fn params_dealloc_lists_and_own(&self, lower: Self::ParamsLower); + unsafe fn params_dealloc_lists_and_own(&mut self, lower: Self::ParamsLower); /// Bindings-generated version of lifting the results stored at `src`. - unsafe fn results_lift(&self, src: *mut u8) -> Self::Results; + unsafe fn results_lift(&mut self, src: *mut u8) -> Self::Results; /// Helper function to actually perform this asynchronous call with /// `params`. - fn call(&self, params: Self::Params) -> impl Future + fn call(&mut self, params: Self::Params) -> impl Future where Self: Sized, { @@ -93,7 +93,7 @@ pub unsafe trait Subtask { } } -struct SubtaskOps<'a, T>(&'a T); +struct SubtaskOps<'a, T>(&'a mut T); struct Start { params: T::Params, @@ -105,7 +105,7 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { type Result = Result; type Cancel = Result; - fn start(&self, state: Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, state: Self::Start) -> (u32, Self::InProgress) { unsafe { let (ptr_params, cleanup) = Cleanup::new(self.0.abi_layout()); let ptr_results = ptr_params.add(self.0.results_offset()); @@ -128,12 +128,12 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { } } - fn start_cancelled(&self, _state: Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, _state: Self::Start) -> Self::Cancel { Err(()) } fn in_progress_update( - &self, + &mut self, mut state: Self::InProgress, code: u32, ) -> Result { @@ -164,7 +164,8 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { // Note that by dropping `state` here we'll both deallocate the // params/results storage area as well as the subtask handle // itself. - unsafe { Ok(Ok(self.0.results_lift(state.ptr_results(self.0)))) } + let ptr = state.ptr_results(self.0); + unsafe { Ok(Ok(self.0.results_lift(ptr))) } } // This subtask was dropped which forced cancellation. Said @@ -209,18 +210,18 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { } } - fn in_progress_waitable(&self, state: &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, state: &Self::InProgress) -> u32 { // This shouldn't get called in the one case this isn't present: when // `STATUS_RETURNED` is returned and no waitable is created. That's the // `unwrap()` condition here. state.subtask.as_ref().unwrap().handle.get() } - fn in_progress_cancel(&self, state: &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, state: &mut Self::InProgress) -> u32 { unsafe { cancel(self.in_progress_waitable(state)) } } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } @@ -247,7 +248,7 @@ struct InProgress { } impl InProgress { - fn flag_started(&mut self, op: &T) { + fn flag_started(&mut self, op: &mut T) { assert!(!self.started); self.started = true; @@ -259,7 +260,7 @@ impl InProgress { } } - fn ptr_results(&self, op: &T) -> *mut u8 { + fn ptr_results(&mut self, op: &mut T) -> *mut u8 { // SAFETY: the `T` trait has unsafely promised us that the offset is // in-bounds of the allocation layout. unsafe { diff --git a/crates/guest-rust/src/rt/async_support/waitable.rs b/crates/guest-rust/src/rt/async_support/waitable.rs index 603dfb689..850cd8f3c 100644 --- a/crates/guest-rust/src/rt/async_support/waitable.rs +++ b/crates/guest-rust/src/rt/async_support/waitable.rs @@ -84,7 +84,7 @@ pub unsafe trait WaitableOp { /// This method will actually call `{future,stream}.{read,write}` with /// `state` provided. The return code of the intrinsic is returned here /// along with the `InProgress` state. - fn start(&self, state: Self::Start) -> (u32, Self::InProgress); + fn start(&mut self, state: Self::Start) -> (u32, Self::InProgress); /// Optionally complete the async operation. /// @@ -95,18 +95,18 @@ pub unsafe trait WaitableOp { /// * a new status code has been received by an async export's `callback` /// * cancellation returned a code to be processed here fn in_progress_update( - &self, + &mut self, state: Self::InProgress, code: u32, ) -> Result; /// Conversion from the "start" state to the "cancel" result, needed when an /// operation is cancelled before it's started. - fn start_cancelled(&self, state: Self::Start) -> Self::Cancel; + fn start_cancelled(&mut self, state: Self::Start) -> Self::Cancel; /// Acquires the component-model `waitable` index that the `InProgress` /// state is waiting on. - fn in_progress_waitable(&self, state: &Self::InProgress) -> u32; + fn in_progress_waitable(&mut self, state: &Self::InProgress) -> u32; /// Initiates a request for cancellation of this operation. Returns the /// status code returned by the `{future,stream}.cancel-{read,write}` @@ -117,12 +117,12 @@ pub unsafe trait WaitableOp { /// instead the operation must be complete with the returned code. That may /// mean that this intrinsic can block while figuring things out in the /// component model ABI, for example. - fn in_progress_cancel(&self, state: &Self::InProgress) -> u32; + fn in_progress_cancel(&mut self, state: &mut Self::InProgress) -> u32; /// Converts a "completion result" into a "cancel result". This is necessary /// when an in-progress operation is cancelled so the in-progress result is /// first acquired and then transitioned to a cancel request. - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel; + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel; } enum WaitableOperationState { @@ -151,7 +151,7 @@ where fn pin_project( self: Pin<&mut Self>, ) -> ( - &S, + &mut S, &mut WaitableOperationState, Pin<&mut CompletionStatus>, ) { @@ -163,7 +163,7 @@ where unsafe { let me = self.get_unchecked_mut(); ( - &me.op, + &mut me.op, &mut me.state, Pin::new_unchecked(&mut me.completion_status), ) @@ -441,6 +441,11 @@ where Poll::Pending => unreachable!(), } } + + /// Returns whether or not this operation has completed. + pub fn is_done(&self) -> bool { + matches!(self.state, WaitableOperationState::Done) + } } impl Future for WaitableOperation { @@ -453,17 +458,15 @@ impl Future for WaitableOperation { impl Drop for WaitableOperation { fn drop(&mut self) { - // SAFETY: we're in the destructor here so the value `self` is about - // to go away and we can guarantee we're not moving out of it. - let mut pin = unsafe { Pin::new_unchecked(self) }; - - let (_, state, _) = pin.as_mut().pin_project(); - // If this operation has already completed then skip cancellation, // otherwise it's our job to cancel anything in-flight. - if let WaitableOperationState::Done = state { + if self.is_done() { return; } + + // SAFETY: we're in the destructor here so the value `self` is about + // to go away and we can guarantee we're not moving out of it. + let pin = unsafe { Pin::new_unchecked(self) }; pin.cancel(); } } diff --git a/crates/rust/src/interface.rs b/crates/rust/src/interface.rs index dd2795650..ce5b947ed 100644 --- a/crates/rust/src/interface.rs +++ b/crates/rust/src/interface.rs @@ -883,7 +883,7 @@ unsafe impl<'a> _Subtask for _MySubtask<'a> {{ uwriteln!( self.src, r#" -fn abi_layout(&self) -> ::core::alloc::Layout {{ +fn abi_layout(&mut self) -> ::core::alloc::Layout {{ unsafe {{ ::core::alloc::Layout::from_size_align_unchecked({}, {}) }} @@ -901,7 +901,10 @@ fn abi_layout(&self) -> ::core::alloc::Layout {{ } None => "0".to_string(), }; - uwriteln!(self.src, "fn results_offset(&self) -> usize {{ {offset} }}"); + uwriteln!( + self.src, + "fn results_offset(&mut self) -> usize {{ {offset} }}" + ); // Generate `fn call_import` let import_name = &func.name; @@ -922,7 +925,7 @@ fn abi_layout(&self) -> ::core::alloc::Layout {{ uwriteln!( self.src, r#" -unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u32 {{ +unsafe fn call_import(&mut self, _params: Self::ParamsLower, _results: *mut u8) -> u32 {{ {intrinsic} unsafe {{ call({args}) as u32 }} }} @@ -942,7 +945,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u ); uwriteln!( self.src, - "unsafe fn params_dealloc_lists(&self, _params: Self::ParamsLower) {{" + "unsafe fn params_dealloc_lists(&mut self, _params: Self::ParamsLower) {{" ); uwriteln!(self.src, "{dealloc_lists}"); uwriteln!(self.src, "}}"); @@ -956,7 +959,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u ); uwriteln!( self.src, - "unsafe fn params_dealloc_lists_and_own(&self, _params: Self::ParamsLower) {{" + "unsafe fn params_dealloc_lists_and_own(&mut self, _params: Self::ParamsLower) {{" ); uwriteln!(self.src, "{dealloc_lists_and_own}"); uwriteln!(self.src, "}}"); @@ -1000,7 +1003,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u } uwriteln!( self.src, - "unsafe fn params_lower(&self, ({}): Self::Params, _ptr: *mut u8) -> Self::ParamsLower {{", + "unsafe fn params_lower(&mut self, ({}): Self::Params, _ptr: *mut u8) -> Self::ParamsLower {{", param_lowers.join(" "), ); for lower in lowers.iter() { @@ -1015,7 +1018,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u }; uwriteln!( self.src, - "unsafe fn results_lift(&self, _ptr: *mut u8) -> Self::Results {{" + "unsafe fn results_lift(&mut self, _ptr: *mut u8) -> Self::Results {{" ); uwriteln!(self.src, "{lift}"); uwriteln!(self.src, "}}");