[wasm] Switch compilation to Jobs API

Use the new jobs API for WebAssembly compilation. This avoids having to
schedule as many background tasks as there are worker threads. Instead
the one job specifies the maximum concurrency, which changes dynamically
as new compile jobs become available.
This also avoids the artificial deadline we used to ensure that other
tasks get some share of the CPU resources if needed.

Even though this CL moves actual wasm function completely over to the
Jobs API, other similar tasks (like wrapper compilation) are still using
the Task API and need to be ported in a follow-up CL.
Also, we are still using the same priority for baseline compilation and
tier up. We should split this in a follow-up CL to have two jobs with
different priorities. This will also allow us to only block on baseline
compilation where we currently block on both.

R=ahaas@chromium.org
CC=gab@chromium.org

Bug: chromium:1101340
Change-Id: I5656697753346e5fdb15d578425cdb949ac6e364
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_rel_ng
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_isolates_rel_ng
Cq-Include-Trybots: luci.chromium.try:linux-rel
Cq-Include-Trybots: luci.v8.try:v8_linux_blink_rel
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2280100
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Reviewed-by: Andreas Haas <ahaas@chromium.org>
Reviewed-by: Thibaud Michaud <thibaudm@chromium.org>
Cr-Commit-Position: refs/heads/master@{#69239}
diff --git a/src/wasm/compilation-environment.h b/src/wasm/compilation-environment.h
index 1b27106..29b359e 100644
--- a/src/wasm/compilation-environment.h
+++ b/src/wasm/compilation-environment.h
@@ -13,6 +13,9 @@
 #include "src/wasm/wasm-tier.h"
 
 namespace v8 {
+
+class JobHandle;
+
 namespace internal {
 
 class Counters;
@@ -104,7 +107,7 @@
 
   ~CompilationState();
 
-  void AbortCompilation();
+  void CancelCompilation();
 
   void SetError();
 
diff --git a/src/wasm/module-compiler.cc b/src/wasm/module-compiler.cc
index fdfd52d..4957ddc 100644
--- a/src/wasm/module-compiler.cc
+++ b/src/wasm/module-compiler.cc
@@ -406,6 +406,39 @@
   }
 };
 
+// {JobHandle} is not thread safe in general (at least both the
+// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect
+// concurrent accesses via a mutex.
+class ThreadSafeJobHandle {
+ public:
+  explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle)
+      : job_handle_(std::move(job_handle)) {}
+
+  void NotifyConcurrencyIncrease() {
+    base::MutexGuard guard(&mutex_);
+    job_handle_->NotifyConcurrencyIncrease();
+  }
+
+  void Join() {
+    base::MutexGuard guard(&mutex_);
+    job_handle_->Join();
+  }
+
+  void Cancel() {
+    base::MutexGuard guard(&mutex_);
+    job_handle_->Cancel();
+  }
+
+  bool IsRunning() const {
+    base::MutexGuard guard(&mutex_);
+    return job_handle_->IsRunning();
+  }
+
+ private:
+  mutable base::Mutex mutex_;
+  std::shared_ptr<JobHandle> job_handle_;
+};
+
 // The {CompilationStateImpl} keeps track of the compilation state of the
 // owning NativeModule, i.e. which functions are left to be compiled.
 // It contains a task manager to allow parallel and asynchronous background
@@ -416,9 +449,9 @@
   CompilationStateImpl(const std::shared_ptr<NativeModule>& native_module,
                        std::shared_ptr<Counters> async_counters);
 
-  // Cancel all background compilation and wait for all tasks to finish. Call
-  // this before destructing this object.
-  void AbortCompilation();
+  // Cancel all background compilation, without waiting for compile tasks to
+  // finish.
+  void CancelCompilation();
 
   // Initialize compilation progress. Set compilation tiers to expect for
   // baseline and top tier compilation. Must be set before {AddCompilationUnits}
@@ -456,10 +489,14 @@
   void OnFinishedUnits(Vector<WasmCode*>);
   void OnFinishedJSToWasmWrapperUnits(int num);
 
-  void OnBackgroundTaskStopped(int task_id, const WasmFeatures& detected);
-  void UpdateDetectedFeatures(const WasmFeatures& detected);
+  int GetFreeCompileTaskId();
+  void OnCompilationStopped(int task_id, const WasmFeatures& detected);
   void PublishDetectedFeatures(Isolate*);
-  void RestartBackgroundTasks();
+  // Ensure that a compilation job is running, and increase its concurrency if
+  // needed.
+  void ScheduleCompileJobForNewUnits(int new_units);
+
+  size_t NumOutstandingCompilations() const;
 
   void SetError();
 
@@ -503,29 +540,6 @@
     return background_compile_token_;
   }
 
-  double GetCompilationDeadline(double now) {
-    // Execute for at least 50ms. Try to distribute deadlines of different tasks
-    // such that every 5ms one task stops. No task should execute longer than
-    // 200ms though.
-    constexpr double kMinLimit = 50. / base::Time::kMillisecondsPerSecond;
-    constexpr double kMaxLimit = 200. / base::Time::kMillisecondsPerSecond;
-    constexpr double kGapBetweenTasks = 5. / base::Time::kMillisecondsPerSecond;
-    double min_deadline = now + kMinLimit;
-    double max_deadline = now + kMaxLimit;
-    double next_deadline =
-        next_compilation_deadline_.load(std::memory_order_relaxed);
-    while (true) {
-      double deadline =
-          std::max(min_deadline, std::min(max_deadline, next_deadline));
-      if (next_compilation_deadline_.compare_exchange_weak(
-              next_deadline, deadline + kGapBetweenTasks,
-              std::memory_order_relaxed)) {
-        return deadline;
-      }
-      // Otherwise, retry with the updated {next_deadline}.
-    }
-  }
-
  private:
   // Trigger callbacks according to the internal counters below
   // (outstanding_...), plus the given events.
@@ -541,17 +555,14 @@
   // using relaxed semantics.
   std::atomic<bool> compile_failed_{false};
 
-  const int max_background_tasks_ = 0;
+  // The atomic counter is shared with the compilation job. It's increased if
+  // more units are added, and decreased when the queue drops to zero.
+  std::shared_ptr<std::atomic<int>> current_compile_concurrency_ =
+      std::make_shared<std::atomic<int>>(0);
+  const int max_compile_concurrency_ = 0;
 
   CompilationUnitQueues compilation_unit_queues_;
 
-  // Each compilation task executes until a certain deadline. The
-  // {CompilationStateImpl} orchestrates the deadlines such that they are
-  // evenly distributed and not all tasks stop at the same time. This removes
-  // contention during publishing of compilation results and also gives other
-  // tasks a fair chance to utilize the worker threads on a regular basis.
-  std::atomic<double> next_compilation_deadline_{0};
-
   // Index of the next wrapper to compile in {js_to_wasm_wrapper_units_}.
   std::atomic<int> js_to_wasm_wrapper_id_{0};
   // Wrapper compilation units are stored in shared_ptrs so that they are kept
@@ -566,9 +577,11 @@
   //////////////////////////////////////////////////////////////////////////////
   // Protected by {mutex_}:
 
-  // Set of unused task ids; <= {max_background_tasks_} many.
+  // Set of unused task ids; <= {max_compile_concurrency_} many.
   std::vector<int> available_task_ids_;
 
+  std::shared_ptr<ThreadSafeJobHandle> current_compile_job_;
+
   // Features detected to be used in this module. Features can be detected
   // as a module is being compiled.
   WasmFeatures detected_features_ = WasmFeatures::None();
@@ -657,7 +670,7 @@
 
 CompilationState::~CompilationState() { Impl(this)->~CompilationStateImpl(); }
 
-void CompilationState::AbortCompilation() { Impl(this)->AbortCompilation(); }
+void CompilationState::CancelCompilation() { Impl(this)->CancelCompilation(); }
 
 void CompilationState::SetError() { Impl(this)->SetError(); }
 
@@ -1029,97 +1042,86 @@
   counters->wasm_reloc_size()->Increment(code.relocation_info().length());
 }
 
-constexpr int kMainThreadTaskId = -1;
+enum CompilationExecutionResult : int8_t { kNoMoreUnits, kYield };
 
-bool ExecuteJSToWasmWrapperCompilationUnits(
-    const std::shared_ptr<BackgroundCompileToken>& token) {
+CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
+    const std::shared_ptr<BackgroundCompileToken>& token,
+    JobDelegate* delegate) {
   std::shared_ptr<JSToWasmWrapperCompilationUnit> wrapper_unit = nullptr;
   int num_processed_wrappers = 0;
-  do {
-    // TODO(thibaudm): Reschedule the compilation task if it takes too long, so
-    // that the background thread is not blocked.
-    {
-      BackgroundCompileScope compile_scope(token);
-      if (compile_scope.cancelled()) return false;
-      wrapper_unit = compile_scope.compilation_state()
-                         ->GetNextJSToWasmWrapperCompilationUnit();
-    }
-    if (wrapper_unit) {
-      wrapper_unit->Execute();
-      ++num_processed_wrappers;
-    }
-  } while (wrapper_unit);
-  if (num_processed_wrappers > 0) {
+
+  {
     BackgroundCompileScope compile_scope(token);
-    if (compile_scope.cancelled()) return false;
-    compile_scope.compilation_state()->OnFinishedJSToWasmWrapperUnits(
-        num_processed_wrappers);
+    if (compile_scope.cancelled()) return kNoMoreUnits;
+    wrapper_unit = compile_scope.compilation_state()
+                       ->GetNextJSToWasmWrapperCompilationUnit();
+    if (!wrapper_unit) return kNoMoreUnits;
   }
-  return true;
+
+  while (true) {
+    wrapper_unit->Execute();
+    ++num_processed_wrappers;
+    bool yield = delegate->ShouldYield();
+    BackgroundCompileScope compile_scope(token);
+    if (compile_scope.cancelled()) return kNoMoreUnits;
+    if (yield ||
+        !(wrapper_unit = compile_scope.compilation_state()
+                             ->GetNextJSToWasmWrapperCompilationUnit())) {
+      compile_scope.compilation_state()->OnFinishedJSToWasmWrapperUnits(
+          num_processed_wrappers);
+      return yield ? kYield : kNoMoreUnits;
+    }
+  }
 }
 
-// Run by the main thread and background tasks to take part in compilation.
-// Returns whether any units were executed.
-bool ExecuteCompilationUnits(
+// Run by the {BackgroundCompileJob} (on any thread).
+CompilationExecutionResult ExecuteCompilationUnits(
     const std::shared_ptr<BackgroundCompileToken>& token, Counters* counters,
-    int task_id, CompileBaselineOnly baseline_only) {
-  TRACE_COMPILE("Compiling (task %d)...\n", task_id);
+    JobDelegate* delegate, CompileBaselineOnly baseline_only) {
   TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits");
 
   // Execute JS to Wasm wrapper units first, so that they are ready to be
   // finalized by the main thread when the kFinishedBaselineCompilation event is
   // triggered.
-  if (!ExecuteJSToWasmWrapperCompilationUnits(token)) {
-    return false;
+  if (ExecuteJSToWasmWrapperCompilationUnits(token, delegate) == kYield) {
+    return kYield;
   }
 
-  const bool is_foreground = task_id == kMainThreadTaskId;
-  // The main thread uses task id 0, which might collide with one of the
-  // background tasks. This is fine, as it will only cause some contention on
-  // the one queue, but work otherwise.
-  if (is_foreground) task_id = 0;
-
-  Platform* platform = V8::GetCurrentPlatform();
-  double compilation_start = platform->MonotonicallyIncreasingTime();
-
   // These fields are initialized in a {BackgroundCompileScope} before
   // starting compilation.
-  double deadline = 0;
   base::Optional<CompilationEnv> env;
   std::shared_ptr<WireBytesStorage> wire_bytes;
   std::shared_ptr<const WasmModule> module;
-  WasmEngine* wasm_engine = nullptr;
+  WasmEngine* wasm_engine;
+  int task_id;
   base::Optional<WasmCompilationUnit> unit;
+
   WasmFeatures detected_features = WasmFeatures::None();
 
-  auto stop = [is_foreground, task_id,
-               &detected_features](BackgroundCompileScope& compile_scope) {
-    if (is_foreground) {
-      compile_scope.compilation_state()->UpdateDetectedFeatures(
-          detected_features);
-    } else {
-      compile_scope.compilation_state()->OnBackgroundTaskStopped(
-          task_id, detected_features);
-    }
+  auto stop = [&detected_features,
+               &task_id](BackgroundCompileScope& compile_scope) {
+    compile_scope.compilation_state()->OnCompilationStopped(task_id,
+                                                            detected_features);
   };
 
   // Preparation (synchronized): Initialize the fields above and get the first
   // compilation unit.
   {
     BackgroundCompileScope compile_scope(token);
-    if (compile_scope.cancelled()) return false;
+    if (compile_scope.cancelled()) return kNoMoreUnits;
     auto* compilation_state = compile_scope.compilation_state();
-    deadline = compilation_state->GetCompilationDeadline(compilation_start);
     env.emplace(compile_scope.native_module()->CreateCompilationEnv());
     wire_bytes = compilation_state->GetWireBytesStorage();
     module = compile_scope.native_module()->shared_module();
     wasm_engine = compile_scope.native_module()->engine();
+    task_id = compilation_state->GetFreeCompileTaskId();
     unit = compilation_state->GetNextCompilationUnit(task_id, baseline_only);
     if (!unit) {
       stop(compile_scope);
-      return false;
+      return kNoMoreUnits;
     }
   }
+  TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
 
   std::vector<WasmCompilationResult> results_to_publish;
 
@@ -1167,10 +1169,12 @@
         wasm_engine, &env.value(), wire_bytes, counters, &detected_features);
     results_to_publish.emplace_back(std::move(result));
 
+    bool yield = delegate->ShouldYield();
+
     // (synchronized): Publish the compilation result and get the next unit.
     {
       BackgroundCompileScope compile_scope(token);
-      if (compile_scope.cancelled()) return true;
+      if (compile_scope.cancelled()) return kNoMoreUnits;
       if (!results_to_publish.back().succeeded()) {
         // Compile error.
         compile_scope.compilation_state()->SetError();
@@ -1180,23 +1184,19 @@
       }
 
       // Get next unit.
-      if (FLAG_predictable ||
-          deadline < platform->MonotonicallyIncreasingTime()) {
-        unit = {};
-      } else {
-        unit = compile_scope.compilation_state()->GetNextCompilationUnit(
-            task_id, baseline_only);
-      }
-
-      if (!unit) {
+      if (yield ||
+          !(unit = compile_scope.compilation_state()->GetNextCompilationUnit(
+                task_id, baseline_only))) {
         publish_results(&compile_scope);
         stop(compile_scope);
-        return true;
-      } else if (unit->tier() == ExecutionTier::kTurbofan) {
-        // Before executing a TurboFan unit, ensure to publish all previous
-        // units. If we compiled Liftoff before, we need to publish them anyway
-        // to ensure fast completion of baseline compilation, if we compiled
-        // TurboFan before, we publish to reduce peak memory consumption.
+        return yield ? kYield : kNoMoreUnits;
+      }
+
+      // Before executing a TurboFan unit, ensure to publish all previous
+      // units. If we compiled Liftoff before, we need to publish them anyway
+      // to ensure fast completion of baseline compilation, if we compiled
+      // TurboFan before, we publish to reduce peak memory consumption.
+      if (unit->tier() == ExecutionTier::kTurbofan) {
         publish_results(&compile_scope);
       }
     }
@@ -1205,7 +1205,7 @@
   DCHECK(compilation_failed);
   USE(compilation_failed);
   token->Cancel();
-  return true;
+  return kNoMoreUnits;
 }
 
 using JSToWasmWrapperKey = std::pair<bool, FunctionSig>;
@@ -1387,18 +1387,8 @@
   // Initialize the compilation units and kick off background compile tasks.
   InitializeCompilationUnits(isolate, native_module);
 
-  // If tiering is disabled, the main thread can execute any unit (all of them
-  // are part of initial compilation). Otherwise, just execute baseline units.
-  bool is_tiering = compilation_state->compile_mode() == CompileMode::kTiering;
-  auto baseline_only = is_tiering ? kBaselineOnly : kBaselineOrTopTier;
-  // The main threads contributes to the compilation.
-  while (ExecuteCompilationUnits(compilation_state->background_compile_token(),
-                                 isolate->counters(), kMainThreadTaskId,
-                                 baseline_only)) {
-    // Continue executing compilation units.
-  }
-
   // Now wait until baseline compilation finished.
+  // TODO(clemensb): Contribute to compilation while waiting.
   baseline_finished_semaphore->Wait();
 
   compilation_state->PublishDetectedFeatures(isolate);
@@ -1412,26 +1402,50 @@
 }
 
 // The runnable task that performs compilations in the background.
-class BackgroundCompileTask : public CancelableTask {
+class BackgroundCompileJob : public JobTask {
  public:
-  explicit BackgroundCompileTask(CancelableTaskManager* manager,
-                                 std::shared_ptr<BackgroundCompileToken> token,
-                                 std::shared_ptr<Counters> async_counters,
-                                 int task_id)
-      : CancelableTask(manager),
-        token_(std::move(token)),
+  explicit BackgroundCompileJob(
+      std::shared_ptr<BackgroundCompileToken> token,
+      std::shared_ptr<Counters> async_counters,
+      std::shared_ptr<std::atomic<int>> current_concurrency,
+      int max_concurrency)
+      : token_(std::move(token)),
         async_counters_(std::move(async_counters)),
-        task_id_(task_id) {}
+        current_concurrency_(std::move(current_concurrency)),
+        max_concurrency_(max_concurrency) {}
 
-  void RunInternal() override {
-    ExecuteCompilationUnits(token_, async_counters_.get(), task_id_,
-                            kBaselineOrTopTier);
+  void Run(JobDelegate* delegate) override {
+    if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
+                                kBaselineOrTopTier) == kYield) {
+      return;
+    }
+    // Otherwise we didn't find any more units to execute. Reduce the available
+    // concurrency to zero, but then check whether any more units were added in
+    // the meantime, and increase back if necessary.
+    current_concurrency_->store(0);
+    {
+      BackgroundCompileScope scope(token_);
+      if (scope.cancelled()) return;
+      size_t outstanding_units =
+          scope.compilation_state()->NumOutstandingCompilations();
+      if (outstanding_units == 0) return;
+      // On a race between this thread and the thread which scheduled the units,
+      // this might increase concurrency more than needed, which is fine. It
+      // will be reduced again when the first task finds no more work to do.
+      scope.compilation_state()->ScheduleCompileJobForNewUnits(
+          static_cast<int>(outstanding_units));
+    }
+  }
+
+  size_t GetMaxConcurrency() const override {
+    return std::min(max_concurrency_, current_concurrency_->load());
   }
 
  private:
   const std::shared_ptr<BackgroundCompileToken> token_;
   const std::shared_ptr<Counters> async_counters_;
-  const int task_id_;
+  const std::shared_ptr<std::atomic<int>> current_concurrency_;
+  const int max_concurrency_;
 };
 
 }  // namespace
@@ -1507,15 +1521,8 @@
         }
       });
 
-  // The main thread contributes to the compilation.
-  constexpr Counters* kNoCounters = nullptr;
-  while (ExecuteCompilationUnits(compilation_state->background_compile_token(),
-                                 kNoCounters, kMainThreadTaskId,
-                                 kBaselineOnly)) {
-    // Continue executing compilation units.
-  }
-
   // Now wait until all compilation units finished.
+  // TODO(clemensb): Contribute to compilation while waiting.
   recompilation_finished_semaphore->Wait();
   DCHECK(!compilation_state->failed());
 }
@@ -1623,7 +1630,7 @@
   // If the runtime objects were not created yet, then initial compilation did
   // not finish yet. In this case we can abort compilation.
   if (native_module_ && module_object_.is_null()) {
-    Impl(native_module_->compilation_state())->AbortCompilation();
+    Impl(native_module_->compilation_state())->CancelCompilation();
   }
   // Tell the streaming decoder that the AsyncCompileJob is not available
   // anymore.
@@ -2191,7 +2198,7 @@
   // Check if there is already a CompiledModule, in which case we have to clean
   // up the CompilationStateImpl as well.
   if (job_->native_module_) {
-    Impl(job_->native_module_->compilation_state())->AbortCompilation();
+    Impl(job_->native_module_->compilation_state())->CancelCompilation();
 
     job_->DoSync<AsyncCompileJob::DecodeFail,
                  AsyncCompileJob::kUseExistingForegroundTask>(error);
@@ -2489,7 +2496,9 @@
   return true;
 }
 
-int GetMaxBackgroundTasks() {
+// TODO(wasm): Try to avoid the {NumberOfWorkerThreads} calls, grow queues
+// dynamically instead.
+int GetMaxCompileConcurrency() {
   int num_worker_threads = V8::GetCurrentPlatform()->NumberOfWorkerThreads();
   return std::min(FLAG_wasm_num_compilation_tasks, num_worker_threads);
 }
@@ -2505,17 +2514,17 @@
                         ? CompileMode::kTiering
                         : CompileMode::kRegular),
       async_counters_(std::move(async_counters)),
-      max_background_tasks_(std::max(GetMaxBackgroundTasks(), 1)),
-      compilation_unit_queues_(max_background_tasks_),
-      available_task_ids_(max_background_tasks_) {
-  for (int i = 0; i < max_background_tasks_; ++i) {
+      max_compile_concurrency_(std::max(GetMaxCompileConcurrency(), 1)),
+      compilation_unit_queues_(max_compile_concurrency_),
+      available_task_ids_(max_compile_concurrency_) {
+  for (int i = 0; i < max_compile_concurrency_; ++i) {
     // Ids are popped on task creation, so reverse this list. This ensures that
     // the first background task gets id 0.
-    available_task_ids_[i] = max_background_tasks_ - 1 - i;
+    available_task_ids_[i] = max_compile_concurrency_ - 1 - i;
   }
 }
 
-void CompilationStateImpl::AbortCompilation() {
+void CompilationStateImpl::CancelCompilation() {
   background_compile_token_->Cancel();
   // No more callbacks after abort.
   base::MutexGuard callbacks_guard(&callbacks_mutex_);
@@ -2690,7 +2699,9 @@
                                    js_to_wasm_wrapper_units.begin(),
                                    js_to_wasm_wrapper_units.end());
 
-  RestartBackgroundTasks();
+  size_t total_units = baseline_units.size() + top_tier_units.size() +
+                       js_to_wasm_wrapper_units.size();
+  ScheduleCompileJobForNewUnits(static_cast<int>(total_units));
 }
 
 void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
@@ -2887,26 +2898,26 @@
   }
 }
 
-void CompilationStateImpl::OnBackgroundTaskStopped(
-    int task_id, const WasmFeatures& detected) {
-  {
-    base::MutexGuard guard(&mutex_);
-    DCHECK_EQ(0, std::count(available_task_ids_.begin(),
-                            available_task_ids_.end(), task_id));
-    DCHECK_GT(max_background_tasks_, available_task_ids_.size());
-    available_task_ids_.push_back(task_id);
-    detected_features_.Add(detected);
+int CompilationStateImpl::GetFreeCompileTaskId() {
+  base::MutexGuard guard(&mutex_);
+  if (V8_UNLIKELY(available_task_ids_.empty())) {
+    FATAL(
+        "The platform is running the compile job with more concurrency than "
+        "returned by {GetMaxConcurrency()}.");
   }
-
-  // The background task could have stopped while we were adding new units, or
-  // because it reached its deadline. In both cases we need to restart tasks to
-  // avoid a potential deadlock.
-  RestartBackgroundTasks();
+  int id = available_task_ids_.back();
+  available_task_ids_.pop_back();
+  return id;
 }
 
-void CompilationStateImpl::UpdateDetectedFeatures(
-    const WasmFeatures& detected) {
+void CompilationStateImpl::OnCompilationStopped(int task_id,
+                                                const WasmFeatures& detected) {
+  DCHECK_GT(max_compile_concurrency_, task_id);
   base::MutexGuard guard(&mutex_);
+  DCHECK_EQ(0, std::count(available_task_ids_.begin(),
+                          available_task_ids_.end(), task_id));
+  available_task_ids_.push_back(task_id);
+  DCHECK_GE(max_compile_concurrency_, available_task_ids_.size());
   detected_features_.Add(detected);
 }
 
@@ -2918,43 +2929,48 @@
   UpdateFeatureUseCounts(isolate, detected_features_);
 }
 
-void CompilationStateImpl::RestartBackgroundTasks() {
-  // Create new tasks, but only spawn them after releasing the mutex, because
-  // some platforms (e.g. the predictable platform) might execute tasks right
-  // away.
-  std::vector<std::unique_ptr<Task>> new_tasks;
-  {
-    base::MutexGuard guard(&mutex_);
-    // Explicit fast path (quite common): If no more task ids are available
-    // (i.e. {max_background_tasks_} tasks are already running), spawn nothing.
-    if (available_task_ids_.empty()) return;
-    // No need to restart tasks if compilation already failed.
-    if (failed()) return;
-
-    size_t max_num_restart = compilation_unit_queues_.GetTotalSize();
-    if (js_to_wasm_wrapper_id_ <
-        static_cast<int>(js_to_wasm_wrapper_units_.size())) {
-      max_num_restart +=
-          js_to_wasm_wrapper_units_.size() - js_to_wasm_wrapper_id_;
-    }
-
-    while (!available_task_ids_.empty() && max_num_restart-- > 0) {
-      int task_id = available_task_ids_.back();
-      available_task_ids_.pop_back();
-      new_tasks.emplace_back(
-          native_module_->engine()
-              ->NewBackgroundCompileTask<BackgroundCompileTask>(
-                  background_compile_token_, async_counters_, task_id));
-    }
+void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) {
+  // Increase the {current_compile_concurrency_} counter and remember the old
+  // value to check whether it increased towards {max_compile_concurrency_}.
+  // In that case, we need to notify the compile job about the increased
+  // concurrency.
+  DCHECK_LT(0, new_units);
+  int old_units = current_compile_concurrency_->load();
+  while (!current_compile_concurrency_->compare_exchange_weak(
+      old_units, old_units + new_units)) {
+    // Retry with updated {old_units}.
   }
+  bool concurrency_increased = old_units < max_compile_concurrency_;
 
-  // Spawn all tasts with default priority (avoid
-  // {CallLowPriorityTaskOnWorkerThread}) even for tier up, because low priority
-  // tasks will be severely delayed even if background threads are idle (see
-  // https://crbug.com/1094928).
-  for (auto& task : new_tasks) {
-    V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(task));
+  base::MutexGuard guard(&mutex_);
+  if (current_compile_job_ && current_compile_job_->IsRunning()) {
+    if (concurrency_increased) {
+      current_compile_job_->NotifyConcurrencyIncrease();
+    }
+    return;
   }
+  if (failed()) return;
+
+  std::unique_ptr<JobTask> new_compile_job =
+      std::make_unique<BackgroundCompileJob>(
+          background_compile_token_, async_counters_,
+          current_compile_concurrency_, max_compile_concurrency_);
+  // TODO(wasm): Lower priority for TurboFan-only jobs.
+  std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob(
+      TaskPriority::kUserVisible, std::move(new_compile_job));
+  native_module_->engine()->ShepherdCompileJobHandle(handle);
+  current_compile_job_ =
+      std::make_unique<ThreadSafeJobHandle>(std::move(handle));
+}
+
+size_t CompilationStateImpl::NumOutstandingCompilations() const {
+  size_t next_wrapper = js_to_wasm_wrapper_id_.load(std::memory_order_relaxed);
+  size_t outstanding_wrappers =
+      next_wrapper >= js_to_wasm_wrapper_units_.size()
+          ? 0
+          : js_to_wasm_wrapper_units_.size() - next_wrapper;
+  size_t outstanding_functions = compilation_unit_queues_.GetTotalSize();
+  return outstanding_wrappers + outstanding_functions;
 }
 
 void CompilationStateImpl::SetError() {
diff --git a/src/wasm/module-compiler.h b/src/wasm/module-compiler.h
index 161a30c..855205d 100644
--- a/src/wasm/module-compiler.h
+++ b/src/wasm/module-compiler.h
@@ -67,7 +67,8 @@
 
 void TriggerTierUp(Isolate*, NativeModule*, int func_index);
 
-int GetMaxBackgroundTasks();
+// Get the maximum concurrency for parallel compilation.
+int GetMaxCompileConcurrency();
 
 template <typename Key, typename Hash>
 class WrapperQueue {
diff --git a/src/wasm/module-instantiate.cc b/src/wasm/module-instantiate.cc
index 0d32729..bdda660 100644
--- a/src/wasm/module-instantiate.cc
+++ b/src/wasm/module-instantiate.cc
@@ -1373,7 +1373,8 @@
   }
 
   CancelableTaskManager task_manager;
-  const int max_background_tasks = GetMaxBackgroundTasks();
+  // TODO(wasm): Switch this to the Jobs API.
+  const int max_background_tasks = GetMaxCompileConcurrency();
   for (int i = 0; i < max_background_tasks; ++i) {
     auto task = std::make_unique<CompileImportWrapperTask>(
         &task_manager, isolate_->wasm_engine(), isolate_->counters(),
diff --git a/src/wasm/wasm-code-manager.cc b/src/wasm/wasm-code-manager.cc
index 11da708..1f137f1 100644
--- a/src/wasm/wasm-code-manager.cc
+++ b/src/wasm/wasm-code-manager.cc
@@ -1507,7 +1507,7 @@
   TRACE_HEAP("Deleting native module: %p\n", this);
   // Cancel all background compilation before resetting any field of the
   // NativeModule or freeing anything.
-  compilation_state_->AbortCompilation();
+  compilation_state_->CancelCompilation();
   engine_->FreeNativeModule(this);
   // Free the import wrapper cache before releasing the {WasmCode} objects in
   // {owned_code_}. The destructor of {WasmImportWrapperCache} still needs to
diff --git a/src/wasm/wasm-engine.cc b/src/wasm/wasm-engine.cc
index 3e755bd..3e669f8 100644
--- a/src/wasm/wasm-engine.cc
+++ b/src/wasm/wasm-engine.cc
@@ -401,8 +401,33 @@
   gdb_server_.reset();
 #endif  // V8_ENABLE_WASM_GDB_REMOTE_DEBUGGING
 
-  // Synchronize on all background compile tasks.
-  background_compile_task_manager_.CancelAndWait();
+  // Collect the live modules into a vector first, then cancel them while
+  // releasing our lock. This will allow the background tasks to finish.
+  std::vector<std::shared_ptr<NativeModule>> live_modules;
+  {
+    base::MutexGuard guard(&mutex_);
+    for (auto& entry : native_modules_) {
+      if (auto shared_ptr = entry.second->weak_ptr.lock()) {
+        live_modules.emplace_back(std::move(shared_ptr));
+      }
+    }
+  }
+
+  for (auto& native_module : live_modules) {
+    native_module->compilation_state()->CancelCompilation();
+  }
+  live_modules.clear();
+
+  // Now wait for all background compile tasks to actually finish.
+  std::vector<std::shared_ptr<JobHandle>> compile_job_handles;
+  {
+    base::MutexGuard guard(&mutex_);
+    compile_job_handles = compile_job_handles_;
+  }
+  for (auto& job_handle : compile_job_handles) {
+    if (job_handle->IsRunning()) job_handle->Cancel();
+  }
+
   // All AsyncCompileJobs have been canceled.
   DCHECK(async_compile_jobs_.empty());
   // All Isolates have been deregistered.
@@ -1306,6 +1331,14 @@
   }
 }
 
+void WasmEngine::ShepherdCompileJobHandle(
+    std::shared_ptr<JobHandle> job_handle) {
+  DCHECK_NOT_NULL(job_handle);
+  base::MutexGuard guard(&mutex_);
+  // TODO(clemensb): Add occasional cleanup of finished handles.
+  compile_job_handles_.emplace_back(std::move(job_handle));
+}
+
 void WasmEngine::TriggerGC(int8_t gc_sequence_index) {
   DCHECK(!mutex_.TryLock());
   DCHECK_NULL(current_gc_info_);
diff --git a/src/wasm/wasm-engine.h b/src/wasm/wasm-engine.h
index 8e59b3f..6fe5f22 100644
--- a/src/wasm/wasm-engine.h
+++ b/src/wasm/wasm-engine.h
@@ -243,12 +243,6 @@
   void AddIsolate(Isolate* isolate);
   void RemoveIsolate(Isolate* isolate);
 
-  template <typename T, typename... Args>
-  std::unique_ptr<T> NewBackgroundCompileTask(Args&&... args) {
-    return std::make_unique<T>(&background_compile_task_manager_,
-                               std::forward<Args>(args)...);
-  }
-
   // Trigger code logging for the given code objects in all Isolates which have
   // access to the NativeModule containing this code. This method can be called
   // from background threads.
@@ -338,6 +332,10 @@
                                    const std::shared_ptr<NativeModule>&,
                                    Vector<const char> source_url = {});
 
+  // Take shared ownership of a compile job handle, such that we can synchronize
+  // on that before the engine dies.
+  void ShepherdCompileJobHandle(std::shared_ptr<JobHandle>);
+
   // Call on process start and exit.
   static void InitializeOncePerProcess();
   static void GlobalTearDown();
@@ -372,10 +370,6 @@
   WasmCodeManager code_manager_;
   AccountingAllocator allocator_;
 
-  // Task manager managing all background compile jobs. Before shut down of the
-  // engine, they must all be finished because they access the allocator.
-  CancelableTaskManager background_compile_task_manager_;
-
 #ifdef V8_ENABLE_WASM_GDB_REMOTE_DEBUGGING
   // Implements a GDB-remote stub for WebAssembly debugging.
   std::unique_ptr<gdb_server::GdbServer> gdb_server_;
@@ -403,6 +397,10 @@
   std::unordered_map<NativeModule*, std::unique_ptr<NativeModuleInfo>>
       native_modules_;
 
+  // Background compile jobs that are still running. We need to join them before
+  // the engine gets deleted. Otherwise we don't care when exactly they finish.
+  std::vector<std::shared_ptr<JobHandle>> compile_job_handles_;
+
   // Size of code that became dead since the last GC. If this exceeds a certain
   // threshold, a new GC is triggered.
   size_t new_potentially_dead_code_size_ = 0;
diff --git a/test/cctest/wasm/test-streaming-compilation.cc b/test/cctest/wasm/test-streaming-compilation.cc
index c700ae8..b86f7dd 100644
--- a/test/cctest/wasm/test-streaming-compilation.cc
+++ b/test/cctest/wasm/test-streaming-compilation.cc
@@ -31,6 +31,20 @@
     i::V8::SetPlatformForTesting(this);
   }
 
+  ~MockPlatform() {
+    for (auto* job_handle : job_handles_) job_handle->ResetPlatform();
+  }
+
+  std::unique_ptr<v8::JobHandle> PostJob(
+      v8::TaskPriority priority,
+      std::unique_ptr<v8::JobTask> job_task) override {
+    auto orig_job_handle = TestPlatform::PostJob(priority, std::move(job_task));
+    auto job_handle =
+        std::make_unique<MockJobHandle>(std::move(orig_job_handle), this);
+    job_handles_.insert(job_handle.get());
+    return job_handle;
+  }
+
   std::shared_ptr<TaskRunner> GetForegroundTaskRunner(
       v8::Isolate* isolate) override {
     return task_runner_;
@@ -42,7 +56,12 @@
 
   bool IdleTasksEnabled(v8::Isolate* isolate) override { return false; }
 
-  void ExecuteTasks() { task_runner_->ExecuteTasks(); }
+  void ExecuteTasks() {
+    for (auto* job_handle : job_handles_) {
+      if (job_handle->IsRunning()) job_handle->Join();
+    }
+    task_runner_->ExecuteTasks();
+  }
 
  private:
   class MockTaskRunner final : public TaskRunner {
@@ -75,7 +94,32 @@
     std::queue<std::unique_ptr<v8::Task>> tasks_;
   };
 
+  class MockJobHandle : public JobHandle {
+   public:
+    explicit MockJobHandle(std::unique_ptr<JobHandle> orig_handle,
+                           MockPlatform* platform)
+        : orig_handle_(std::move(orig_handle)), platform_(platform) {}
+
+    ~MockJobHandle() {
+      if (platform_) platform_->job_handles_.erase(this);
+    }
+
+    void ResetPlatform() { platform_ = nullptr; }
+
+    void NotifyConcurrencyIncrease() override {
+      orig_handle_->NotifyConcurrencyIncrease();
+    }
+    void Join() override { orig_handle_->Join(); }
+    void Cancel() override { orig_handle_->Cancel(); }
+    bool IsRunning() override { return orig_handle_->IsRunning(); }
+
+   private:
+    std::unique_ptr<JobHandle> orig_handle_;
+    MockPlatform* platform_;
+  };
+
   std::shared_ptr<MockTaskRunner> task_runner_;
+  std::unordered_set<MockJobHandle*> job_handles_;
 };
 
 namespace {