Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 139 additions & 83 deletions nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::{
collections::VecDeque,
hint::assert_unchecked,
ops::ControlFlow,
sync::{Arc, atomic::AtomicBool},
sync::{
Arc, Condvar,
atomic::{AtomicBool, Ordering as StdOrdering},
},
thread::{self, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

use ecmascript_atomics::Ordering;
use ecmascript_futex::{ECMAScriptAtomicWait, FutexError};

use crate::{
ecmascript::{
Expand All @@ -27,6 +30,7 @@ use crate::{
to_integer_or_infinity, to_number, try_result_into_js, try_to_index, unwrap_try,
validate_index, validate_typed_array,
},
ecmascript::{WaitResult, WaiterList, WaiterRecord},
engine::{Bindable, GcScope, Global, NoGcScope, Scopable},
heap::{ObjectEntry, WellKnownSymbols},
};
Expand Down Expand Up @@ -666,35 +670,32 @@ impl AtomicsObject {
};
// 6. Let block be buffer.[[ArrayBufferData]].
// 8. Let WL be GetWaiterList(block, byteIndexInBuffer).
let is_big_int_64_array = matches!(typed_array, AnyTypedArray::SharedBigInt64Array(_));
let slot = buffer.as_slice(agent).slice_from(byte_index_in_buffer);
let n = if is_big_int_64_array {
// SAFETY: offset was checked.
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
if c == usize::MAX {
// Force the notify count down into a reasonable range: the
// ecmascript_futex may return usize::MAX if the OS doesn't
// give us a count number.
slot.notify_all().min(i32::MAX as usize)
} else {
slot.notify_many(c)
}
} else {
// SAFETY: offset was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
if c == usize::MAX {
// Force the notify count down into a reasonable range: the
// ecmascript_futex may return usize::MAX if the OS doesn't
// give us a count number.
slot.notify_all().min(i32::MAX as usize)
} else {
slot.notify_many(c)
}
};
let data_block = buffer.get_data_block(agent);
// 9. Perform EnterCriticalSection(WL).
// SAFETY: buffer is a valid SharedArrayBuffer it cannot be detached, so the data block is non-dangling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Detached and dangling are two different things. 0-sized SAB's are "dangling", ie. they don't have the backing allocation.

let mut n = 0;
let Some(waiters) = (unsafe { data_block.get_waiters() }) else {
return Ok(0.into());
};

let mut guard = waiters.lock().unwrap();
// 10. Let S be RemoveWaiters(WL, c).
let Some(list) = guard.get_mut(&byte_index_in_buffer) else {
return Ok(0.into());
};

// 11. For each element W of S, do
// a. Perform NotifyWaiter(WL, W).
while n < c {
let Some(waiter) = list.waiters.pop_front() else {
break;
};
waiter.notified.store(true, StdOrdering::Release);
waiter.condvar.notify_one();
n += 1;
}
drop(guard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'd move this comment below step 12 to make it clear this is where/how the critical section ends.


// 12. Perform LeaveCriticalSection(WL).
// 13. Let n be the number of elements in S.
// 14. Return 𝔽(n).
Expand Down Expand Up @@ -1486,37 +1487,66 @@ fn do_wait_critical<'gc, const IS_ASYNC: bool, const IS_I64: bool>(
// 28. Perform AddWaiter(WL, waiterRecord).
// 29. If mode is sync, then
if !IS_ASYNC {
// a. Perform SuspendThisAgent(WL, waiterRecord).
let result = if IS_I64 {
let v = v as u64;
// SAFETY: buffer is still live and index was checked.
let data_block = buffer.get_data_block(agent);
// SAFETY: buffer is a valid SharedArrayBuffer it cannot be detached, so the data block is non-dangling.
let waiters = unsafe { data_block.get_or_init_waiters() };
let waiter_record = Arc::new(WaiterRecord {
condvar: Condvar::new(),
notified: AtomicBool::new(false),
});
let mut guard = waiters.lock().unwrap();

// Re-read value under critical section to avoid TOCTOU race.
let slot = data_block.as_racy_slice().slice_from(byte_index_in_buffer);
let v_changed = if IS_I64 {
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as u64 != slot.load(Ordering::SeqCst)
} else {
let v = v as u32;
// SAFETY: buffer is still live and index was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as i32 as u32 != slot.load(Ordering::SeqCst)
};
// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
if v_changed {
return BUILTIN_STRING_MEMORY.not_equal.into();
}

match result {
Ok(_) => BUILTIN_STRING_MEMORY.ok.into(),
Err(err) => match err {
FutexError::Timeout => BUILTIN_STRING_MEMORY.timed_out.into(),
FutexError::NotEqual => BUILTIN_STRING_MEMORY.not_equal.into(),
FutexError::Unknown => panic!(),
},
// a. Perform SuspendThisAgent(WL, waiterRecord).
guard
.entry(byte_index_in_buffer)
.or_insert_with(|| WaiterList {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I think you could derive Default for WaiterList and others and use .or_default() here.

waiters: VecDeque::new(),
})
.waiters
.push_back(waiter_record.clone());

if t == u64::MAX {
while !waiter_record.notified.load(StdOrdering::Acquire) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: We should probably use wait_timeout_while to avoid having to deal with the deadlines and whatnot ourselves. Should simplify the code fairly nicely.

guard = waiter_record.condvar.wait(guard).unwrap();
}
} else {
let deadline = Instant::now() + Duration::from_millis(t);
loop {
if waiter_record.notified.load(StdOrdering::Acquire) {
break;
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
if let Some(list) = guard.get_mut(&byte_index_in_buffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I think looping with the guard held here (and above) means that no other thread can start waiting on the SharedDataBlock while we've yet to be woken up. That of course shouldn't be the case, it should be possible for roughly any number of wakers to exist on the same SDB concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, this shouldn't be an issue, because the lock is released when we call: guard = waiter_record.condvar.wait(guard).unwrap(); a few lines below.

So, if I understand this correctly, we will acquire the lock to check the value inside the datablock and then we release it when we call wait, and it is only reacquired when the thread wakes up.

list.waiters.retain(|w| !Arc::ptr_eq(w, &waiter_record));
}
// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
return BUILTIN_STRING_MEMORY.timed_out.into();
}
let (new_guard, _) = waiter_record
.condvar
.wait_timeout(guard, remaining)
.unwrap();
guard = new_guard;
}
}
// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
BUILTIN_STRING_MEMORY.ok.into()
} else {
let promise_capability = PromiseCapability::new(agent, gc);
let promise = Global::new(agent, promise_capability.promise.unbind());
Expand Down Expand Up @@ -1623,7 +1653,7 @@ fn create_wait_result_object<'gc>(
#[derive(Debug)]
struct WaitAsyncJobInner {
promise_to_resolve: Global<Promise<'static>>,
join_handle: JoinHandle<Result<(), FutexError>>,
join_handle: JoinHandle<WaitResult>,
_has_timeout: bool,
}

Expand Down Expand Up @@ -1662,18 +1692,8 @@ impl WaitAsyncJob {
// c. Perform LeaveCriticalSection(WL).
let promise_capability = PromiseCapability::from_promise(promise, true);
let result = match result {
Ok(_) => BUILTIN_STRING_MEMORY.ok.into(),
Err(FutexError::NotEqual) => BUILTIN_STRING_MEMORY.ok.into(),
Err(FutexError::Timeout) => BUILTIN_STRING_MEMORY.timed_out.into(),
Err(FutexError::Unknown) => {
let error = agent.throw_exception_with_static_message(
ExceptionType::Error,
"unknown error occurred",
gc,
);
promise_capability.reject(agent, error.value(), gc);
return Ok(());
}
WaitResult::Ok | WaitResult::NotEqual => BUILTIN_STRING_MEMORY.ok.into(),
WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(),
};
unwrap_try(promise_capability.try_resolve(agent, result, gc));
// d. Return unused.
Expand Down Expand Up @@ -1701,26 +1721,62 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>(
let signal = Arc::new(AtomicBool::new(false));
let s = signal.clone();
let handle = thread::spawn(move || {
// SAFETY: buffer is a cloned SharedDataBlock; non-dangling.
let waiters = unsafe { buffer.get_or_init_waiters() };
let waiter_record = Arc::new(WaiterRecord {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Could create a WaiterRecord::new_shared() helper function that returns Arc<WaiterRecord>.

condvar: Condvar::new(),
notified: AtomicBool::new(false),
});
let mut guard = waiters.lock().unwrap();

// Re-check the value under the critical section.
let slot = buffer.as_racy_slice().slice_from(byte_index_in_buffer);
if IS_I64 {
let v = v as u64;
// SAFETY: buffer is still live and index was checked.
let v_not_equal = if IS_I64 {
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
s.store(true, std::sync::atomic::Ordering::Release);
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as u64 != slot.load(Ordering::SeqCst)
} else {
let v = v as i32 as u32;
// SAFETY: buffer is still live and index was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
s.store(true, std::sync::atomic::Ordering::Release);
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
v as i32 as u32 != slot.load(Ordering::SeqCst)
};

// Signal the main thread that we have the lock and are about to sleep.
s.store(true, StdOrdering::Release);

if v_not_equal {
return WaitResult::NotEqual;
}

guard
.entry(byte_index_in_buffer)
.or_insert_with(|| WaiterList {
waiters: VecDeque::new(),
})
.waiters
.push_back(waiter_record.clone());

if t == u64::MAX {
while !waiter_record.notified.load(StdOrdering::Acquire) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Here too we could use wait_while.

guard = waiter_record.condvar.wait(guard).unwrap();
}
WaitResult::Ok
} else {
let deadline = Instant::now() + Duration::from_millis(t);
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Here too we could use wait_timeout_while.

if waiter_record.notified.load(StdOrdering::Acquire) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I think we should be fine using just Relaxed for the notification. Waking up is our synchronisation, we only really care about the value of the bool here and it is atomic on its own.

return WaitResult::Ok;
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
if let Some(list) = guard.get_mut(&byte_index_in_buffer) {
list.waiters.retain(|w| !Arc::ptr_eq(w, &waiter_record));
}
return WaitResult::TimedOut;
}
let (new_guard, _) = waiter_record
.condvar
.wait_timeout(guard, remaining)
.unwrap();
guard = new_guard;
}
}
});
Expand All @@ -1732,7 +1788,7 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>(
_has_timeout: t != u64::MAX,
}))),
};
while !signal.load(std::sync::atomic::Ordering::Acquire) {
while !signal.load(StdOrdering::Acquire) {
// Wait until the thread has started up and is about to go to sleep.
}
// 2. Let now be the time value (UTC) identifying the current time.
Expand Down
Loading