blob: 70c0a81f3d2b8c2ec4d86adb7b778d84bb5b0b6e [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/workers/experimental/task.h"
#include "third_party/blink/renderer/bindings/core/v8/script_function.h"
#include "third_party/blink/renderer/bindings/core/v8/serialization/serialized_script_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_function.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_task.h"
#include "third_party/blink/renderer/bindings/core/v8/worker_or_worklet_script_controller.h"
#include "third_party/blink/renderer/core/workers/experimental/task_worklet_global_scope.h"
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
#include "third_party/blink/renderer/core/workers/worker_or_worklet_global_scope.h"
#include "third_party/blink/renderer/platform/cross_thread_functional.h"
namespace blink {
// worker_thread_ only.
class TaskBase::AsyncFunctionCompleted : public ScriptFunction {
public:
static v8::Local<v8::Function> CreateFunction(ScriptState* script_state,
TaskBase* task,
State state) {
return (new AsyncFunctionCompleted(script_state, task, state))
->BindToV8Function();
}
ScriptValue Call(ScriptValue v) override {
task_->TaskCompletedOnWorkerThread(v.V8Value(), state_);
return ScriptValue();
}
private:
AsyncFunctionCompleted(ScriptState* script_state, TaskBase* task, State state)
: ScriptFunction(script_state), task_(task), state_(state) {}
CrossThreadPersistent<TaskBase> task_;
const State state_;
};
TaskBase::TaskBase(TaskType task_type,
ScriptState* script_state,
const ScriptValue& function,
const String& function_name)
: task_type_(task_type),
self_keep_alive_(this),
function_name_(function_name.IsolatedCopy()) {
DCHECK(IsMainThread());
DCHECK(task_type_ == TaskType::kUserInteraction ||
task_type_ == TaskType::kIdleTask);
// TODO(japhet): Handle serialization failures
v8::Isolate* isolate = script_state->GetIsolate();
if (!function.IsEmpty()) {
function_ = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, function.V8Value()->ToString(isolate));
}
}
void TaskBase::InitializeArgumentsOnMainThread(
ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const Vector<ScriptValue>& arguments) {
v8::Isolate* isolate = script_state->GetIsolate();
arguments_.resize(arguments.size());
HeapVector<Member<TaskBase>> prerequisites;
Vector<size_t> prerequisites_indices;
for (size_t i = 0; i < arguments_.size(); i++) {
// Normal case: if the argument isn't a Task, just serialize it.
if (!V8Task::hasInstance(arguments[i].V8Value(), isolate)) {
arguments_[i].serialized_value =
SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, arguments[i].V8Value());
continue;
}
TaskBase* prerequisite =
ToScriptWrappable(arguments[i].V8Value().As<v8::Object>())
->ToImpl<Task>();
prerequisites.push_back(prerequisite);
prerequisites_indices.push_back(i);
}
worker_thread_ = SelectThread(prerequisites, thread_provider);
worker_thread_->IncrementTasksInProgressCount();
if (prerequisites.IsEmpty()) {
MutexLocker lock(mutex_);
MaybeStartTask();
return;
}
// Prior to this point, other Task instances don't have a reference
// to |this| yet, so no need to lock mutex_. RegisterDependencies() populates
// those references, so RegisterDependencies() and any logic thereafter must
// consider the potential for data races.
RegisterDependencies(prerequisites, prerequisites_indices);
}
// static
ThreadPoolThread* TaskBase::SelectThread(
const HeapVector<Member<TaskBase>>& prerequisites,
ThreadPoolThreadProvider* thread_provider) {
DCHECK(IsMainThread());
HashCountedSet<ThreadPoolThread*> prerequisite_location_counts;
size_t max_prerequisite_location_count = 0;
ThreadPoolThread* max_prerequisite_thread = nullptr;
for (TaskBase* prerequisite : prerequisites) {
// Track which thread the prerequisites will run on. Put this task on the
// thread where the most prerequisites reside.
ThreadPoolThread* thread = prerequisite->worker_thread_;
prerequisite_location_counts.insert(thread);
unsigned thread_count = prerequisite_location_counts.count(thread);
if (thread_count > max_prerequisite_location_count) {
max_prerequisite_location_count = thread_count;
max_prerequisite_thread = thread;
}
}
return max_prerequisite_thread ? max_prerequisite_thread
: thread_provider->GetLeastBusyThread();
}
// Should only be called from constructor. Split out in to a helper because
// clang appears to exempt constructors from thread safety analysis.
void TaskBase::RegisterDependencies(
const HeapVector<Member<TaskBase>>& prerequisites,
const Vector<size_t>& prerequisites_indices) {
DCHECK(IsMainThread());
{
MutexLocker lock(mutex_);
prerequisites_remaining_ = prerequisites.size();
}
for (size_t i = 0; i < prerequisites.size(); i++) {
TaskBase* prerequisite = prerequisites[i];
size_t prerequisite_index = prerequisites_indices[i];
{
MutexLocker lock(prerequisite->mutex_);
if (!prerequisite->HasFinished()) {
prerequisite->dependents_.emplace_back(
MakeGarbageCollected<Dependent>(this, prerequisite_index));
continue;
}
}
PostCrossThreadTask(
*prerequisite->worker_thread_->GetTaskRunner(task_type_), FROM_HERE,
CrossThreadBind(&TaskBase::PassResultToDependentOnWorkerThread,
WrapCrossThreadPersistent(prerequisite),
prerequisite_index, WrapCrossThreadPersistent(this)));
}
}
TaskBase::~TaskBase() {
DCHECK(IsMainThread());
DCHECK(HasFinished());
DCHECK(!function_);
DCHECK(arguments_.IsEmpty());
DCHECK(!prerequisites_remaining_);
DCHECK(dependents_.IsEmpty());
}
scoped_refptr<SerializedScriptValue> TaskBase::GetSerializedResult() {
DCHECK(IsMainThread() || worker_thread_->IsCurrentThread());
MutexLocker lock(mutex_);
DCHECK(HasFinished());
if (!serialized_result_ && worker_thread_->IsCurrentThread()) {
DCHECK(v8_result_);
ScriptState::Scope scope(
worker_thread_->GlobalScope()->ScriptController()->GetScriptState());
v8::Isolate* isolate = ToIsolate(worker_thread_->GlobalScope());
serialized_result_ = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, v8_result_->GetResult(isolate));
}
return serialized_result_;
}
void TaskBase::PassResultToDependentOnWorkerThread(size_t dependent_index,
TaskBase* dependent) {
DCHECK(worker_thread_->IsCurrentThread());
bool failed = false;
{
MutexLocker lock(mutex_);
DCHECK(HasFinished());
failed = state_ == State::kFailed;
}
// Only serialize if the dependent needs the result on a different thread.
// Otherwise, use the unserialized result from v8.
scoped_refptr<SerializedScriptValue> serialized_result =
dependent->IsTargetThreadForArguments() ? nullptr : GetSerializedResult();
V8ResultHolder* v8_result =
dependent->IsTargetThreadForArguments() ? v8_result_.Get() : nullptr;
dependent->PrerequisiteFinished(dependent_index, v8_result, serialized_result,
failed);
}
void TaskBase::PrerequisiteFinished(
size_t index,
V8ResultHolder* v8_result,
scoped_refptr<SerializedScriptValue> serialized_result,
bool failed) {
DCHECK(v8_result || serialized_result);
DCHECK(!v8_result || !serialized_result);
MutexLocker lock(mutex_);
DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
DCHECK_GT(prerequisites_remaining_, 0u);
prerequisites_remaining_--;
if (failed)
AdvanceState(State::kCancelPending);
arguments_[index].v8_value = v8_result;
arguments_[index].serialized_value = serialized_result;
MaybeStartTask();
}
void TaskBase::MaybeStartTask() {
if (prerequisites_remaining_)
return;
DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
PostCrossThreadTask(*worker_thread_->GetTaskRunner(task_type_), FROM_HERE,
CrossThreadBind(&TaskBase::StartTaskOnWorkerThread,
WrapCrossThreadPersistent(this)));
}
bool TaskBase::WillStartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
{
MutexLocker lock(mutex_);
DCHECK(!prerequisites_remaining_);
switch (state_) {
case State::kPending:
AdvanceState(State::kStarted);
break;
case State::kCancelPending:
return false;
case State::kStarted:
case State::kCompleted:
case State::kFailed:
NOTREACHED();
break;
}
}
return true;
}
void TaskBase::TaskCompletedOnWorkerThread(v8::Local<v8::Value> v8_result,
State state) {
DCHECK(worker_thread_->IsCurrentThread());
v8_result_ =
new V8ResultHolder(ToIsolate(worker_thread_->GlobalScope()), v8_result);
function_ = nullptr;
arguments_.clear();
Vector<CrossThreadPersistent<Dependent>> dependents_to_notify;
{
MutexLocker lock(mutex_);
AdvanceState(state);
dependents_to_notify.swap(dependents_);
}
for (auto& dependent : dependents_to_notify)
PassResultToDependentOnWorkerThread(dependent->index, dependent->task);
PostCrossThreadTask(
*worker_thread_->GetParentExecutionContextTaskRunners()->Get(
TaskType::kInternalWorker),
FROM_HERE,
CrossThreadBind(&TaskBase::TaskCompleted, WrapCrossThreadPersistent(this),
state == State::kCompleted));
}
void TaskBase::RunTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
// No other thread should be touching function_ or arguments_ at this point,
// so no mutex needed while actually running the task.
WorkerOrWorkletGlobalScope* global_scope = worker_thread_->GlobalScope();
ScriptState::Scope scope(global_scope->ScriptController()->GetScriptState());
v8::Isolate* isolate = ToIsolate(global_scope);
v8::Local<v8::Context> context = isolate->GetCurrentContext();
v8::Local<v8::Function> script_function;
v8::Local<v8::Value> receiver;
if (function_) {
String core_script =
"(" + ToCoreString(function_->Deserialize(isolate).As<v8::String>()) +
")";
v8::MaybeLocal<v8::Script> script =
v8::Script::Compile(context, V8String(isolate, core_script));
script_function = script.ToLocalChecked()
->Run(context)
.ToLocalChecked()
.As<v8::Function>();
receiver = script_function;
} else if (worker_thread_->IsWorklet()) {
TaskWorkletGlobalScope* task_worklet_global_scope =
static_cast<TaskWorkletGlobalScope*>(global_scope);
script_function = task_worklet_global_scope->GetProcessFunctionForName(
function_name_, isolate);
receiver =
task_worklet_global_scope->GetInstanceForName(function_name_, isolate);
}
if (script_function.IsEmpty()) {
TaskCompletedOnWorkerThread(V8String(isolate, "Invalid task"),
State::kFailed);
return;
}
Vector<v8::Local<v8::Value>> params(arguments_.size());
for (size_t i = 0; i < arguments_.size(); i++) {
DCHECK(!arguments_[i].serialized_value || !arguments_[i].v8_value);
if (arguments_[i].serialized_value)
params[i] = arguments_[i].serialized_value->Deserialize(isolate);
else
params[i] = arguments_[i].v8_value->GetResult(isolate);
}
v8::TryCatch block(isolate);
v8::MaybeLocal<v8::Value> ret =
script_function->Call(context, receiver, params.size(), params.data());
if (block.HasCaught()) {
TaskCompletedOnWorkerThread(block.Exception()->ToString(isolate),
State::kFailed);
return;
}
DCHECK(!ret.IsEmpty());
v8::Local<v8::Value> return_value = ret.ToLocalChecked();
if (return_value->IsPromise()) {
v8::Local<v8::Promise> promise = return_value.As<v8::Promise>();
if (promise->State() == v8::Promise::kPending) {
// Wait for the promise to resolve before calling
// TaskCompletedOnWorkerThread.
ScriptState* script_state = ScriptState::Current(isolate);
ScriptPromise(script_state, promise)
.Then(AsyncFunctionCompleted::CreateFunction(script_state, this,
State::kCompleted),
AsyncFunctionCompleted::CreateFunction(script_state, this,
State::kFailed));
return;
}
return_value = promise->Result();
}
TaskCompletedOnWorkerThread(return_value, State::kCompleted);
}
void TaskBase::TaskCompleted(bool was_successful) {
DCHECK(IsMainThread());
worker_thread_->DecrementTasksInProgressCount();
self_keep_alive_.Clear();
}
void TaskBase::AdvanceState(State new_state) {
switch (new_state) {
case State::kPending:
NOTREACHED() << "kPending should only be set via initialization";
break;
case State::kStarted:
DCHECK_EQ(State::kPending, state_);
break;
case State::kCancelPending:
DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
break;
case State::kCompleted:
DCHECK_EQ(State::kStarted, state_);
break;
case State::kFailed:
DCHECK(state_ == State::kStarted || state_ == State::kCancelPending);
break;
}
state_ = new_state;
}
Vector<ScriptValue> GetResolverArgument(ScriptState* script_state, Task* task) {
v8::Isolate* isolate = script_state->GetIsolate();
return Vector<ScriptValue>({ScriptValue(
script_state,
ToV8(task, isolate->GetCurrentContext()->Global(), isolate))});
}
ScriptPromise Task::result(ScriptState* script_state) {
DCHECK(IsMainThread());
if (!resolve_task_) {
resolve_task_ =
MakeGarbageCollected<ResolveTask>(script_state, task_type_, this);
}
return resolve_task_->GetPromise();
}
void Task::cancel() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kPending)
AdvanceState(State::kCancelPending);
}
void Task::StartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
if (!WillStartTaskOnWorkerThread()) {
WorkerOrWorkletGlobalScope* global_scope = worker_thread_->GlobalScope();
v8::Isolate* isolate = ToIsolate(global_scope);
ScriptState::Scope scope(
global_scope->ScriptController()->GetScriptState());
TaskCompletedOnWorkerThread(V8String(isolate, "Task aborted"),
State::kFailed);
return;
}
RunTaskOnWorkerThread();
}
void Task::Trace(Visitor* visitor) {
ScriptWrappable::Trace(visitor);
TaskBase::Trace(visitor);
visitor->Trace(resolve_task_);
}
ResolveTask::ResolveTask(ScriptState* script_state,
TaskType task_type,
Task* prerequisite)
: TaskBase(task_type, script_state, ScriptValue(), String()),
resolver_(ScriptPromiseResolver::Create(script_state)) {
DCHECK(IsMainThread());
// It's safe to pass a nullptr ThreadPoolThreadProivder here because it
// is only used to select a thread if there are no prerequisites, but a
// ResolveTask always has exactly one prerequisite.
InitializeArgumentsOnMainThread(
nullptr, script_state, GetResolverArgument(script_state, prerequisite));
}
void ResolveTask::StartTaskOnWorkerThread() {
// Just take the sole argument and use it as the return value that will be
// given to the promise resolver.
{
MutexLocker lock(mutex_);
serialized_result_ = arguments_[0].serialized_value;
}
TaskCompletedOnWorkerThread(
v8::Local<v8::Value>(),
WillStartTaskOnWorkerThread() ? State::kCompleted : State::kFailed);
}
void ResolveTask::TaskCompleted(bool was_successful) {
DCHECK(IsMainThread());
ScriptState* script_state = resolver_->GetScriptState();
if (!script_state->ContextIsValid())
return;
ScriptState::Scope scope(script_state);
v8::Local<v8::Value> value =
GetSerializedResult()->Deserialize(script_state->GetIsolate());
if (was_successful)
resolver_->Resolve(value);
else
resolver_->Reject(v8::Exception::Error(value.As<v8::String>()));
TaskBase::TaskCompleted(was_successful);
}
void ResolveTask::Trace(Visitor* visitor) {
TaskBase::Trace(visitor);
visitor->Trace(resolver_);
}
} // namespace blink