blob: 963188cfc05cc1673d41e248fe917da6d364813d [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "platform/impl/windows/thread_pool.h"
#include <stdio.h>
#include <iomanip>
#include <iostream>
#include "platform/impl/windows/runner.h"
#include "platform/public/logging.h"
namespace location {
namespace nearby {
namespace windows {
#define POOL_NAME_BUFFER_SIZE 64
#define EVENT_NAME_BUFFER_SIZE 64
__declspec(align(8)) volatile long ThreadPool::instance_ = // NOLINT
0; // NOLINT because the Windows function takes a volatile long
DWORD WINAPI ThreadPool::_ThreadProc(LPVOID pParam) {
DWORD wait;
ThreadPool* pool;
DWORD threadId = GetCurrentThreadId();
HANDLE waits[2];
std::unique_ptr<Runner> runner;
_ASSERT(pParam != NULL);
if (NULL == pParam) {
NEARBY_LOGS(ERROR) << __func__ << ": pParam must not be null.";
return -1;
}
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Starting thread id: " << threadId;
pool = static_cast<ThreadPool*>(pParam);
waits[0] = pool->GetWaitHandle(threadId);
waits[1] = pool->GetShutdownHandle();
loop_here:
wait = WaitForMultipleObjects(2, waits, FALSE, INFINITE);
if (wait == 1) {
if (pool->CheckThreadStop()) {
if (pool->GetWorkingThreadCount() < 1) {
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Pool is being destroyed, and working thread "
"count is 0, thread exiting.";
return 0;
}
}
}
// a new function was added, go and get it
runner = nullptr;
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": On thread id: " << threadId
<< ", checking for work.";
if (pool->GetThreadProc(threadId, std::move(runner))) {
pool->BusyNotify(threadId);
runner->Run();
pool->FinishNotify(threadId); // tell the pool, i am now free
}
goto loop_here;
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": Thread shutdown occurred.";
return 0;
}
ThreadPool::ThreadPool(int nPoolSize, bool bCreateNow)
: function_list_(std::make_unique<FunctionList>()),
thread_map_(std::make_unique<ThreadMap>()),
thread_handles_(nullptr),
wait_for_threads_to_die_ms_(500),
notify_shutdown_(nullptr) {
// The MAXIMUM_WAIT_OBJECTS is the limiting factor, currently
// windows has a max of 64. This means we can only wait on up
// to 64 threads, anything more gives undesirable results.
if (nPoolSize > 63) {
NEARBY_LOGS(ERROR) << __func__ << ": Thread pool max size exceeded.";
throw ThreadPoolException("Thread pool max size exceeded.");
}
pool_state_ = State::Destroyed;
pool_size_ = nPoolSize;
InitializeCriticalSection(&critical_section_);
if (bCreateNow) {
if (!Create()) {
NEARBY_LOGS(ERROR) << __func__ << ": Thread pool creation failed.";
throw ThreadPoolException("Thread pool creation failed.");
}
}
}
bool ThreadPool::Create() {
if (pool_state_ != State::Destroyed) {
// To create a new pool, destroy the existing one first
NEARBY_LOGS(ERROR) << __func__
<< ": Attempt to create a new thread pool before "
"destroying the old one.";
return false;
}
char buffer[POOL_NAME_BUFFER_SIZE];
snprintf(buffer, POOL_NAME_BUFFER_SIZE, "Pool%d",
(uint32_t)(InterlockedIncrement(
&ThreadPool::instance_))); // InterlockedIncrement done here
// since there's no access to the
// instance_ var except through the
// interlocked functions
pool_name_ = std::string(buffer);
// create the event which will signal the threads to stop
std::string eventName;
notify_shutdown_ = CreateEvent(NULL, TRUE, FALSE, NULL);
_ASSERT(notify_shutdown_ != NULL);
if (!notify_shutdown_) {
NEARBY_LOGS(ERROR) << "Error: " << __func__
<< ": Failed to create thread shut down event.";
return false;
}
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
int numCPU = sysinfo.dwNumberOfProcessors;
int threadsToCreate = 0;
// We are going to initially allocate the first n
// threads based on the number of logical cores
if (pool_size_ > numCPU) {
threadsToCreate = STARTUP_THREAD_COUNT;
} else {
threadsToCreate = pool_size_;
}
thread_handles_ = new HANDLE[pool_size_];
// create the threads
for (int index = 0; index < threadsToCreate; index++) {
CreateThreadPoolThread(&thread_handles_[index]);
}
pool_state_ = State::Ready;
return true;
}
ThreadPool::~ThreadPool() {
Destroy();
ReleaseMemory();
DeleteCriticalSection(&critical_section_);
}
DWORD ThreadPool::CreateThreadPoolThread(HANDLE* handles) {
HANDLE thread;
DWORD threadId;
std::unique_ptr<ThreadData> threadData = std::make_unique<ThreadData>();
char buffer[EVENT_NAME_BUFFER_SIZE];
snprintf(buffer, EVENT_NAME_BUFFER_SIZE, "PID:%ld IID:%d TDX:%d",
GetCurrentProcessId(),
(uint32_t)(InterlockedAdd(&ThreadPool::instance_, 0)),
(int)thread_map_->size());
thread = CreateThread(NULL, 0, ThreadPool::_ThreadProc, this,
CREATE_SUSPENDED, &threadId);
_ASSERT(NULL != thread);
if (NULL == thread) {
NEARBY_LOGS(ERROR) << "Error: " << __func__ << ": Failed to create thread.";
return NULL;
}
if (thread) {
// add the entry to the map of threads
EnterCriticalSection(&critical_section_);
threadData->free = true;
threadData->wait_handle = CreateEventA(NULL, TRUE, FALSE, buffer);
threadData->thread_handle = thread;
threadData->thread_id = threadId;
thread_map_->insert(ThreadMap::value_type(threadId, std::move(threadData)));
*handles = thread;
LeaveCriticalSection(&critical_section_);
ResumeThread(thread);
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Thread created, handle: " << thread
<< ", id: " << threadId;
return threadId;
} else {
NEARBY_LOGS(ERROR) << "Error: " << __func__ << ": Failed to create thread.";
return NULL;
}
}
void ThreadPool::ReleaseMemory() {
// empty all collections
EnterCriticalSection(&critical_section_);
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Clearing the function list.";
function_list_->clear();
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": Clearing the thread map.";
thread_map_->clear();
LeaveCriticalSection(&critical_section_);
}
void ThreadPool::Destroy() {
if (pool_state_ == State::Destroying || pool_state_ == State::Destroyed)
return;
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": Destroying thread pool.";
pool_state_ = State::Destroying;
bool notDone = true;
EnterCriticalSection(&critical_section_);
ThreadMap::iterator iter = thread_map_->begin();
int index = 0;
// Build an array of handles
while (iter != thread_map_->end()) {
thread_handles_[index] = iter->second->thread_handle;
index++;
iter++;
}
LeaveCriticalSection(&critical_section_);
// tell all threads to shutdown.
_ASSERT(NULL != notify_shutdown_);
SetEvent(GetShutdownHandle());
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Setting waits for the threads to exit.";
if (pool_size_ == 1) {
// Waits until the specified object is in the signaled state or the time-out
// interval elapses.
// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject
auto wait = WaitForSingleObject(
thread_handles_[0], // A handle to the object
INFINITE // The time-out interval, in milliseconds.
);
} else {
auto wait =
// Waits until one or all of the specified objects are in the signaled
// state or the time-out interval elapses.
// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitformultipleobjects
WaitForMultipleObjects(
index, // The number of object handles in the array.
thread_handles_, // An array of object handles.
true, // If this parameter is TRUE, the function returns when the
// state of all objects in the handles array are signaled.
INFINITE // The time-out interval, in milliseconds.
);
}
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": All threads have exited.";
delete[] thread_handles_;
// close the shutdown event
CloseHandle(notify_shutdown_);
notify_shutdown_ = NULL;
EnterCriticalSection(&critical_section_);
ThreadMap::iterator threadMapIterator;
// walk through the events and threads and close them all
for (threadMapIterator = thread_map_->begin();
threadMapIterator != thread_map_->end(); threadMapIterator++) {
NEARBY_LOGS(VERBOSE) << "Info: " << __func__ << ": Closing thread handle: "
<< threadMapIterator->second->thread_handle
<< " thread id: "
<< threadMapIterator->second->thread_id
<< " wait_handle: "
<< threadMapIterator->second->wait_handle;
CloseHandle(threadMapIterator->second->wait_handle);
CloseHandle(threadMapIterator->second->thread_handle);
}
LeaveCriticalSection(&critical_section_);
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": Sending the shutdown event.";
ReleaseMemory(); // free any remaining UserPoolData objects
InterlockedDecrement(&ThreadPool::instance_);
pool_state_ = State::Destroyed;
}
int ThreadPool::GetPoolSize() { return pool_size_; }
void ThreadPool::SetPoolSize(int nSize) {
_ASSERT(nSize > 0);
if (nSize <= 0) {
NEARBY_LOGS(ERROR)
<< __func__ << ": 0 or negative value is not a valid thread pool size.";
return;
}
pool_size_ = nSize;
}
HANDLE ThreadPool::GetShutdownHandle() { return notify_shutdown_; }
bool ThreadPool::GetThreadProc(DWORD threadId,
std::unique_ptr<Runner>&& runner) {
// get the first function info in the function list
FunctionList::iterator functionListIterator;
bool haveAnotherRunner = false;
EnterCriticalSection(&critical_section_);
functionListIterator = function_list_->begin();
if (functionListIterator != function_list_->end()) {
runner = std::move(*functionListIterator);
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": popping runner from the front.";
function_list_->pop_front(); // remove the function from the list
haveAnotherRunner = true;
} else {
thread_map_->at(threadId)->free = true;
ResetEvent(thread_map_->at(threadId)->wait_handle);
}
LeaveCriticalSection(&critical_section_);
return haveAnotherRunner;
}
void ThreadPool::FinishNotify(DWORD threadId) {
ThreadMap::iterator threadMapIterator;
EnterCriticalSection(&critical_section_);
threadMapIterator = thread_map_->find(threadId);
if (threadMapIterator == thread_map_->end()) // if search found no elements
{
_ASSERT(!"No matching thread found.");
NEARBY_LOGS(ERROR) << __func__ << ": No matching thread found.";
} else {
thread_map_->at(threadId)->free = true;
if (!function_list_->empty()) {
// there are some more functions that need servicing, lets do that.
// By not doing anything here we are letting the thread go back and
// check the function list and pick up a function and execute it.
thread_map_->at(threadId)->free = false;
} else {
ResetEvent(thread_map_->at(threadId)->wait_handle);
}
}
LeaveCriticalSection(&critical_section_);
}
void ThreadPool::BusyNotify(DWORD threadId) {
ThreadMap::iterator iter;
EnterCriticalSection(&critical_section_);
iter = thread_map_->find(threadId);
if (iter == thread_map_->end()) // if search found no elements
{
_ASSERT(!"No matching thread found.");
} else {
thread_map_->at(threadId)->free = false;
}
LeaveCriticalSection(&critical_section_);
}
bool ThreadPool::Run(std::unique_ptr<Runner> runner) {
if (pool_state_ == State::Destroying || pool_state_ == State::Destroyed)
return false;
_ASSERT(runner != NULL);
AddRunner(std::move(runner));
// See if any threads are free
ThreadMap::iterator iterator;
std::unique_ptr<ThreadData> threadData;
bool freeThreadFound = false;
EnterCriticalSection(&critical_section_);
for (iterator = thread_map_->begin(); iterator != thread_map_->end();
iterator++) {
if (iterator->second->free) {
// here is a free thread, put it to work
iterator->second->free = false;
SetEvent(iterator->second->wait_handle);
// this thread will now call GetThreadProc() and pick up the next
// function in the list.
freeThreadFound = true;
break;
}
}
if (!freeThreadFound && thread_map_->size() < pool_size_) {
// We haven't used up all of our threads, go ahead and spin up another one
DWORD threadId =
CreateThreadPoolThread(&thread_handles_[thread_map_->size()]);
thread_map_->at(threadId)->free = false;
SetEvent(thread_map_->at(threadId)->wait_handle);
}
LeaveCriticalSection(&critical_section_);
return true;
}
void ThreadPool::AddRunner(std::unique_ptr<Runner> runner) {
// add it to the list
runner->thread_pool_ = this;
NEARBY_LOGS(VERBOSE) << "Info: " << __func__
<< ": pushing new runner to the back.";
EnterCriticalSection(&critical_section_);
function_list_->push_back(std::move(runner));
LeaveCriticalSection(&critical_section_);
}
HANDLE ThreadPool::GetWaitHandle(DWORD dwThreadId) {
HANDLE hWait = NULL;
ThreadMap::iterator iter;
EnterCriticalSection(&critical_section_);
iter = thread_map_->find(dwThreadId);
if (iter != thread_map_->end()) // if search found no elements
{
hWait = thread_map_->at(dwThreadId)->wait_handle;
}
LeaveCriticalSection(&critical_section_);
return hWait;
}
bool ThreadPool::CheckThreadStop() {
EnterCriticalSection(&critical_section_);
bool bRet =
(pool_state_ == State::Destroying || pool_state_ == State::Destroyed);
LeaveCriticalSection(&critical_section_);
return bRet;
}
int ThreadPool::GetWorkingThreadCount() {
ThreadMap::iterator iter;
int nCount = 0;
EnterCriticalSection(&critical_section_);
for (iter = thread_map_->begin(); iter != thread_map_->end(); iter++) {
if (function_list_->empty()) {
iter->second->free = true;
}
if (!iter->second->free) {
nCount++;
}
}
LeaveCriticalSection(&critical_section_);
return nCount;
}
State ThreadPool::GetState() { return pool_state_; }
} // namespace windows
} // namespace nearby
} // namespace location