diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index db6437a..566d773 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,7 @@ jobs: uses: smol-rs/.github/.github/workflows/clippy.yml@main with: bench: false # We run clippy using stable rustc, but our benchmarks use #![feature(test)]. + # features: 'std,portable-atomic,allocator-api2' # TODO: Action needs to support a new input to exclude nightly features. security_audit: uses: smol-rs/.github/.github/workflows/security_audit.yml@main permissions: @@ -68,6 +69,11 @@ jobs: CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: valgrind -v --error-exitcode=1 --error-limit=no --leak-check=full --show-leak-kinds=definite,indirect --errors-for-leak-kinds=definite,indirect --track-origins=yes --fair-sched=yes --gen-suppressions=all - name: Run cargo test (with portable-atomic enabled) run: cargo test --features portable-atomic + - name: Run cargo test (with allocator-api2 enabled) + run: cargo test --features allocator-api2 + - name: Run cargo test (with nightly allocator_api enabled) + run: cargo test --features allocator_api + if: startsWith(matrix.rust, 'nightly') - name: Clone async-executor run: git clone https://github.com/smol-rs/async-executor.git - name: Add patch section diff --git a/Cargo.toml b/Cargo.toml index 3073d84..f5974cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,19 @@ exclude = ["/.*"] [features] default = ["std"] +allocator_api = [] std = [] [dependencies] # Uses portable-atomic polyfill atomics on targets without them portable-atomic = { version = "1", optional = true, default-features = false } +[dependencies.allocator-api2] +version = "0.3.1" +optional = true +default-features = false +features = ["alloc"] + [dev-dependencies] atomic-waker = "1" easy-parallel = "3" diff --git a/src/alloc_api.rs b/src/alloc_api.rs new file mode 100644 index 0000000..5f9cfa1 --- /dev/null +++ b/src/alloc_api.rs @@ -0,0 +1,106 @@ +pub(crate) use self::inner::{allocate, deallocate, Allocator, Global}; + +// Nightly `allocator_api` feature case. +#[cfg(feature = "allocator_api")] +mod inner { + pub use crate::alloc::alloc::{Allocator, Global}; + + use crate::alloc::alloc::Layout; + use core::ptr::NonNull; + + pub(crate) fn allocate(alloc: &A, layout: Layout) -> Result, ()> { + match alloc.allocate(layout) { + Ok(ptr) => Ok(ptr.cast()), + Err(_) => Err(()), + } + } + + /// # Safety + /// + /// - `ptr` must have been allocated by `alloc` using the same allocator. + /// - `layout` must be the same layout that was used to allocate `ptr`. + /// - `ptr` must not be used after this call. + pub(crate) unsafe fn deallocate(alloc: &A, ptr: *mut u8, layout: Layout) { + alloc.deallocate(NonNull::new_unchecked(ptr), layout) + } +} + +// Non-nightly `allocator-api2` feature case. +#[cfg(all(feature = "allocator-api2", not(feature = "allocator_api")))] +mod inner { + pub use allocator_api2::alloc::{Allocator, Global}; + + use crate::alloc::alloc::Layout; + use core::ptr::NonNull; + + pub(crate) fn allocate(alloc: &A, layout: Layout) -> Result, ()> { + match alloc.allocate(layout) { + Ok(ptr) => Ok(ptr.cast()), + Err(_) => Err(()), + } + } + + /// # Safety + /// + /// - `ptr` must have been allocated by `alloc` using the same allocator. + /// - `layout` must be the same layout that was used to allocate `ptr`. + /// - `ptr` must not be used after this call. + pub(crate) unsafe fn deallocate(alloc: &A, ptr: *mut u8, layout: Layout) { + alloc.deallocate(NonNull::new_unchecked(ptr), layout) + } +} + +// Default case without allocator api. +#[cfg(not(any(feature = "allocator_api", feature = "allocator-api2")))] +mod inner { + use crate::alloc::alloc::{alloc, dealloc, Layout}; + use core::ptr::NonNull; + + /// # Safety + /// + /// Used only within the crate and implemented only for [`Global`]. + pub unsafe trait Allocator { + fn allocate(&self, layout: Layout) -> Result, ()>; + + /// # Safety + /// + /// - `ptr` must have been returned by a previous call to `allocate` on this allocator. + /// - `layout` must be the same layout used in that `allocate` call. + unsafe fn deallocate(&self, ptr: NonNull, layout: Layout); + } + + #[derive(Copy, Clone)] + pub struct Global; + + impl Default for Global { + #[inline] + fn default() -> Self { + Global + } + } + + unsafe impl Allocator for Global { + #[inline] + fn allocate(&self, layout: Layout) -> Result, ()> { + unsafe { NonNull::new(alloc(layout)).ok_or(()) } + } + + #[inline] + unsafe fn deallocate(&self, ptr: NonNull, layout: Layout) { + dealloc(ptr.as_ptr(), layout); + } + } + + pub(crate) fn allocate(alloc: &A, layout: Layout) -> Result, ()> { + alloc.allocate(layout) + } + + /// # Safety + /// + /// - `ptr` must have been allocated by `alloc` using the same allocator. + /// - `layout` must be the same layout that was used to allocate `ptr`. + /// - `ptr` must not be used after this call. + pub(crate) unsafe fn deallocate(alloc: &A, ptr: *mut u8, layout: Layout) { + alloc.deallocate(NonNull::new_unchecked(ptr), layout) + } +} diff --git a/src/lib.rs b/src/lib.rs index 290a749..204f1cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +#![cfg_attr(feature = "allocator_api", feature(allocator_api))] extern crate alloc; #[cfg(feature = "std")] @@ -105,6 +106,7 @@ macro_rules! leap_unwrap { }}; } +mod alloc_api; mod header; mod raw; mod runnable; @@ -117,5 +119,14 @@ pub use crate::runnable::{ }; pub use crate::task::{FallibleTask, Task}; +#[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] +pub use crate::runnable::{spawn_in, spawn_unchecked_in}; + #[cfg(feature = "std")] pub use crate::runnable::spawn_local; + +#[cfg(all( + feature = "std", + any(feature = "allocator_api", feature = "allocator-api2") +))] +pub use crate::runnable::spawn_local_in; diff --git a/src/raw.rs b/src/raw.rs index 89dd16d..7122646 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -8,6 +8,7 @@ use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use core::sync::atomic::Ordering; +use crate::alloc_api::{self, Allocator}; use crate::header::{DropWakerAction, Header, HeaderWithMetadata}; use crate::runnable::{Schedule, ScheduleInfo}; use crate::state::*; @@ -68,10 +69,13 @@ pub(crate) struct TaskLayout { /// Offset into the task at which the output is stored. pub(crate) offset_r: usize, + + /// Offset into the task at which the allocator is stored. + pub(crate) offset_a: usize, } /// Raw pointers to the fields inside a task. -pub(crate) struct RawTask { +pub(crate) struct RawTask { /// The task header. pub(crate) header: *const HeaderWithMetadata, @@ -83,17 +87,20 @@ pub(crate) struct RawTask { /// The output of the future. pub(crate) output: *mut Result, + + /// The task allocator. + pub(crate) allocator: *const A, } -impl Copy for RawTask {} +impl Copy for RawTask {} -impl Clone for RawTask { +impl Clone for RawTask { fn clone(&self) -> Self { *self } } -impl RawTask { +impl RawTask { pub(crate) const TASK_LAYOUT: TaskLayout = Self::eval_task_layout(); /// Computes the memory layout for a task. @@ -104,6 +111,7 @@ impl RawTask { let layout_s = Layout::new::(); let layout_f = Layout::new::(); let layout_r = Layout::new::>(); + let layout_a = Layout::new::(); // Compute the layout for `union { F, T }`. let size_union = max(layout_f.size(), layout_r.size()); @@ -117,11 +125,18 @@ impl RawTask { let offset_f = offset_union; let offset_r = offset_union; + let (layout, offset_a) = if layout_a.size() > 0 { + leap_unwrap!(layout.extend(layout_a)) + } else { + (layout, layout.size()) + }; + TaskLayout { layout: unsafe { layout.into_std() }, offset_s, offset_f, offset_r, + offset_a, } } } @@ -133,13 +148,14 @@ impl RawTask { /// Use a macro to brute force inlining to minimize stack copies of potentially /// large futures. macro_rules! allocate_task { - ($f:tt, $s:tt, $m:tt, $builder:ident, $schedule:ident, $raw:ident => $future:block) => {{ - let allocation = - alloc::alloc::alloc(RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_LAYOUT.layout); + ($f:tt, $s:tt, $m:tt, $a:tt, $builder:ident, $schedule:ident, $alloc:ident, $raw:ident => $future:block) => {{ // Allocate enough space for the entire task. - let ptr = NonNull::new(allocation as *mut ()).unwrap_or_else(|| crate::utils::abort()); + let allocation = + crate::alloc_api::allocate::<$a>(&$alloc, RawTask::<$f, <$f as Future>::Output, $s, $m, $a>::TASK_LAYOUT.layout); + let allocation = allocation.unwrap_or_else(|()| crate::utils::abort()); + let ptr = allocation.cast::<()>(); - let $raw = RawTask::<$f, <$f as Future>::Output, $s, $m>::from_ptr(ptr.as_ptr()); + let $raw = RawTask::<$f, <$f as Future>::Output, $s, $m, $a>::from_ptr(ptr.as_ptr()); let crate::Builder { metadata, @@ -155,7 +171,7 @@ macro_rules! allocate_task { #[cfg(feature = "portable-atomic")] state: portable_atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: core::cell::UnsafeCell::new(None), - vtable: &RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_VTABLE, + vtable: &RawTask::<$f, <$f as Future>::Output, $s, $m, $a>::TASK_VTABLE, #[cfg(feature = "std")] propagate_panic, }, @@ -163,7 +179,7 @@ macro_rules! allocate_task { }); // Write the schedule function as the third field of the task. - ($raw.schedule as *mut S).write($schedule); + ($raw.schedule as *mut $s).write($schedule); // Explicitly avoid using abort_on_panic here to avoid extra stack // copies of the future on lower optimization levels. @@ -174,6 +190,9 @@ macro_rules! allocate_task { $raw.future.write($future); // (&(*raw.header).metadata) + // Write the allocator as the fifth field of the task. + ($raw.allocator as *mut $a).write($alloc); + mem::forget(bomb); ptr }}; @@ -181,10 +200,11 @@ macro_rules! allocate_task { pub(crate) use allocate_task; -impl RawTask +impl RawTask where F: Future, S: Schedule, + A: Allocator, { pub(crate) const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Header::clone_waker, @@ -197,7 +217,7 @@ where raw_waker_vtable: &Self::RAW_WAKER_VTABLE, schedule: schedule::, drop_future: drop_future::, - destroy: destroy::, + destroy: destroy::, run: Self::run, layout_info: &Self::TASK_LAYOUT, }; @@ -211,6 +231,7 @@ where schedule: ptr.add_byte(Self::TASK_LAYOUT.offset_s) as *const S, future: ptr.add_byte(Self::TASK_LAYOUT.offset_f) as *mut F, output: ptr.add_byte(Self::TASK_LAYOUT.offset_r) as *mut Result, + allocator: ptr.add_byte(Self::TASK_LAYOUT.offset_a) as *const A, } } } @@ -678,11 +699,14 @@ unsafe fn wake_by_ref, M>(ptr: *const ()) { /// The schedule function will be dropped, and the task will then get deallocated. /// The task must be closed before this function is called. #[inline] -unsafe fn destroy(ptr: *const ()) { +unsafe fn destroy(ptr: *const ()) { let header = ptr as *const Header; let task_layout = (*header).vtable.layout_info; let schedule = ptr.add_byte(task_layout.offset_s); + let allocator = ptr.add_byte(task_layout.offset_a); + let allocator = core::ptr::read(allocator as *const A); + // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { // Drop the header along with the metadata. @@ -690,10 +714,14 @@ unsafe fn destroy(ptr: *const ()) { // Drop the schedule function. (schedule as *mut S).drop_in_place(); + + // Don't drop allocator here, it's already moved out by ptr::read }); // Finally, deallocate the memory reserved by the task. - alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); + alloc_api::deallocate(&allocator, ptr as *mut u8, task_layout.layout); + + // The allocator is dropped when it goes out of scope } /// Drops a task reference (`Runnable` or `Waker`). diff --git a/src/runnable.rs b/src/runnable.rs index cada62d..ea5452e 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -6,6 +6,7 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::Waker; +use crate::alloc_api::Global; use crate::header::Header; use crate::header::HeaderWithMetadata; use crate::raw::drop_ref; @@ -13,6 +14,9 @@ use crate::raw::{allocate_task, RawTask}; use crate::state::*; use crate::Task; +#[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] +use crate::alloc_api::Allocator; + mod sealed { use super::*; pub trait Sealed {} @@ -280,8 +284,8 @@ impl Builder<()> { // Use a macro to brute force inlining to minimize stack copies of potentially // large futures. macro_rules! spawn_unchecked { - ($f:tt, $s:tt, $m:tt, $builder:ident, $schedule:ident, $raw:ident => $future:block) => {{ - let ptr = allocate_task!($f, $s, $m, $builder, $schedule, $raw => $future); + ($f:tt, $s:tt, $m:tt, $a:tt, $builder:ident, $schedule:ident, $alloc:ident, $raw:ident => $future:block) => {{ + let ptr = allocate_task!($f, $s, $m, $a, $builder, $schedule, $alloc, $raw => $future); #[allow(unused_unsafe)] // SAFTETY: The task was just allocated above. @@ -384,7 +388,32 @@ impl Builder { S: Schedule + Send + Sync + 'static, { unsafe { - spawn_unchecked!(Fut, S, M, self, schedule, raw => { future(&(*raw.header).metadata) }) + spawn_unchecked!(Fut, S, M, Global, self, schedule, Global, raw => { future(&(*raw.header).metadata) }) + } + } + + /// Creates a new task with the provided allocator. + /// + /// This function is the same as [`spawn()`], except it allocates memory for a task using the provided [`Allocator`]. + /// + /// This function is only available when the `allocator_api` or `allocator-api2` feature is enabled. + /// + #[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] + pub fn spawn_in( + self, + future: F, + schedule: S, + alloc: A, + ) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + S: Schedule + Send + Sync + 'static, + A: Allocator + Send + Sync + 'static, + { + unsafe { + spawn_unchecked!(Fut, S, M, A, self, schedule, alloc, raw => { future(&(*raw.header).metadata) }) } } @@ -432,60 +461,53 @@ impl Builder { Fut::Output: 'static, S: Schedule + Send + Sync + 'static, { - use std::mem::ManuallyDrop; - use std::pin::Pin; - use std::task::{Context, Poll}; - use std::thread::{self, ThreadId}; - - #[inline] - fn thread_id() -> ThreadId { - std::thread_local! { - static ID: ThreadId = thread::current().id(); - } - ID.try_with(|id| *id) - .unwrap_or_else(|_| thread::current().id()) - } - - struct Checked { - id: ThreadId, - inner: ManuallyDrop, - } + // Wrap the future into one that checks which thread it's on. + let future = move |meta| { + let future = future(meta); - impl Drop for Checked { - fn drop(&mut self) { - assert!( - self.id == thread_id(), - "local task dropped by a thread that didn't spawn it" - ); - unsafe { - ManuallyDrop::drop(&mut self.inner); - } + local::Checked { + id: local::thread_id(), + inner: std::mem::ManuallyDrop::new(future), } - } - - impl Future for Checked { - type Output = F::Output; + }; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!( - self.id == thread_id(), - "local task polled by a thread that didn't spawn it" - ); - unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } - } - } + unsafe { self.spawn_unchecked(future, schedule) } + } + /// Creates a new thread-local task with the provided allocator. + /// + /// This function is the same as [`spawn_local()`], except it allocates memory for a task using the provided [`Allocator`]. + /// + /// This function is only available when the `std` feature and either `allocator_api` or `allocator-api2` are enabled. + /// + #[cfg(all( + feature = "std", + any(feature = "allocator_api", feature = "allocator-api2") + ))] + pub fn spawn_local_in( + self, + future: F, + schedule: S, + alloc: A, + ) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + 'static, + Fut::Output: 'static, + S: Schedule + Send + Sync + 'static, + A: Allocator + 'static, + { // Wrap the future into one that checks which thread it's on. let future = move |meta| { let future = future(meta); - Checked { - id: thread_id(), - inner: ManuallyDrop::new(future), + local::Checked { + id: local::thread_id(), + inner: std::mem::ManuallyDrop::new(future), } }; - unsafe { self.spawn_unchecked(future, schedule) } + unsafe { self.spawn_unchecked_in(future, schedule, alloc) } } /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. @@ -531,7 +553,44 @@ impl Builder { S: Schedule, M: 'a, { - spawn_unchecked!(Fut, S, M, self, schedule, raw => { future(&(*raw.header).metadata) }) + spawn_unchecked!(Fut, S, M, Global, self, schedule, Global, raw => { future(&(*raw.header).metadata) }) + } + + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds with the provided allocator. + /// + /// This function is the same as [`spawn_unchecked()`], except it allocates memory for a task using the provided [`Allocator`]. + /// + /// This function is only available when the `allocator_api` or `allocator-api2` feature is enabled. + /// + /// # Safety + /// + /// - If `Fut` is not [`Send`], its [`Runnable`] must be used and dropped on the original + /// thread. + /// - If `Fut` is not `'static`, borrowed non-metadata variables must outlive its [`Runnable`]. + /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] + /// must be used and dropped on the original thread. + /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the + /// [`Runnable`]'s [`Waker`]. + /// - If `alloc` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] + /// must be used and dropped on the original thread. + /// - If `alloc` is not `'static`, borrowed variables must outlive all instances of the + /// [`Runnable`]'s [`Waker`]. + /// + #[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] + pub unsafe fn spawn_unchecked_in<'a, F, Fut, S, A>( + self, + future: F, + schedule: S, + alloc: A, + ) -> (Runnable, Task) + where + F: FnOnce(&'a M) -> Fut, + Fut: Future + 'a, + S: Schedule, + M: 'a, + A: Allocator, + { + spawn_unchecked!(Fut, S, M, A, self, schedule, alloc, raw => { future(&(*raw.header).metadata) }) } } @@ -573,7 +632,25 @@ where S: Schedule + Send + Sync + 'static, { let builder = Builder::new(); - unsafe { spawn_unchecked!(F, S, (), builder, schedule, raw => { future }) } + unsafe { spawn_unchecked!(F, S, (), Global, builder, schedule, Global, raw => { future }) } +} + +/// Creates a new task with the provided allocator. +/// +/// This function is the same as [`spawn()`], except it allocates memory for a task using the provided [`Allocator`]. +/// +/// This function is only available when the `allocator_api` or `allocator-api2` feature is enabled. +/// +#[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] +pub fn spawn_in(future: F, schedule: S, alloc: A) -> (Runnable, Task) +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Schedule + Send + Sync + 'static, + A: Allocator + Send + Sync + 'static, +{ + let builder = Builder::new(); + unsafe { spawn_unchecked!(F, S, (), A, builder, schedule, alloc, raw => { future }) } } /// Creates a new thread-local task. @@ -618,6 +695,24 @@ where Builder::new().spawn_local(move |()| future, schedule) } +/// Creates a new thread-local task with the provided allocator. +/// +/// This function is only available when the `std` feature and either `allocator_api` or `allocator-api2` are enabled. +/// +#[cfg(all( + feature = "std", + any(feature = "allocator_api", feature = "allocator-api2") +))] +pub fn spawn_local_in(future: F, schedule: S, alloc: A) -> (Runnable, Task) +where + F: Future + 'static, + F::Output: 'static, + S: Schedule + Send + Sync + 'static, + A: Allocator + 'static, +{ + Builder::new().spawn_local_in(move |()| future, schedule, alloc) +} + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. /// /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and @@ -654,7 +749,89 @@ where S: Schedule, { let builder = Builder::new(); - spawn_unchecked!(F, S, (), builder, schedule, raw => { future }) + spawn_unchecked!(F, S, (), Global, builder, schedule, Global, raw => { future }) +} + +/// Creates a new task with the provided allocator. +/// +/// This function is the same as [`spawn_unchecked()`], except it allocates memory for a task using the provided [`Allocator`]. +/// +/// This function is only available when the `allocator_api` or `allocator-api2` feature is enabled. +/// +/// # Safety +/// +/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original +/// thread. +/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. +/// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] +/// must be used and dropped on the original thread. +/// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the +/// [`Runnable`]'s [`Waker`]. +/// - If `alloc` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] +/// must be used and dropped on the original thread. +/// - If `alloc` is not `'static`, borrowed variables must outlive all instances of the +/// [`Runnable`]'s [`Waker`]. +/// +#[cfg(any(feature = "allocator_api", feature = "allocator-api2"))] +pub unsafe fn spawn_unchecked_in( + future: F, + schedule: S, + alloc: A, +) -> (Runnable, Task) +where + F: Future, + S: Schedule, + A: Allocator, +{ + let builder = Builder::new(); + spawn_unchecked!(F, S, (), A, builder, schedule, alloc, raw => { future }) +} + +#[cfg(feature = "std")] +mod local { + use core::future::Future; + use std::mem::ManuallyDrop; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread::{self, ThreadId}; + + #[inline] + pub(super) fn thread_id() -> ThreadId { + std::thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + pub(super) struct Checked { + pub(super) id: ThreadId, + pub(super) inner: ManuallyDrop, + } + + impl Drop for Checked { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it" + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it" + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } } /// A handle to a runnable task. diff --git a/tests/custom_allocator.rs b/tests/custom_allocator.rs new file mode 100644 index 0000000..bf15eeb --- /dev/null +++ b/tests/custom_allocator.rs @@ -0,0 +1,305 @@ +#![cfg_attr(feature = "allocator_api", feature(allocator_api))] +#![cfg(any(feature = "allocator_api", feature = "allocator-api2"))] + +use std::future::Future; +use std::panic::catch_unwind; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use easy_parallel::Parallel; + +#[cfg(feature = "allocator_api")] +extern crate alloc; +#[cfg(feature = "allocator_api")] +pub use alloc::alloc::{AllocError, Allocator, Global}; + +#[cfg(all(feature = "allocator-api2", not(feature = "allocator_api")))] +pub use allocator_api2::alloc::{AllocError, Allocator, Global}; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP)` +// +// The future `f` always returns `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(#[allow(dead_code)] Box); + + impl Future for Fut { + type Output = Box; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + $poll.fetch_add(1, Ordering::SeqCst); + Poll::Ready(Box::new(42)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a future with event counters. +// +// Usage: `panic_future!(f, POLL, DROP)` +// +// The future `f` sleeps for 400 ms and then panics. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! panic_future { + ($name:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(#[allow(dead_code)] Box); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_millis(400)); + panic!() + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(#[allow(dead_code)] Box); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |_runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +// Creates an allocator with event counters. +// +// Usage: `allocator!(a, ALLOC, DEALLOC, DROP)` +// +// The allocator `a` wraps `Global` and tracks allocations. +// When `allocate` is called, `ALLOC` is incremented. +// When `deallocate` is called, `DEALLOC` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! allocator { + ($name:pat, $alloc:ident, $dealloc:ident, $drop:ident) => { + static $alloc: AtomicUsize = AtomicUsize::new(0); + static $dealloc: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct TrackedAllocator(#[allow(dead_code)] Box); + + unsafe impl Send for TrackedAllocator {} + unsafe impl Sync for TrackedAllocator {} + + impl Drop for TrackedAllocator { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + unsafe impl Allocator for TrackedAllocator { + fn allocate( + &self, + layout: std::alloc::Layout, + ) -> Result, AllocError> { + $alloc.fetch_add(1, Ordering::SeqCst); + Global.allocate(layout) + } + + unsafe fn deallocate( + &self, + ptr: std::ptr::NonNull, + layout: std::alloc::Layout, + ) { + $dealloc.fetch_add(1, Ordering::SeqCst); + Global.deallocate(ptr, layout) + } + } + + TrackedAllocator(Box::new(0)) + }; + }; +} + +#[test] +fn detach_and_drop() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + allocator!(a, ALLOC, DEALLOC, DROP_A); + let (runnable, task) = async_task::spawn_in(f, s, a); + + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + drop(runnable); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DROP_A.load(Ordering::SeqCst), 1); +} + +#[test] +fn cancel_and_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + allocator!(a, ALLOC, DEALLOC, DROP_A); + let (runnable, task) = async_task::spawn_in(f, s, a); + + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DROP_A.load(Ordering::SeqCst), 1); +} + +#[test] +fn run_and_cancel() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + allocator!(a, ALLOC, DEALLOC, DROP_A); + let (runnable, task) = async_task::spawn_in(f, s, a); + + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DROP_A.load(Ordering::SeqCst), 1); +} + +#[test] +fn cancel_during_run() { + panic_future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + allocator!(a, ALLOC, DEALLOC, DROP_A); + let (runnable, task) = async_task::spawn_in(f, s, a); + + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DROP_A.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(Duration::from_millis(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(ALLOC.load(Ordering::SeqCst), 1); + assert_eq!(DEALLOC.load(Ordering::SeqCst), 0); + assert_eq!(DROP_A.load(Ordering::SeqCst), 0); + }) + .run(); +} diff --git a/tests/panic.rs b/tests/panic.rs index 726e385..7b20348 100644 --- a/tests/panic.rs +++ b/tests/panic.rs @@ -14,7 +14,7 @@ use smol::future; // // Usage: `future!(f, POLL, DROP)` // -// The future `f` sleeps for 200 ms and then panics. +// The future `f` sleeps for 400 ms and then panics. // When it gets polled, `POLL` is incremented. // When it gets dropped, `DROP` is incremented. macro_rules! future {