| #define _GNU_SOURCE |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <stdint.h> |
| #include <limits.h> |
| #ifndef WIN32 |
| #include <sys/queue.h> |
| #endif |
| |
| #include <unistd.h> |
| #include <pthread.h> |
| |
| #include <event2/event.h> |
| #include <event2/thread.h> |
| |
| #include "internal.h" |
| #include "evhtp/thread.h" |
| |
| typedef struct evthr_cmd evthr_cmd_t; |
| typedef struct evthr_pool_slist evthr_pool_slist_t; |
| |
| struct evthr_cmd { |
| uint8_t stop; |
| void * args; |
| evthr_cb cb; |
| } __attribute__((packed)); |
| |
| TAILQ_HEAD(evthr_pool_slist, evthr); |
| |
| struct evthr_pool { |
| #ifdef EVTHR_SHARED_PIPE |
| int rdr; |
| int wdr; |
| #endif |
| int nthreads; |
| evthr_pool_slist_t threads; |
| }; |
| |
| struct evthr { |
| int rdr; |
| int wdr; |
| char err; |
| ev_t * event; |
| evbase_t * evbase; |
| pthread_mutex_t lock; |
| pthread_t * thr; |
| evthr_init_cb init_cb; |
| evthr_init_cb exit_cb; |
| void * arg; |
| void * aux; |
| |
| #ifdef EVTHR_SHARED_PIPE |
| int pool_rdr; |
| struct event * shared_pool_ev; |
| #endif |
| TAILQ_ENTRY(evthr) next; |
| }; |
| |
| #define _evthr_read(thr, cmd, sock) \ |
| (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0 |
| |
| static void |
| _evthr_read_cmd(evutil_socket_t sock, short which, void * args) { |
| evthr_t * thread; |
| evthr_cmd_t cmd; |
| int stopped; |
| |
| if (!(thread = (evthr_t *)args)) { |
| return; |
| } |
| |
| stopped = 0; |
| |
| if (evhtp_likely(_evthr_read(thread, &cmd, sock) == 1)) { |
| stopped = cmd.stop; |
| |
| if (evhtp_likely(cmd.cb != NULL)) { |
| (cmd.cb)(thread, cmd.args, thread->arg); |
| } |
| } |
| |
| if (evhtp_unlikely(stopped == 1)) { |
| event_base_loopbreak(thread->evbase); |
| } |
| |
| return; |
| } /* _evthr_read_cmd */ |
| |
| static void * |
| _evthr_loop(void * args) { |
| evthr_t * thread; |
| |
| if (!(thread = (evthr_t *)args)) { |
| return NULL; |
| } |
| |
| if (thread == NULL || thread->thr == NULL) { |
| pthread_exit(NULL); |
| } |
| |
| thread->evbase = event_base_new(); |
| thread->event = event_new(thread->evbase, thread->rdr, |
| EV_READ | EV_PERSIST, _evthr_read_cmd, args); |
| |
| event_add(thread->event, NULL); |
| |
| #ifdef EVTHR_SHARED_PIPE |
| if (thread->pool_rdr > 0) { |
| thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr, |
| EV_READ | EV_PERSIST, _evthr_read_cmd, args); |
| event_add(thread->shared_pool_ev, NULL); |
| } |
| #endif |
| |
| pthread_mutex_lock(&thread->lock); |
| if (thread->init_cb != NULL) { |
| (thread->init_cb)(thread, thread->arg); |
| } |
| pthread_mutex_unlock(&thread->lock); |
| |
| event_base_loop(thread->evbase, 0); |
| |
| pthread_mutex_lock(&thread->lock); |
| if (thread->exit_cb != NULL) { |
| (thread->exit_cb)(thread, thread->arg); |
| } |
| pthread_mutex_unlock(&thread->lock); |
| |
| if (thread->err == 1) { |
| fprintf(stderr, "FATAL ERROR!\n"); |
| } |
| |
| pthread_exit(NULL); |
| } /* _evthr_loop */ |
| |
| evthr_res |
| evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) { |
| evthr_cmd_t cmd = { |
| .cb = cb, |
| .args = arg, |
| .stop = 0 |
| }; |
| |
| if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) { |
| return EVTHR_RES_RETRY; |
| } |
| |
| return EVTHR_RES_OK; |
| } |
| |
| evthr_res |
| evthr_stop(evthr_t * thread) { |
| evthr_cmd_t cmd = { |
| .cb = NULL, |
| .args = NULL, |
| .stop = 1 |
| }; |
| |
| if (send(thread->wdr, &cmd, sizeof(evthr_cmd_t), 0) < 0) { |
| return EVTHR_RES_RETRY; |
| } |
| |
| pthread_join(*thread->thr, NULL); |
| return EVTHR_RES_OK; |
| } |
| |
| evbase_t * |
| evthr_get_base(evthr_t * thr) { |
| return thr ? thr->evbase : NULL; |
| } |
| |
| void |
| evthr_set_aux(evthr_t * thr, void * aux) { |
| if (thr) { |
| thr->aux = aux; |
| } |
| } |
| |
| void * |
| evthr_get_aux(evthr_t * thr) { |
| return thr ? thr->aux : NULL; |
| } |
| |
| int |
| evthr_set_initcb(evthr_t * thr, evthr_init_cb cb) { |
| if (thr == NULL) { |
| return -1; |
| } |
| |
| thr->init_cb = cb; |
| |
| return 01; |
| } |
| |
| int |
| evthr_set_exitcb(evthr_t * thr, evthr_exit_cb cb) { |
| if (thr == NULL) { |
| return -1; |
| } |
| |
| thr->exit_cb = cb; |
| |
| return 0; |
| } |
| |
| static evthr_t * |
| _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args) { |
| evthr_t * thread; |
| int fds[2]; |
| |
| if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) { |
| return NULL; |
| } |
| |
| evutil_make_socket_nonblocking(fds[0]); |
| evutil_make_socket_nonblocking(fds[1]); |
| |
| if (!(thread = calloc(sizeof(evthr_t), 1))) { |
| return NULL; |
| } |
| |
| thread->thr = malloc(sizeof(pthread_t)); |
| thread->arg = args; |
| thread->rdr = fds[0]; |
| thread->wdr = fds[1]; |
| |
| evthr_set_initcb(thread, init_cb); |
| evthr_set_exitcb(thread, exit_cb); |
| |
| if (pthread_mutex_init(&thread->lock, NULL)) { |
| evthr_free(thread); |
| return NULL; |
| } |
| |
| return thread; |
| } /* evthr_new */ |
| |
| evthr_t * |
| evthr_new(evthr_init_cb init_cb, void * args) { |
| return _evthr_new(init_cb, NULL, args); |
| } |
| |
| evthr_t * |
| evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args) { |
| return _evthr_new(init_cb, exit_cb, args); |
| } |
| |
| int |
| evthr_start(evthr_t * thread) { |
| if (thread == NULL || thread->thr == NULL) { |
| return -1; |
| } |
| |
| if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) { |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| void |
| evthr_free(evthr_t * thread) { |
| if (thread == NULL) { |
| return; |
| } |
| |
| if (thread->rdr > 0) { |
| close(thread->rdr); |
| } |
| |
| if (thread->wdr > 0) { |
| close(thread->wdr); |
| } |
| |
| if (thread->thr) { |
| free(thread->thr); |
| } |
| |
| if (thread->event) { |
| event_free(thread->event); |
| } |
| |
| if (thread->evbase) { |
| event_base_free(thread->evbase); |
| } |
| |
| free(thread); |
| } /* evthr_free */ |
| |
| void |
| evthr_pool_free(evthr_pool_t * pool) { |
| evthr_t * thread; |
| evthr_t * save; |
| |
| if (pool == NULL) { |
| return; |
| } |
| |
| TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { |
| TAILQ_REMOVE(&pool->threads, thread, next); |
| |
| evthr_free(thread); |
| } |
| |
| free(pool); |
| } |
| |
| evthr_res |
| evthr_pool_stop(evthr_pool_t * pool) { |
| evthr_t * thr; |
| evthr_t * save; |
| |
| if (pool == NULL) { |
| return EVTHR_RES_FATAL; |
| } |
| |
| TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) { |
| evthr_stop(thr); |
| } |
| |
| return EVTHR_RES_OK; |
| } |
| |
| evthr_res |
| evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) { |
| #ifdef EVTHR_SHARED_PIPE |
| evthr_cmd_t cmd = { |
| .cb = cb, |
| .args = arg, |
| .stop = 0 |
| }; |
| |
| if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) { |
| return EVTHR_RES_RETRY; |
| } |
| |
| return EVTHR_RES_OK; |
| #else |
| evthr_t * thr = NULL; |
| |
| if (pool == NULL) { |
| return EVTHR_RES_FATAL; |
| } |
| |
| if (cb == NULL) { |
| return EVTHR_RES_NOCB; |
| } |
| |
| thr = TAILQ_FIRST(&pool->threads); |
| |
| TAILQ_REMOVE(&pool->threads, thr, next); |
| TAILQ_INSERT_TAIL(&pool->threads, thr, next); |
| |
| |
| return evthr_defer(thr, cb, arg); |
| #endif |
| } /* evthr_pool_defer */ |
| |
| static evthr_pool_t * |
| _evthr_pool_new(int nthreads, |
| evthr_init_cb init_cb, |
| evthr_exit_cb exit_cb, |
| void * shared) { |
| evthr_pool_t * pool; |
| int i; |
| |
| #ifdef EVTHR_SHARED_PIPE |
| int fds[2]; |
| #endif |
| |
| if (nthreads == 0) { |
| return NULL; |
| } |
| |
| if (!(pool = calloc(sizeof(evthr_pool_t), 1))) { |
| return NULL; |
| } |
| |
| pool->nthreads = nthreads; |
| TAILQ_INIT(&pool->threads); |
| |
| #ifdef EVTHR_SHARED_PIPE |
| if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) { |
| return NULL; |
| } |
| |
| evutil_make_socket_nonblocking(fds[0]); |
| evutil_make_socket_nonblocking(fds[1]); |
| |
| pool->rdr = fds[0]; |
| pool->wdr = fds[1]; |
| #endif |
| |
| for (i = 0; i < nthreads; i++) { |
| evthr_t * thread; |
| |
| if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { |
| evthr_pool_free(pool); |
| return NULL; |
| } |
| |
| #ifdef EVTHR_SHARED_PIPE |
| thread->pool_rdr = fds[0]; |
| #endif |
| |
| TAILQ_INSERT_TAIL(&pool->threads, thread, next); |
| } |
| |
| return pool; |
| } /* _evthr_pool_new */ |
| |
| evthr_pool_t * |
| evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) { |
| return _evthr_pool_new(nthreads, init_cb, NULL, shared); |
| } |
| |
| evthr_pool_t * |
| evthr_pool_wexit_new(int nthreads, |
| evthr_init_cb init_cb, |
| evthr_exit_cb exit_cb, void * shared) { |
| return _evthr_pool_new(nthreads, init_cb, exit_cb, shared); |
| } |
| |
| int |
| evthr_pool_start(evthr_pool_t * pool) { |
| evthr_t * evthr = NULL; |
| |
| if (pool == NULL) { |
| return -1; |
| } |
| |
| TAILQ_FOREACH(evthr, &pool->threads, next) { |
| if (evthr_start(evthr) < 0) { |
| return -1; |
| } |
| |
| usleep(5000); |
| } |
| |
| return 0; |
| } |