blob: 732dd75375beba6fb8133f190455e348e4295390 [file] [log] [blame]
#include "rar.hpp"
#ifdef RAR_SMP
#include "threadmisc.cpp"
#ifdef _WIN_ALL
int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
#endif
ThreadPool::ThreadPool(uint MaxThreads)
{
MaxAllowedThreads = MaxThreads;
if (MaxAllowedThreads>MaxPoolThreads)
MaxAllowedThreads=MaxPoolThreads;
if (MaxAllowedThreads==0)
MaxAllowedThreads=1;
ThreadsCreatedCount=0;
// If we have more threads than queue size, we'll hang on pool destroying,
// not releasing all waiting threads.
if (MaxAllowedThreads>ASIZE(TaskQueue))
MaxAllowedThreads=ASIZE(TaskQueue);
Closing=false;
bool Success = CriticalSectionCreate(&CritSection);
#ifdef _WIN_ALL
QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
#elif defined(_UNIX)
AnyActive = false;
QueuedTasksCnt = 0;
Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
#endif
if (!Success)
{
ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
ErrHandler.Exit(RARX_FATAL);
}
QueueTop = 0;
QueueBottom = 0;
ActiveThreads = 0;
}
ThreadPool::~ThreadPool()
{
WaitDone();
Closing=true;
#ifdef _WIN_ALL
ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
#elif defined(_UNIX)
// Threads still can access QueuedTasksCnt for a short time after WaitDone(),
// so lock is required. We would occassionally hang without it.
pthread_mutex_lock(&QueuedTasksCntMutex);
QueuedTasksCnt+=ASIZE(TaskQueue);
pthread_mutex_unlock(&QueuedTasksCntMutex);
pthread_cond_broadcast(&QueuedTasksCntCond);
#endif
for(uint I=0;I<ThreadsCreatedCount;I++)
{
#ifdef _WIN_ALL
// Waiting until the thread terminates.
CWaitForSingleObject(ThreadHandles[I]);
#endif
// Close the thread handle. In Unix it results in pthread_join call,
// which also waits for thread termination.
ThreadClose(ThreadHandles[I]);
}
CriticalSectionDelete(&CritSection);
#ifdef _WIN_ALL
CloseHandle(QueuedTasksCnt);
CloseHandle(NoneActive);
#elif defined(_UNIX)
pthread_cond_destroy(&AnyActiveCond);
pthread_mutex_destroy(&AnyActiveMutex);
pthread_cond_destroy(&QueuedTasksCntCond);
pthread_mutex_destroy(&QueuedTasksCntMutex);
#endif
}
void ThreadPool::CreateThreads()
{
for(uint I=0;I<MaxAllowedThreads;I++)
{
ThreadHandles[I] = ThreadCreate(PoolThread, this);
ThreadsCreatedCount++;
#ifdef _WIN_ALL
if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
#endif
}
}
NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
{
((ThreadPool*)Param)->PoolThreadLoop();
return 0;
}
void ThreadPool::PoolThreadLoop()
{
QueueEntry Task;
while (GetQueuedTask(&Task))
{
Task.Proc(Task.Param);
CriticalSectionStart(&CritSection);
if (--ActiveThreads == 0)
{
#ifdef _WIN_ALL
SetEvent(NoneActive);
#elif defined(_UNIX)
pthread_mutex_lock(&AnyActiveMutex);
AnyActive=false;
pthread_cond_signal(&AnyActiveCond);
pthread_mutex_unlock(&AnyActiveMutex);
#endif
}
CriticalSectionEnd(&CritSection);
}
}
bool ThreadPool::GetQueuedTask(QueueEntry *Task)
{
#ifdef _WIN_ALL
CWaitForSingleObject(QueuedTasksCnt);
#elif defined(_UNIX)
pthread_mutex_lock(&QueuedTasksCntMutex);
while (QueuedTasksCnt==0)
cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
QueuedTasksCnt--;
pthread_mutex_unlock(&QueuedTasksCntMutex);
#endif
if (Closing)
return false;
CriticalSectionStart(&CritSection);
*Task = TaskQueue[QueueBottom];
QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
CriticalSectionEnd(&CritSection);
return true;
}
// Add task to queue. We assume that it is always called from main thread,
// it allows to avoid any locks here. We process collected tasks only
// when WaitDone is called.
void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
{
if (ThreadsCreatedCount == 0)
CreateThreads();
// If queue is full, wait until it is empty.
if ((QueueTop + 1) % ASIZE(TaskQueue) == QueueBottom)
WaitDone();
TaskQueue[QueueTop].Proc = Proc;
TaskQueue[QueueTop].Param = Data;
QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
}
// Start queued tasks and wait until all threads are inactive.
// We assume that it is always called from main thread, when pool threads
// are sleeping yet.
void ThreadPool::WaitDone()
{
// We add ASIZE(TaskQueue) for case if TaskQueue array size is not
// a power of two. Negative numbers would not suit our purpose here.
ActiveThreads=(QueueTop+ASIZE(TaskQueue)-QueueBottom) % ASIZE(TaskQueue);
if (ActiveThreads==0)
return;
#ifdef _WIN_ALL
ResetEvent(NoneActive);
ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
CWaitForSingleObject(NoneActive);
#elif defined(_UNIX)
AnyActive=true;
// Threads reset AnyActive before accessing QueuedTasksCnt and even
// preceding WaitDone() call does not guarantee that some slow thread
// is not accessing QueuedTasksCnt now. So lock is necessary.
pthread_mutex_lock(&QueuedTasksCntMutex);
QueuedTasksCnt+=ActiveThreads;
pthread_mutex_unlock(&QueuedTasksCntMutex);
pthread_cond_broadcast(&QueuedTasksCntCond);
pthread_mutex_lock(&AnyActiveMutex);
while (AnyActive)
cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
pthread_mutex_unlock(&AnyActiveMutex);
#endif
}
#endif // RAR_SMP