Allow fork in thread pool

This commit is contained in:
topjohnwu 2021-10-17 04:24:25 -07:00
parent e8ae103d5f
commit 6f54c57647
3 changed files with 43 additions and 11 deletions

View File

@ -68,6 +68,18 @@ void unregister_poll(int fd, bool auto_close) {
} }
} }
void clear_poll() {
if (poll_fds) {
for (auto &poll_fd : *poll_fds) {
close(poll_fd.fd);
}
}
delete poll_fds;
delete poll_map;
poll_fds = nullptr;
poll_map = nullptr;
}
static void poll_ctrl_handler(pollfd *pfd) { static void poll_ctrl_handler(pollfd *pfd) {
int code = read_int(pfd->fd); int code = read_int(pfd->fd);
switch (code) { switch (code) {

View File

@ -6,30 +6,48 @@
using namespace std; using namespace std;
#ifndef PTHREAD_COND_INITIALIZER_MONOTONIC_NP
#define PTHREAD_COND_INITIALIZER_MONOTONIC_NP { { 1 << 1 } }
#endif
#define THREAD_IDLE_MAX_SEC 60 #define THREAD_IDLE_MAX_SEC 60
#define CORE_POOL_SIZE 3 #define CORE_POOL_SIZE 3
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t send_task = PTHREAD_COND_INITIALIZER; static pthread_cond_t send_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
static pthread_cond_t recv_task = PTHREAD_COND_INITIALIZER; static pthread_cond_t recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
// The following variables should be guarded by lock // The following variables should be guarded by lock
static int available_threads = 0; static int available_threads = 0;
static int active_threads = 0; static int active_threads = 0;
static function<void()> pending_task; static function<void()> pending_task;
static void operator+=(timeval &a, const timeval &b) { static void operator+=(timespec &a, const timespec &b) {
a.tv_sec += b.tv_sec; a.tv_sec += b.tv_sec;
a.tv_usec += b.tv_usec; a.tv_nsec += b.tv_nsec;
if (a.tv_usec >= 1000000) { if (a.tv_nsec >= 1000000000L) {
a.tv_sec++; a.tv_sec++;
a.tv_usec -= 1000000; a.tv_nsec -= 1000000000L;
} }
} }
static timespec to_ts(const timeval &tv) { return { tv.tv_sec, tv.tv_usec * 1000 }; } static void reset_pool() {
clear_poll();
pthread_mutex_unlock(&lock);
pthread_mutex_destroy(&lock);
pthread_mutex_init(&lock, nullptr);
pthread_cond_destroy(&send_task);
send_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
pthread_cond_destroy(&recv_task);
recv_task = PTHREAD_COND_INITIALIZER_MONOTONIC_NP;
available_threads = 0;
active_threads = 0;
pending_task = nullptr;
}
static void *thread_pool_loop(void * const is_core_pool) { static void *thread_pool_loop(void * const is_core_pool) {
pthread_atfork(nullptr, nullptr, &reset_pool);
// Block all signals // Block all signals
sigset_t mask; sigset_t mask;
sigfillset(&mask); sigfillset(&mask);
@ -45,10 +63,9 @@ static void *thread_pool_loop(void * const is_core_pool) {
if (is_core_pool) { if (is_core_pool) {
pthread_cond_wait(&send_task, &lock); pthread_cond_wait(&send_task, &lock);
} else { } else {
timeval tv; timespec ts;
gettimeofday(&tv, nullptr); clock_gettime(CLOCK_MONOTONIC, &ts);
tv += { THREAD_IDLE_MAX_SEC, 0 }; ts += { THREAD_IDLE_MAX_SEC, 0 };
auto ts = to_ts(tv);
if (pthread_cond_timedwait(&send_task, &lock, &ts) == ETIMEDOUT) { if (pthread_cond_timedwait(&send_task, &lock, &ts) == ETIMEDOUT) {
// Terminate thread after max idle time // Terminate thread after max idle time
--available_threads; --available_threads;
@ -62,6 +79,8 @@ static void *thread_pool_loop(void * const is_core_pool) {
--available_threads; --available_threads;
} }
local_task(); local_task();
if (getpid() == gettid())
exit(0);
} }
} }

View File

@ -49,6 +49,7 @@ int connect_daemon(bool create = false);
using poll_callback = void(*)(pollfd*); using poll_callback = void(*)(pollfd*);
void register_poll(const pollfd *pfd, poll_callback callback); void register_poll(const pollfd *pfd, poll_callback callback);
void unregister_poll(int fd, bool auto_close); void unregister_poll(int fd, bool auto_close);
void clear_poll();
// Thread pool // Thread pool
void exec_task(std::function<void()> &&task); void exec_task(std::function<void()> &&task);