From 9f127337f1ab6bf7e441ad2c0f870923c469c5da Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 6 Nov 2025 13:41:41 -0800 Subject: [PATCH 1/2] Add more mutability to internal async traits As usage expands a bit beyond just this crate it can be useful to have mutability in the guts of the implementation. It's also easy enough to provide this everywhere so it's not much of a burden to plumb it around. --- .../src/rt/async_support/future_support.rs | 24 ++++++------ .../src/rt/async_support/stream_support.rs | 24 ++++++------ .../src/rt/async_support/subtask.rs | 37 ++++++++++--------- .../src/rt/async_support/waitable.rs | 16 ++++---- crates/rust/src/interface.rs | 17 +++++---- 5 files changed, 61 insertions(+), 57 deletions(-) diff --git a/crates/guest-rust/src/rt/async_support/future_support.rs b/crates/guest-rust/src/rt/async_support/future_support.rs index d18798549..07dbab14a 100644 --- a/crates/guest-rust/src/rt/async_support/future_support.rs +++ b/crates/guest-rust/src/rt/async_support/future_support.rs @@ -395,7 +395,7 @@ where type Result = (WriteComplete, FutureWriter); type Cancel = FutureWriteCancel; - fn start(&self, (writer, value): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (writer, value): Self::Start) -> (u32, Self::InProgress) { // TODO: it should be safe to store the lower-destination in // `WaitableOperation` using `Pin` memory and such, but that would // require some type-level trickery to get a correctly-sized value @@ -417,12 +417,12 @@ where (code, (writer, cleanup)) } - fn start_cancelled(&self, (writer, value): Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, (writer, value): Self::Start) -> Self::Cancel { FutureWriteCancel::Cancelled(value, writer) } fn in_progress_update( - &self, + &mut self, (mut writer, cleanup): Self::InProgress, code: u32, ) -> Result { @@ -482,11 +482,11 @@ where } } - fn in_progress_waitable(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 { writer.handle } - fn in_progress_cancel(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `writer` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; @@ -494,7 +494,7 @@ where code } - fn result_into_cancel(&self, (result, writer): Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, (result, writer): Self::Result) -> Self::Cancel { match result { // The value was actually sent, meaning we can't yield back the // future nor the value. @@ -679,7 +679,7 @@ where type Result = (ReadComplete, FutureReader); type Cancel = Result>; - fn start(&self, reader: Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, reader: Self::Start) -> (u32, Self::InProgress) { let (ptr, cleanup) = Cleanup::new(reader.vtable.layout); // SAFETY: `ptr` is allocated with `vtable.layout` and should be // safe to use here. Its lifetime for the async operation is hinged on @@ -689,12 +689,12 @@ where (code, (reader, cleanup)) } - fn start_cancelled(&self, state: Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, state: Self::Start) -> Self::Cancel { Err(state) } fn in_progress_update( - &self, + &mut self, (reader, cleanup): Self::InProgress, code: u32, ) -> Result { @@ -725,11 +725,11 @@ where } } - fn in_progress_waitable(&self, (reader, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (reader, _): &Self::InProgress) -> u32 { reader.handle() } - fn in_progress_cancel(&self, (reader, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (reader, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `reader` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; @@ -737,7 +737,7 @@ where code } - fn result_into_cancel(&self, (value, reader): Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, (value, reader): Self::Result) -> Self::Cancel { match value { // The value was actually read, so thread that through here. ReadComplete::Value(value) => Ok(value), diff --git a/crates/guest-rust/src/rt/async_support/stream_support.rs b/crates/guest-rust/src/rt/async_support/stream_support.rs index 597850cc3..ff11e6dbc 100644 --- a/crates/guest-rust/src/rt/async_support/stream_support.rs +++ b/crates/guest-rust/src/rt/async_support/stream_support.rs @@ -243,7 +243,7 @@ where type Result = (StreamResult, AbiBuffer); type Cancel = (StreamResult, AbiBuffer); - fn start(&self, (writer, buf): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) { if writer.done { return (DROPPED, (writer, buf)); } @@ -259,12 +259,12 @@ where (code, (writer, buf)) } - fn start_cancelled(&self, (_writer, buf): Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel { (StreamResult::Cancelled, buf) } fn in_progress_update( - &self, + &mut self, (writer, mut buf): Self::InProgress, code: u32, ) -> Result { @@ -285,11 +285,11 @@ where } } - fn in_progress_waitable(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 { writer.handle } - fn in_progress_cancel(&self, (writer, _): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `writer` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; @@ -297,7 +297,7 @@ where code } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } @@ -460,7 +460,7 @@ where type Result = (StreamResult, Vec); type Cancel = (StreamResult, Vec); - fn start(&self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) { if reader.done { return (DROPPED, (reader, buf, None)); } @@ -493,12 +493,12 @@ where (code, (reader, buf, cleanup)) } - fn start_cancelled(&self, (_, buf): Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel { (StreamResult::Cancelled, buf) } fn in_progress_update( - &self, + &mut self, (reader, mut buf, cleanup): Self::InProgress, code: u32, ) -> Result { @@ -555,11 +555,11 @@ where } } - fn in_progress_waitable(&self, (reader, ..): &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 { reader.handle() } - fn in_progress_cancel(&self, (reader, ..): &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `reader` and all the various operational bits, // so this relies on `WaitableOperation` being safe. let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; @@ -567,7 +567,7 @@ where code } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } diff --git a/crates/guest-rust/src/rt/async_support/subtask.rs b/crates/guest-rust/src/rt/async_support/subtask.rs index b902a1f34..750156225 100644 --- a/crates/guest-rust/src/rt/async_support/subtask.rs +++ b/crates/guest-rust/src/rt/async_support/subtask.rs @@ -42,14 +42,14 @@ pub unsafe trait Subtask { /// The in-memory layout of both parameters and results allocated with /// parameters coming first. - fn abi_layout(&self) -> Layout; + fn abi_layout(&mut self) -> Layout; /// The offset, in bytes, from the start of `ABI_LAYOUT` to where the /// results will be stored. - fn results_offset(&self) -> usize; + fn results_offset(&mut self) -> usize; /// The raw function import using `[async-lower]` and the canonical ABI. - unsafe fn call_import(&self, params: Self::ParamsLower, results: *mut u8) -> u32; + unsafe fn call_import(&mut self, params: Self::ParamsLower, results: *mut u8) -> u32; /// Bindings-generated version of lowering `params`. /// @@ -60,22 +60,22 @@ pub unsafe trait Subtask { /// Note that `ParamsLower` may return `dst` if there are more ABI /// parameters than are allowed flat params (as specified by the canonical /// ABI). - unsafe fn params_lower(&self, params: Self::Params, dst: *mut u8) -> Self::ParamsLower; + unsafe fn params_lower(&mut self, params: Self::Params, dst: *mut u8) -> Self::ParamsLower; /// Bindings-generated version of deallocating any lists stored within /// `lower`. - unsafe fn params_dealloc_lists(&self, lower: Self::ParamsLower); + unsafe fn params_dealloc_lists(&mut self, lower: Self::ParamsLower); /// Bindings-generated version of deallocating not only owned lists within /// `lower` but also deallocating any owned resources. - unsafe fn params_dealloc_lists_and_own(&self, lower: Self::ParamsLower); + unsafe fn params_dealloc_lists_and_own(&mut self, lower: Self::ParamsLower); /// Bindings-generated version of lifting the results stored at `src`. - unsafe fn results_lift(&self, src: *mut u8) -> Self::Results; + unsafe fn results_lift(&mut self, src: *mut u8) -> Self::Results; /// Helper function to actually perform this asynchronous call with /// `params`. - fn call(&self, params: Self::Params) -> impl Future + fn call(&mut self, params: Self::Params) -> impl Future where Self: Sized, { @@ -93,7 +93,7 @@ pub unsafe trait Subtask { } } -struct SubtaskOps<'a, T>(&'a T); +struct SubtaskOps<'a, T>(&'a mut T); struct Start { params: T::Params, @@ -105,7 +105,7 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { type Result = Result; type Cancel = Result; - fn start(&self, state: Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, state: Self::Start) -> (u32, Self::InProgress) { unsafe { let (ptr_params, cleanup) = Cleanup::new(self.0.abi_layout()); let ptr_results = ptr_params.add(self.0.results_offset()); @@ -128,12 +128,12 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { } } - fn start_cancelled(&self, _state: Self::Start) -> Self::Cancel { + fn start_cancelled(&mut self, _state: Self::Start) -> Self::Cancel { Err(()) } fn in_progress_update( - &self, + &mut self, mut state: Self::InProgress, code: u32, ) -> Result { @@ -164,7 +164,8 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { // Note that by dropping `state` here we'll both deallocate the // params/results storage area as well as the subtask handle // itself. - unsafe { Ok(Ok(self.0.results_lift(state.ptr_results(self.0)))) } + let ptr = state.ptr_results(self.0); + unsafe { Ok(Ok(self.0.results_lift(ptr))) } } // This subtask was dropped which forced cancellation. Said @@ -209,18 +210,18 @@ unsafe impl WaitableOp for SubtaskOps<'_, T> { } } - fn in_progress_waitable(&self, state: &Self::InProgress) -> u32 { + fn in_progress_waitable(&mut self, state: &Self::InProgress) -> u32 { // This shouldn't get called in the one case this isn't present: when // `STATUS_RETURNED` is returned and no waitable is created. That's the // `unwrap()` condition here. state.subtask.as_ref().unwrap().handle.get() } - fn in_progress_cancel(&self, state: &Self::InProgress) -> u32 { + fn in_progress_cancel(&mut self, state: &mut Self::InProgress) -> u32 { unsafe { cancel(self.in_progress_waitable(state)) } } - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel { + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel { result } } @@ -247,7 +248,7 @@ struct InProgress { } impl InProgress { - fn flag_started(&mut self, op: &T) { + fn flag_started(&mut self, op: &mut T) { assert!(!self.started); self.started = true; @@ -259,7 +260,7 @@ impl InProgress { } } - fn ptr_results(&self, op: &T) -> *mut u8 { + fn ptr_results(&mut self, op: &mut T) -> *mut u8 { // SAFETY: the `T` trait has unsafely promised us that the offset is // in-bounds of the allocation layout. unsafe { diff --git a/crates/guest-rust/src/rt/async_support/waitable.rs b/crates/guest-rust/src/rt/async_support/waitable.rs index 603dfb689..c456ad627 100644 --- a/crates/guest-rust/src/rt/async_support/waitable.rs +++ b/crates/guest-rust/src/rt/async_support/waitable.rs @@ -84,7 +84,7 @@ pub unsafe trait WaitableOp { /// This method will actually call `{future,stream}.{read,write}` with /// `state` provided. The return code of the intrinsic is returned here /// along with the `InProgress` state. - fn start(&self, state: Self::Start) -> (u32, Self::InProgress); + fn start(&mut self, state: Self::Start) -> (u32, Self::InProgress); /// Optionally complete the async operation. /// @@ -95,18 +95,18 @@ pub unsafe trait WaitableOp { /// * a new status code has been received by an async export's `callback` /// * cancellation returned a code to be processed here fn in_progress_update( - &self, + &mut self, state: Self::InProgress, code: u32, ) -> Result; /// Conversion from the "start" state to the "cancel" result, needed when an /// operation is cancelled before it's started. - fn start_cancelled(&self, state: Self::Start) -> Self::Cancel; + fn start_cancelled(&mut self, state: Self::Start) -> Self::Cancel; /// Acquires the component-model `waitable` index that the `InProgress` /// state is waiting on. - fn in_progress_waitable(&self, state: &Self::InProgress) -> u32; + fn in_progress_waitable(&mut self, state: &Self::InProgress) -> u32; /// Initiates a request for cancellation of this operation. Returns the /// status code returned by the `{future,stream}.cancel-{read,write}` @@ -117,12 +117,12 @@ pub unsafe trait WaitableOp { /// instead the operation must be complete with the returned code. That may /// mean that this intrinsic can block while figuring things out in the /// component model ABI, for example. - fn in_progress_cancel(&self, state: &Self::InProgress) -> u32; + fn in_progress_cancel(&mut self, state: &mut Self::InProgress) -> u32; /// Converts a "completion result" into a "cancel result". This is necessary /// when an in-progress operation is cancelled so the in-progress result is /// first acquired and then transitioned to a cancel request. - fn result_into_cancel(&self, result: Self::Result) -> Self::Cancel; + fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel; } enum WaitableOperationState { @@ -151,7 +151,7 @@ where fn pin_project( self: Pin<&mut Self>, ) -> ( - &S, + &mut S, &mut WaitableOperationState, Pin<&mut CompletionStatus>, ) { @@ -163,7 +163,7 @@ where unsafe { let me = self.get_unchecked_mut(); ( - &me.op, + &mut me.op, &mut me.state, Pin::new_unchecked(&mut me.completion_status), ) diff --git a/crates/rust/src/interface.rs b/crates/rust/src/interface.rs index dd2795650..ce5b947ed 100644 --- a/crates/rust/src/interface.rs +++ b/crates/rust/src/interface.rs @@ -883,7 +883,7 @@ unsafe impl<'a> _Subtask for _MySubtask<'a> {{ uwriteln!( self.src, r#" -fn abi_layout(&self) -> ::core::alloc::Layout {{ +fn abi_layout(&mut self) -> ::core::alloc::Layout {{ unsafe {{ ::core::alloc::Layout::from_size_align_unchecked({}, {}) }} @@ -901,7 +901,10 @@ fn abi_layout(&self) -> ::core::alloc::Layout {{ } None => "0".to_string(), }; - uwriteln!(self.src, "fn results_offset(&self) -> usize {{ {offset} }}"); + uwriteln!( + self.src, + "fn results_offset(&mut self) -> usize {{ {offset} }}" + ); // Generate `fn call_import` let import_name = &func.name; @@ -922,7 +925,7 @@ fn abi_layout(&self) -> ::core::alloc::Layout {{ uwriteln!( self.src, r#" -unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u32 {{ +unsafe fn call_import(&mut self, _params: Self::ParamsLower, _results: *mut u8) -> u32 {{ {intrinsic} unsafe {{ call({args}) as u32 }} }} @@ -942,7 +945,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u ); uwriteln!( self.src, - "unsafe fn params_dealloc_lists(&self, _params: Self::ParamsLower) {{" + "unsafe fn params_dealloc_lists(&mut self, _params: Self::ParamsLower) {{" ); uwriteln!(self.src, "{dealloc_lists}"); uwriteln!(self.src, "}}"); @@ -956,7 +959,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u ); uwriteln!( self.src, - "unsafe fn params_dealloc_lists_and_own(&self, _params: Self::ParamsLower) {{" + "unsafe fn params_dealloc_lists_and_own(&mut self, _params: Self::ParamsLower) {{" ); uwriteln!(self.src, "{dealloc_lists_and_own}"); uwriteln!(self.src, "}}"); @@ -1000,7 +1003,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u } uwriteln!( self.src, - "unsafe fn params_lower(&self, ({}): Self::Params, _ptr: *mut u8) -> Self::ParamsLower {{", + "unsafe fn params_lower(&mut self, ({}): Self::Params, _ptr: *mut u8) -> Self::ParamsLower {{", param_lowers.join(" "), ); for lower in lowers.iter() { @@ -1015,7 +1018,7 @@ unsafe fn call_import(&self, _params: Self::ParamsLower, _results: *mut u8) -> u }; uwriteln!( self.src, - "unsafe fn results_lift(&self, _ptr: *mut u8) -> Self::Results {{" + "unsafe fn results_lift(&mut self, _ptr: *mut u8) -> Self::Results {{" ); uwriteln!(self.src, "{lift}"); uwriteln!(self.src, "}}"); From 7b58b3b8d664ad12486d7e40ef79b1a4d8e5921c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 6 Nov 2025 14:01:20 -0800 Subject: [PATCH 2/2] Generalize bindings for futures This commit is similar to #1390 in that the goal is to enable external systems to use the Rust bindings here without requiring the exact shape of the vtable structure and such. This is, for example, intended to be useful when working on `wit-dylib`. --- .../src/rt/async_support/future_support.rs | 546 ++++++++++++------ .../src/rt/async_support/waitable.rs | 17 +- 2 files changed, 384 insertions(+), 179 deletions(-) diff --git a/crates/guest-rust/src/rt/async_support/future_support.rs b/crates/guest-rust/src/rt/async_support/future_support.rs index 07dbab14a..37beb3007 100644 --- a/crates/guest-rust/src/rt/async_support/future_support.rs +++ b/crates/guest-rust/src/rt/async_support/future_support.rs @@ -118,12 +118,46 @@ use std::alloc::Layout; use std::fmt; use std::future::{Future, IntoFuture}; use std::marker; +use std::mem::{self, ManuallyDrop}; use std::pin::Pin; use std::ptr; use std::sync::atomic::{AtomicU32, Ordering::Relaxed}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Wake, Waker}; +/// Helper trait which encapsulates the various operations which can happen +/// with a future. +pub trait FutureOps { + /// The Rust type that's sent or received on this future. + type Payload; + + /// The `future.new` intrinsic. + fn new(&mut self) -> u64; + /// The canonical ABI layout of the type that this future is + /// sending/receiving. + fn elem_layout(&mut self) -> Layout; + /// Converts a Rust type to its canonical ABI representation. + unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8); + /// Used to deallocate any Rust-owned lists in the canonical ABI + /// representation for when a value is successfully sent but needs to be + /// cleaned up. + unsafe fn dealloc_lists(&mut self, dst: *mut u8); + /// Converts from the canonical ABI representation to a Rust value. + unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload; + /// The `future.write` intrinsic + unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32; + /// The `future.read` intrinsic + unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32; + /// The `future.cancel-read` intrinsic + unsafe fn cancel_read(&mut self, future: u32) -> u32; + /// The `future.cancel-write` intrinsic + unsafe fn cancel_write(&mut self, future: u32) -> u32; + /// The `future.drop-readable` intrinsic + unsafe fn drop_readable(&mut self, future: u32); + /// The `future.drop-writable` intrinsic + unsafe fn drop_writable(&mut self, future: u32); +} + /// Function table used for [`FutureWriter`] and [`FutureReader`] /// /// Instances of this table are generated by `wit_bindgen::generate!`. This is @@ -175,6 +209,44 @@ pub struct FutureVtable { pub new: unsafe extern "C" fn() -> u64, } +impl FutureOps for &'static FutureVtable { + type Payload = T; + + fn new(&mut self) -> u64 { + unsafe { (self.new)() } + } + fn elem_layout(&mut self) -> Layout { + self.layout + } + unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) { + (self.lower)(payload, dst) + } + unsafe fn dealloc_lists(&mut self, dst: *mut u8) { + (self.dealloc_lists)(dst) + } + unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload { + (self.lift)(dst) + } + unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32 { + (self.start_write)(future, val) + } + unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32 { + (self.start_read)(future, val) + } + unsafe fn cancel_read(&mut self, future: u32) -> u32 { + (self.cancel_read)(future) + } + unsafe fn cancel_write(&mut self, future: u32) -> u32 { + (self.cancel_write)(future) + } + unsafe fn drop_readable(&mut self, future: u32) { + (self.drop_readable)(future) + } + unsafe fn drop_writable(&mut self, future: u32) { + (self.drop_writable)(future) + } +} + /// Helper function to create a new read/write pair for a component model /// future. /// @@ -186,14 +258,29 @@ pub unsafe fn future_new( default: fn() -> T, vtable: &'static FutureVtable, ) -> (FutureWriter, FutureReader) { + let (tx, rx) = unsafe { raw_future_new(vtable) }; + (FutureWriter::new(tx, default), rx) +} + +/// Helper function to create a new read/write pair for a component model +/// future. +/// +/// # Unsafety +/// +/// This function is unsafe as it requires the functions within `vtable` to +/// correctly uphold the contracts of the component model. +pub unsafe fn raw_future_new(mut ops: O) -> (RawFutureWriter, RawFutureReader) +where + O: FutureOps + Clone, +{ unsafe { - let handles = (vtable.new)(); + let handles = ops.new(); let reader = handles as u32; let writer = (handles >> 32) as u32; rtdebug!("future.new() = [{writer}, {reader}]"); ( - FutureWriter::new(writer, default, vtable), - FutureReader::new(reader, vtable), + RawFutureWriter::new(writer, ops.clone()), + RawFutureReader::new(reader, ops), ) } } @@ -203,8 +290,7 @@ pub unsafe fn future_new( /// A [`FutureWriter`] can be used to send a single value of `T` to the other /// end of a `future`. In a sense this is similar to a oneshot channel in Rust. pub struct FutureWriter { - handle: u32, - vtable: &'static FutureVtable, + raw: ManuallyDrop>>, /// Whether or not a value should be written during `drop`. /// @@ -227,13 +313,11 @@ impl FutureWriter { /// /// This function is unsafe as it requires the functions within `vtable` to /// correctly uphold the contracts of the component model. - #[doc(hidden)] - pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable) -> Self { + unsafe fn new(raw: RawFutureWriter<&'static FutureVtable>, default: fn() -> T) -> Self { Self { - handle, + raw: ManuallyDrop::new(raw), default, should_write_default_value: true, - vtable, } } @@ -264,17 +348,18 @@ impl FutureWriter { /// will be lost. There is also [`FutureWrite::cancel`] which can be used to /// possibly re-acquire `value` and `self` if the operation was cancelled. /// In such a situation the operation can be retried at a future date. - pub fn write(self, value: T) -> FutureWrite { - FutureWrite { - op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)), - } + pub fn write(mut self, value: T) -> FutureWrite { + let raw = unsafe { ManuallyDrop::take(&mut self.raw).write(value) }; + let default = self.default; + mem::forget(self); + FutureWrite { raw, default } } } impl fmt::Debug for FutureWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FutureWriter") - .field("handle", &self.handle) + .field("handle", &self.raw.handle) .finish() } } @@ -282,103 +367,228 @@ impl fmt::Debug for FutureWriter { impl Drop for FutureWriter { fn drop(&mut self) { // If a value has not yet been written into this writer than that must - // be done so now. Perform a "clone" of `self` ensuring that - // `should_write_default_value` is set to `false` to avoid infinite - // loops by accident. The cloned `FutureWriter` will be responsible for - // performing the `drop-writable` call below once the write has - // completed. + // be done so now. Take the `raw` writer and perform the write via a + // waker that drives the future. // - // Note, though, that if `should_write_default_value` is `false` then a - // write has already happened and we can go ahead and just synchronously - // drop this writer as we would any other handle. + // If `should_write_default_value` is `false` then a write has already + // happened and we can go ahead and just synchronously drop this writer + // as we would any other handle. if self.should_write_default_value { - let clone = FutureWriter { - handle: self.handle, - default: self.default, - should_write_default_value: false, - vtable: self.vtable, - }; - let value = (clone.default)(); - let write = clone.write(value); - Arc::new(DeferredWrite::new(write)).wake(); + let raw = unsafe { ManuallyDrop::take(&mut self.raw) }; + let value = (self.default)(); + raw.write_and_forget(value); } else { - unsafe { - rtdebug!("future.drop-writable({})", self.handle); - (self.vtable.drop_writable)(self.handle); - } + unsafe { ManuallyDrop::drop(&mut self.raw) } } } } -/// Helper structure which behaves both as a future of sorts and an executor of -/// sorts. -/// -/// This type is constructed in `Drop for FutureWriter` to send out a -/// default value when no other has been written. This manages the -/// `FutureWrite` operation happening internally through a `Wake` -/// implementation. That means that this is a sort of cyclical future which, -/// when woken, will complete the write operation. +/// Represents a write operation which may be cancelled prior to completion. /// -/// The purpose of this is to be a "lightweight" way of "spawn"-ing a future -/// write to happen in the background. Crucially, however, this doesn't require -/// the `async-spawn` feature and instead works with the `wasip3_task` C ABI -/// structures (which spawn doesn't support). -struct DeferredWrite { - write: Mutex>, -} - -// TODO -unsafe impl Send for DeferredWrite {} -unsafe impl Sync for DeferredWrite {} - -impl DeferredWrite { - fn new(write: FutureWrite) -> DeferredWrite { - DeferredWrite { - write: Mutex::new(write), +/// This is returned by [`FutureWriter::write`]. +pub struct FutureWrite { + raw: RawFutureWrite<&'static FutureVtable>, + default: fn() -> T, +} + +/// Result of [`FutureWrite::cancel`]. +#[derive(Debug)] +pub enum FutureWriteCancel { + /// The cancel request raced with the receipt of the sent value, and the + /// value was actually sent. Neither the value nor the writer are made + /// available here as both are gone. + AlreadySent, + + /// The other end was dropped before cancellation happened. + /// + /// In this case the original value is returned back to the caller but the + /// writer itself is not longer accessible as it's no longer usable. + Dropped(T), + + /// The pending write was successfully cancelled and the value being written + /// is returned along with the writer to resume again in the future if + /// necessary. + Cancelled(T, FutureWriter), +} + +impl Future for FutureWrite { + type Output = Result<(), FutureWriteError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.pin_project().poll(cx) + } +} + +impl FutureWrite { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut RawFutureWrite<&'static FutureVtable>> { + // SAFETY: we've chosen that when `Self` is pinned that it translates to + // always pinning the inner field, so that's codified here. + unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().raw) } + } + + /// Cancel this write if it hasn't already completed. + /// + /// This method can be used to cancel a write-in-progress and re-acquire + /// the writer and the value being sent. Note that the write operation may + /// succeed racily or the other end may also drop racily, and these + /// outcomes are reflected in the returned value here. + /// + /// # Panics + /// + /// Panics if the operation has already been completed via `Future::poll`, + /// or if this method is called twice. + pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel { + let default = self.default; + match self.pin_project().cancel() { + RawFutureWriteCancel::AlreadySent => FutureWriteCancel::AlreadySent, + RawFutureWriteCancel::Dropped(val) => FutureWriteCancel::Dropped(val), + RawFutureWriteCancel::Cancelled(val, raw) => FutureWriteCancel::Cancelled( + val, + FutureWriter { + raw: ManuallyDrop::new(raw), + default, + should_write_default_value: true, + }, + ), } } } -impl Wake for DeferredWrite { - fn wake(self: Arc) { - // When a `wake` signal comes in that should happen in two locations: - // - // 1. When `DeferredWrite` is initially constructed. - // 2. When an event comes in indicating that the internal write has - // completed. - // - // The implementation here is the same in both cases. A clone of `self` - // is converted to a `Waker`, and this `Waker` notably owns the - // internal future itself. The internal write operation is then pushed - // forward (e.g. it's issued in (1) or checked up on in (2)). +impl Drop for FutureWrite { + fn drop(&mut self) { + if self.raw.op.is_done() { + return; + } + + // Although the underlying `WaitableOperation` will already + // auto-cancel-on-drop we need to specially handle that here because if + // the cancellation goes through then it means that no value will have + // been written to this future which will cause a trap. By using + // `Self::cancel` it's ensured that if cancellation succeeds a + // `FutureWriter` is created. In `Drop for FutureWriter` that'll handle + // the last-ditch write-default logic. // - // If `Pending` is returned then `waker` should have been stored away - // within the `wasip3_task` C ABI structure. Otherwise it should not - // have been stored away and `self` should be the sole reference which - // means everything will get cleaned up when this function returns. - let poll = { - let waker = Waker::from(self.clone()); - let mut cx = Context::from_waker(&waker); - let mut write = self.write.lock().unwrap(); - unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) } - }; - if poll.is_ready() { - assert_eq!(Arc::strong_count(&self), 1); - } else { - assert!(Arc::strong_count(&self) > 1); + // SAFETY: we're in the destructor here so the value `self` is about + // to go away and we can guarantee we're not moving out of it. + let pin = unsafe { Pin::new_unchecked(self) }; + pin.cancel(); + } +} + +/// Raw version of [`FutureWriter`]. +pub struct RawFutureWriter { + handle: u32, + ops: O, +} + +impl RawFutureWriter { + unsafe fn new(handle: u32, ops: O) -> Self { + Self { handle, ops } + } + + /// Same as [`FutureWriter::write`], but the raw version. + pub fn write(self, value: O::Payload) -> RawFutureWrite { + RawFutureWrite { + op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)), + } + } + + /// Writes `value` in the background. + /// + /// This does not block and is not cancellable. + pub fn write_and_forget(self, value: O::Payload) + where + O: 'static, + { + return Arc::new(DeferredWrite { + write: Mutex::new(self.write(value)), + }) + .wake(); + + /// Helper structure which behaves both as a future of sorts and an + /// executor of sorts. + /// + /// This type is constructed in `Drop for FutureWriter` to send out a + /// default value when no other has been written. This manages the + /// `FutureWrite` operation happening internally through a `Wake` + /// implementation. That means that this is a sort of cyclical future + /// which, when woken, will complete the write operation. + /// + /// The purpose of this is to be a "lightweight" way of "spawn"-ing a + /// future write to happen in the background. Crucially, however, this + /// doesn't require the `async-spawn` feature and instead works with the + /// `wasip3_task` C ABI structures (which spawn doesn't support). + struct DeferredWrite { + write: Mutex>, + } + + // SAFETY: Needed to satisfy `Waker::from` but otherwise should be ok + // because wasm doesn't have threads anyway right now. + unsafe impl Send for DeferredWrite {} + unsafe impl Sync for DeferredWrite {} + + impl Wake for DeferredWrite { + fn wake(self: Arc) { + // When a `wake` signal comes in that should happen in two + // locations: + // + // 1. When `DeferredWrite` is initially constructed. + // 2. When an event comes in indicating that the internal write + // has completed. + // + // The implementation here is the same in both cases. A clone of + // `self` is converted to a `Waker`, and this `Waker` notably + // owns the internal future itself. The internal write operation + // is then pushed forward (e.g. it's issued in (1) or checked up + // on in (2)). + // + // If `Pending` is returned then `waker` should have been stored + // away within the `wasip3_task` C ABI structure. Otherwise it + // should not have been stored away and `self` should be the + // sole reference which means everything will get cleaned up + // when this function returns. + let poll = { + let waker = Waker::from(self.clone()); + let mut cx = Context::from_waker(&waker); + let mut write = self.write.lock().unwrap(); + unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) } + }; + if poll.is_ready() { + assert_eq!(Arc::strong_count(&self), 1); + } else { + assert!(Arc::strong_count(&self) > 1); + } + assert_eq!(Arc::weak_count(&self), 0); + } + } + } +} + +impl fmt::Debug for RawFutureWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawFutureWriter") + .field("handle", &self.handle) + .finish() + } +} + +impl Drop for RawFutureWriter { + fn drop(&mut self) { + unsafe { + rtdebug!("future.drop-writable({})", self.handle); + self.ops.drop_writable(self.handle); } - assert_eq!(Arc::weak_count(&self), 0); } } /// Represents a write operation which may be cancelled prior to completion. /// /// This is returned by [`FutureWriter::write`]. -pub struct FutureWrite { - op: WaitableOperation>, +pub struct RawFutureWrite { + op: WaitableOperation>, } -struct FutureWriteOp(marker::PhantomData); +struct FutureWriteOp(marker::PhantomData); enum WriteComplete { Written, @@ -386,16 +596,13 @@ enum WriteComplete { Cancelled(T), } -unsafe impl WaitableOp for FutureWriteOp -where - T: 'static, -{ - type Start = (FutureWriter, T); - type InProgress = (FutureWriter, Option); - type Result = (WriteComplete, FutureWriter); - type Cancel = FutureWriteCancel; +unsafe impl WaitableOp for FutureWriteOp { + type Start = (RawFutureWriter, O::Payload); + type InProgress = (RawFutureWriter, Option); + type Result = (WriteComplete, RawFutureWriter); + type Cancel = RawFutureWriteCancel; - fn start(&mut self, (writer, value): Self::Start) -> (u32, Self::InProgress) { + fn start(&mut self, (mut writer, value): Self::Start) -> (u32, Self::InProgress) { // TODO: it should be safe to store the lower-destination in // `WaitableOperation` using `Pin` memory and such, but that would // require some type-level trickery to get a correctly-sized value @@ -406,19 +613,19 @@ where // In lieu of that a dedicated location on the heap is created for the // lowering, and then `value`, as an owned value, is lowered into this // pointer to initialize it. - let (ptr, cleanup) = Cleanup::new(writer.vtable.layout); - // SAFETY: `ptr` is allocated with `vtable.layout` and should be + let (ptr, cleanup) = Cleanup::new(writer.ops.elem_layout()); + // SAFETY: `ptr` is allocated with `ops.layout` and should be // safe to use here. let code = unsafe { - (writer.vtable.lower)(value, ptr); - (writer.vtable.start_write)(writer.handle, ptr) + writer.ops.lower(value, ptr); + writer.ops.start_write(writer.handle, ptr) }; rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle); (code, (writer, cleanup)) } fn start_cancelled(&mut self, (writer, value): Self::Start) -> Self::Cancel { - FutureWriteCancel::Cancelled(value, writer) + RawFutureWriteCancel::Cancelled(value, writer) } fn in_progress_update( @@ -443,13 +650,8 @@ where super::DROPPED | super::CANCELLED => { // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. - let value = unsafe { (writer.vtable.lift)(ptr) }; + let value = unsafe { writer.ops.lift(ptr) }; let status = if code == super::DROPPED { - // This writer has been witnessed to be dropped, meaning that - // `writer` is going to get destroyed soon as this return - // value propagates up the stack. There's no need to write - // the default value, so set this to `false`. - writer.should_write_default_value = false; WriteComplete::Dropped(value) } else { WriteComplete::Cancelled(value) @@ -467,13 +669,10 @@ where // Afterwards the `cleanup` itself is naturally dropped and cleaned // up. super::COMPLETED => { - // A value was written, so no need to write the default value. - writer.should_write_default_value = false; - // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. unsafe { - (writer.vtable.dealloc_lists)(ptr); + writer.ops.dealloc_lists(ptr); } Ok((WriteComplete::Written, writer)) } @@ -489,7 +688,7 @@ where fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `writer` and all the various operational bits, // so this relies on `WaitableOperation` being safe. - let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; + let code = unsafe { writer.ops.cancel_write(writer.handle) }; rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle); code } @@ -498,19 +697,19 @@ where match result { // The value was actually sent, meaning we can't yield back the // future nor the value. - WriteComplete::Written => FutureWriteCancel::AlreadySent, + WriteComplete::Written => RawFutureWriteCancel::AlreadySent, // The value was not sent because the other end either hung up or we // successfully cancelled. In both cases return back the value here // with the writer. - WriteComplete::Dropped(val) => FutureWriteCancel::Dropped(val), - WriteComplete::Cancelled(val) => FutureWriteCancel::Cancelled(val, writer), + WriteComplete::Dropped(val) => RawFutureWriteCancel::Dropped(val), + WriteComplete::Cancelled(val) => RawFutureWriteCancel::Cancelled(val, writer), } } } -impl Future for FutureWrite { - type Output = Result<(), FutureWriteError>; +impl Future for RawFutureWrite { + type Output = Result<(), FutureWriteError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pin_project() @@ -524,25 +723,16 @@ impl Future for FutureWrite { } } -impl FutureWrite { - fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { +impl RawFutureWrite { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { // SAFETY: we've chosen that when `Self` is pinned that it translates to // always pinning the inner field, so that's codified here. unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) } } - /// Cancel this write if it hasn't already completed. - /// - /// This method can be used to cancel a write-in-progress and re-acquire - /// the writer and the value being sent. Note that the write operation may - /// succeed racily or the other end may also drop racily, and these - /// outcomes are reflected in the returned value here. - /// - /// # Panics - /// - /// Panics if the operation has already been completed via `Future::poll`, - /// or if this method is called twice. - pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel { + /// Same as [`FutureWrite::cancel`], but returns a [`RawFutureWriteCancel`] + /// instead. + pub fn cancel(self: Pin<&mut Self>) -> RawFutureWriteCancel { self.pin_project().cancel() } } @@ -570,7 +760,7 @@ impl std::error::Error for FutureWriteError {} /// Result of [`FutureWrite::cancel`]. #[derive(Debug)] -pub enum FutureWriteCancel { +pub enum RawFutureWriteCancel { /// The cancel request raced with the receipt of the sent value, and the /// value was actually sent. Neither the value nor the writer are made /// available here as both are gone. @@ -580,34 +770,43 @@ pub enum FutureWriteCancel { /// /// In this case the original value is returned back to the caller but the /// writer itself is not longer accessible as it's no longer usable. - Dropped(T), + Dropped(O::Payload), /// The pending write was successfully cancelled and the value being written /// is returned along with the writer to resume again in the future if /// necessary. - Cancelled(T, FutureWriter), + Cancelled(O::Payload, RawFutureWriter), } /// Represents the readable end of a Component Model `future`. -pub struct FutureReader { +pub type FutureReader = RawFutureReader<&'static FutureVtable>; + +/// Represents the readable end of a Component Model `future`. +pub struct RawFutureReader { handle: AtomicU32, - vtable: &'static FutureVtable, + ops: O, } -impl fmt::Debug for FutureReader { +impl fmt::Debug for RawFutureReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FutureReader") + f.debug_struct("RawFutureReader") .field("handle", &self.handle) .finish() } } -impl FutureReader { - #[doc(hidden)] - pub fn new(handle: u32, vtable: &'static FutureVtable) -> Self { +impl RawFutureReader { + /// Raw constructor for a future reader. + /// + /// Takes ownership of the `handle` provided. + /// + /// # Safety + /// + /// The `ops` specified must be both valid and well-typed for `handle`. + pub unsafe fn new(handle: u32, ops: O) -> Self { Self { handle: AtomicU32::new(handle), - vtable, + ops, } } @@ -630,27 +829,27 @@ impl FutureReader { } } -impl IntoFuture for FutureReader { - type Output = T; - type IntoFuture = FutureRead; +impl IntoFuture for RawFutureReader { + type Output = O::Payload; + type IntoFuture = RawFutureRead; /// Convert this object into a `Future` which will resolve when a value is /// written to the writable end of this `future`. fn into_future(self) -> Self::IntoFuture { - FutureRead { + RawFutureRead { op: WaitableOperation::new(FutureReadOp(marker::PhantomData), self), } } } -impl Drop for FutureReader { +impl Drop for RawFutureReader { fn drop(&mut self) { let Some(handle) = self.opt_handle() else { return; }; unsafe { rtdebug!("future.drop-readable({handle})"); - (self.vtable.drop_readable)(handle); + self.ops.drop_readable(handle); } } } @@ -659,32 +858,35 @@ impl Drop for FutureReader { /// /// This represents a read operation on a [`FutureReader`] and is created via /// `IntoFuture`. -pub struct FutureRead { - op: WaitableOperation>, +pub type FutureRead = RawFutureRead<&'static FutureVtable>; + +/// Represents a read operation which may be cancelled prior to completion. +/// +/// This represents a read operation on a [`FutureReader`] and is created via +/// `IntoFuture`. +pub struct RawFutureRead { + op: WaitableOperation>, } -struct FutureReadOp(marker::PhantomData); +struct FutureReadOp(marker::PhantomData); enum ReadComplete { Value(T), Cancelled, } -unsafe impl WaitableOp for FutureReadOp -where - T: 'static, -{ - type Start = FutureReader; - type InProgress = (FutureReader, Option); - type Result = (ReadComplete, FutureReader); - type Cancel = Result>; +unsafe impl WaitableOp for FutureReadOp { + type Start = RawFutureReader; + type InProgress = (RawFutureReader, Option); + type Result = (ReadComplete, RawFutureReader); + type Cancel = Result>; - fn start(&mut self, reader: Self::Start) -> (u32, Self::InProgress) { - let (ptr, cleanup) = Cleanup::new(reader.vtable.layout); + fn start(&mut self, mut reader: Self::Start) -> (u32, Self::InProgress) { + let (ptr, cleanup) = Cleanup::new(reader.ops.elem_layout()); // SAFETY: `ptr` is allocated with `vtable.layout` and should be // safe to use here. Its lifetime for the async operation is hinged on // `WaitableOperation` being safe. - let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr) }; + let code = unsafe { reader.ops.start_read(reader.handle(), ptr) }; rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle()); (code, (reader, cleanup)) } @@ -695,7 +897,7 @@ where fn in_progress_update( &mut self, - (reader, cleanup): Self::InProgress, + (mut reader, cleanup): Self::InProgress, code: u32, ) -> Result { match ReturnCode::decode(code) { @@ -717,7 +919,7 @@ where // SAFETY: we're the ones managing `ptr` so we know it's safe to // pass here. - let value = unsafe { (reader.vtable.lift)(ptr) }; + let value = unsafe { reader.ops.lift(ptr) }; Ok((ReadComplete::Value(value), reader)) } @@ -732,7 +934,7 @@ where fn in_progress_cancel(&mut self, (reader, _): &mut Self::InProgress) -> u32 { // SAFETY: we're managing `reader` and all the various operational bits, // so this relies on `WaitableOperation` being safe. - let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; + let code = unsafe { reader.ops.cancel_read(reader.handle()) }; rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle()); code } @@ -749,8 +951,8 @@ where } } -impl Future for FutureRead { - type Output = T; +impl Future for RawFutureRead { + type Output = O::Payload; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pin_project() @@ -765,8 +967,8 @@ impl Future for FutureRead { } } -impl FutureRead { - fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { +impl RawFutureRead { + fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation>> { // SAFETY: we've chosen that when `Self` is pinned that it translates to // always pinning the inner field, so that's codified here. unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) } @@ -786,7 +988,7 @@ impl FutureRead { /// Panics if the operation has already been completed via `Future::poll`, /// or if this method is called twice. Additionally if this method completes /// then calling `poll` again on `self` will panic. - pub fn cancel(self: Pin<&mut Self>) -> Result> { + pub fn cancel(self: Pin<&mut Self>) -> Result> { self.pin_project().cancel() } } diff --git a/crates/guest-rust/src/rt/async_support/waitable.rs b/crates/guest-rust/src/rt/async_support/waitable.rs index c456ad627..850cd8f3c 100644 --- a/crates/guest-rust/src/rt/async_support/waitable.rs +++ b/crates/guest-rust/src/rt/async_support/waitable.rs @@ -441,6 +441,11 @@ where Poll::Pending => unreachable!(), } } + + /// Returns whether or not this operation has completed. + pub fn is_done(&self) -> bool { + matches!(self.state, WaitableOperationState::Done) + } } impl Future for WaitableOperation { @@ -453,17 +458,15 @@ impl Future for WaitableOperation { impl Drop for WaitableOperation { fn drop(&mut self) { - // SAFETY: we're in the destructor here so the value `self` is about - // to go away and we can guarantee we're not moving out of it. - let mut pin = unsafe { Pin::new_unchecked(self) }; - - let (_, state, _) = pin.as_mut().pin_project(); - // If this operation has already completed then skip cancellation, // otherwise it's our job to cancel anything in-flight. - if let WaitableOperationState::Done = state { + if self.is_done() { return; } + + // SAFETY: we're in the destructor here so the value `self` is about + // to go away and we can guarantee we're not moving out of it. + let pin = unsafe { Pin::new_unchecked(self) }; pin.cancel(); } }