blob: 68e72fb7cf8d768b72a2e93c4ca0d6b929ce3b34 [file] [log] [blame]
#include <condition_variable>
#include <cstdlib>
#include <mutex>
#include <thread>
#include "napi.h"
#if (NAPI_VERSION > 3)
using namespace Napi;
namespace {
struct TestData {
TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)){};
// Native Promise returned to JavaScript
Promise::Deferred deferred;
// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};
ThreadSafeFunction tsfn = ThreadSafeFunction();
// These variables are only accessed from the main thread.
bool mainWantsRelease = false;
size_t expected_calls = 0;
};
void FinalizerCallback(Napi::Env env, TestData* finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/
void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}
static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();
// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData* testData = new TestData(Promise::Deferred::New(info.Env()));
ThreadSafeFunction tsfn = ThreadSafeFunction::New(
info.Env(),
cb,
"Test",
0,
threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback),
testData);
for (int i = 0; i < threadCount; ++i) {
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back(std::thread(entryWithTSFN, tsfn, i));
}
return testData->deferred.Promise();
}
// Task instance created for each new std::thread
class DelayedTSFNTask {
public:
// Each instance has its own tsfn
ThreadSafeFunction tsfn;
// Thread-safety
std::mutex mtx;
std::condition_variable cv;
// Entry point for std::thread
void entryDelayedTSFN(int threadId) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [this] { return this->tsfn != nullptr; });
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
};
};
struct TestDataDelayed {
TestDataDelayed(Promise::Deferred&& deferred)
: deferred(std::move(deferred)){};
~TestDataDelayed() { taskInsts.clear(); };
// Native Promise returned to JavaScript
Promise::Deferred deferred;
// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};
// List of DelayedTSFNThread instances
std::vector<std::unique_ptr<DelayedTSFNTask>> taskInsts = {};
ThreadSafeFunction tsfn = ThreadSafeFunction();
};
void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed* finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
static Value TestDelayedTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();
TestDataDelayed* testData =
new TestDataDelayed(Promise::Deferred::New(info.Env()));
testData->tsfn =
ThreadSafeFunction::New(info.Env(),
cb,
"Test",
0,
threadCount,
std::function<decltype(FinalizerCallbackDelayed)>(
FinalizerCallbackDelayed),
testData);
for (int i = 0; i < threadCount; ++i) {
testData->taskInsts.push_back(
std::unique_ptr<DelayedTSFNTask>(new DelayedTSFNTask()));
testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN,
testData->taskInsts.back().get(),
i));
}
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
for (auto& task : testData->taskInsts) {
std::lock_guard<std::mutex> lk(task->mtx);
task->tsfn = testData->tsfn;
task->cv.notify_all();
}
return testData->deferred.Promise();
}
void AcquireFinalizerCallback(Napi::Env env,
TestData* finalizeData,
TestData* context) {
(void)context;
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
TestData* testData = tsfn.GetContext();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
// This lambda runs on the main thread so it's OK to access the variables
// `expected_calls` and `mainWantsRelease`.
testData->expected_calls--;
if (testData->expected_calls == 0 && testData->mainWantsRelease)
testData->tsfn.Release();
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}
static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
// Counting expected calls like this only works because on the JS side this
// binding is called from a synchronous loop. This means the main loop has no
// chance to run the tsfn JS callback before we've counted how many threads
// the JS intends to create.
testData->expected_calls++;
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back(std::thread(entryAcquire, tsfn, threadId));
return Number::New(info.Env(), threadId);
}
static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
testData->mainWantsRelease = true;
return info.Env().Undefined();
}
static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();
// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData* testData = new TestData(Promise::Deferred::New(info.Env()));
testData->tsfn =
ThreadSafeFunction::New(env,
cb,
"Test",
0,
1,
testData,
std::function<decltype(AcquireFinalizerCallback)>(
AcquireFinalizerCallback),
testData);
Object result = Object::New(env);
result["createThread"] =
Function::New(env, CreateThread, "createThread", testData);
result["stopThreads"] =
Function::New(env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();
return result;
}
} // namespace
Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}
#endif