Reland [wasm] Allow the initialization of a single compilation unit

Tsan figured out that I called compilation_units_.empty() outside a
lock.

Original message:
This CL adds a new function {InitializeCompilationUnit} to initialize
a single compilation unit and not just all compilation units at once.
This is necessary for streaming compilation eventually. This also
required some refactoring on how the working queue for compilation units
works. Previously the synchronization was done with an atomic counter,
now it is done with a lock. Note that the code to finish compilation
of a module still only works if the working queue gets only empty when
all work is done. I plan to change this in a different CL.

Since the code would not be tested without streaming compilation, I added
an experimental flag and a test to test the new code.

R=clemensh@chromium.org, mtrofin@chromium.org

Change-Id: Ia83560d1c70f0802271a88c514e0d1cb0458f6c4
Reviewed-on: https://chromium-review.googlesource.com/561458
Reviewed-by: Clemens Hammacher <clemensh@chromium.org>
Commit-Queue: Andreas Haas <ahaas@chromium.org>
Cr-Commit-Position: refs/heads/master@{#46454}
diff --git a/src/wasm/module-compiler.cc b/src/wasm/module-compiler.cc
index 555ad54..4196645 100644
--- a/src/wasm/module-compiler.cc
+++ b/src/wasm/module-compiler.cc
@@ -129,15 +129,13 @@
   DisallowHandleDereference no_deref;
   DisallowCodeDependencyChange no_dependency_change;
 
-  // - 1 because AtomicIncrement returns the value after the atomic increment.
-  // Bail out fast if there's no work to do.
-  size_t index = next_unit_.Increment(1) - 1;
-  if (index >= compilation_units_.size()) {
-    return false;
+  std::unique_ptr<compiler::WasmCompilationUnit> unit;
+  {
+    base::LockGuard<base::Mutex> guard(&compilation_units_mutex_);
+    if (compilation_units_.empty()) return false;
+    unit = std::move(compilation_units_.back());
+    compilation_units_.pop_back();
   }
-
-  std::unique_ptr<compiler::WasmCompilationUnit> unit =
-      std::move(compilation_units_.at(index));
   unit->ExecuteCompilation();
   {
     base::LockGuard<base::Mutex> guard(&result_mutex_);
@@ -151,20 +149,23 @@
   return true;
 }
 
-size_t ModuleCompiler::InitializeParallelCompilation(
+size_t ModuleCompiler::InitializeCompilationUnits(
     const std::vector<WasmFunction>& functions, ModuleBytesEnv& module_env) {
   uint32_t start = module_env.module_env.module->num_imported_functions +
                    FLAG_skip_compiling_wasm_funcs;
   uint32_t num_funcs = static_cast<uint32_t>(functions.size());
   uint32_t funcs_to_compile = start > num_funcs ? 0 : num_funcs - start;
-  compilation_units_.reserve(funcs_to_compile);
+  CompilationUnitBuilder builder(this);
   for (uint32_t i = start; i < num_funcs; ++i) {
     const WasmFunction* func = &functions[i];
-    constexpr bool is_sync = true;
-    compilation_units_.push_back(std::unique_ptr<compiler::WasmCompilationUnit>(
-        new compiler::WasmCompilationUnit(isolate_, &module_env, func,
-                                          centry_stub_, !is_sync)));
+    uint32_t buffer_offset = func->code.offset();
+    Vector<const uint8_t> bytes(
+        module_env.wire_bytes.start() + func->code.offset(),
+        func->code.end_offset() - func->code.offset());
+    WasmName name = module_env.wire_bytes.GetName(func);
+    builder.AddUnit(&module_env.module_env, func, buffer_offset, bytes, name);
   }
+  builder.Commit();
   return funcs_to_compile;
 }
 
@@ -191,7 +192,12 @@
     results[func_index] = result;
     ++finished;
   }
-  RestartCompilationTasks();
+  bool do_restart;
+  {
+    base::LockGuard<base::Mutex> guard(&compilation_units_mutex_);
+    do_restart = !compilation_units_.empty();
+  }
+  if (do_restart) RestartCompilationTasks();
   return finished;
 }
 
@@ -241,34 +247,32 @@
 
   // 1) The main thread allocates a compilation unit for each wasm function
   //    and stores them in the vector {compilation_units}.
-  InitializeParallelCompilation(module->functions, *module_env);
-
+  InitializeCompilationUnits(module->functions, *module_env);
   executed_units_.EnableThrottling();
 
   // 2) The main thread spawns {CompilationTask} instances which run on
   //    the background threads.
   RestartCompilationTasks();
 
-  size_t finished_functions = 0;
-  while (finished_functions < compilation_units_.size()) {
-    // 3.a) The background threads and the main thread pick one compilation
-    //      unit at a time and execute the parallel phase of the compilation
-    //      unit. After finishing the execution of the parallel phase, the
-    //      result is enqueued in {executed_units}.
-    //      The foreground task bypasses waiting on memory threshold, because
-    //      its results will immediately be converted to code (below).
-    FetchAndExecuteCompilationUnit();
-
+  // 3.a) The background threads and the main thread pick one compilation
+  //      unit at a time and execute the parallel phase of the compilation
+  //      unit. After finishing the execution of the parallel phase, the
+  //      result is enqueued in {executed_units}.
+  //      The foreground task bypasses waiting on memory threshold, because
+  //      its results will immediately be converted to code (below).
+  while (FetchAndExecuteCompilationUnit()) {
     // 3.b) If {executed_units} contains a compilation unit, the main thread
     //      dequeues it and finishes the compilation unit. Compilation units
     //      are finished concurrently to the background threads to save
     //      memory.
-    finished_functions += FinishCompilationUnits(results, thrower);
+    FinishCompilationUnits(results, thrower);
   }
   // 4) After the parallel phase of all compilation units has started, the
   //    main thread waits for all {CompilationTask} instances to finish - which
   //    happens once they all realize there's no next work item to process.
   background_task_manager_.CancelAndWait();
+  // Finish all compilation units which have been executed while we waited.
+  FinishCompilationUnits(results, thrower);
 }
 
 void ModuleCompiler::CompileSequentially(ModuleBytesEnv* module_env,
@@ -2136,7 +2140,8 @@
                         ->NumberOfAvailableBackgroundThreads())));
     job_->module_bytes_env_.reset(new ModuleBytesEnv(
         module, job_->temp_instance_.get(), job_->wire_bytes_));
-    job_->outstanding_units_ = job_->compiler_->InitializeParallelCompilation(
+
+    job_->outstanding_units_ = job_->compiler_->InitializeCompilationUnits(
         module->functions, *job_->module_bytes_env_);
 
     job_->DoAsync<ExecuteAndFinishCompilationUnits>(num_background_tasks);
diff --git a/src/wasm/module-compiler.h b/src/wasm/module-compiler.h
index bb00581..fd8eda9 100644
--- a/src/wasm/module-compiler.h
+++ b/src/wasm/module-compiler.h
@@ -39,6 +39,44 @@
     void RunInternal() override;
   };
 
+  // The CompilationUnitBuilder builds compilation units and stores them in an
+  // internal buffer. The buffer is moved into the working queue of the
+  // ModuleCompiler when {Commit} is called.
+  class CompilationUnitBuilder {
+   public:
+    explicit CompilationUnitBuilder(ModuleCompiler* compiler)
+        : compiler_(compiler) {}
+
+    ~CompilationUnitBuilder() { DCHECK(units_.empty()); }
+
+    void AddUnit(ModuleEnv* module_env, const WasmFunction* function,
+                 uint32_t buffer_offset, Vector<const uint8_t> bytes,
+                 WasmName name) {
+      constexpr bool is_sync = true;
+      units_.emplace_back(new compiler::WasmCompilationUnit(
+          compiler_->isolate_, module_env,
+          wasm::FunctionBody{function->sig, buffer_offset, bytes.begin(),
+                             bytes.end()},
+          name, function->func_index, compiler_->centry_stub_, is_sync));
+    }
+
+    void Commit() {
+      {
+        base::LockGuard<base::Mutex> guard(
+            &compiler_->compilation_units_mutex_);
+        compiler_->compilation_units_.insert(
+            compiler_->compilation_units_.end(),
+            std::make_move_iterator(units_.begin()),
+            std::make_move_iterator(units_.end()));
+      }
+      units_.clear();
+    }
+
+   private:
+    ModuleCompiler* compiler_;
+    std::vector<std::unique_ptr<compiler::WasmCompilationUnit>> units_;
+  };
+
   class CodeGenerationSchedule {
    public:
     explicit CodeGenerationSchedule(
@@ -89,8 +127,8 @@
     return executed_units_.ShouldIncreaseWorkload();
   }
 
-  size_t InitializeParallelCompilation(
-      const std::vector<WasmFunction>& functions, ModuleBytesEnv& module_env);
+  size_t InitializeCompilationUnits(const std::vector<WasmFunction>& functions,
+                                    ModuleBytesEnv& module_env);
 
   void ReopenHandlesInDeferredScope();
 
@@ -134,9 +172,9 @@
   bool is_sync_;
   std::vector<std::unique_ptr<compiler::WasmCompilationUnit>>
       compilation_units_;
+  base::Mutex compilation_units_mutex_;
   CodeGenerationSchedule executed_units_;
   base::Mutex result_mutex_;
-  base::AtomicNumber<size_t> next_unit_;
   const size_t num_background_tasks_;
   // This flag should only be set while holding result_mutex_.
   bool finisher_is_running_ = false;
diff --git a/test/mjsunit/wasm/async-compile.js b/test/mjsunit/wasm/async-compile.js
index b95930a..854e8bb 100644
--- a/test/mjsunit/wasm/async-compile.js
+++ b/test/mjsunit/wasm/async-compile.js
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-// Flags: --expose-wasm --allow-natives-syntax
+// Flags: --wasm-async-compilation --expose-wasm --allow-natives-syntax
 
 load("test/mjsunit/wasm/wasm-constants.js");
 load("test/mjsunit/wasm/wasm-module-builder.js");