blob: 4bf08b59c76e401d5b4270350549727f7c42af75 [file] [log] [blame]
// Copyright 2022 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/objects/js-atomics-synchronization.h"
#include "src/base/macros.h"
#include "src/base/platform/yield-processor.h"
#include "src/execution/isolate-inl.h"
#include "src/objects/js-atomics-synchronization-inl.h"
#include "src/objects/js-promise-inl.h"
#include "src/objects/waiter-queue-node.h"
#include "src/sandbox/external-pointer-inl.h"
namespace v8 {
namespace internal {
namespace detail {
class WaiterQueueNode;
}
namespace {
// TODO(v8:12547): Move this logic into a static method JSPromise::PerformThen
// so that other callsites like this one can use it.
// Set fulfill/reject handlers for a JSPromise object.
MaybeDirectHandle<JSReceiver> PerformPromiseThen(
Isolate* isolate, DirectHandle<JSReceiver> promise,
DirectHandle<Object> fulfill_handler,
MaybeDirectHandle<JSFunction> maybe_reject_handler = {}) {
DCHECK(IsCallable(*fulfill_handler));
DirectHandle<Object> reject_handler = isolate->factory()->undefined_value();
if (!maybe_reject_handler.is_null()) {
reject_handler = maybe_reject_handler.ToHandleChecked();
}
DirectHandle<Object> args[] = {fulfill_handler, reject_handler};
DirectHandle<Object> then_result;
ASSIGN_RETURN_ON_EXCEPTION(
isolate, then_result,
Execution::CallBuiltin(isolate, isolate->promise_then(), promise,
base::VectorOf(args)));
return Cast<JSReceiver>(then_result);
}
MaybeDirectHandle<Context> SetAsyncUnlockHandlers(
Isolate* isolate, DirectHandle<JSAtomicsMutex> mutex,
DirectHandle<JSReceiver> waiting_for_callback_promise,
DirectHandle<JSPromise> unlocked_promise) {
DirectHandle<Context> handlers_context =
isolate->factory()->NewBuiltinContext(
isolate->native_context(), JSAtomicsMutex::kAsyncContextLength);
handlers_context->set(JSAtomicsMutex::kMutexAsyncContextSlot, *mutex);
handlers_context->set(JSAtomicsMutex::kUnlockedPromiseAsyncContextSlot,
*unlocked_promise);
DirectHandle<SharedFunctionInfo> resolve_info(
isolate->heap()->atomics_mutex_async_unlock_resolve_handler_sfi(),
isolate);
DirectHandle<JSFunction> resolver_callback =
Factory::JSFunctionBuilder{isolate, resolve_info, handlers_context}
.set_map(isolate->strict_function_without_prototype_map())
.set_allocation_type(AllocationType::kYoung)
.Build();
DirectHandle<SharedFunctionInfo> reject_info(
isolate->heap()->atomics_mutex_async_unlock_reject_handler_sfi(),
isolate);
DirectHandle<JSFunction> reject_callback =
Factory::JSFunctionBuilder{isolate, reject_info, handlers_context}
.set_map(isolate->strict_function_without_prototype_map())
.set_allocation_type(AllocationType::kYoung)
.Build();
RETURN_ON_EXCEPTION(isolate,
PerformPromiseThen(isolate, waiting_for_callback_promise,
resolver_callback, reject_callback));
return handlers_context;
}
void AddPromiseToNativeContext(Isolate* isolate,
DirectHandle<JSPromise> promise) {
DirectHandle<NativeContext> native_context(isolate->native_context());
Handle<OrderedHashSet> promises(native_context->atomics_waitasync_promises(),
isolate);
promises = OrderedHashSet::Add(isolate, promises, promise).ToHandleChecked();
native_context->set_atomics_waitasync_promises(*promises);
}
void RemovePromiseFromNativeContext(Isolate* isolate,
DirectHandle<JSPromise> promise) {
Handle<OrderedHashSet> promises(
isolate->native_context()->atomics_waitasync_promises(), isolate);
bool was_deleted = OrderedHashSet::Delete(isolate, *promises, *promise);
DCHECK(was_deleted);
USE(was_deleted);
promises = OrderedHashSet::Shrink(isolate, promises);
isolate->native_context()->set_atomics_waitasync_promises(*promises);
}
template <typename T>
Global<T> GetWeakGlobal(Isolate* isolate, Local<T> object) {
auto* v8_isolate = reinterpret_cast<v8::Isolate*>(isolate);
v8::Global<T> global(v8_isolate, object);
global.SetWeak();
return global;
}
} // namespace
namespace detail {
// The waiter queue lock guard provides a RAII-style mechanism for locking the
// waiter queue. It is a non copyable and non movable object and a new state
// must be set before destroying the guard.
class V8_NODISCARD WaiterQueueLockGuard final {
using StateT = JSSynchronizationPrimitive::StateT;
public:
// Spinlock to acquire the IsWaiterQueueLockedField bit. current_state is
// updated to the last value of the state before the waiter queue lock was
// acquired.
explicit WaiterQueueLockGuard(std::atomic<StateT>* state,
StateT& current_state)
: state_(state), new_state_(kInvalidState) {
while (!JSSynchronizationPrimitive::TryLockWaiterQueueExplicit(
state, current_state)) {
YIELD_PROCESSOR;
}
}
// Constructor for creating a wrapper around a state whose waiter queue
// is already locked and owned by this thread.
explicit WaiterQueueLockGuard(std::atomic<StateT>* state, bool is_locked)
: state_(state), new_state_(kInvalidState) {
CHECK(is_locked);
DCHECK(JSSynchronizationPrimitive::IsWaiterQueueLockedField::decode(
state->load()));
}
WaiterQueueLockGuard(const WaiterQueueLockGuard&) = delete;
WaiterQueueLockGuard() = delete;
~WaiterQueueLockGuard() {
DCHECK_NOT_NULL(state_);
DCHECK_NE(new_state_, kInvalidState);
DCHECK(JSSynchronizationPrimitive::IsWaiterQueueLockedField::decode(
state_->load()));
new_state_ = JSSynchronizationPrimitive::IsWaiterQueueLockedField::update(
new_state_, false);
state_->store(new_state_, std::memory_order_release);
}
void set_new_state(StateT new_state) { new_state_ = new_state; }
static std::optional<WaiterQueueLockGuard>
NewAlreadyLockedWaiterQueueLockGuard(std::atomic<StateT>* state) {
return std::optional<WaiterQueueLockGuard>(std::in_place, state, true);
}
private:
static constexpr StateT kInvalidState =
~JSSynchronizationPrimitive::kEmptyState;
std::atomic<StateT>* state_;
StateT new_state_;
};
class V8_NODISCARD SyncWaiterQueueNode final : public WaiterQueueNode {
public:
explicit SyncWaiterQueueNode(Isolate* requester)
: WaiterQueueNode(requester), should_wait_(true) {}
void Wait() {
AllowGarbageCollection allow_before_parking;
requester_->main_thread_local_heap()->ExecuteWhileParked([this]() {
base::MutexGuard guard(&wait_lock_);
while (should_wait_) {
wait_cond_var_.Wait(&wait_lock_);
}
});
}
// Returns false if timed out, true otherwise.
bool WaitFor(const base::TimeDelta& rel_time) {
bool result;
AllowGarbageCollection allow_before_parking;
requester_->main_thread_local_heap()->ExecuteWhileParked([this, rel_time,
&result]() {
base::MutexGuard guard(&wait_lock_);
base::TimeTicks current_time = base::TimeTicks::Now();
base::TimeTicks timeout_time = current_time + rel_time;
for (;;) {
if (!should_wait_) {
result = true;
return;
}
current_time = base::TimeTicks::Now();
if (current_time >= timeout_time) {
result = false;
return;
}
base::TimeDelta time_until_timeout = timeout_time - current_time;
bool wait_res = wait_cond_var_.WaitFor(&wait_lock_, time_until_timeout);
USE(wait_res);
// The wake up may have been spurious, so loop again.
}
});
return result;
}
void Notify() override {
base::MutexGuard guard(&wait_lock_);
should_wait_ = false;
wait_cond_var_.NotifyOne();
SetNotInListForVerification();
}
bool IsSameIsolateForAsyncCleanup(Isolate* isolate) override {
// Sync waiters are only queued while the thread is sleeping, so there
// should not be sync nodes while cleaning up the isolate.
DCHECK_NE(requester_, isolate);
return false;
}
void CleanupMatchingAsyncWaiters(const DequeueMatcher& matcher) override {
UNREACHABLE();
}
private:
void SetReadyForAsyncCleanup() override { UNREACHABLE(); }
base::Mutex wait_lock_;
base::ConditionVariable wait_cond_var_;
bool should_wait_;
};
template <typename T>
class AsyncWaiterNotifyTask : public CancelableTask {
public:
AsyncWaiterNotifyTask(CancelableTaskManager* cancelable_task_manager,
typename T::AsyncWaiterNodeType* node)
: CancelableTask(cancelable_task_manager), node_(node) {}
void RunInternal() override {
if (node_->requester_->cancelable_task_manager()->canceled()) return;
T::HandleAsyncNotify(node_);
}
private:
typename T::AsyncWaiterNodeType* node_;
};
template <typename T>
class AsyncWaiterTimeoutTask : public CancelableTask {
public:
AsyncWaiterTimeoutTask(CancelableTaskManager* cancelable_task_manager,
typename T::AsyncWaiterNodeType* node)
: CancelableTask(cancelable_task_manager), node_(node) {}
void RunInternal() override {
if (node_->requester_->cancelable_task_manager()->canceled()) return;
T::HandleAsyncTimeout(node_);
}
private:
typename T::AsyncWaiterNodeType* node_;
};
template <typename T>
class V8_NODISCARD AsyncWaiterQueueNode final : public WaiterQueueNode {
public:
static AsyncWaiterQueueNode<T>* NewAsyncWaiterStoredInIsolate(
Isolate* requester, DirectHandle<T> synchronization_primitive,
Handle<JSPromise> internal_waiting_promise,
MaybeHandle<JSPromise> unlocked_promise = {}) {
auto waiter =
std::unique_ptr<AsyncWaiterQueueNode<T>>(new AsyncWaiterQueueNode<T>(
requester, synchronization_primitive, internal_waiting_promise,
unlocked_promise));
AsyncWaiterQueueNode<T>* raw_waiter = waiter.get();
requester->async_waiter_queue_nodes().push_back(std::move(waiter));
return raw_waiter;
}
// Creates a minimal LockAsyncWaiterQueueNode so that the isolate can keep
// track of the locked mutexes and release them in case of isolate deinit.
static AsyncWaiterQueueNode<T>* NewLockedAsyncWaiterStoredInIsolate(
Isolate* requester, DirectHandle<T> synchronization_primitive) {
DCHECK(IsJSAtomicsMutex(*synchronization_primitive));
auto waiter = std::unique_ptr<AsyncWaiterQueueNode<T>>(
new AsyncWaiterQueueNode<T>(requester, synchronization_primitive));
AsyncWaiterQueueNode<T>* raw_waiter = waiter.get();
requester->async_waiter_queue_nodes().push_back(std::move(waiter));
return raw_waiter;
}
TaskRunner* task_runner() { return task_runner_.get(); }
Local<v8::Context> GetNativeContext() {
v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
return native_context_.Get(v8_isolate);
}
Handle<JSPromise> GetInternalWaitingPromise() {
v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
Handle<JSPromise> internal_waiting_promise =
Utils::OpenHandle(*internal_waiting_promise_.Get(v8_isolate));
return internal_waiting_promise;
}
bool IsEmpty() const { return synchronization_primitive_.IsEmpty(); }
Handle<T> GetSynchronizationPrimitive() {
v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
Handle<T> synchronization_primitive =
Cast<T>(Utils::OpenHandle(*synchronization_primitive_.Get(v8_isolate)));
return synchronization_primitive;
}
Handle<JSPromise> GetUnlockedPromise() {
v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
Handle<JSPromise> unlocked_promise =
Cast<JSPromise>(Utils::OpenHandle(*unlocked_promise_.Get(v8_isolate)));
return unlocked_promise;
}
void Notify() override {
SetNotInListForVerification();
CancelableTaskManager* task_manager = requester_->cancelable_task_manager();
if (task_manager->canceled()) return;
auto notify_task =
std::make_unique<AsyncWaiterNotifyTask<T>>(task_manager, this);
notify_task_id_ = notify_task->id();
task_runner_->PostNonNestableTask(std::move(notify_task));
}
bool IsSameIsolateForAsyncCleanup(Isolate* isolate) override {
return requester_ == isolate;
}
void CleanupMatchingAsyncWaiters(const DequeueMatcher& matcher) override {
T::CleanupMatchingAsyncWaiters(requester_, this, matcher);
}
// Removes the node from the isolate's `async_waiter_queue_nodes` list; the
// passing node will be invalid after this call since the corresponding
// unique_ptr is deleted upon removal.
static void RemoveFromAsyncWaiterQueueList(AsyncWaiterQueueNode<T>* node) {
node->requester_->async_waiter_queue_nodes().remove_if(
[=](std::unique_ptr<WaiterQueueNode>& n) { return n.get() == node; });
}
private:
friend JSAtomicsMutex;
friend JSAtomicsCondition;
friend AsyncWaiterNotifyTask<T>;
friend AsyncWaiterTimeoutTask<T>;
explicit AsyncWaiterQueueNode(Isolate* requester,
DirectHandle<T> synchronization_primitive)
: WaiterQueueNode(requester),
notify_task_id_(CancelableTaskManager::kInvalidTaskId) {
native_context_ =
GetWeakGlobal(requester, Utils::ToLocal(requester->native_context()));
synchronization_primitive_ = GetWeakGlobal(
requester, Utils::ToLocal(Cast<JSObject>(synchronization_primitive)));
}
explicit AsyncWaiterQueueNode(
Isolate* requester, DirectHandle<T> synchronization_primitive,
DirectHandle<JSPromise> internal_waiting_promise,
MaybeDirectHandle<JSPromise> unlocked_promise)
: WaiterQueueNode(requester),
notify_task_id_(CancelableTaskManager::kInvalidTaskId) {
v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester);
task_runner_ =
V8::GetCurrentPlatform()->GetForegroundTaskRunner(v8_isolate);
timeout_task_id_ = CancelableTaskManager::kInvalidTaskId;
native_context_ =
GetWeakGlobal(requester, Utils::ToLocal(requester->native_context()));
synchronization_primitive_ = GetWeakGlobal(
requester, Utils::ToLocal(Cast<JSObject>(synchronization_primitive)));
internal_waiting_promise_ = GetWeakGlobal(
requester, Utils::PromiseToLocal(internal_waiting_promise));
if (!unlocked_promise.is_null()) {
DCHECK(IsJSAtomicsMutex(*synchronization_primitive));
unlocked_promise_ = GetWeakGlobal(
requester, Utils::PromiseToLocal(unlocked_promise.ToHandleChecked()));
}
}
void SetReadyForAsyncCleanup() override { ready_for_async_cleanup_ = true; }
std::shared_ptr<TaskRunner> task_runner_;
CancelableTaskManager::Id timeout_task_id_;
CancelableTaskManager::Id notify_task_id_;
bool ready_for_async_cleanup_ = false;
// The node holds weak global handles to the v8 required to handle the
// notification.
Global<v8::Context> native_context_;
// `internal_waiting_promise_` is the internal first promise of the chain. See
// comments in `JSAtomicsMutex` and `JSAtomicsCondition`.
Global<v8::Promise> internal_waiting_promise_;
Global<v8::Object> synchronization_primitive_;
// `unlocked_promise` is the user exposed promise used to handle timeouts,
// it should be empty for `JSAtomicsCondition`.
Global<v8::Promise> unlocked_promise_;
};
} // namespace detail
using detail::SyncWaiterQueueNode;
using LockAsyncWaiterQueueNode = detail::AsyncWaiterQueueNode<JSAtomicsMutex>;
using WaitAsyncWaiterQueueNode =
detail::AsyncWaiterQueueNode<JSAtomicsCondition>;
using AsyncWaitTimeoutTask = detail::AsyncWaiterTimeoutTask<JSAtomicsCondition>;
using AsyncLockTimeoutTask = detail::AsyncWaiterTimeoutTask<JSAtomicsMutex>;
// static
void JSSynchronizationPrimitive::IsolateDeinit(Isolate* isolate) {
CleanupAsyncWaiterLists(isolate, [=](WaiterQueueNode* waiter) {
return waiter->IsSameIsolateForAsyncCleanup(isolate);
});
}
void JSSynchronizationPrimitive::CleanupAsyncWaiterLists(
Isolate* isolate, DequeueMatcher matcher) {
DisallowGarbageCollection no_gc;
std::list<std::unique_ptr<WaiterQueueNode>>& async_waiter_queue_nodes_list =
isolate->async_waiter_queue_nodes();
if (!async_waiter_queue_nodes_list.empty()) {
// There is no allocation in the following code, so there shouldn't be any
// GC, but we use a HandleScope to dereference the global handles.
HandleScope handle_scope(isolate);
auto it = async_waiter_queue_nodes_list.begin();
while (it != async_waiter_queue_nodes_list.end()) {
WaiterQueueNode* async_node = it->get();
if (!matcher(async_node)) {
it++;
continue;
}
async_node->CleanupMatchingAsyncWaiters(matcher);
it = async_waiter_queue_nodes_list.erase(it);
}
}
}
// static
bool JSSynchronizationPrimitive::TryLockWaiterQueueExplicit(
std::atomic<StateT>* state, StateT& expected) {
// Try to acquire the queue lock.
expected = IsWaiterQueueLockedField::update(expected, false);
return state->compare_exchange_weak(
expected, IsWaiterQueueLockedField::update(expected, true),
std::memory_order_acquire, std::memory_order_relaxed);
}
// static
void JSSynchronizationPrimitive::SetWaiterQueueStateOnly(
std::atomic<StateT>* state, StateT new_state) {
// Set the new state changing only the waiter queue bits.
DCHECK_EQ(new_state & ~kWaiterQueueMask, 0);
StateT expected = state->load(std::memory_order_relaxed);
StateT desired;
do {
desired = new_state | (expected & ~kWaiterQueueMask);
} while (!state->compare_exchange_weak(
expected, desired, std::memory_order_release, std::memory_order_relaxed));
}
Tagged<Object> JSSynchronizationPrimitive::NumWaitersForTesting(
Isolate* requester) {
DisallowGarbageCollection no_gc;
std::atomic<StateT>* state = AtomicStatePtr();
StateT current_state = state->load(std::memory_order_acquire);
// There are no waiters.
if (!HasWaitersField::decode(current_state)) return Smi::FromInt(0);
int num_waiters;
{
// If this is counting the number of waiters on a mutex, the js mutex
// can be taken by another thread without acquiring the queue lock. We
// handle the state manually to release the queue lock without changing the
// "is locked" bit.
while (!TryLockWaiterQueueExplicit(state, current_state)) {
YIELD_PROCESSOR;
}
if (!HasWaitersField::decode(current_state)) {
// The queue was emptied while waiting for the queue lock.
SetWaiterQueueStateOnly(state, kEmptyState);
return Smi::FromInt(0);
}
// Get the waiter queue head.
WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
DCHECK_NOT_NULL(waiter_head);
num_waiters = WaiterQueueNode::LengthFromHead(waiter_head);
// Release the queue lock and reinstall the same queue head by creating a
// new state.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state = SetWaiterQueueHead(requester, waiter_head, kEmptyState);
new_state = IsWaiterQueueLockedField::update(new_state, false);
SetWaiterQueueStateOnly(state, new_state);
}
return Smi::FromInt(num_waiters);
}
// TODO(lpardosixtos): Consider making and caching a canonical map for this
// result object, like we do for the iterator result object.
// static
DirectHandle<JSObject> JSAtomicsMutex::CreateResultObject(
Isolate* isolate, DirectHandle<Object> value, bool success) {
DirectHandle<JSObject> result =
isolate->factory()->NewJSObject(isolate->object_function());
DirectHandle<Object> success_value = isolate->factory()->ToBoolean(success);
JSObject::AddProperty(isolate, result, "value", value,
PropertyAttributes::NONE);
JSObject::AddProperty(isolate, result, "success", success_value,
PropertyAttributes::NONE);
return result;
}
// static
void JSAtomicsMutex::CleanupMatchingAsyncWaiters(Isolate* isolate,
WaiterQueueNode* node,
DequeueMatcher matcher) {
auto* async_node = static_cast<LockAsyncWaiterQueueNode*>(node);
if (async_node->ready_for_async_cleanup_) {
// Whenever a node needs to be looked up in the waiter queue we also remove
// any other matching nodes and mark them as ready for async cleanup. This
// way we avoid taking the queue lock multiple times, which could slow down
// other threads.
return;
}
if (async_node->IsEmpty()) {
// The node's underlying synchronization primitive has been collected, so
// delete it.
async_node->SetNotInListForVerification();
return;
}
DirectHandle<JSAtomicsMutex> mutex =
async_node->GetSynchronizationPrimitive();
std::atomic<StateT>* state = mutex->AtomicStatePtr();
StateT current_state = state->load(std::memory_order_relaxed);
// The details of updating the state in this function are too complicated
// for the waiter queue lock guard to manage, so handle the state manually.
while (!TryLockWaiterQueueExplicit(state, current_state)) {
YIELD_PROCESSOR;
}
bool was_locked_by_this_thread = mutex->IsCurrentThreadOwner();
WaiterQueueNode* waiter_head =
mutex->DestructivelyGetWaiterQueueHead(isolate);
if (waiter_head) {
// Dequeue all the matching waiters.
WaiterQueueNode::DequeueAllMatchingForAsyncCleanup(&waiter_head, matcher);
if (!async_node->ready_for_async_cleanup_) {
// The node was not in the queue, so it has already being notified.
// Notify the next head unless the lock is already taken by a different
// thread or the queue may stall.
if (waiter_head && (!IsLockedField::decode(current_state) ||
was_locked_by_this_thread)) {
// Notify the next head unless the lock is already taken, in which case
// the lock owner will notify the next waiter.
WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
old_head->Notify();
}
}
}
StateT new_state = kUnlockedUncontended;
new_state = mutex->SetWaiterQueueHead(isolate, waiter_head, new_state);
new_state = IsWaiterQueueLockedField::update(new_state, false);
if (was_locked_by_this_thread) {
mutex->ClearOwnerThread();
new_state = IsLockedField::update(new_state, false);
state->store(new_state, std::memory_order_release);
} else {
SetWaiterQueueStateOnly(state, new_state);
}
}
// static
bool JSAtomicsMutex::TryLockExplicit(std::atomic<StateT>* state,
StateT& expected) {
// Try to lock a possibly contended mutex.
expected = IsLockedField::update(expected, false);
return state->compare_exchange_weak(
expected, IsLockedField::update(expected, true),
std::memory_order_acquire, std::memory_order_relaxed);
}
bool JSAtomicsMutex::BackoffTryLock(Isolate* requester,
DirectHandle<JSAtomicsMutex> mutex,
std::atomic<StateT>* state) {
// The backoff algorithm is copied from PartitionAlloc's SpinningMutex.
constexpr int kSpinCount = 64;
constexpr int kMaxBackoff = 16;
int tries = 0;
int backoff = 1;
StateT current_state = state->load(std::memory_order_relaxed);
do {
if (JSAtomicsMutex::TryLockExplicit(state, current_state)) return true;
for (int yields = 0; yields < backoff; yields++) {
YIELD_PROCESSOR;
tries++;
}
backoff = std::min(kMaxBackoff, backoff << 1);
} while (tries < kSpinCount);
return false;
}
bool JSAtomicsMutex::MaybeEnqueueNode(Isolate* requester,
DirectHandle<JSAtomicsMutex> mutex,
std::atomic<StateT>* state,
WaiterQueueNode* this_waiter) {
DCHECK_NOT_NULL(this_waiter);
// Try to acquire the queue lock, which is itself a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
std::optional<WaiterQueueLockGuard> waiter_queue_lock_guard =
LockWaiterQueueOrJSMutex(state, current_state);
if (!waiter_queue_lock_guard.has_value()) {
// There is no waiter queue lock guard, so the lock was acquired.
DCHECK(IsLockedField::decode(state->load()));
return false;
}
// With the queue lock held, enqueue the requester onto the waiter queue.
WaiterQueueNode* waiter_head =
mutex->DestructivelyGetWaiterQueueHead(requester);
WaiterQueueNode::Enqueue(&waiter_head, this_waiter);
// Enqueue a new waiter queue head and release the queue lock.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state =
mutex->SetWaiterQueueHead(requester, waiter_head, current_state);
// The lock is held, just not by us, so don't set the current thread id as
// the owner.
DCHECK(IsLockedField::decode(current_state));
new_state = IsLockedField::update(new_state, true);
waiter_queue_lock_guard->set_new_state(new_state);
return true;
}
// static
std::optional<WaiterQueueLockGuard> JSAtomicsMutex::LockWaiterQueueOrJSMutex(
std::atomic<StateT>* state, StateT& current_state) {
for (;;) {
if (IsLockedField::decode(current_state) &&
TryLockWaiterQueueExplicit(state, current_state)) {
return WaiterQueueLockGuard::NewAlreadyLockedWaiterQueueLockGuard(state);
}
// Also check for the lock having been released by another thread during
// attempts to acquire the queue lock.
if (TryLockExplicit(state, current_state)) return std::nullopt;
YIELD_PROCESSOR;
}
}
bool JSAtomicsMutex::LockJSMutexOrDequeueTimedOutWaiter(
Isolate* requester, std::atomic<StateT>* state,
WaiterQueueNode* timed_out_waiter) {
// First acquire the queue lock, which is itself a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
// There are no waiters, but the js mutex lock may be held by another thread.
if (!HasWaitersField::decode(current_state)) return false;
// The details of updating the state in this function are too complicated
// for the waiter queue lock guard to manage, so handle the state manually.
while (!TryLockWaiterQueueExplicit(state, current_state)) {
YIELD_PROCESSOR;
}
WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
if (waiter_head == nullptr) {
// The queue is empty but the js mutex lock may be held by another thread,
// release the waiter queue bit without changing the "is locked" bit.
DCHECK(!HasWaitersField::decode(current_state));
SetWaiterQueueStateOnly(state, kUnlockedUncontended);
return false;
}
WaiterQueueNode* dequeued_node = WaiterQueueNode::DequeueMatching(
&waiter_head,
[&](WaiterQueueNode* node) { return node == timed_out_waiter; });
// Release the queue lock and install the new waiter queue head.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state = kUnlockedUncontended;
new_state = SetWaiterQueueHead(requester, waiter_head, new_state);
if (!dequeued_node) {
// The timed out waiter was not in the queue, so it must have been dequeued
// and notified between the time this thread woke up and the time it
// acquired the queue lock, so there is a risk that the next queue head is
// never notified. Try to take the js mutex lock here, if we succeed, the
// next node will be notified by this thread, otherwise, it will be notified
// by the thread holding the lock now.
// Since we use strong CAS below, we know that the js mutex lock will be
// held by either this thread or another thread that can't go through the
// unlock fast path because this thread is holding the waiter queue lock.
// Hence, it is safe to always set the "is locked" bit in new_state.
new_state = IsLockedField::update(new_state, true);
DCHECK(!IsWaiterQueueLockedField::decode(new_state));
current_state = IsLockedField::update(current_state, false);
if (state->compare_exchange_strong(current_state, new_state,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// The CAS atomically released the waiter queue lock and acquired the js
// mutex lock.
return true;
}
DCHECK(IsLockedField::decode(state->load()));
state->store(new_state, std::memory_order_release);
return false;
}
SetWaiterQueueStateOnly(state, new_state);
return false;
}
// static
bool JSAtomicsMutex::LockSlowPath(Isolate* requester,
DirectHandle<JSAtomicsMutex> mutex,
std::atomic<StateT>* state,
std::optional<base::TimeDelta> timeout) {
for (;;) {
// Spin for a little bit to try to acquire the lock, so as to be fast under
// microcontention.
if (BackoffTryLock(requester, mutex, state)) return true;
// At this point the lock is considered contended, so try to go to sleep and
// put the requester thread on the waiter queue.
// Allocate a waiter queue node on-stack, since this thread is going to
// sleep and will be blocked anyway.
SyncWaiterQueueNode this_waiter(requester);
if (!MaybeEnqueueNode(requester, mutex, state, &this_waiter)) return true;
bool rv;
// Wait for another thread to release the lock and wake us up.
if (timeout) {
rv = this_waiter.WaitFor(*timeout);
// Reload the state pointer after wake up in case of shared GC while
// blocked.
state = mutex->AtomicStatePtr();
if (!rv) {
// If timed out, remove ourself from the waiter list, which is usually
// done by the thread performing the notifying.
rv = mutex->LockJSMutexOrDequeueTimedOutWaiter(requester, state,
&this_waiter);
return rv;
}
} else {
this_waiter.Wait();
// Reload the state pointer after wake up in case of shared GC while
// blocked.
state = mutex->AtomicStatePtr();
}
// After wake up we try to acquire the lock again by spinning, as the
// contention at the point of going to sleep should not be correlated with
// contention at the point of waking up.
}
}
void JSAtomicsMutex::UnlockSlowPath(Isolate* requester,
std::atomic<StateT>* state) {
// The fast path unconditionally cleared the owner thread.
DCHECK_EQ(ThreadId::Invalid().ToInteger(),
AtomicOwnerThreadIdPtr()->load(std::memory_order_relaxed));
// To wake a sleeping thread, first acquire the queue lock, which is itself
// a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
if (!HasWaitersField::decode(current_state)) {
// All waiters were removed while waiting for the queue lock, possibly by
// timing out. Release both the lock and the queue lock.
StateT new_state = IsLockedField::update(current_state, false);
waiter_queue_lock_guard.set_new_state(new_state);
return;
}
WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
DCHECK_NOT_NULL(waiter_head);
WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
// Release both the lock and the queue lock, and install the new waiter queue
// head.
StateT new_state = IsLockedField::update(current_state, false);
new_state = SetWaiterQueueHead(requester, waiter_head, new_state);
waiter_queue_lock_guard.set_new_state(new_state);
old_head->Notify();
}
// The lockAsync flow is controlled by a series of promises:
// 1. `internal_locked_promise`, a promise that settles when the mutex is
// locked. When this promise is resolved, the callback is run. Not exposed to
// user code.
// 2. `waiting_for_callback_promise`, a promise that settles when the callback
// completes. When this promise settles, the mutex is unlocked
// 3. `unlocked_promise`, a promise that settles when the mutex is unlocked,
// either explicitly or by timeout. Returned by lockAsync.
// static
MaybeDirectHandle<JSPromise> JSAtomicsMutex::LockOrEnqueuePromise(
Isolate* requester, DirectHandle<JSAtomicsMutex> mutex,
DirectHandle<Object> callback, std::optional<base::TimeDelta> timeout) {
Handle<JSPromise> internal_locked_promise =
requester->factory()->NewJSPromise();
DirectHandle<JSReceiver> waiting_for_callback_promise;
ASSIGN_RETURN_ON_EXCEPTION(
requester, waiting_for_callback_promise,
PerformPromiseThen(requester, internal_locked_promise, callback));
Handle<JSPromise> unlocked_promise = requester->factory()->NewJSPromise();
// Set the async unlock handlers here so we can throw without any additional
// cleanup if the inner `promise_then` call fails. Keep a reference to
// the handlers' synthetic context so we can store the waiter node in it once
// the node is created.
DirectHandle<Context> handlers_context;
ASSIGN_RETURN_ON_EXCEPTION(
requester, handlers_context,
SetAsyncUnlockHandlers(requester, mutex, waiting_for_callback_promise,
unlocked_promise));
LockAsyncWaiterQueueNode* waiter_node = nullptr;
bool locked = LockAsync(requester, mutex, internal_locked_promise,
unlocked_promise, &waiter_node, timeout);
if (locked) {
// Create an LockAsyncWaiterQueueNode to be queued in the async locked
// waiter queue.
DCHECK(!waiter_node);
waiter_node = LockAsyncWaiterQueueNode::NewLockedAsyncWaiterStoredInIsolate(
requester, mutex);
}
// Don't use kWaiterQueueNodeTag here as that will cause the pointer to be
// stored in the shared external pointer table, which is not necessary since
// this object is only visible in this thread.
DirectHandle<Foreign> wrapper =
requester->factory()->NewForeign<kWaiterQueueForeignTag>(
reinterpret_cast<Address>(waiter_node));
handlers_context->set(JSAtomicsMutex::kAsyncLockedWaiterAsyncContextSlot,
*wrapper);
return unlocked_promise;
}
// static
bool JSAtomicsMutex::LockAsync(Isolate* requester,
DirectHandle<JSAtomicsMutex> mutex,
Handle<JSPromise> internal_locked_promise,
MaybeHandle<JSPromise> unlocked_promise,
LockAsyncWaiterQueueNode** waiter_node,
std::optional<base::TimeDelta> timeout) {
bool locked =
LockImpl(requester, mutex, timeout, [=](std::atomic<StateT>* state) {
return LockAsyncSlowPath(requester, mutex, state,
internal_locked_promise, unlocked_promise,
waiter_node, timeout);
});
if (locked) {
// Resolve `internal_locked_promise` instead of synchronously running the
// callback. This guarantees that the callback is run in a microtask
// regardless of the current state of the mutex.
MaybeDirectHandle<Object> result = JSPromise::Resolve(
internal_locked_promise, requester->factory()->undefined_value());
USE(result);
} else {
// If the promise is not resolved, keep it alive in a set in the native
// context. The promise will be resolved and remove from the set in
// `JSAtomicsMutex::HandleAsyncNotify` or
// `JSAtomicsMutex::HandleAsyncTimeout`.
AddPromiseToNativeContext(requester, internal_locked_promise);
}
return locked;
}
// static
DirectHandle<JSPromise> JSAtomicsMutex::LockAsyncWrapperForWait(
Isolate* requester, DirectHandle<JSAtomicsMutex> mutex) {
Handle<JSPromise> internal_locked_promise =
requester->factory()->NewJSPromise();
AsyncWaiterNodeType* waiter_node = nullptr;
LockAsync(requester, mutex, internal_locked_promise, {}, &waiter_node);
return internal_locked_promise;
}
// static
bool JSAtomicsMutex::LockAsyncSlowPath(
Isolate* isolate, DirectHandle<JSAtomicsMutex> mutex,
std::atomic<StateT>* state, Handle<JSPromise> internal_locked_promise,
MaybeHandle<JSPromise> unlocked_promise,
LockAsyncWaiterQueueNode** waiter_node,
std::optional<base::TimeDelta> timeout) {
// Spin for a little bit to try to acquire the lock, so as to be fast under
// microcontention.
if (BackoffTryLock(isolate, mutex, state)) {
return true;
}
// At this point the lock is considered contended, create a new async waiter
// node in the C++ heap. It's lifetime is managed by the requester's
// `async_waiter_queue_nodes` list.
LockAsyncWaiterQueueNode* this_waiter =
LockAsyncWaiterQueueNode::NewAsyncWaiterStoredInIsolate(
isolate, mutex, internal_locked_promise, unlocked_promise);
if (!MaybeEnqueueNode(isolate, mutex, state, this_waiter)) {
return true;
}
if (timeout) {
// Start a timer to run the `AsyncLockTimeoutTask` after the timeout.
TaskRunner* taks_runner = this_waiter->task_runner();
auto task = std::make_unique<AsyncLockTimeoutTask>(
isolate->cancelable_task_manager(), this_waiter);
this_waiter->timeout_task_id_ = task->id();
taks_runner->PostNonNestableDelayedTask(std::move(task),
timeout->InSecondsF());
}
*waiter_node = this_waiter;
return false;
}
// static
bool JSAtomicsMutex::LockOrEnqueueAsyncNode(Isolate* isolate,
DirectHandle<JSAtomicsMutex> mutex,
LockAsyncWaiterQueueNode* waiter) {
std::atomic<StateT>* state = mutex->AtomicStatePtr();
// Spin for a little bit to try to acquire the lock, so as to be fast under
// microcontention.
if (BackoffTryLock(isolate, mutex, state)) {
return true;
}
return !MaybeEnqueueNode(isolate, mutex, state, waiter);
}
void JSAtomicsMutex::UnlockAsyncLockedMutex(
Isolate* requester, DirectHandle<Foreign> async_locked_waiter_wrapper) {
LockAsyncWaiterQueueNode* waiter_node =
reinterpret_cast<LockAsyncWaiterQueueNode*>(
async_locked_waiter_wrapper->foreign_address<kWaiterQueueForeignTag>(
IsolateForSandbox(requester)));
LockAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter_node);
if (IsCurrentThreadOwner()) {
Unlock(requester);
return;
}
// If this is reached, the lock was already released by this thread.
// This can happen if waitAsync is called without awaiting or due to
// promise prototype tampering. Setting Promise.prototype.then to a
// non callable will cause the `waiting_for_callback_promise` (defined in
// LockOrEnqueuePromise) reactions to be called even if the async callback
// is not resolved; as a consequence, the following code will try to unlock
// the mutex twice:
//
// let mutex = new Atomics.Mutex();
// let cv = new Atomics.Condition();
// Promise.prototype.then = undefined;
// Atomics.Mutex.lockAsync(mutex, async function() {
// await Atomics.Condition.waitAsync(cv, mutex);
// }
}
bool JSAtomicsMutex::DequeueTimedOutAsyncWaiter(
Isolate* requester, DirectHandle<JSAtomicsMutex> mutex,
std::atomic<StateT>* state, WaiterQueueNode* timed_out_waiter) {
// First acquire the queue lock, which is itself a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
// There are no waiters, but the js mutex lock may be held by another thread.
if (!HasWaitersField::decode(current_state)) return false;
// The details of updating the state in this function are too complicated
// for the waiter queue lock guard to manage, so handle the state manually.
while (!TryLockWaiterQueueExplicit(state, current_state)) {
YIELD_PROCESSOR;
}
// Get the waiter queue head.
WaiterQueueNode* waiter_head =
mutex->DestructivelyGetWaiterQueueHead(requester);
if (waiter_head == nullptr) {
// The queue is empty but the js mutex lock may be held by another thread,
// release the waiter queue bit without changing the "is locked" bit.
DCHECK(!HasWaitersField::decode(current_state));
SetWaiterQueueStateOnly(state, kUnlockedUncontended);
return false;
}
WaiterQueueNode* dequeued_node = WaiterQueueNode::DequeueMatching(
&waiter_head,
[&](WaiterQueueNode* node) { return node == timed_out_waiter; });
// Release the queue lock and install the new waiter queue head.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state = kUnlockedUncontended;
new_state = mutex->SetWaiterQueueHead(requester, waiter_head, new_state);
SetWaiterQueueStateOnly(state, new_state);
return dequeued_node != nullptr;
}
// static
void JSAtomicsMutex::HandleAsyncTimeout(LockAsyncWaiterQueueNode* waiter) {
Isolate* requester = waiter->requester_;
HandleScope scope(requester);
if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
// The native context was destroyed so the lock_promise was already removed
// from the native context. Remove the node from the async unlocked waiter
// list.
LockAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
return;
}
v8::Context::Scope contextScope(waiter->GetNativeContext());
DirectHandle<JSAtomicsMutex> js_mutex = waiter->GetSynchronizationPrimitive();
bool dequeued = JSAtomicsMutex::DequeueTimedOutAsyncWaiter(
requester, js_mutex, js_mutex->AtomicStatePtr(), waiter);
// If the waiter is no longer in the queue, then its corresponding notify
// task is already in the event loop, this doesn't guarantee that the lock
// will be taken by the time the notify task runs, so cancel the notify task.
if (!dequeued) {
TryAbortResult abort_result =
requester->cancelable_task_manager()->TryAbort(waiter->notify_task_id_);
DCHECK_EQ(abort_result, TryAbortResult::kTaskAborted);
USE(abort_result);
}
DirectHandle<JSPromise> lock_promise = waiter->GetInternalWaitingPromise();
DirectHandle<JSPromise> lock_async_promise = waiter->GetUnlockedPromise();
DirectHandle<JSObject> result = CreateResultObject(
requester, requester->factory()->undefined_value(), false);
auto resolve_result = JSPromise::Resolve(lock_async_promise, result);
USE(resolve_result);
LockAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
RemovePromiseFromNativeContext(requester, lock_promise);
}
// static
void JSAtomicsMutex::HandleAsyncNotify(LockAsyncWaiterQueueNode* waiter) {
Isolate* requester = waiter->requester_;
HandleScope scope(requester);
if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
// The native context was destroyed, so the promise was already removed. But
// it is possible that other threads are holding references to the
// synchronization primitive. Try to notify the next waiter.
if (!waiter->synchronization_primitive_.IsEmpty()) {
DirectHandle<JSAtomicsMutex> js_mutex =
waiter->GetSynchronizationPrimitive();
std::atomic<StateT>* state = js_mutex->AtomicStatePtr();
StateT current_state = state->load(std::memory_order_acquire);
if (HasWaitersField::decode(current_state)) {
// Another thread might take the lock while we are notifying the next
// waiter, so manually release the queue lock without changing the
// IsLockedField bit.
while (!TryLockWaiterQueueExplicit(state, current_state)) {
YIELD_PROCESSOR;
}
WaiterQueueNode* waiter_head =
js_mutex->DestructivelyGetWaiterQueueHead(requester);
if (waiter_head) {
WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
old_head->Notify();
}
StateT new_state =
js_mutex->SetWaiterQueueHead(requester, waiter_head, kEmptyState);
new_state = IsWaiterQueueLockedField::update(new_state, false);
SetWaiterQueueStateOnly(state, new_state);
}
}
LockAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
return;
}
v8::Context::Scope contextScope(waiter->GetNativeContext());
DirectHandle<JSAtomicsMutex> js_mutex = waiter->GetSynchronizationPrimitive();
DirectHandle<JSPromise> promise = waiter->GetInternalWaitingPromise();
bool locked = LockOrEnqueueAsyncNode(requester, js_mutex, waiter);
if (locked) {
if (waiter->timeout_task_id_ != CancelableTaskManager::kInvalidTaskId) {
TryAbortResult abort_result =
requester->cancelable_task_manager()->TryAbort(
waiter->timeout_task_id_);
DCHECK_EQ(abort_result, TryAbortResult::kTaskAborted);
USE(abort_result);
}
if (waiter->unlocked_promise_.IsEmpty()) {
// This node came from an async wait notify giving control back to an
// async lock call, so we don't need to put the node in the locked waiter
// list because the original LockAsycWaiterQueueNode is already in
// the locked waiter list.
LockAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
}
js_mutex->SetCurrentThreadAsOwner();
auto resolve_result =
JSPromise::Resolve(promise, requester->factory()->undefined_value());
USE(resolve_result);
RemovePromiseFromNativeContext(requester, promise);
}
}
// static
void JSAtomicsCondition::CleanupMatchingAsyncWaiters(Isolate* isolate,
WaiterQueueNode* node,
DequeueMatcher matcher) {
auto* async_node = static_cast<WaitAsyncWaiterQueueNode*>(node);
if (async_node->ready_for_async_cleanup_) {
// The node is not in the waiter queue and there is no HandleNotify task
// for it in the event loop. So it is safe to delete it.
return;
}
if (async_node->IsEmpty()) {
// The node's underlying synchronization primitive has been collected, so
// delete it.
async_node->SetNotInListForVerification();
return;
}
DirectHandle<JSAtomicsCondition> cv =
async_node->GetSynchronizationPrimitive();
std::atomic<StateT>* state = cv->AtomicStatePtr();
StateT current_state = state->load(std::memory_order_relaxed);
WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(isolate);
if (waiter_head) {
WaiterQueueNode::DequeueAllMatchingForAsyncCleanup(&waiter_head, matcher);
}
StateT new_state =
cv->SetWaiterQueueHead(isolate, waiter_head, current_state);
waiter_queue_lock_guard.set_new_state(new_state);
}
// static
void JSAtomicsCondition::QueueWaiter(Isolate* requester,
DirectHandle<JSAtomicsCondition> cv,
WaiterQueueNode* waiter) {
// The state pointer should not be used outside of this block as a shared GC
// may reallocate it after waiting.
std::atomic<StateT>* state = cv->AtomicStatePtr();
// Try to acquire the queue lock, which is itself a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
// With the queue lock held, enqueue the requester onto the waiter queue.
WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(requester);
WaiterQueueNode::Enqueue(&waiter_head, waiter);
// Release the queue lock and install the new waiter queue head.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state =
cv->SetWaiterQueueHead(requester, waiter_head, current_state);
waiter_queue_lock_guard.set_new_state(new_state);
}
// static
bool JSAtomicsCondition::WaitFor(Isolate* requester,
DirectHandle<JSAtomicsCondition> cv,
DirectHandle<JSAtomicsMutex> mutex,
std::optional<base::TimeDelta> timeout) {
DisallowGarbageCollection no_gc;
bool rv;
{
// Allocate a waiter queue node on-stack, since this thread is going to
// sleep and will be blocked anyway.
SyncWaiterQueueNode this_waiter(requester);
JSAtomicsCondition::QueueWaiter(requester, cv, &this_waiter);
// Release the mutex and wait for another thread to wake us up, reacquiring
// the mutex upon wakeup.
mutex->Unlock(requester);
if (timeout) {
rv = this_waiter.WaitFor(*timeout);
if (!rv) {
// If timed out, remove ourself from the waiter list, which is usually
// done by the thread performing the notifying.
std::atomic<StateT>* state = cv->AtomicStatePtr();
DequeueExplicit(
requester, cv, state, [&](WaiterQueueNode** waiter_head) {
WaiterQueueNode* dequeued = WaiterQueueNode::DequeueMatching(
waiter_head,
[&](WaiterQueueNode* node) { return node == &this_waiter; });
return dequeued ? 1 : 0;
});
}
} else {
this_waiter.Wait();
rv = true;
}
}
JSAtomicsMutex::Lock(requester, mutex);
return rv;
}
// static
uint32_t JSAtomicsCondition::DequeueExplicit(
Isolate* requester, DirectHandle<JSAtomicsCondition> cv,
std::atomic<StateT>* state, const DequeueAction& action_under_lock) {
// First acquire the queue lock, which is itself a spinlock.
StateT current_state = state->load(std::memory_order_relaxed);
if (!HasWaitersField::decode(current_state)) return 0;
WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
// Get the waiter queue head.
WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(requester);
// There's no waiter to wake up, release the queue lock by setting it to the
// empty state.
if (waiter_head == nullptr) {
StateT new_state = kEmptyState;
waiter_queue_lock_guard.set_new_state(new_state);
return 0;
}
uint32_t num_dequeued_waiters = action_under_lock(&waiter_head);
// Release the queue lock and install the new waiter queue head.
DCHECK_EQ(state->load(),
IsWaiterQueueLockedField::update(current_state, true));
StateT new_state =
cv->SetWaiterQueueHead(requester, waiter_head, current_state);
waiter_queue_lock_guard.set_new_state(new_state);
return num_dequeued_waiters;
}
// static
uint32_t JSAtomicsCondition::Notify(Isolate* requester,
DirectHandle<JSAtomicsCondition> cv,
uint32_t count) {
std::atomic<StateT>* state = cv->AtomicStatePtr();
// Dequeue count waiters.
return DequeueExplicit(
requester, cv, state, [=](WaiterQueueNode** waiter_head) -> uint32_t {
WaiterQueueNode* old_head;
if (count == 1) {
old_head = WaiterQueueNode::Dequeue(waiter_head);
if (!old_head) return 0;
old_head->Notify();
return 1;
}
if (count == kAllWaiters) {
old_head = *waiter_head;
*waiter_head = nullptr;
} else {
old_head = WaiterQueueNode::Split(waiter_head, count);
}
if (!old_head) return 0;
// Notify while holding the queue lock to avoid notifying
// waiters that have been deleted in other threads.
return old_head->NotifyAllInList();
});
}
// The lockAsync flow is controlled 2 chained promises, with lock_promise being
// the return value of the API.
// 1. `internal_waiting_promise`, which will be resolved either in the notify
// task or in the
// timeout task.
// 2. `lock_promise`, which will be resolved when the lock is acquired after
// waiting.
// static
MaybeDirectHandle<JSReceiver> JSAtomicsCondition::WaitAsync(
Isolate* requester, DirectHandle<JSAtomicsCondition> cv,
DirectHandle<JSAtomicsMutex> mutex,
std::optional<base::TimeDelta> timeout) {
Handle<JSPromise> internal_waiting_promise =
requester->factory()->NewJSPromise();
DirectHandle<Context> handler_context =
requester->factory()->NewBuiltinContext(requester->native_context(),
kAsyncContextLength);
handler_context->set(kMutexAsyncContextSlot, *mutex);
handler_context->set(kConditionVariableAsyncContextSlot, *cv);
DirectHandle<SharedFunctionInfo> info(
requester->heap()->atomics_condition_acquire_lock_sfi(), requester);
DirectHandle<JSFunction> lock_function =
Factory::JSFunctionBuilder{requester, info, handler_context}
.set_map(requester->strict_function_without_prototype_map())
.Build();
DirectHandle<JSReceiver> lock_promise;
ASSIGN_RETURN_ON_EXCEPTION(
requester, lock_promise,
PerformPromiseThen(requester, internal_waiting_promise, lock_function));
// Create a new async waiter node in the C++ heap. Its lifetime is managed by
// the requester's `async_waiter_queue_nodes` list.
WaitAsyncWaiterQueueNode* this_waiter =
WaitAsyncWaiterQueueNode::NewAsyncWaiterStoredInIsolate(
requester, cv, internal_waiting_promise);
QueueWaiter(requester, cv, this_waiter);
if (timeout) {
TaskRunner* taks_runner = this_waiter->task_runner();
auto task = std::make_unique<AsyncWaitTimeoutTask>(
requester->cancelable_task_manager(), this_waiter);
this_waiter->timeout_task_id_ = task->id();
taks_runner->PostNonNestableDelayedTask(std::move(task),
timeout->InSecondsF());
}
mutex->Unlock(requester);
// Keep the wait promise alive in the native context.
AddPromiseToNativeContext(requester, internal_waiting_promise);
return lock_promise;
}
// static
void JSAtomicsCondition::HandleAsyncTimeout(WaitAsyncWaiterQueueNode* waiter) {
Isolate* requester = waiter->requester_;
if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
// The native context was destroyed so the promise was already removed
// from the native context. Remove the node from the async unlocked waiter
// list.
WaitAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
return;
}
HandleScope scope(requester);
DirectHandle<JSAtomicsCondition> cv = waiter->GetSynchronizationPrimitive();
std::atomic<StateT>* state = cv->AtomicStatePtr();
uint32_t num_dequeued =
DequeueExplicit(requester, cv, state, [&](WaiterQueueNode** waiter_head) {
WaiterQueueNode* dequeued = WaiterQueueNode::DequeueMatching(
waiter_head, [&](WaiterQueueNode* node) { return node == waiter; });
return dequeued ? 1 : 0;
});
// If the waiter is not in the queue, the notify task is already in the event
// loop, so cancel the notify task.
if (num_dequeued == 0) {
TryAbortResult abort_result =
requester->cancelable_task_manager()->TryAbort(waiter->notify_task_id_);
DCHECK_EQ(abort_result, TryAbortResult::kTaskAborted);
USE(abort_result);
}
// Reset the timeout task id to kInvalidTaskId, otherwise the notify task will
// try to cancel it.
waiter->timeout_task_id_ = CancelableTaskManager::kInvalidTaskId;
JSAtomicsCondition::HandleAsyncNotify(waiter);
}
// static
void JSAtomicsCondition::HandleAsyncNotify(WaitAsyncWaiterQueueNode* waiter) {
Isolate* requester = waiter->requester_;
if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
// The native context was destroyed so the promise was already removed
// from the native context. Remove the node from the async unlocked waiter
// list.
WaitAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
return;
}
HandleScope scope(requester);
if (waiter->timeout_task_id_ != CancelableTaskManager::kInvalidTaskId) {
TryAbortResult abort_result =
requester->cancelable_task_manager()->TryAbort(
waiter->timeout_task_id_);
DCHECK_EQ(abort_result, TryAbortResult::kTaskAborted);
USE(abort_result);
}
v8::Context::Scope contextScope(waiter->GetNativeContext());
DirectHandle<JSPromise> promise = waiter->GetInternalWaitingPromise();
MaybeDirectHandle<Object> result =
JSPromise::Resolve(promise, requester->factory()->undefined_value());
USE(result);
WaitAsyncWaiterQueueNode::RemoveFromAsyncWaiterQueueList(waiter);
RemovePromiseFromNativeContext(requester, promise);
}
} // namespace internal
} // namespace v8