pthread_cond_signal might wake multiple threads

Close #4759
This commit is contained in:
topjohnwu 2021-10-19 21:32:37 -07:00
parent 8276a0775d
commit fe41df87bb

View File

@ -18,8 +18,8 @@ static pthread_cond_t send_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
static pthread_cond_t recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP; static pthread_cond_t recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
// The following variables should be guarded by lock // The following variables should be guarded by lock
static int available_threads = 0; static int idle_threads = 0;
static int active_threads = 0; static int total_threads = 0;
static function<void()> pending_task; static function<void()> pending_task;
static void operator+=(timespec &a, const timespec &b) { static void operator+=(timespec &a, const timespec &b) {
@ -40,8 +40,8 @@ static void reset_pool() {
send_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP; send_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
pthread_cond_destroy(&recv_task); pthread_cond_destroy(&recv_task);
recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP; recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
available_threads = 0; idle_threads = 0;
active_threads = 0; total_threads = 0;
pending_task = nullptr; pending_task = nullptr;
} }
@ -58,7 +58,7 @@ static void *thread_pool_loop(void * const is_core_pool) {
function<void()> local_task; function<void()> local_task;
{ {
mutex_guard g(lock); mutex_guard g(lock);
++available_threads; ++idle_threads;
if (!pending_task) { if (!pending_task) {
if (is_core_pool) { if (is_core_pool) {
pthread_cond_wait(&send_task, &lock); pthread_cond_wait(&send_task, &lock);
@ -68,17 +68,20 @@ static void *thread_pool_loop(void * const is_core_pool) {
ts += { THREAD_IDLE_MAX_SEC, 0 }; ts += { THREAD_IDLE_MAX_SEC, 0 };
if (pthread_cond_timedwait(&send_task, &lock, &ts) == ETIMEDOUT) { if (pthread_cond_timedwait(&send_task, &lock, &ts) == ETIMEDOUT) {
// Terminate thread after max idle time // Terminate thread after max idle time
--available_threads; --idle_threads;
--active_threads; --total_threads;
return nullptr; return nullptr;
} }
} }
} }
local_task.swap(pending_task); if (pending_task) {
pthread_cond_signal(&recv_task); local_task.swap(pending_task);
--available_threads; pthread_cond_signal(&recv_task);
}
--idle_threads;
} }
local_task(); if (local_task)
local_task();
if (getpid() == gettid()) if (getpid() == gettid())
exit(0); exit(0);
} }
@ -87,9 +90,9 @@ static void *thread_pool_loop(void * const is_core_pool) {
void exec_task(function<void()> &&task) { void exec_task(function<void()> &&task) {
mutex_guard g(lock); mutex_guard g(lock);
pending_task.swap(task); pending_task.swap(task);
if (available_threads == 0) { if (idle_threads == 0) {
++active_threads; ++total_threads;
long is_core_pool = active_threads <= CORE_POOL_SIZE; long is_core_pool = total_threads <= CORE_POOL_SIZE;
new_daemon_thread(thread_pool_loop, (void *) is_core_pool); new_daemon_thread(thread_pool_loop, (void *) is_core_pool);
} else { } else {
pthread_cond_signal(&send_task); pthread_cond_signal(&send_task);