diff options
-rw-r--r-- | lib/librt/aio.c | 4 | ||||
-rw-r--r-- | lib/librt/mq.c | 6 | ||||
-rw-r--r-- | lib/librt/sigev_thread.c | 354 | ||||
-rw-r--r-- | lib/librt/sigev_thread.h | 15 | ||||
-rw-r--r-- | lib/librt/timer.c | 10 |
5 files changed, 326 insertions, 63 deletions
diff --git a/lib/librt/aio.c b/lib/librt/aio.c index 0ea0d51..559e894 100644 --- a/lib/librt/aio.c +++ b/lib/librt/aio.c @@ -59,7 +59,7 @@ extern int __sys_aio_return(struct aiocb *iocb); extern int __sys_aio_error(struct aiocb *iocb); static void -aio_dispatch(struct sigev_node *sn, siginfo_t *si) +aio_dispatch(struct sigev_node *sn) { aio_func f = sn->sn_func; @@ -84,7 +84,7 @@ aio_io(struct aiocb *iocb, int (*sysfunc)(struct aiocb *iocb)) return (-1); } - sn = __sigev_alloc(SI_ASYNCIO, &iocb->aio_sigevent); + sn = __sigev_alloc(SI_ASYNCIO, &iocb->aio_sigevent, NULL, 1); if (sn == NULL) { errno = EAGAIN; return (-1); diff --git a/lib/librt/mq.c b/lib/librt/mq.c index 5950de8..ff46b25 100644 --- a/lib/librt/mq.c +++ b/lib/librt/mq.c @@ -119,7 +119,7 @@ __mq_close(mqd_t mqd) typedef void (*mq_func)(union sigval val); static void -mq_dispatch(struct sigev_node *sn, siginfo_t *si) +mq_dispatch(struct sigev_node *sn) { mq_func f = sn->sn_func; @@ -127,7 +127,7 @@ mq_dispatch(struct sigev_node *sn, siginfo_t *si) * Check generation before calling user function, * this should avoid expired notification. */ - if (sn->sn_gen == si->si_value.sival_int) + if (sn->sn_gen == sn->sn_info.si_value.sival_int) f(sn->sn_value); } @@ -156,7 +156,7 @@ __mq_notify(mqd_t mqd, const struct sigevent *evp) return (-1); } - sn = __sigev_alloc(SI_MESGQ, evp); + sn = __sigev_alloc(SI_MESGQ, evp, mqd->node, 1); if (sn == NULL) { errno = EAGAIN; return (-1); diff --git a/lib/librt/sigev_thread.c b/lib/librt/sigev_thread.c index f942e57..ea86c60 100644 --- a/lib/librt/sigev_thread.c +++ b/lib/librt/sigev_thread.c @@ -31,9 +31,12 @@ #include <machine/atomic.h> #include "namespace.h" +#include <assert.h> #include <err.h> +#include <errno.h> #include <ucontext.h> #include <sys/thr.h> +#include <sys/time.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -43,23 +46,60 @@ #include "sigev_thread.h" +/* Lowest number of worker threads should be kept. */ +#define SIGEV_WORKER_LOW 0 + +/* Highest number of worker threads can be created. */ +#define SIGEV_WORKER_HIGH 20 + +/* How long an idle worker thread should stay. */ +#define SIGEV_WORKER_IDLE 10 + +struct sigev_worker { + LIST_ENTRY(sigev_worker) sw_link; + pthread_cond_t sw_cv; + struct sigev_node *sw_sn; + int sw_flags; + int *sw_readyptr; +}; + +#define SWF_READYQ 1 + LIST_HEAD(sigev_list_head, sigev_node); #define HASH_QUEUES 17 #define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) + static struct sigev_list_head sigev_hash[HASH_QUEUES]; static struct sigev_list_head sigev_all; +static TAILQ_HEAD(, sigev_node) sigev_actq; +static TAILQ_HEAD(, sigev_thread_node) sigev_threads; static int sigev_generation; static pthread_mutex_t *sigev_list_mtx; -static TAILQ_HEAD(,sigev_thread_node) sigev_threads; +static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; +static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; static pthread_mutex_t *sigev_threads_mtx; +static pthread_cond_t *sigev_threads_cv; +static pthread_cond_t *sigev_actq_cv; +static struct sigev_thread_node *sigev_default_thread; +static struct sigev_thread_attr sigev_default_sna; static pthread_attr_t sigev_default_attr; -static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; +static int atfork_registered; +static LIST_HEAD(,sigev_worker) sigev_worker_ready; +static int sigev_worker_count; +static int sigev_worker_start; +static int sigev_worker_high; +static int sigev_worker_low; +static pthread_cond_t *sigev_worker_init_cv; static void __sigev_fork_prepare(void); static void __sigev_fork_parent(void); static void __sigev_fork_child(void); -static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *); +static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *, + struct sigev_thread_node *, int); static void *sigev_service_loop(void *); +static void *sigev_worker_routine(void *); +static void sigev_put(struct sigev_node *); +static void worker_cleanup(void *arg); #pragma weak pthread_create #pragma weak pthread_attr_getschedpolicy @@ -87,7 +127,6 @@ attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) pthread_attr_getinheritsched(attr, &sna->sna_inherit); pthread_attr_getschedparam(attr, &sched_param); sna->sna_prio = sched_param.sched_priority; - pthread_attr_getscope(attr, &sna->sna_scope); pthread_attr_getstacksize(attr, &sna->sna_stacksize); pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); pthread_attr_getguardsize(attr, &sna->sna_guardsize); @@ -102,32 +141,53 @@ sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) static __inline int have_threads(void) { - return (pthread_create != NULL); + return (&pthread_create != NULL); } void __sigev_thread_init(void) { - static int notfirst = 0; + static int inited = 0; int i; sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); _pthread_mutex_init(sigev_list_mtx, NULL); sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); _pthread_mutex_init(sigev_threads_mtx, NULL); + sigev_threads_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_threads_cv, NULL); + sigev_actq_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_actq_cv, NULL); for (i = 0; i < HASH_QUEUES; ++i) LIST_INIT(&sigev_hash[i]); LIST_INIT(&sigev_all); TAILQ_INIT(&sigev_threads); - if (!notfirst) { + TAILQ_INIT(&sigev_actq); + sigev_default_thread = NULL; + sigev_worker_count = 0; + sigev_worker_start = 0; + LIST_INIT(&sigev_worker_ready); + sigev_worker_high = SIGEV_WORKER_HIGH; + sigev_worker_low = SIGEV_WORKER_LOW; + sigev_worker_init_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_worker_init_cv, NULL); + if (atfork_registered == 0) { + pthread_atfork( + __sigev_fork_prepare, + __sigev_fork_parent, + __sigev_fork_child); + atfork_registered = 1; + } + if (!inited) { pthread_attr_init(&sigev_default_attr); - pthread_attr_setscope(&sigev_default_attr, PTHREAD_SCOPE_SYSTEM); + attr2sna(&sigev_default_attr, &sigev_default_sna); + pthread_attr_setscope(&sigev_default_attr, + PTHREAD_SCOPE_SYSTEM); pthread_attr_setdetachstate(&sigev_default_attr, PTHREAD_CREATE_DETACHED); - pthread_atfork(__sigev_fork_prepare, __sigev_fork_parent, - __sigev_fork_child); - notfirst = 1; + inited = 1; } + sigev_default_thread = sigev_thread_create(NULL, NULL, 0); } int @@ -137,24 +197,29 @@ __sigev_check_init(void) return (-1); _pthread_once(&sigev_once, __sigev_thread_init); - return (0); + return (sigev_default_thread != NULL) ? 0 : -1; } -void +static void __sigev_fork_prepare(void) { - __sigev_thread_list_lock(); } -void +static void __sigev_fork_parent(void) { - __sigev_thread_list_unlock(); } -void +static void __sigev_fork_child(void) { + /* + * This is a hack, the thread libraries really should + * check if the handlers were already registered in + * pthread_atfork(). + */ + atfork_registered = 1; + memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); __sigev_thread_init(); } @@ -183,7 +248,8 @@ __sigev_thread_list_unlock(void) } struct sigev_node * -__sigev_alloc(int type, const struct sigevent *evp) +__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, + int usethreadpool) { struct sigev_node *sn; @@ -193,7 +259,10 @@ __sigev_alloc(int type, const struct sigevent *evp) sn->sn_func = evp->sigev_notify_function; sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); sn->sn_type = type; - sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes); + if (usethreadpool) + sn->sn_flags |= SNF_THREADPOOL; + sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes, + prev ? prev->sn_tn : NULL, usethreadpool); if (sn->sn_tn == NULL) { free(sn); sn = NULL; @@ -262,10 +331,19 @@ __sigev_delete_node(struct sigev_node *sn) LIST_REMOVE(sn, sn_link); LIST_REMOVE(sn, sn_allist); + __sigev_thread_list_lock(); + if (--sn->sn_tn->tn_refcount == 0) + if (!(sn->sn_flags & SNF_THREADPOOL)) + pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE); + __sigev_thread_list_unlock(); if (sn->sn_flags & SNF_WORKING) sn->sn_flags |= SNF_REMOVED; - else + else { + if (sn->sn_flags & SNF_ACTQ) { + TAILQ_REMOVE(&sigev_actq, sn, sn_actq); + } __sigev_free(sn); + } return (0); } @@ -280,15 +358,13 @@ sigev_get_id(siginfo_t *si) return (si->si_mqd); case SI_ASYNCIO: return (sigev_id_t)si->si_value.sival_ptr; - default: - warnx("%s %s : unknown si_code %d\n", __FILE__, __func__, - si->si_code); } return (-1); } static struct sigev_thread_node * -sigev_thread_create(pthread_attr_t *pattr) +sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev, + int usepool) { struct sigev_thread_node *tn; struct sigev_thread_attr sna; @@ -305,43 +381,54 @@ sigev_thread_create(pthread_attr_t *pattr) attr2sna(pattr, &sna); __sigev_thread_list_lock(); - /* Search a thread matching the required pthread_attr. */ - TAILQ_FOREACH(tn, &sigev_threads, tn_link) { - if (sna.sna_stackaddr == NULL) { - if (sna_eq(&tn->tn_sna, &sna)) - break; - } else { - /* - * Reuse the thread if it has same stack address, - * because two threads can not run on same stack. - */ + + if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) { + prev->tn_refcount++; + __sigev_thread_list_unlock(); + return (prev); + } + + if (sna_eq(&sna, &sigev_default_sna) && usepool && + sigev_default_thread != NULL) { + sigev_default_thread->tn_refcount++; + __sigev_thread_list_unlock(); + return (sigev_default_thread); + } + + tn = NULL; + /* Search a thread matching the required stack address */ + if (sna.sna_stackaddr != NULL) { + TAILQ_FOREACH(tn, &sigev_threads, tn_link) { if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) break; } } + if (tn != NULL) { + tn->tn_refcount++; __sigev_thread_list_unlock(); return (tn); } tn = malloc(sizeof(*tn)); tn->tn_sna = sna; tn->tn_cur = NULL; + tn->tn_lwpid = -1; + tn->tn_refcount = 1; TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); sigemptyset(&set); sigaddset(&set, SIGEV_SIGSERVICE); _sigprocmask(SIG_BLOCK, &set, NULL); - _pthread_cond_init(&tn->tn_cv, NULL); ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); _sigprocmask(SIG_UNBLOCK, &set, NULL); if (ret != 0) { TAILQ_REMOVE(&sigev_threads, tn, tn_link); __sigev_thread_list_unlock(); - _pthread_cond_destroy(&tn->tn_cv); free(tn); tn = NULL; } else { /* wait the thread to get its lwpid */ - _pthread_cond_wait(&tn->tn_cv, sigev_threads_mtx); + while (tn->tn_lwpid == -1) + _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx); __sigev_thread_list_unlock(); } return (tn); @@ -362,6 +449,7 @@ after_dispatch(struct sigev_thread_node *tn) tn->tn_cur = NULL; __sigev_list_unlock(); } + tn->tn_cur = NULL; } /* @@ -380,6 +468,11 @@ thread_cleanup(void *arg) abort(); } +/* + * Main notification dispatch function, the function either + * run user callback by itself or hand off the notifications + * to worker threads depend on flags. + */ static void * sigev_service_loop(void *arg) { @@ -388,12 +481,13 @@ sigev_service_loop(void *arg) struct sigev_thread_node *tn; struct sigev_node *sn; sigev_id_t id; + int ret; tn = arg; thr_self(&tn->tn_lwpid); - __sigev_list_lock(); - _pthread_cond_broadcast(&tn->tn_cv); - __sigev_list_unlock(); + __sigev_thread_list_lock(); + _pthread_cond_broadcast(sigev_threads_cv); + __sigev_thread_list_unlock(); /* * Service thread should not be killed by callback, if user @@ -405,19 +499,33 @@ sigev_service_loop(void *arg) sigaddset(&set, SIGEV_SIGSERVICE); pthread_cleanup_push(thread_cleanup, tn); for (;;) { - if (__predict_false(sigwaitinfo(&set, &si) == -1)) + ret = sigwaitinfo(&set, &si); + __sigev_thread_list_lock(); + if (tn->tn_refcount == 0) { + TAILQ_REMOVE(&sigev_threads, tn, tn_link); + __sigev_thread_list_unlock(); + free(tn); + break; + } + __sigev_thread_list_unlock(); + if (ret == -1) continue; - id = sigev_get_id(&si); __sigev_list_lock(); sn = __sigev_find(si.si_code, id); if (sn != NULL) { - tn->tn_cur = sn; - sn->sn_flags |= SNF_WORKING; - __sigev_list_unlock(); - sn->sn_dispatch(sn, &si); - after_dispatch(tn); - } else { + sn->sn_info = si; + if (!(sn->sn_flags & SNF_THREADPOOL)) { + tn->tn_cur = sn; + sn->sn_flags |= SNF_WORKING; + __sigev_list_unlock(); + sn->sn_dispatch(sn); + after_dispatch(tn); + } else { + assert(!(sn->sn_flags & SNF_ACTQ)); + sigev_put(sn); + } + } else { tn->tn_cur = NULL; __sigev_list_unlock(); } @@ -425,3 +533,153 @@ sigev_service_loop(void *arg) pthread_cleanup_pop(0); return (0); } + +/* + * Hand off notifications to worker threads. + * + * prerequist: sigev list locked. + */ +static void +sigev_put(struct sigev_node *sn) +{ + struct sigev_worker *worker; + pthread_t td; + int ret, ready; + + TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq); + sn->sn_flags |= SNF_ACTQ; + /* + * check if we should add more worker threads unless quota is hit. + */ + if (LIST_EMPTY(&sigev_worker_ready) && + sigev_worker_count + sigev_worker_start < sigev_worker_high) { + sigev_worker_start++; + __sigev_list_unlock(); + worker = malloc(sizeof(*worker)); + _pthread_cond_init(&worker->sw_cv, NULL); + worker->sw_flags = 0; + worker->sw_sn = 0; + worker->sw_readyptr = &ready; + ready = 0; + ret = pthread_create(&td, &sigev_default_attr, + sigev_worker_routine, worker); + if (ret) { + warnc(ret, "%s:%s can not create worker thread", + __FILE__, __func__); + __sigev_list_lock(); + sigev_worker_start--; + __sigev_list_unlock(); + } else { + __sigev_list_lock(); + while (ready == 0) { + _pthread_cond_wait(sigev_worker_init_cv, + sigev_list_mtx); + } + __sigev_list_unlock(); + } + } else { + worker = LIST_FIRST(&sigev_worker_ready); + if (worker) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + _pthread_cond_broadcast(&worker->sw_cv); + __sigev_list_unlock(); + } + } +} + +/* + * Background thread to dispatch notification to user code. + * These threads are not bound to any realtime objects. + */ +static void * +sigev_worker_routine(void *arg) +{ + struct sigev_worker *worker; + struct sigev_node *sn; + struct timespec ts; + int ret; + + worker = arg; + __sigev_list_lock(); + sigev_worker_count++; + sigev_worker_start--; + (*(worker->sw_readyptr))++; + LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); + worker->sw_flags |= SWF_READYQ; + _pthread_cond_broadcast(sigev_worker_init_cv); + + for (;;) { + if (worker->sw_flags & SWF_READYQ) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + } + + sn = TAILQ_FIRST(&sigev_actq); + if (sn != NULL) { + TAILQ_REMOVE(&sigev_actq, sn, sn_actq); + sn->sn_flags &= ~SNF_ACTQ; + sn->sn_flags |= SNF_WORKING; + __sigev_list_unlock(); + + worker->sw_sn = sn; + pthread_cleanup_push(worker_cleanup, worker); + sn->sn_dispatch(sn); + pthread_cleanup_pop(0); + worker->sw_sn = NULL; + + __sigev_list_lock(); + sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_REMOVED) + __sigev_free(sn); + else if (sn->sn_flags & SNF_ONESHOT) + __sigev_delete_node(sn); + } else { + LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); + worker->sw_flags |= SWF_READYQ; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += SIGEV_WORKER_IDLE; + ret = _pthread_cond_timedwait(&worker->sw_cv, + sigev_list_mtx, &ts); + if (ret == ETIMEDOUT) { + /* + * If we were timeouted and there is nothing + * to do, exit the thread. + */ + if (TAILQ_EMPTY(&sigev_actq) && + (worker->sw_flags & SWF_READYQ) && + sigev_worker_count > sigev_worker_low) + goto out; + } + } + } +out: + if (worker->sw_flags & SWF_READYQ) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + } + sigev_worker_count--; + __sigev_list_unlock(); + _pthread_cond_destroy(&worker->sw_cv); + free(worker); + return (0); +} + +static void +worker_cleanup(void *arg) +{ + struct sigev_worker *worker = arg; + struct sigev_node *sn; + + sn = worker->sw_sn; + __sigev_list_lock(); + sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_REMOVED) + __sigev_free(sn); + else if (sn->sn_flags & SNF_ONESHOT) + __sigev_delete_node(sn); + sigev_worker_count--; + __sigev_list_unlock(); + _pthread_cond_destroy(&worker->sw_cv); + free(worker); +} diff --git a/lib/librt/sigev_thread.h b/lib/librt/sigev_thread.h index 998af17..89512f9 100644 --- a/lib/librt/sigev_thread.h +++ b/lib/librt/sigev_thread.h @@ -38,11 +38,12 @@ struct sigev_thread_node; struct sigev_node; typedef uintptr_t sigev_id_t; -typedef void (*sigev_dispatch_t)(struct sigev_node *, siginfo_t *); +typedef void (*sigev_dispatch_t)(struct sigev_node *); struct sigev_node { LIST_ENTRY(sigev_node) sn_link; LIST_ENTRY(sigev_node) sn_allist; + TAILQ_ENTRY(sigev_node) sn_actq; int sn_type; sigev_id_t sn_id; sigev_dispatch_t sn_dispatch; @@ -50,6 +51,8 @@ struct sigev_node { void *sn_func; int sn_flags; int sn_gen; + int sn_usethreadpool; + siginfo_t sn_info; struct sigev_thread_node * sn_tn; }; @@ -57,7 +60,6 @@ struct sigev_thread_attr { int sna_policy; int sna_inherit; int sna_prio; - int sna_scope; size_t sna_stacksize; void *sna_stackaddr; size_t sna_guardsize; @@ -68,19 +70,22 @@ struct sigev_thread_node { pthread_t tn_thread; struct sigev_node *tn_cur; struct sigev_thread_attr tn_sna; + int tn_refcount; long tn_lwpid; - pthread_cond_t tn_cv; jmp_buf tn_jbuf; }; #define SNF_WORKING 0x01 #define SNF_REMOVED 0x02 -#define SNF_ONESHOT 0x04 +#define SNF_ONESHOT 0x04 +#define SNF_ACTQ 0x08 +#define SNF_THREADPOOL 0x10 #define SIGEV_SIGSERVICE (SIGTHR+1) int __sigev_check_init(); -struct sigev_node *__sigev_alloc(int, const struct sigevent *); +struct sigev_node *__sigev_alloc(int, const struct sigevent *, + struct sigev_node *, int usethreadpool); struct sigev_node *__sigev_find(int, sigev_id_t); void __sigev_get_sigevent(struct sigev_node *, struct sigevent *, sigev_id_t); diff --git a/lib/librt/timer.c b/lib/librt/timer.c index c5d0671..3db5f5b 100644 --- a/lib/librt/timer.c +++ b/lib/librt/timer.c @@ -66,16 +66,16 @@ __weak_reference(__timer_settime, _timer_settime); __weak_reference(__timer_getoverrun, timer_getoverrun); __weak_reference(__timer_getoverrun, _timer_getoverrun); -typedef void (*timer_func)(union sigval val, int timerid, int overrun); +typedef void (*timer_func)(union sigval val, int overrun); static void -timer_dispatch(struct sigev_node *sn, siginfo_t *si) +timer_dispatch(struct sigev_node *sn) { timer_func f = sn->sn_func; /* I want to avoid expired notification. */ - if (si->si_value.sival_int == sn->sn_gen) - f(sn->sn_value, si->si_timerid, si->si_overrun); + if (sn->sn_info.si_value.sival_int == sn->sn_gen) + f(sn->sn_value, sn->sn_info.si_overrun); } int @@ -108,7 +108,7 @@ __timer_create(clockid_t clockid, struct sigevent *evp, timer_t *timerid) return (-1); } - sn = __sigev_alloc(SI_TIMER, evp); + sn = __sigev_alloc(SI_TIMER, evp, NULL, 0); if (sn == NULL) { errno = EAGAIN; return (-1); |