blob: 4123a59c06a4a33c35bf6009e799f36c6f349297 [file] [log] [blame]
/*
* Copyright 2016 WebAssembly Community Group participants
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef optimizing_incremental_module_builder_h
#define optimizing_incremental_module_builder_h
#include <wasm.h>
#include <support/threads.h>
namespace wasm {
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[OptimizingIncrementalModuleBuilder Threading (thread: " << std::this_thread::get_id() << ")] " << x; std::cerr << '\n'; }
#else
#define DEBUG_THREAD(x)
#endif
//
// OptimizingIncrementalModuleBuilder
//
// Helps build wasm modules efficiently. If you build a module by
// adding function by function, and you want to optimize them, this class
// starts optimizing using worker threads *while you are still adding*.
// It runs function optimization passes at that time, and then at the end
// it runs global module-level optimization passes. The result is a fully
// optimized module, optimized while being generated.
//
// This might also be faster than normal module optimization since it
// runs all passes on each function, then goes on to the next function
// which is better for data locality.
//
// Usage: Create an instance, passing it the module and the total
// number of functions. Then call addFunction as you have
// new functions to add (this also adds it to the module). Finally,
// call finish() when all functions have been added.
//
// This avoids locking by using atomics. We allocate an array of nullptrs
// that represent all the functions, and as we add a function, we place it
// at the next index. Each worker will read from the start to the end,
// and when a non-nullptr is found, the worker optimizes that function and
// nulls it. There is also an end marker that is not nullptr nor the address of
// a valid function, which represents that beyond this point we have not
// yet filled in. In other words,
// * the main thread fills everything with the end marker
// * the main thread transforms a marker entry into a function
// * workers pause when they see the marker
// * workers skip over nullptrs
// * workers transform functions into nullptrs, and optimize them
// * we keep an atomic count of the number of active workers and
// the number of optimized functions.
// * after adding a function, the main thread wakes up workers if
// it calculates there is work for them.
// * a lock is used for going to sleep and waking up.
// Locking should be rare, as optimization is
// generally slower than generation; in the optimal case, we never
// lock beyond the first step, and all further work is lock-free.
//
class OptimizingIncrementalModuleBuilder {
Module* wasm;
uint32_t numFunctions;
PassOptions passOptions;
std::function<void (PassRunner&)> addPrePasses;
Function* endMarker;
std::atomic<Function*>* list;
uint32_t nextFunction; // only used on main thread
uint32_t numWorkers;
std::vector<std::unique_ptr<std::thread>> threads;
std::atomic<uint32_t> liveWorkers, activeWorkers, availableFuncs, finishedFuncs;
std::mutex mutex;
std::condition_variable condition;
bool finishing;
bool debug;
bool validateGlobally;
public:
// numFunctions must be equal to the number of functions allocated, or higher. Knowing
// this bounds helps avoid locking.
OptimizingIncrementalModuleBuilder(Module* wasm, Index numFunctions, PassOptions passOptions, std::function<void (PassRunner&)> addPrePasses, bool debug, bool validateGlobally)
: wasm(wasm), numFunctions(numFunctions), passOptions(passOptions), addPrePasses(addPrePasses), endMarker(nullptr), list(nullptr), nextFunction(0),
numWorkers(0), liveWorkers(0), activeWorkers(0), availableFuncs(0), finishedFuncs(0),
finishing(false), debug(debug), validateGlobally(validateGlobally) {
if (numFunctions == 0 || debug) {
// if no functions to be optimized, or debug non-parallel mode, don't create any threads.
return;
}
// Before parallelism, create all passes on the main thread here, to ensure
// prepareToRun() is called for each pass before we start to optimize functions.
{
PassRunner passRunner(wasm, passOptions);
addPrePasses(passRunner);
passRunner.addDefaultFunctionOptimizationPasses();
}
// prepare work list
endMarker = new Function();
list = new std::atomic<Function*>[numFunctions];
for (uint32_t i = 0; i < numFunctions; i++) {
list[i].store(endMarker);
}
// create workers
DEBUG_THREAD("creating workers");
numWorkers = ThreadPool::getNumCores();
assert(numWorkers >= 1);
liveWorkers.store(0);
activeWorkers.store(0);
for (uint32_t i = 0; i < numWorkers; i++) { // TODO: one less, and add it at the very end, to not compete with main thread?
createWorker();
}
waitUntilAllReady();
DEBUG_THREAD("workers are ready");
// prepare the rest of the initial state
availableFuncs.store(0);
finishedFuncs.store(0);
}
~OptimizingIncrementalModuleBuilder() {
delete[] list;
delete endMarker;
}
// Add a function to the module, and to be optimized
void addFunction(Function* func) {
wasm->addFunction(func);
if (debug) return; // we optimize at the end if debugging
queueFunction(func);
// wake workers if needed
auto wake = availableFuncs.load();
for (uint32_t i = 0; i < wake; i++) {
wakeWorker();
}
}
// All functions have been added, block until all are optimized, and then do
// global optimizations. When this returns, the module is ready and optimized.
void finish() {
if (debug) {
// in debug mode, optimize each function now that we are done adding functions,
// then optimize globally
PassRunner passRunner(wasm, passOptions);
passRunner.setDebug(true);
passRunner.setValidateGlobally(validateGlobally);
addPrePasses(passRunner);
passRunner.addDefaultFunctionOptimizationPasses();
passRunner.addDefaultGlobalOptimizationPasses();
passRunner.run();
return;
}
DEBUG_THREAD("finish()ing");
assert(nextFunction == numFunctions);
wakeAllWorkers();
waitUntilAllFinished();
optimizeGlobally();
// TODO: clear side thread allocators from module allocator, as these threads were transient
}
private:
void createWorker() {
DEBUG_THREAD("create a worker");
threads.emplace_back(make_unique<std::thread>(workerMain, this));
}
void wakeWorker() {
DEBUG_THREAD("wake a worker");
std::lock_guard<std::mutex> lock(mutex);
condition.notify_one();
}
void wakeAllWorkers() {
DEBUG_THREAD("wake all workers");
std::lock_guard<std::mutex> lock(mutex);
condition.notify_all();
}
void waitUntilAllReady() {
DEBUG_THREAD("wait until all workers are ready");
std::unique_lock<std::mutex> lock(mutex);
if (liveWorkers.load() < numWorkers) {
condition.wait(lock, [this]() { return liveWorkers.load() == numWorkers; });
}
}
void waitUntilAllFinished() {
DEBUG_THREAD("wait until all workers are finished");
{
std::unique_lock<std::mutex> lock(mutex);
finishing = true;
if (liveWorkers.load() > 0) {
condition.wait(lock, [this]() { return liveWorkers.load() == 0; });
}
}
DEBUG_THREAD("joining");
for (auto& thread : threads) thread->join();
DEBUG_THREAD("joined");
}
void queueFunction(Function* func) {
DEBUG_THREAD("queue function");
assert(nextFunction < numFunctions); // TODO: if we are given more than we expected, use a slower work queue?
list[nextFunction++].store(func);
availableFuncs++;
}
void optimizeGlobally() {
PassRunner passRunner(wasm, passOptions);
passRunner.addDefaultGlobalOptimizationPasses();
passRunner.run();
}
// worker code
void optimizeFunction(Function* func) {
PassRunner passRunner(wasm, passOptions);
addPrePasses(passRunner);
passRunner.addDefaultFunctionOptimizationPasses();
passRunner.runFunction(func);
}
static void workerMain(OptimizingIncrementalModuleBuilder* self) {
DEBUG_THREAD("workerMain");
{
std::lock_guard<std::mutex> lock(self->mutex);
self->liveWorkers++;
self->activeWorkers++;
self->condition.notify_all();
}
for (uint32_t i = 0; i < self->numFunctions; i++) {
DEBUG_THREAD("workerMain iteration " << i);
if (self->list[i].load() == self->endMarker) {
// sleep, this entry isn't ready yet
DEBUG_THREAD("workerMain sleep");
self->activeWorkers--;
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->finishing) { // while waiting for the lock, things may have ended
self->condition.wait(lock);
}
}
// continue
DEBUG_THREAD("workerMain continue");
self->activeWorkers++;
i--;
continue;
}
DEBUG_THREAD("workerMain exchange item");
auto* func = self->list[i].exchange(nullptr);
if (func == nullptr) {
DEBUG_THREAD("workerMain sees was already taken");
continue; // someone else has taken this one
}
// we have work to do!
DEBUG_THREAD("workerMain work on " << size_t(func));
self->availableFuncs--;
self->optimizeFunction(func);
self->finishedFuncs++;
}
DEBUG_THREAD("workerMain ready to exit");
{
std::lock_guard<std::mutex> lock(self->mutex);
self->liveWorkers--;
self->condition.notify_all();
}
DEBUG_THREAD("workerMain exiting");
}
};
} // namespace wasm
#endif // optimizing_incremental_module_builder_h