blob: 4ce8ec53a220d992cd2036f2a1358826a1cbd5e9 [file] [log] [blame]
#include <stdlib.h>
#include "workQueue.h"
#include "mt.h"
struct workQueue {
int posR, posW, size;
int status;
pthread_mutex_t lock;
pthread_cond_t cond;
void *buf[];
};
/*
* FUNCTION: workQueueAlloc
* USE: Create and return a work queue
* PARAMS: numSlots - how many slots the queue should have
* RETURN: The workqueue or NULL on error
* NOTES:
*/
struct workQueue* workQueueAlloc(int numSlots)
{
struct workQueue* q;
numSlots++; /* one always unused */
q = malloc(sizeof(struct workQueue) + sizeof(void* [numSlots]));
if (q) {
q->posR = 0;
q->posW = 0;
q->status = 0;
q->size = numSlots;
if (pthread_mutex_init(&q->lock, NULL)) {
free(q);
q = NULL;
}
else if (pthread_cond_init(&q->cond, NULL)) {
pthread_mutex_destroy(&q->lock);
free(q);
q = NULL;
}
}
return q;
}
/*
* FUNCTION: workQueueFree
* USE: Free a workqueue
* PARAMS: workQueue - the workQueue
* freeCbk - callback to call on each existing request
* RETURN: NONE
* NOTES: Your job to make sure nobody's waiting on this queue when you do this
*/
void workQueueFree(struct workQueue* q, void (*freeCbk)(void*))
{
pthread_mutex_lock(&q->lock);
while (q->posR != q->posW) {
if(freeCbk)
freeCbk(q->buf[q->posR]);
if (++q->posR == q->size)
q->posR = 0;
}
pthread_mutex_unlock(&q->lock);
pthread_mutex_destroy(&q->lock);
pthread_cond_destroy(&q->cond);
free(q);
}
/*
* FUNCTION: workQueueGet
* USE: Get some work from the queue
* PARAMS: workQueue - the workQueue
* workP - work pointer will be written here
* RETURN: Current status (0 for none, else whatever
* workQueueWakeAll has set it to)
* NOTES: Will block untill work is available or woken explicitly
*/
int workQueueGet(struct workQueue* q, void** workP)
{
int ret = 0;
pthread_mutex_lock(&q->lock);
while (q->posR == q->posW && !q->status) {
pthread_cond_wait(&q->cond, &q->lock);
}
if (q->status)
ret = q->status;
else {
*workP = q->buf[q->posR++];
if (q->posR == q->size)
q->posR = 0;
}
pthread_mutex_unlock(&q->lock);
return ret;
}
/*
* FUNCTION: workQueuePut
* USE: Put some work into the queue
* PARAMS: workQueue - the workQueue
* work - the work to enqueue
* RETURN: true if all OK, false if queue is out of space
* NOTES:
*/
bool workQueuePut(struct workQueue *q, void* work)
{
int newIdx;
bool ret = true;
pthread_mutex_lock(&q->lock);
newIdx = q->posW + 1;
if (newIdx == q->size)
newIdx = 0;
if (newIdx == q->posR) //list is full
ret = false;
else {
q->buf[q->posW] = work;
q->posW = newIdx;
}
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->lock);
return ret;
}
/*
* FUNCTION: workQueueWakeAll
* USE: Wake all workers blocked on the queue
* PARAMS: workQueue - the workQueue
* status - the status to broadcast, NONZERO ONLY!
* to restore the queue to working state, call with zero
* RETURN: NONE
* NOTES: Workers will be woken. It is up to you to make sure they know why
*/
void workQueueWakeAll(struct workQueue* q, int status)
{
pthread_mutex_lock(&q->lock);
q->status = status;
pthread_cond_broadcast(&q->cond);
pthread_mutex_unlock(&q->lock);
}