| /*************************************************************************** |
| * _ _ ____ _ |
| * Project ___| | | | _ \| | |
| * / __| | | | |_) | | |
| * | (__| |_| | _ <| |___ |
| * \___|\___/|_| \_\_____| |
| * |
| * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. |
| * |
| * This software is licensed as described in the file COPYING, which |
| * you should have received as part of this distribution. The terms |
| * are also available at https://curl.se/docs/copyright.html. |
| * |
| * You may opt to use, copy, modify, merge, publish, distribute and/or sell |
| * copies of the Software, and permit persons to whom the Software is |
| * furnished to do so, under the terms of the COPYING file. |
| * |
| * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY |
| * KIND, either express or implied. |
| * |
| * SPDX-License-Identifier: curl |
| * |
| ***************************************************************************/ |
| #include "curl_setup.h" |
| |
| #ifdef USE_THREADS |
| |
| #include "llist.h" |
| #include "curl_threads.h" |
| #include "curlx/timeval.h" |
| #include "thrdpool.h" |
| #ifdef CURLVERBOSE |
| #include "curl_trc.h" |
| #include "urldata.h" |
| #endif |
| |
| |
| struct thrdslot { |
| struct Curl_llist_node node; |
| struct curl_thrdpool *tpool; |
| curl_thread_t thread; |
| curl_cond_t await; |
| struct curltime starttime; |
| const char *work_description; |
| timediff_t work_timeout_ms; |
| uint32_t id; |
| BIT(running); |
| BIT(idle); |
| }; |
| |
| struct curl_thrdpool { |
| char *name; |
| uint64_t refcount; |
| curl_mutex_t lock; |
| curl_cond_t await; |
| struct Curl_llist slots; |
| struct Curl_llist zombies; |
| Curl_thrdpool_take_item_cb *fn_take; |
| Curl_thrdpool_process_item_cb *fn_process; |
| Curl_thrdpool_return_item_cb *fn_return; |
| void *fn_user_data; |
| CURLcode fatal_err; |
| uint32_t min_threads; |
| uint32_t max_threads; |
| uint32_t idle_time_ms; |
| uint32_t next_id; |
| BIT(aborted); |
| BIT(detached); |
| }; |
| |
| static void thrdpool_join_zombies(struct curl_thrdpool *tpool); |
| static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked); |
| |
| static void thrdslot_destroy(struct thrdslot *tslot) |
| { |
| DEBUGASSERT(tslot->thread == curl_thread_t_null); |
| DEBUGASSERT(!tslot->running); |
| Curl_cond_destroy(&tslot->await); |
| curlx_free(tslot); |
| } |
| |
| static void thrdslot_done(struct thrdslot *tslot) |
| { |
| struct curl_thrdpool *tpool = tslot->tpool; |
| |
| DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots); |
| Curl_node_remove(&tslot->node); |
| tslot->running = FALSE; |
| Curl_llist_append(&tpool->zombies, tslot, &tslot->node); |
| Curl_cond_signal(&tpool->await); |
| } |
| |
| static CURL_THREAD_RETURN_T CURL_STDCALL thrdslot_run(void *arg) |
| { |
| struct thrdslot *tslot = arg; |
| struct curl_thrdpool *tpool = tslot->tpool; |
| void *item; |
| |
| Curl_mutex_acquire(&tpool->lock); |
| DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots); |
| for(;;) { |
| while(!tpool->aborted) { |
| tslot->work_description = NULL; |
| tslot->work_timeout_ms = 0; |
| item = tpool->fn_take(tpool->fn_user_data, &tslot->work_description, |
| &tslot->work_timeout_ms); |
| if(!item) |
| break; |
| tslot->starttime = curlx_now(); |
| tslot->idle = FALSE; |
| Curl_mutex_release(&tpool->lock); |
| |
| tpool->fn_process(item); |
| |
| Curl_mutex_acquire(&tpool->lock); |
| tslot->work_description = NULL; |
| tpool->fn_return(item, tpool->aborted ? NULL : tpool->fn_user_data); |
| } |
| |
| if(tpool->aborted || |
| (Curl_llist_count(&tpool->slots) > tpool->max_threads)) |
| goto out; |
| |
| tslot->idle = TRUE; |
| tslot->starttime = curlx_now(); |
| thrdpool_join_zombies(tpool); |
| Curl_cond_signal(&tpool->await); |
| /* Only wait with idle timeout when we are above the minimum |
| * number of threads. Otherwise short idle timeouts will keep |
| * on activating threads that have no means to shut down. */ |
| if((tpool->idle_time_ms > 0) && |
| (Curl_llist_count(&tpool->slots) > tpool->min_threads)) { |
| CURLcode r = Curl_cond_timedwait(&tslot->await, &tpool->lock, |
| tpool->idle_time_ms); |
| if((r == CURLE_OPERATION_TIMEDOUT) && |
| (Curl_llist_count(&tpool->slots) > tpool->min_threads)) { |
| goto out; |
| } |
| } |
| else { |
| Curl_cond_wait(&tslot->await, &tpool->lock); |
| } |
| } |
| |
| out: |
| thrdslot_done(tslot); |
| if(!thrdpool_unlink(tslot->tpool, TRUE)) { |
| /* tpool not destroyed */ |
| Curl_mutex_release(&tpool->lock); |
| } |
| return 0; |
| } |
| |
| static CURLcode thrdslot_start(struct curl_thrdpool *tpool) |
| { |
| struct thrdslot *tslot; |
| CURLcode result = CURLE_OUT_OF_MEMORY; |
| |
| tslot = curlx_calloc(1, sizeof(*tslot)); |
| if(!tslot) |
| goto out; |
| tslot->id = tpool->next_id++; |
| tslot->tpool = tpool; |
| tslot->thread = curl_thread_t_null; |
| Curl_cond_init(&tslot->await); |
| |
| tpool->refcount++; |
| tslot->running = TRUE; |
| tslot->thread = Curl_thread_create(thrdslot_run, tslot); |
| if(tslot->thread == curl_thread_t_null) { /* never started */ |
| tslot->running = FALSE; |
| thrdpool_unlink(tpool, TRUE); |
| result = CURLE_FAILED_INIT; |
| goto out; |
| } |
| |
| Curl_llist_append(&tpool->slots, tslot, &tslot->node); |
| tslot = NULL; |
| result = CURLE_OK; |
| |
| out: |
| if(tslot) |
| thrdslot_destroy(tslot); |
| return result; |
| } |
| |
| static void thrdpool_wake_all(struct curl_thrdpool *tpool) |
| { |
| struct Curl_llist_node *e; |
| for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| Curl_cond_signal(&tslot->await); |
| } |
| } |
| |
| static void thrdpool_join_zombies(struct curl_thrdpool *tpool) |
| { |
| struct Curl_llist_node *e; |
| |
| for(e = Curl_llist_head(&tpool->zombies); e; |
| e = Curl_llist_head(&tpool->zombies)) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| |
| Curl_node_remove(&tslot->node); |
| if(tslot->thread != curl_thread_t_null) { |
| Curl_mutex_release(&tpool->lock); |
| Curl_thread_join(&tslot->thread); |
| Curl_mutex_acquire(&tpool->lock); |
| tslot->thread = curl_thread_t_null; |
| } |
| thrdslot_destroy(tslot); |
| } |
| } |
| |
| static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked) |
| { |
| DEBUGASSERT(tpool->refcount); |
| if(tpool->refcount) |
| tpool->refcount--; |
| if(tpool->refcount) |
| return FALSE; |
| |
| /* no more references, free */ |
| DEBUGASSERT(tpool->aborted); |
| thrdpool_join_zombies(tpool); |
| if(locked) |
| Curl_mutex_release(&tpool->lock); |
| curlx_free(tpool->name); |
| Curl_cond_destroy(&tpool->await); |
| Curl_mutex_destroy(&tpool->lock); |
| curlx_free(tpool); |
| return TRUE; |
| } |
| |
| static CURLcode thrdpool_signal(struct curl_thrdpool *tpool, |
| uint32_t nthreads) |
| { |
| struct Curl_llist_node *e, *n; |
| CURLcode result = CURLE_OK; |
| |
| DEBUGASSERT(!tpool->aborted); |
| thrdpool_join_zombies(tpool); |
| |
| for(e = Curl_llist_head(&tpool->slots); e && nthreads; e = n) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| n = Curl_node_next(e); |
| if(tslot->idle) { |
| Curl_cond_signal(&tslot->await); |
| --nthreads; |
| } |
| else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { |
| /* starting thread, queries for work soon. */ |
| --nthreads; |
| } |
| } |
| |
| while(nthreads && !result && |
| Curl_llist_count(&tpool->slots) < tpool->max_threads) { |
| result = thrdslot_start(tpool); |
| if(result) |
| break; |
| --nthreads; |
| } |
| |
| return result; |
| } |
| |
| CURLcode Curl_thrdpool_set_props(struct curl_thrdpool *tpool, |
| uint32_t min_threads, |
| uint32_t max_threads, |
| uint32_t idle_time_ms) |
| { |
| CURLcode result = CURLE_OK; |
| size_t running; |
| |
| if(!max_threads || (min_threads > max_threads)) |
| return CURLE_BAD_FUNCTION_ARGUMENT; |
| |
| Curl_mutex_acquire(&tpool->lock); |
| tpool->min_threads = min_threads; |
| tpool->max_threads = max_threads; |
| tpool->idle_time_ms = idle_time_ms; |
| running = Curl_llist_count(&tpool->slots); |
| if(tpool->min_threads > running) { |
| result = thrdpool_signal(tpool, tpool->min_threads - (uint32_t)running); |
| } |
| Curl_mutex_release(&tpool->lock); |
| |
| return result; |
| } |
| |
| CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, |
| const char *name, |
| uint32_t min_threads, |
| uint32_t max_threads, |
| uint32_t idle_time_ms, |
| Curl_thrdpool_take_item_cb *fn_take, |
| Curl_thrdpool_process_item_cb *fn_process, |
| Curl_thrdpool_return_item_cb *fn_return, |
| void *user_data) |
| { |
| struct curl_thrdpool *tpool; |
| CURLcode result = CURLE_OUT_OF_MEMORY; |
| |
| tpool = curlx_calloc(1, sizeof(*tpool)); |
| if(!tpool) |
| goto out; |
| tpool->refcount = 1; |
| |
| Curl_mutex_init(&tpool->lock); |
| Curl_cond_init(&tpool->await); |
| Curl_llist_init(&tpool->slots, NULL); |
| Curl_llist_init(&tpool->zombies, NULL); |
| tpool->fn_take = fn_take; |
| tpool->fn_process = fn_process; |
| tpool->fn_return = fn_return; |
| tpool->fn_user_data = user_data; |
| |
| tpool->name = curlx_strdup(name); |
| if(!tpool->name) |
| goto out; |
| |
| result = Curl_thrdpool_set_props(tpool, min_threads, max_threads, |
| idle_time_ms); |
| |
| out: |
| if(result && tpool) { |
| tpool->aborted = TRUE; |
| thrdpool_unlink(tpool, FALSE); |
| tpool = NULL; |
| } |
| *ptpool = tpool; |
| return result; |
| } |
| |
| void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join) |
| { |
| Curl_mutex_acquire(&tpool->lock); |
| |
| tpool->aborted = TRUE; |
| |
| while(join && Curl_llist_count(&tpool->slots)) { |
| thrdpool_wake_all(tpool); |
| Curl_cond_wait(&tpool->await, &tpool->lock); |
| } |
| |
| thrdpool_join_zombies(tpool); |
| |
| /* detach all still running threads */ |
| if(Curl_llist_count(&tpool->slots)) { |
| struct Curl_llist_node *e; |
| for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| if(tslot->thread != curl_thread_t_null) |
| Curl_thread_destroy(&tslot->thread); |
| } |
| tpool->detached = TRUE; |
| } |
| |
| if(!thrdpool_unlink(tpool, TRUE)) { |
| /* tpool not destroyed */ |
| Curl_mutex_release(&tpool->lock); |
| } |
| } |
| |
| CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads) |
| { |
| CURLcode result; |
| |
| Curl_mutex_acquire(&tpool->lock); |
| result = thrdpool_signal(tpool, nthreads); |
| Curl_mutex_release(&tpool->lock); |
| return result; |
| } |
| |
| static bool thrdpool_all_idle(struct curl_thrdpool *tpool) |
| { |
| struct Curl_llist_node *e; |
| for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| if(!tslot->idle) |
| return FALSE; |
| } |
| return TRUE; |
| } |
| |
| CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool, |
| uint32_t timeout_ms) |
| { |
| CURLcode result = CURLE_OK; |
| struct curltime end = { 0 }; |
| |
| Curl_mutex_acquire(&tpool->lock); |
| DEBUGASSERT(!tpool->aborted); |
| if(tpool->aborted) { |
| result = CURLE_FAILED_INIT; |
| goto out; |
| } |
| |
| while(!thrdpool_all_idle(tpool)) { |
| if(timeout_ms) { |
| timediff_t remain_ms; |
| CURLcode r; |
| |
| if(!end.tv_sec && !end.tv_usec) { |
| end = curlx_now(); |
| end.tv_sec += (time_t)(timeout_ms / 1000); |
| end.tv_usec += (int)(timeout_ms % 1000) * 1000; |
| if(end.tv_usec >= 1000000) { |
| end.tv_sec++; |
| end.tv_usec -= 1000000; |
| } |
| } |
| remain_ms = curlx_timediff_ms(curlx_now(), end); |
| if(remain_ms <= 0) |
| r = CURLE_OPERATION_TIMEDOUT; |
| else |
| r = Curl_cond_timedwait(&tpool->await, &tpool->lock, |
| (uint32_t)remain_ms); |
| if(r == CURLE_OPERATION_TIMEDOUT) { |
| result = r; |
| break; |
| } |
| } |
| else { |
| Curl_cond_wait(&tpool->await, &tpool->lock); |
| } |
| } |
| |
| out: |
| thrdpool_join_zombies(tpool); |
| Curl_mutex_release(&tpool->lock); |
| return result; |
| } |
| |
| #ifdef CURLVERBOSE |
| void Curl_thrdpool_trace(struct curl_thrdpool *tpool, |
| struct Curl_easy *data) |
| { |
| struct curl_trc_feat *feat = &Curl_trc_feat_threads; |
| if(Curl_trc_ft_is_verbose(data, feat)) { |
| struct Curl_llist_node *e; |
| struct curltime now = curlx_now(); |
| |
| Curl_mutex_acquire(&tpool->lock); |
| if(!Curl_llist_count(&tpool->slots)) { |
| Curl_trc_feat_infof(data, feat, "[TPOOL-%s] no threads running", |
| tpool->name); |
| } |
| for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { |
| struct thrdslot *tslot = Curl_node_elem(e); |
| timediff_t elapsed_ms = curlx_ptimediff_ms(&now, &tslot->starttime); |
| if(!tslot->running) { |
| Curl_trc_feat_infof(data, feat, "[TPOOL-%s] [%u]: not running", |
| tpool->name, tslot->id); |
| } |
| else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { |
| Curl_trc_feat_infof(data, feat, "[TPOOL-%s] [%u]: starting...", |
| tpool->name, tslot->id); |
| } |
| else if(tslot->idle) { |
| Curl_trc_feat_infof(data, feat, "[TPOOL-%s] [%u]: idle for %" |
| FMT_TIMEDIFF_T "ms", |
| tpool->name, tslot->id, elapsed_ms); |
| } |
| else { |
| timediff_t remain_ms = tslot->work_timeout_ms ? |
| (tslot->work_timeout_ms - elapsed_ms) : 0; |
| Curl_trc_feat_infof(data, feat, "[TPOOL-%s] [%u]: busy %" |
| FMT_TIMEDIFF_T "ms, timeout in %" FMT_TIMEDIFF_T |
| "ms: %s", |
| tpool->name, tslot->id, elapsed_ms, remain_ms, |
| tslot->work_description); |
| } |
| } |
| Curl_mutex_release(&tpool->lock); |
| } |
| } |
| #endif |
| |
| #endif /* USE_THREADS */ |