blob: 9544e902254f25d303ac9baf5dc5335c8869270b [file] [log] [blame]
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// -----------------------------------------------------------------------------
//
// Multi-threaded worker
//
// Author: Skal (pascal.massimino@gmail.com)
// James Zern (jzern@google.com)
#include "src/utils/thread_utils.h"
#include <cstddef>
#include <cstdint>
#include "src/wp2/base.h"
#if defined(WP2_USE_THREAD)
#if defined(_WIN32)
#include <process.h>
#include <windows.h>
#else
#include <pthread.h>
#endif
#endif
#include <cassert>
#include "src/utils/utils.h"
namespace WP2 {
#ifdef WP2_USE_THREAD
#if defined(_WIN32)
typedef HANDLE pthread_t;
typedef CRITICAL_SECTION pthread_mutex_t;
typedef struct {
uint32_t stack_size;
} pthread_attr_t;
#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater
#define USE_WINDOWS_CONDITION_VARIABLE
typedef CONDITION_VARIABLE pthread_cond_t;
#else
typedef struct {
HANDLE waiting_sem;
HANDLE received_sem;
HANDLE signal_event;
} pthread_cond_t;
#endif // _WIN32_WINNT >= 0x600
#ifndef WINAPI_FAMILY_PARTITION
#define WINAPI_PARTITION_DESKTOP 1
#define WINAPI_FAMILY_PARTITION(x) x
#endif
#if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
#define USE_CREATE_THREAD
#endif
#endif // _WIN32
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t condition;
pthread_t thread;
} WorkerImpl;
#if defined(_WIN32)
//------------------------------------------------------------------------------
// simplistic pthread emulation layer
// _beginthreadex requires __stdcall
#define THREADFN unsigned int __stdcall
#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
#if _WIN32_WINNT >= 0x0501 // Windows XP or greater
#define WaitForSingleObject(obj, timeout) \
WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
#endif
static int pthread_attr_init(pthread_attr_t* attr) {
if (attr != NULL) attr->stack_size = 0;
return 0;
}
static int pthread_attr_destroy(pthread_attr_t* attr) { return 0; }
static int pthread_attr_setstacksize(pthread_attr_t* attr, size_t stack_size) {
if (attr != NULL) attr->stack_size = stack_size;
return 1;
}
static int pthread_create(pthread_t* const thread, const pthread_attr_t* attr,
unsigned int (__stdcall *start)(void*), void* arg) {
const size_t stack_size = (attr != NULL) ? attr->stack_size : 0u;
#ifdef USE_CREATE_THREAD
*thread = CreateThread(NULL, /* lpThreadAttributes */
stack_size, /* dwStackSize */
start, arg, 0, /* dwCreationFlags */
NULL); /* lpThreadId */
#else
*thread = (pthread_t)_beginthreadex(NULL, /* void *security */
stack_size, /* unsigned stack_size */
start, arg, 0, /* unsigned initflag */
NULL); /* unsigned *thrdaddr */
#endif
if (*thread == NULL) return 1;
SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
return 0;
}
static int pthread_join(pthread_t thread, void** value_ptr) {
(void)value_ptr;
return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
CloseHandle(thread) == 0);
}
// Mutex
static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
(void)mutexattr;
#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater
InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
#else
InitializeCriticalSection(mutex);
#endif
return 0;
}
static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
EnterCriticalSection(mutex);
return 0;
}
static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
LeaveCriticalSection(mutex);
return 0;
}
static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
DeleteCriticalSection(mutex);
return 0;
}
// Condition
static int pthread_cond_destroy(pthread_cond_t* const condition) {
int ok = 1;
#ifdef USE_WINDOWS_CONDITION_VARIABLE
(void)condition;
#else
ok &= (CloseHandle(condition->waiting_sem) != 0);
ok &= (CloseHandle(condition->received_sem) != 0);
ok &= (CloseHandle(condition->signal_event) != 0);
#endif
return !ok;
}
static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
(void)cond_attr;
#ifdef USE_WINDOWS_CONDITION_VARIABLE
InitializeConditionVariable(condition);
#else
condition->waiting_sem = CreateSemaphore(NULL, 0, 1, NULL);
condition->received_sem = CreateSemaphore(NULL, 0, 1, NULL);
condition->signal_event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (condition->waiting_sem == NULL || condition->received_sem == NULL ||
condition->signal_event == NULL) {
pthread_cond_destroy(condition);
return 1;
}
#endif
return 0;
}
static int pthread_cond_signal(pthread_cond_t* const condition) {
int ok = 1;
#ifdef USE_WINDOWS_CONDITION_VARIABLE
WakeConditionVariable(condition);
#else
if (WaitForSingleObject(condition->waiting_sem, 0) == WAIT_OBJECT_0) {
// a thread is waiting in pthread_cond_wait: allow it to be notified
ok = SetEvent(condition->signal_event);
// wait until the event is consumed so the signaler cannot consume
// the event via its own pthread_cond_wait.
ok &= (WaitForSingleObject(condition->received_sem, INFINITE) !=
WAIT_OBJECT_0);
}
#endif
return !ok;
}
static int pthread_cond_wait(pthread_cond_t* const condition,
pthread_mutex_t* const mutex) {
int ok;
#ifdef USE_WINDOWS_CONDITION_VARIABLE
ok = SleepConditionVariableCS(condition, mutex, INFINITE);
#else
// note that there is a consumer available so the signal isn't dropped in
// pthread_cond_signal
if (!ReleaseSemaphore(condition->waiting_sem, 1, NULL)) return 1;
// now unlock the mutex so pthread_cond_signal may be issued
pthread_mutex_unlock(mutex);
ok =
(WaitForSingleObject(condition->signal_event, INFINITE) == WAIT_OBJECT_0);
ok &= ReleaseSemaphore(condition->received_sem, 1, NULL);
pthread_mutex_lock(mutex);
#endif
return !ok;
}
#else // !_WIN32
#define THREADFN void*
#define THREAD_RETURN(val) val
#endif // _WIN32
//------------------------------------------------------------------------------
static THREADFN ThreadLoop(void* ptr) {
Worker* const worker = (Worker*)ptr;
assert(worker != nullptr);
WorkerImpl* const impl = (WorkerImpl*)worker->GetImpl();
assert(impl != nullptr);
bool done = false;
while (!done) {
pthread_mutex_lock(&impl->mutex);
while (worker->state_ == Worker::OK) { // wait in idling mode
pthread_cond_wait(&impl->condition, &impl->mutex);
}
if (worker->state_ == Worker::WORK) {
worker->status_ = worker->Execute();
worker->state_ = Worker::OK;
} else if (worker->state_ == Worker::NOT_OK) { // finish the worker
done = true;
}
// signal to the main thread that we're done (for Sync())
pthread_mutex_unlock(&impl->mutex);
pthread_cond_signal(&impl->condition);
}
return THREAD_RETURN(nullptr); // Thread is finished
}
// main thread state control
static void ChangeState(WorkerBase* const worker,
decltype(WorkerBase::state_) new_state) {
// No-op when attempting to change state on a thread that didn't come up.
// Checking state_ without acquiring the lock first would result in a data
// race.
assert(worker != nullptr);
WorkerImpl* const impl = (WorkerImpl*)worker->GetImpl();
if (impl == nullptr) return;
pthread_mutex_lock(&impl->mutex);
if (worker->state_ >= Worker::OK) {
// wait for the worker to finish
while (worker->state_ != Worker::OK) {
pthread_cond_wait(&impl->condition, &impl->mutex);
}
// assign new status and release the working thread if needed
if (new_state != Worker::OK) {
worker->state_ = new_state;
// Note the associated mutex does not need to be held when signaling the
// condition. Unlocking the mutex first may improve performance in some
// implementations, avoiding the case where the waiting thread can't
// reacquire the mutex when woken.
pthread_mutex_unlock(&impl->mutex);
pthread_cond_signal(&impl->condition);
return;
}
}
pthread_mutex_unlock(&impl->mutex);
}
#endif // WP2_USE_THREAD
//------------------------------------------------------------------------------
struct ThreadLock::Impl : public WP2Allocable {
#ifdef WP2_USE_THREAD
pthread_mutex_t mutex;
#endif
};
ThreadLock::ThreadLock() {
#ifdef WP2_USE_THREAD
// If new Impl() fails, Acquire() will return WP2_STATUS_OUT_OF_MEMORY.
impl_ = new (WP2Allocable::nothrow) Impl();
if ((impl_ != nullptr) && (pthread_mutex_init(&impl_->mutex, nullptr) != 0)) {
delete impl_;
impl_ = nullptr;
}
#else
impl_ = nullptr;
#endif
}
ThreadLock::~ThreadLock() {
#ifdef WP2_USE_THREAD
if (impl_ != nullptr) {
pthread_mutex_destroy(&impl_->mutex);
delete impl_;
}
#endif
}
WP2Status ThreadLock::Acquire() {
#ifdef WP2_USE_THREAD
WP2_CHECK_ALLOC_OK(impl_ != nullptr);
WP2_CHECK_OK(pthread_mutex_lock(&impl_->mutex) == 0,
WP2_STATUS_OUT_OF_MEMORY);
#endif
return WP2_STATUS_OK;
}
void ThreadLock::Release() {
#ifdef WP2_USE_THREAD
if (impl_ != nullptr) pthread_mutex_unlock(&impl_->mutex);
#endif
}
//------------------------------------------------------------------------------
WP2Status WorkerBase::Start(bool do_mt, uint32_t stack_size) {
const auto& I = GetWorkerInterface();
WP2_CHECK_OK(I.Reset(this, do_mt, stack_size), WP2_STATUS_OUT_OF_MEMORY);
I.Launch(this);
return WP2_STATUS_OK;
}
WP2Status WorkerBase::End() {
const auto& I = GetWorkerInterface();
const WP2Status status = I.Sync(this);
I.End(this);
WP2_CHECK_STATUS(status);
return WP2_STATUS_OK;
}
//------------------------------------------------------------------------------
static class : public WorkerInterface {
WP2Status Sync(WorkerBase* const worker) const override {
if (worker == nullptr) return WP2_STATUS_NULL_PARAMETER;
#ifdef WP2_USE_THREAD
ChangeState(worker, Worker::OK);
#endif
assert(worker->state_ <= Worker::OK);
return worker->status_;
}
bool Reset(WorkerBase* const worker, bool do_mt,
uint32_t stack_size) const override {
if (worker == nullptr) return false;
bool ok = true;
worker->status_ = WP2_STATUS_OK;
if (worker->state_ < Worker::OK) {
worker->state_ = Worker::OK;
#ifdef WP2_USE_THREAD
if (do_mt) {
WorkerImpl* const impl = (WorkerImpl*)WP2Calloc(1, sizeof(WorkerImpl));
worker->SetImpl(impl);
if (impl == nullptr) return false;
if (pthread_mutex_init(&impl->mutex, NULL)) goto Error;
if (pthread_cond_init(&impl->condition, NULL)) {
pthread_mutex_destroy(&impl->mutex);
goto Error;
}
pthread_attr_t attr;
ok = (pthread_attr_init(&attr) == 0);
if (ok && stack_size > 0) {
ok = (pthread_attr_setstacksize(&attr, stack_size) == 0);
}
if (ok) {
pthread_mutex_lock(&impl->mutex);
ok = (pthread_create(&impl->thread, &attr, ThreadLoop, worker) == 0);
if (ok) worker->state_ = Worker::OK;
pthread_mutex_unlock(&impl->mutex);
}
if (pthread_attr_destroy(&attr) != 0) ok = false;
if (!ok) {
pthread_mutex_destroy(&impl->mutex);
pthread_cond_destroy(&impl->condition);
Error:
WP2Free(impl);
worker->SetImpl(nullptr);
worker->state_ = Worker::NOT_OK;
return false;
}
}
#endif
} else if (worker->state_ == Worker::WORK) {
ok = (Sync(worker) == WP2_STATUS_OK);
}
assert(!ok || worker->state_ == Worker::OK);
return ok;
}
void Launch(WorkerBase* const worker) const override {
if (worker == nullptr) return; // !?
if (worker->state_ == Worker::OK) {
#ifdef WP2_USE_THREAD
if (worker->GetImpl() != nullptr) {
ChangeState(worker, Worker::WORK);
return;
}
// fall through
#endif
worker->status_ = worker->Execute(); // direct call
}
}
void End(WorkerBase* const worker) const override {
if (worker == nullptr) return; // !?
#ifdef WP2_USE_THREAD
WorkerImpl* const impl = (WorkerImpl*)worker->GetImpl();
if (impl != nullptr) {
ChangeState(worker, Worker::NOT_OK);
pthread_join(impl->thread, NULL);
pthread_mutex_destroy(&impl->mutex);
pthread_cond_destroy(&impl->condition);
WP2Free(impl);
worker->SetImpl(nullptr);
assert(worker->state_ == Worker::NOT_OK);
return;
}
#endif
worker->state_ = Worker::NOT_OK;
assert(worker->GetImpl() == nullptr);
}
} kDefaultInterface; // NOLINT
//------------------------------------------------------------------------------
static const WorkerInterface* g_worker_interface = &kDefaultInterface;
void SetWorkerInterface(const WorkerInterface& winterface) {
g_worker_interface = &winterface;
}
const WorkerInterface& GetWorkerInterface() { return *g_worker_interface; }
//------------------------------------------------------------------------------
} // namespace WP2