| /* |
| * Copyright 2016 WebAssembly Community Group participants |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef optimizing_incremental_module_builder_h |
| #define optimizing_incremental_module_builder_h |
| |
| #include <wasm.h> |
| #include <support/threads.h> |
| |
| namespace wasm { |
| |
| #ifdef BINARYEN_THREAD_DEBUG |
| static std::mutex debug; |
| #define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[OptimizingIncrementalModuleBuilder Threading (thread: " << std::this_thread::get_id() << ")] " << x; std::cerr << '\n'; } |
| #else |
| #define DEBUG_THREAD(x) |
| #endif |
| |
| // |
| // OptimizingIncrementalModuleBuilder |
| // |
| // Helps build wasm modules efficiently. If you build a module by |
| // adding function by function, and you want to optimize them, this class |
| // starts optimizing using worker threads *while you are still adding*. |
| // It runs function optimization passes at that time, and then at the end |
| // it runs global module-level optimization passes. The result is a fully |
| // optimized module, optimized while being generated. |
| // |
| // This might also be faster than normal module optimization since it |
| // runs all passes on each function, then goes on to the next function |
| // which is better for data locality. |
| // |
| // Usage: Create an instance, passing it the module and the total |
| // number of functions. Then call addFunction as you have |
| // new functions to add (this also adds it to the module). Finally, |
| // call finish() when all functions have been added. |
| // |
| // This avoids locking by using atomics. We allocate an array of nullptrs |
| // that represent all the functions, and as we add a function, we place it |
| // at the next index. Each worker will read from the start to the end, |
| // and when a non-nullptr is found, the worker optimizes that function and |
| // nulls it. There is also an end marker that is not nullptr nor the address of |
| // a valid function, which represents that beyond this point we have not |
| // yet filled in. In other words, |
| // * the main thread fills everything with the end marker |
| // * the main thread transforms a marker entry into a function |
| // * workers pause when they see the marker |
| // * workers skip over nullptrs |
| // * workers transform functions into nullptrs, and optimize them |
| // * we keep an atomic count of the number of active workers and |
| // the number of optimized functions. |
| // * after adding a function, the main thread wakes up workers if |
| // it calculates there is work for them. |
| // * a lock is used for going to sleep and waking up. |
| // Locking should be rare, as optimization is |
| // generally slower than generation; in the optimal case, we never |
| // lock beyond the first step, and all further work is lock-free. |
| // |
| |
| class OptimizingIncrementalModuleBuilder { |
| Module* wasm; |
| uint32_t numFunctions; |
| PassOptions passOptions; |
| std::function<void (PassRunner&)> addPrePasses; |
| Function* endMarker; |
| std::atomic<Function*>* list; |
| uint32_t nextFunction; // only used on main thread |
| uint32_t numWorkers; |
| std::vector<std::unique_ptr<std::thread>> threads; |
| std::atomic<uint32_t> liveWorkers, activeWorkers, availableFuncs, finishedFuncs; |
| std::mutex mutex; |
| std::condition_variable condition; |
| bool finishing; |
| bool debug; |
| bool validateGlobally; |
| |
| public: |
| // numFunctions must be equal to the number of functions allocated, or higher. Knowing |
| // this bounds helps avoid locking. |
| OptimizingIncrementalModuleBuilder(Module* wasm, Index numFunctions, PassOptions passOptions, std::function<void (PassRunner&)> addPrePasses, bool debug, bool validateGlobally) |
| : wasm(wasm), numFunctions(numFunctions), passOptions(passOptions), addPrePasses(addPrePasses), endMarker(nullptr), list(nullptr), nextFunction(0), |
| numWorkers(0), liveWorkers(0), activeWorkers(0), availableFuncs(0), finishedFuncs(0), |
| finishing(false), debug(debug), validateGlobally(validateGlobally) { |
| if (numFunctions == 0 || debug) { |
| // if no functions to be optimized, or debug non-parallel mode, don't create any threads. |
| return; |
| } |
| |
| // Before parallelism, create all passes on the main thread here, to ensure |
| // prepareToRun() is called for each pass before we start to optimize functions. |
| { |
| PassRunner passRunner(wasm, passOptions); |
| addPrePasses(passRunner); |
| passRunner.addDefaultFunctionOptimizationPasses(); |
| } |
| |
| // prepare work list |
| endMarker = new Function(); |
| list = new std::atomic<Function*>[numFunctions]; |
| for (uint32_t i = 0; i < numFunctions; i++) { |
| list[i].store(endMarker); |
| } |
| // create workers |
| DEBUG_THREAD("creating workers"); |
| numWorkers = ThreadPool::getNumCores(); |
| assert(numWorkers >= 1); |
| liveWorkers.store(0); |
| activeWorkers.store(0); |
| for (uint32_t i = 0; i < numWorkers; i++) { // TODO: one less, and add it at the very end, to not compete with main thread? |
| createWorker(); |
| } |
| waitUntilAllReady(); |
| DEBUG_THREAD("workers are ready"); |
| // prepare the rest of the initial state |
| availableFuncs.store(0); |
| finishedFuncs.store(0); |
| } |
| |
| ~OptimizingIncrementalModuleBuilder() { |
| delete[] list; |
| delete endMarker; |
| } |
| |
| // Add a function to the module, and to be optimized |
| void addFunction(Function* func) { |
| wasm->addFunction(func); |
| if (debug) return; // we optimize at the end if debugging |
| queueFunction(func); |
| // wake workers if needed |
| auto wake = availableFuncs.load(); |
| for (uint32_t i = 0; i < wake; i++) { |
| wakeWorker(); |
| } |
| } |
| |
| // All functions have been added, block until all are optimized, and then do |
| // global optimizations. When this returns, the module is ready and optimized. |
| void finish() { |
| if (debug) { |
| // in debug mode, optimize each function now that we are done adding functions, |
| // then optimize globally |
| PassRunner passRunner(wasm, passOptions); |
| passRunner.setDebug(true); |
| passRunner.setValidateGlobally(validateGlobally); |
| addPrePasses(passRunner); |
| passRunner.addDefaultFunctionOptimizationPasses(); |
| passRunner.addDefaultGlobalOptimizationPasses(); |
| passRunner.run(); |
| return; |
| } |
| DEBUG_THREAD("finish()ing"); |
| assert(nextFunction == numFunctions); |
| wakeAllWorkers(); |
| waitUntilAllFinished(); |
| optimizeGlobally(); |
| // TODO: clear side thread allocators from module allocator, as these threads were transient |
| } |
| |
| private: |
| void createWorker() { |
| DEBUG_THREAD("create a worker"); |
| threads.emplace_back(make_unique<std::thread>(workerMain, this)); |
| } |
| |
| void wakeWorker() { |
| DEBUG_THREAD("wake a worker"); |
| std::lock_guard<std::mutex> lock(mutex); |
| condition.notify_one(); |
| } |
| |
| void wakeAllWorkers() { |
| DEBUG_THREAD("wake all workers"); |
| std::lock_guard<std::mutex> lock(mutex); |
| condition.notify_all(); |
| } |
| |
| void waitUntilAllReady() { |
| DEBUG_THREAD("wait until all workers are ready"); |
| std::unique_lock<std::mutex> lock(mutex); |
| if (liveWorkers.load() < numWorkers) { |
| condition.wait(lock, [this]() { return liveWorkers.load() == numWorkers; }); |
| } |
| } |
| |
| void waitUntilAllFinished() { |
| DEBUG_THREAD("wait until all workers are finished"); |
| { |
| std::unique_lock<std::mutex> lock(mutex); |
| finishing = true; |
| if (liveWorkers.load() > 0) { |
| condition.wait(lock, [this]() { return liveWorkers.load() == 0; }); |
| } |
| } |
| DEBUG_THREAD("joining"); |
| for (auto& thread : threads) thread->join(); |
| DEBUG_THREAD("joined"); |
| } |
| |
| void queueFunction(Function* func) { |
| DEBUG_THREAD("queue function"); |
| assert(nextFunction < numFunctions); // TODO: if we are given more than we expected, use a slower work queue? |
| list[nextFunction++].store(func); |
| availableFuncs++; |
| } |
| |
| void optimizeGlobally() { |
| PassRunner passRunner(wasm, passOptions); |
| passRunner.addDefaultGlobalOptimizationPasses(); |
| passRunner.run(); |
| } |
| |
| // worker code |
| |
| void optimizeFunction(Function* func) { |
| PassRunner passRunner(wasm, passOptions); |
| addPrePasses(passRunner); |
| passRunner.addDefaultFunctionOptimizationPasses(); |
| passRunner.runFunction(func); |
| } |
| |
| static void workerMain(OptimizingIncrementalModuleBuilder* self) { |
| DEBUG_THREAD("workerMain"); |
| { |
| std::lock_guard<std::mutex> lock(self->mutex); |
| self->liveWorkers++; |
| self->activeWorkers++; |
| self->condition.notify_all(); |
| } |
| for (uint32_t i = 0; i < self->numFunctions; i++) { |
| DEBUG_THREAD("workerMain iteration " << i); |
| if (self->list[i].load() == self->endMarker) { |
| // sleep, this entry isn't ready yet |
| DEBUG_THREAD("workerMain sleep"); |
| self->activeWorkers--; |
| { |
| std::unique_lock<std::mutex> lock(self->mutex); |
| if (!self->finishing) { // while waiting for the lock, things may have ended |
| self->condition.wait(lock); |
| } |
| } |
| // continue |
| DEBUG_THREAD("workerMain continue"); |
| self->activeWorkers++; |
| i--; |
| continue; |
| } |
| DEBUG_THREAD("workerMain exchange item"); |
| auto* func = self->list[i].exchange(nullptr); |
| if (func == nullptr) { |
| DEBUG_THREAD("workerMain sees was already taken"); |
| continue; // someone else has taken this one |
| } |
| // we have work to do! |
| DEBUG_THREAD("workerMain work on " << size_t(func)); |
| self->availableFuncs--; |
| self->optimizeFunction(func); |
| self->finishedFuncs++; |
| } |
| DEBUG_THREAD("workerMain ready to exit"); |
| { |
| std::lock_guard<std::mutex> lock(self->mutex); |
| self->liveWorkers--; |
| self->condition.notify_all(); |
| } |
| DEBUG_THREAD("workerMain exiting"); |
| } |
| }; |
| |
| } // namespace wasm |
| |
| #endif // optimizing_incremental_module_builder_h |
| |