diff --git a/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs b/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs index 415a1091f..4667aa0d5 100644 --- a/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs +++ b/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs @@ -3,15 +3,18 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::{ + collections::VecDeque, hint::assert_unchecked, ops::ControlFlow, - sync::{Arc, atomic::AtomicBool}, + sync::{ + Arc, Condvar, + atomic::{AtomicBool, Ordering as StdOrdering}, + }, thread::{self, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; use ecmascript_atomics::Ordering; -use ecmascript_futex::{ECMAScriptAtomicWait, FutexError}; use crate::{ ecmascript::{ @@ -27,6 +30,7 @@ use crate::{ to_integer_or_infinity, to_number, try_result_into_js, try_to_index, unwrap_try, validate_index, validate_typed_array, }, + ecmascript::{WaitResult, WaiterList, WaiterRecord}, engine::{Bindable, GcScope, Global, NoGcScope, Scopable}, heap::{ObjectEntry, WellKnownSymbols}, }; @@ -666,35 +670,32 @@ impl AtomicsObject { }; // 6. Let block be buffer.[[ArrayBufferData]]. // 8. Let WL be GetWaiterList(block, byteIndexInBuffer). - let is_big_int_64_array = matches!(typed_array, AnyTypedArray::SharedBigInt64Array(_)); - let slot = buffer.as_slice(agent).slice_from(byte_index_in_buffer); - let n = if is_big_int_64_array { - // SAFETY: offset was checked. - let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if c == usize::MAX { - // Force the notify count down into a reasonable range: the - // ecmascript_futex may return usize::MAX if the OS doesn't - // give us a count number. - slot.notify_all().min(i32::MAX as usize) - } else { - slot.notify_many(c) - } - } else { - // SAFETY: offset was checked. - let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if c == usize::MAX { - // Force the notify count down into a reasonable range: the - // ecmascript_futex may return usize::MAX if the OS doesn't - // give us a count number. - slot.notify_all().min(i32::MAX as usize) - } else { - slot.notify_many(c) - } - }; + let data_block = buffer.get_data_block(agent); // 9. Perform EnterCriticalSection(WL). + // SAFETY: buffer is a valid SharedArrayBuffer it cannot be detached, so the data block is non-dangling. + let mut n = 0; + let Some(waiters) = (unsafe { data_block.get_waiters() }) else { + return Ok(0.into()); + }; + + let mut guard = waiters.lock().unwrap(); // 10. Let S be RemoveWaiters(WL, c). + let Some(list) = guard.get_mut(&byte_index_in_buffer) else { + return Ok(0.into()); + }; + // 11. For each element W of S, do // a. Perform NotifyWaiter(WL, W). + while n < c { + let Some(waiter) = list.waiters.pop_front() else { + break; + }; + waiter.notified.store(true, StdOrdering::Release); + waiter.condvar.notify_one(); + n += 1; + } + drop(guard); + // 12. Perform LeaveCriticalSection(WL). // 13. Let n be the number of elements in S. // 14. Return 𝔽(n). @@ -1486,37 +1487,66 @@ fn do_wait_critical<'gc, const IS_ASYNC: bool, const IS_I64: bool>( // 28. Perform AddWaiter(WL, waiterRecord). // 29. If mode is sync, then if !IS_ASYNC { - // a. Perform SuspendThisAgent(WL, waiterRecord). - let result = if IS_I64 { - let v = v as u64; - // SAFETY: buffer is still live and index was checked. + let data_block = buffer.get_data_block(agent); + // SAFETY: buffer is a valid SharedArrayBuffer it cannot be detached, so the data block is non-dangling. + let waiters = unsafe { data_block.get_or_init_waiters() }; + let waiter_record = Arc::new(WaiterRecord { + condvar: Condvar::new(), + notified: AtomicBool::new(false), + }); + let mut guard = waiters.lock().unwrap(); + + // Re-read value under critical section to avoid TOCTOU race. + let slot = data_block.as_racy_slice().slice_from(byte_index_in_buffer); + let v_changed = if IS_I64 { let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as u64 != slot.load(Ordering::SeqCst) } else { - let v = v as u32; - // SAFETY: buffer is still live and index was checked. let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as i32 as u32 != slot.load(Ordering::SeqCst) }; - // 31. Perform LeaveCriticalSection(WL). - // 32. If mode is sync, return waiterRecord.[[Result]]. + if v_changed { + return BUILTIN_STRING_MEMORY.not_equal.into(); + } - match result { - Ok(_) => BUILTIN_STRING_MEMORY.ok.into(), - Err(err) => match err { - FutexError::Timeout => BUILTIN_STRING_MEMORY.timed_out.into(), - FutexError::NotEqual => BUILTIN_STRING_MEMORY.not_equal.into(), - FutexError::Unknown => panic!(), - }, + // a. Perform SuspendThisAgent(WL, waiterRecord). + guard + .entry(byte_index_in_buffer) + .or_insert_with(|| WaiterList { + waiters: VecDeque::new(), + }) + .waiters + .push_back(waiter_record.clone()); + + if t == u64::MAX { + while !waiter_record.notified.load(StdOrdering::Acquire) { + guard = waiter_record.condvar.wait(guard).unwrap(); + } + } else { + let deadline = Instant::now() + Duration::from_millis(t); + loop { + if waiter_record.notified.load(StdOrdering::Acquire) { + break; + } + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + if let Some(list) = guard.get_mut(&byte_index_in_buffer) { + list.waiters.retain(|w| !Arc::ptr_eq(w, &waiter_record)); + } + // 31. Perform LeaveCriticalSection(WL). + // 32. If mode is sync, return waiterRecord.[[Result]]. + return BUILTIN_STRING_MEMORY.timed_out.into(); + } + let (new_guard, _) = waiter_record + .condvar + .wait_timeout(guard, remaining) + .unwrap(); + guard = new_guard; + } } + // 31. Perform LeaveCriticalSection(WL). + // 32. If mode is sync, return waiterRecord.[[Result]]. + BUILTIN_STRING_MEMORY.ok.into() } else { let promise_capability = PromiseCapability::new(agent, gc); let promise = Global::new(agent, promise_capability.promise.unbind()); @@ -1623,7 +1653,7 @@ fn create_wait_result_object<'gc>( #[derive(Debug)] struct WaitAsyncJobInner { promise_to_resolve: Global>, - join_handle: JoinHandle>, + join_handle: JoinHandle, _has_timeout: bool, } @@ -1662,18 +1692,8 @@ impl WaitAsyncJob { // c. Perform LeaveCriticalSection(WL). let promise_capability = PromiseCapability::from_promise(promise, true); let result = match result { - Ok(_) => BUILTIN_STRING_MEMORY.ok.into(), - Err(FutexError::NotEqual) => BUILTIN_STRING_MEMORY.ok.into(), - Err(FutexError::Timeout) => BUILTIN_STRING_MEMORY.timed_out.into(), - Err(FutexError::Unknown) => { - let error = agent.throw_exception_with_static_message( - ExceptionType::Error, - "unknown error occurred", - gc, - ); - promise_capability.reject(agent, error.value(), gc); - return Ok(()); - } + WaitResult::Ok | WaitResult::NotEqual => BUILTIN_STRING_MEMORY.ok.into(), + WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(), }; unwrap_try(promise_capability.try_resolve(agent, result, gc)); // d. Return unused. @@ -1701,26 +1721,62 @@ fn enqueue_atomics_wait_async_job( let signal = Arc::new(AtomicBool::new(false)); let s = signal.clone(); let handle = thread::spawn(move || { + // SAFETY: buffer is a cloned SharedDataBlock; non-dangling. + let waiters = unsafe { buffer.get_or_init_waiters() }; + let waiter_record = Arc::new(WaiterRecord { + condvar: Condvar::new(), + notified: AtomicBool::new(false), + }); + let mut guard = waiters.lock().unwrap(); + + // Re-check the value under the critical section. let slot = buffer.as_racy_slice().slice_from(byte_index_in_buffer); - if IS_I64 { - let v = v as u64; - // SAFETY: buffer is still live and index was checked. + let v_not_equal = if IS_I64 { let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - s.store(true, std::sync::atomic::Ordering::Release); - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as u64 != slot.load(Ordering::SeqCst) } else { - let v = v as i32 as u32; - // SAFETY: buffer is still live and index was checked. let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - s.store(true, std::sync::atomic::Ordering::Release); - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) + v as i32 as u32 != slot.load(Ordering::SeqCst) + }; + + // Signal the main thread that we have the lock and are about to sleep. + s.store(true, StdOrdering::Release); + + if v_not_equal { + return WaitResult::NotEqual; + } + + guard + .entry(byte_index_in_buffer) + .or_insert_with(|| WaiterList { + waiters: VecDeque::new(), + }) + .waiters + .push_back(waiter_record.clone()); + + if t == u64::MAX { + while !waiter_record.notified.load(StdOrdering::Acquire) { + guard = waiter_record.condvar.wait(guard).unwrap(); + } + WaitResult::Ok + } else { + let deadline = Instant::now() + Duration::from_millis(t); + loop { + if waiter_record.notified.load(StdOrdering::Acquire) { + return WaitResult::Ok; + } + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + if let Some(list) = guard.get_mut(&byte_index_in_buffer) { + list.waiters.retain(|w| !Arc::ptr_eq(w, &waiter_record)); + } + return WaitResult::TimedOut; + } + let (new_guard, _) = waiter_record + .condvar + .wait_timeout(guard, remaining) + .unwrap(); + guard = new_guard; } } }); @@ -1732,7 +1788,7 @@ fn enqueue_atomics_wait_async_job( _has_timeout: t != u64::MAX, }))), }; - while !signal.load(std::sync::atomic::Ordering::Acquire) { + while !signal.load(StdOrdering::Acquire) { // Wait until the thread has started up and is about to go to sleep. } // 2. Let now be the time value (UTC) identifying the current time. diff --git a/nova_vm/src/ecmascript/types/spec/data_block.rs b/nova_vm/src/ecmascript/types/spec/data_block.rs index f430ec6c9..794c73fa3 100644 --- a/nova_vm/src/ecmascript/types/spec/data_block.rs +++ b/nova_vm/src/ecmascript/types/spec/data_block.rs @@ -5,9 +5,11 @@ //! ### [6.2.9 Data Blocks](https://tc39.es/ecma262/#sec-data-blocks) #[cfg(feature = "shared-array-buffer")] -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}; #[cfg(feature = "shared-array-buffer")] use std::hint::assert_unchecked; +#[cfg(feature = "shared-array-buffer")] +use std::sync::{Arc, Condvar, Mutex}; use core::{ f32, f64, @@ -419,6 +421,29 @@ impl SharedDataBlockMaxByteLength { } } +#[cfg(feature = "shared-array-buffer")] +pub struct WaiterRecord { + pub condvar: Condvar, + pub notified: AtomicBool, +} + +/// Result of an `Atomics.wait` or `Atomics.waitAsync` operation. +#[derive(Debug)] +#[cfg(feature = "shared-array-buffer")] +pub enum WaitResult { + Ok, + TimedOut, + NotEqual, +} + +#[cfg(feature = "shared-array-buffer")] +pub struct WaiterList { + pub waiters: std::collections::VecDeque>, +} + +#[cfg(feature = "shared-array-buffer")] +pub type SharedWaiterMap = Mutex>; + /// # [Shared Data Block](https://tc39.es/ecma262/#sec-data-blocks) /// /// The Shared Data Block specification type is used to describe a distinct and @@ -439,6 +464,7 @@ impl SharedDataBlockMaxByteLength { /// ```rust,ignore /// #[repr(C)] /// struct StaticSharedDataBuffer { +/// waiters: AtomicPtr, /// rc: AtomicUsize, /// bytes: [AtomicU8; N], /// } @@ -449,12 +475,18 @@ impl SharedDataBlockMaxByteLength { /// #[repr(C)] /// struct GrowableSharedDataBuffer { /// byte_length: AtomicUsize, +/// waiters: AtomicPtr, /// rc: AtomicUsize, /// bytes: [AtomicU8; N], /// } /// ``` /// -/// The `ptr` field points to the start of the `bytes` +/// The `ptr` field points to the start of the `bytes`. +/// +/// The `waiters` pointer is initially null. It is lazily initialized via +/// a compare-and-swap the first time any thread calls `Atomics.wait` or +/// `Atomics.waitAsync` on this block. Its lifetime is managed by the +/// buffer's existing reference count. /// /// Note that the "viewed" byte length of the buffer is defined inside the /// buffer when the SharedDataBlock is growable. @@ -507,8 +539,8 @@ impl Drop for SharedDataBlock { return; } let growable = self.is_growable(); - // SAFETY: SharedDataBlock guarantees we have a AtomicUsize allocated - // before the bytes. + // SAFETY: SharedDataBlock guarantees we have waiters_ptr and rc + // allocated before the bytes. rc is 1 slot before ptr. let rc_ptr = unsafe { self.ptr.as_ptr().cast::().sub(1) }; { // SAFETY: the RC is definitely still allocated, as we haven't @@ -534,24 +566,36 @@ impl Drop for SharedDataBlock { return; } } + + // We are the last holder. Drop the waiter map if it was initialized. + // SAFETY: non-dangling, and we're the sole owner now. + let waiters_ptr = unsafe { self.get_waiters_ptr() }; + let waiters = waiters_ptr.load(Ordering::Acquire); + if !waiters.is_null() { + // SAFETY: the pointer was allocated via Box::into_raw in + // get_or_init_waiters, and we are the last holder. + let _ = unsafe { Box::from_raw(waiters) }; + } + let max_byte_length = self.max_byte_length(); - // SAFETY: if we're here then we're the last holder of the data block. let (size, base_ptr) = if growable { - // This is a growable SharedDataBlock that we're working with here. - // SAFETY: layout guaranteed by type unsafe { ( - max_byte_length - .unchecked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>()), - rc_ptr.sub(1), + max_byte_length.unchecked_add(core::mem::size_of::<( + AtomicUsize, + AtomicUsize, + AtomicUsize, + )>()), + rc_ptr.sub(2), ) } } else { unsafe { ( - max_byte_length.unchecked_add(core::mem::size_of::()), - rc_ptr, + max_byte_length + .unchecked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>()), + rc_ptr.sub(1), ) } }; @@ -559,8 +603,8 @@ impl Drop for SharedDataBlock { // SAFETY: As per the CAS loop on the reference count, we are the only // referrer to the racy memory. We can thus deallocate the ECMAScript // memory; this effectively grows our Rust memory from being just the - // RC and possible byte length value, into also containing the byte - // data. + // Waiters pointer, RC, and possible byte length value, into also + // containing the byte data. let _ = unsafe { memory.exit() }; // SAFETY: layout guaranteed by type. let layout = unsafe { Layout::from_size_align(size, 8).unwrap_unchecked() }; @@ -605,10 +649,10 @@ impl SharedDataBlock { use ecmascript_atomics::RacyMemory; let alloc_size = if growable { // Growable SharedArrayBuffer - size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>())? + size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize, AtomicUsize)>())? } else { // Static SharedArrayBuffer - size.checked_add(core::mem::size_of::())? + size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>())? }; let Ok(layout) = Layout::from_size_align(alloc_size, 8) else { return None; @@ -622,23 +666,26 @@ impl SharedDataBlock { // SAFETY: properly allocated, everything is fine. unsafe { base_ptr.write(byte_length) }; // SAFETY: allocation size is - // (AtomicUsize, AtomicUsize, [AtomicU8; max_byte_length]) - unsafe { base_ptr.add(1) } + // (AtomicUsize, AtomicUsize, AtomicUsize, [AtomicU8; max_byte_length]) + // Skip byte_length and waiters to reach rc. + unsafe { base_ptr.add(2) } } else { - base_ptr + // Skip waiters to reach rc. + unsafe { base_ptr.add(1) } }; { // SAFETY: we're the only owner of this data. unsafe { rc_ptr.write(1) }; } - // SAFETY: the pointer is len + usize + + // SAFETY: ptr is past waiters_ptr and rc let ptr = unsafe { rc_ptr.add(1) }; // SAFETY: ptr does point to size bytes of readable and writable // Rust memory. After this call, that memory is deallocated and we // receive a new RacyMemory in its stead. Reads and writes through // it are undefined behaviour. Note though that we still have the - // RC and possible length values before the pointer; those are in - // normal Rust memory. + // Waiters pointer, RC, and possible length values before the + // pointer; those are in normal Rust memory. let ptr = unsafe { RacyMemory::::enter(ptr.cast(), size) }; Some(Self { ptr: ptr.as_slice().into_raw_parts().0, @@ -662,7 +709,7 @@ impl SharedDataBlock { /// /// Must not be a dangling SharedDataBlock. unsafe fn get_rc(&self) -> &AtomicUsize { - // SAFETY: type guarantees layout + // SAFETY: type guarantees layout; rc is 1 slot before ptr. unsafe { self.ptr.as_ptr().cast::().sub(1).as_ref() } } @@ -673,7 +720,7 @@ impl SharedDataBlock { /// Must be a growable, non-dangling SharedDataBlock. unsafe fn get_byte_length(&self) -> &AtomicUsize { // SAFETY: caller guarantees growable; type guarantees layout. - unsafe { self.ptr.as_ptr().cast::().sub(2).as_ref() } + unsafe { self.ptr.as_ptr().cast::().sub(3).as_ref() } } /// Returns the byte length of the SharedArrayBuffer. @@ -720,6 +767,88 @@ impl SharedDataBlock { self.max_byte_length.is_growable() } + /// Get a reference to the atomic waiters pointer. + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + unsafe fn get_waiters_ptr(&self) -> &AtomicPtr { + // SAFETY: type guarantees layout; waiters_ptr is 2 slots before ptr. + unsafe { + self.ptr + .as_ptr() + .cast::>() + .sub(2) + .as_ref() + } + } + + /// Get or lazily initialize the shared waiter map for this data block. + /// + /// On first call, allocates a new `SharedWaiterMap` and attempts to + /// store it via compare-and-swap. If another thread wins the race, + /// the locally allocated map is dropped and the winner's map is used. + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + pub(crate) unsafe fn get_or_init_waiters(&self) -> &SharedWaiterMap { + // SAFETY: caller guarantees non-dangling. + let waiters_atomic = unsafe { self.get_waiters_ptr() }; + let current = waiters_atomic.load(Ordering::Acquire); + if !current.is_null() { + // SAFETY: non-null means it was previously initialized; the + // buffer RC keeps the allocation alive. + return unsafe { &*current }; + } + + let new_map = Box::into_raw(Box::new(SharedWaiterMap::new( + std::collections::HashMap::new(), + ))); + match waiters_atomic.compare_exchange( + core::ptr::null_mut(), + new_map, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // We won the race; our map is now the canonical one. + // SAFETY: we just stored it and the buffer RC keeps it alive. + unsafe { &*new_map } + } + Err(winner) => { + // Another thread already initialized the waiters pointer. + // Drop our allocation and use theirs. + // SAFETY: new_map was just allocated by us and never shared. + let _ = unsafe { Box::from_raw(new_map) }; + // SAFETY: winner is the non-null pointer stored by the + // winning thread; the buffer RC keeps it alive. + unsafe { &*winner } + } + } + } + + /// Get the shared waiter map if it has been initialized. + /// + /// Returns `None` if no thread has ever called `get_or_init_waiters` on + /// this block (i.e. no `Atomics.wait` / `Atomics.waitAsync` has occurred). + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + pub(crate) unsafe fn get_waiters(&self) -> Option<&SharedWaiterMap> { + // SAFETY: caller guarantees non-dangling. + let waiters_atomic = unsafe { self.get_waiters_ptr() }; + let current = waiters_atomic.load(Ordering::Acquire); + if current.is_null() { + None + } else { + // SAFETY: non-null means it was previously initialized; the + // buffer RC keeps the allocation alive. + Some(unsafe { &*current }) + } + } + /// Read a value at the given aligned offset and with the given ordering. /// /// Returns `None` if the offset is not correctly aligned or the index is