From 8e36ba772a0d4c7556d1b618bc0485c97cae37a7 Mon Sep 17 00:00:00 2001 From: davidxu Date: Sat, 4 Mar 2006 00:18:19 +0000 Subject: Use a thread pool to process notification if sigev_notify_attributes is default and caller does not require dedicated thread. timer needs a dedicated thread to maintain overrun count correctly in notification context. mqueue and aio can use thread pool to do notification concurrently, the thread pool has lifecycle control, some threads will exit if they have idled for a while. --- lib/librt/aio.c | 4 +- lib/librt/mq.c | 6 +- lib/librt/sigev_thread.c | 354 ++++++++++++++++++++++++++++++++++++++++------- lib/librt/sigev_thread.h | 15 +- lib/librt/timer.c | 10 +- 5 files changed, 326 insertions(+), 63 deletions(-) (limited to 'lib') diff --git a/lib/librt/aio.c b/lib/librt/aio.c index 0ea0d51..559e894 100644 --- a/lib/librt/aio.c +++ b/lib/librt/aio.c @@ -59,7 +59,7 @@ extern int __sys_aio_return(struct aiocb *iocb); extern int __sys_aio_error(struct aiocb *iocb); static void -aio_dispatch(struct sigev_node *sn, siginfo_t *si) +aio_dispatch(struct sigev_node *sn) { aio_func f = sn->sn_func; @@ -84,7 +84,7 @@ aio_io(struct aiocb *iocb, int (*sysfunc)(struct aiocb *iocb)) return (-1); } - sn = __sigev_alloc(SI_ASYNCIO, &iocb->aio_sigevent); + sn = __sigev_alloc(SI_ASYNCIO, &iocb->aio_sigevent, NULL, 1); if (sn == NULL) { errno = EAGAIN; return (-1); diff --git a/lib/librt/mq.c b/lib/librt/mq.c index 5950de8..ff46b25 100644 --- a/lib/librt/mq.c +++ b/lib/librt/mq.c @@ -119,7 +119,7 @@ __mq_close(mqd_t mqd) typedef void (*mq_func)(union sigval val); static void -mq_dispatch(struct sigev_node *sn, siginfo_t *si) +mq_dispatch(struct sigev_node *sn) { mq_func f = sn->sn_func; @@ -127,7 +127,7 @@ mq_dispatch(struct sigev_node *sn, siginfo_t *si) * Check generation before calling user function, * this should avoid expired notification. */ - if (sn->sn_gen == si->si_value.sival_int) + if (sn->sn_gen == sn->sn_info.si_value.sival_int) f(sn->sn_value); } @@ -156,7 +156,7 @@ __mq_notify(mqd_t mqd, const struct sigevent *evp) return (-1); } - sn = __sigev_alloc(SI_MESGQ, evp); + sn = __sigev_alloc(SI_MESGQ, evp, mqd->node, 1); if (sn == NULL) { errno = EAGAIN; return (-1); diff --git a/lib/librt/sigev_thread.c b/lib/librt/sigev_thread.c index f942e57..ea86c60 100644 --- a/lib/librt/sigev_thread.c +++ b/lib/librt/sigev_thread.c @@ -31,9 +31,12 @@ #include #include "namespace.h" +#include #include +#include #include #include +#include #include #include #include @@ -43,23 +46,60 @@ #include "sigev_thread.h" +/* Lowest number of worker threads should be kept. */ +#define SIGEV_WORKER_LOW 0 + +/* Highest number of worker threads can be created. */ +#define SIGEV_WORKER_HIGH 20 + +/* How long an idle worker thread should stay. */ +#define SIGEV_WORKER_IDLE 10 + +struct sigev_worker { + LIST_ENTRY(sigev_worker) sw_link; + pthread_cond_t sw_cv; + struct sigev_node *sw_sn; + int sw_flags; + int *sw_readyptr; +}; + +#define SWF_READYQ 1 + LIST_HEAD(sigev_list_head, sigev_node); #define HASH_QUEUES 17 #define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) + static struct sigev_list_head sigev_hash[HASH_QUEUES]; static struct sigev_list_head sigev_all; +static TAILQ_HEAD(, sigev_node) sigev_actq; +static TAILQ_HEAD(, sigev_thread_node) sigev_threads; static int sigev_generation; static pthread_mutex_t *sigev_list_mtx; -static TAILQ_HEAD(,sigev_thread_node) sigev_threads; +static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; +static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; static pthread_mutex_t *sigev_threads_mtx; +static pthread_cond_t *sigev_threads_cv; +static pthread_cond_t *sigev_actq_cv; +static struct sigev_thread_node *sigev_default_thread; +static struct sigev_thread_attr sigev_default_sna; static pthread_attr_t sigev_default_attr; -static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; +static int atfork_registered; +static LIST_HEAD(,sigev_worker) sigev_worker_ready; +static int sigev_worker_count; +static int sigev_worker_start; +static int sigev_worker_high; +static int sigev_worker_low; +static pthread_cond_t *sigev_worker_init_cv; static void __sigev_fork_prepare(void); static void __sigev_fork_parent(void); static void __sigev_fork_child(void); -static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *); +static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *, + struct sigev_thread_node *, int); static void *sigev_service_loop(void *); +static void *sigev_worker_routine(void *); +static void sigev_put(struct sigev_node *); +static void worker_cleanup(void *arg); #pragma weak pthread_create #pragma weak pthread_attr_getschedpolicy @@ -87,7 +127,6 @@ attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) pthread_attr_getinheritsched(attr, &sna->sna_inherit); pthread_attr_getschedparam(attr, &sched_param); sna->sna_prio = sched_param.sched_priority; - pthread_attr_getscope(attr, &sna->sna_scope); pthread_attr_getstacksize(attr, &sna->sna_stacksize); pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); pthread_attr_getguardsize(attr, &sna->sna_guardsize); @@ -102,32 +141,53 @@ sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) static __inline int have_threads(void) { - return (pthread_create != NULL); + return (&pthread_create != NULL); } void __sigev_thread_init(void) { - static int notfirst = 0; + static int inited = 0; int i; sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); _pthread_mutex_init(sigev_list_mtx, NULL); sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); _pthread_mutex_init(sigev_threads_mtx, NULL); + sigev_threads_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_threads_cv, NULL); + sigev_actq_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_actq_cv, NULL); for (i = 0; i < HASH_QUEUES; ++i) LIST_INIT(&sigev_hash[i]); LIST_INIT(&sigev_all); TAILQ_INIT(&sigev_threads); - if (!notfirst) { + TAILQ_INIT(&sigev_actq); + sigev_default_thread = NULL; + sigev_worker_count = 0; + sigev_worker_start = 0; + LIST_INIT(&sigev_worker_ready); + sigev_worker_high = SIGEV_WORKER_HIGH; + sigev_worker_low = SIGEV_WORKER_LOW; + sigev_worker_init_cv = malloc(sizeof(pthread_cond_t)); + _pthread_cond_init(sigev_worker_init_cv, NULL); + if (atfork_registered == 0) { + pthread_atfork( + __sigev_fork_prepare, + __sigev_fork_parent, + __sigev_fork_child); + atfork_registered = 1; + } + if (!inited) { pthread_attr_init(&sigev_default_attr); - pthread_attr_setscope(&sigev_default_attr, PTHREAD_SCOPE_SYSTEM); + attr2sna(&sigev_default_attr, &sigev_default_sna); + pthread_attr_setscope(&sigev_default_attr, + PTHREAD_SCOPE_SYSTEM); pthread_attr_setdetachstate(&sigev_default_attr, PTHREAD_CREATE_DETACHED); - pthread_atfork(__sigev_fork_prepare, __sigev_fork_parent, - __sigev_fork_child); - notfirst = 1; + inited = 1; } + sigev_default_thread = sigev_thread_create(NULL, NULL, 0); } int @@ -137,24 +197,29 @@ __sigev_check_init(void) return (-1); _pthread_once(&sigev_once, __sigev_thread_init); - return (0); + return (sigev_default_thread != NULL) ? 0 : -1; } -void +static void __sigev_fork_prepare(void) { - __sigev_thread_list_lock(); } -void +static void __sigev_fork_parent(void) { - __sigev_thread_list_unlock(); } -void +static void __sigev_fork_child(void) { + /* + * This is a hack, the thread libraries really should + * check if the handlers were already registered in + * pthread_atfork(). + */ + atfork_registered = 1; + memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); __sigev_thread_init(); } @@ -183,7 +248,8 @@ __sigev_thread_list_unlock(void) } struct sigev_node * -__sigev_alloc(int type, const struct sigevent *evp) +__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, + int usethreadpool) { struct sigev_node *sn; @@ -193,7 +259,10 @@ __sigev_alloc(int type, const struct sigevent *evp) sn->sn_func = evp->sigev_notify_function; sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); sn->sn_type = type; - sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes); + if (usethreadpool) + sn->sn_flags |= SNF_THREADPOOL; + sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes, + prev ? prev->sn_tn : NULL, usethreadpool); if (sn->sn_tn == NULL) { free(sn); sn = NULL; @@ -262,10 +331,19 @@ __sigev_delete_node(struct sigev_node *sn) LIST_REMOVE(sn, sn_link); LIST_REMOVE(sn, sn_allist); + __sigev_thread_list_lock(); + if (--sn->sn_tn->tn_refcount == 0) + if (!(sn->sn_flags & SNF_THREADPOOL)) + pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE); + __sigev_thread_list_unlock(); if (sn->sn_flags & SNF_WORKING) sn->sn_flags |= SNF_REMOVED; - else + else { + if (sn->sn_flags & SNF_ACTQ) { + TAILQ_REMOVE(&sigev_actq, sn, sn_actq); + } __sigev_free(sn); + } return (0); } @@ -280,15 +358,13 @@ sigev_get_id(siginfo_t *si) return (si->si_mqd); case SI_ASYNCIO: return (sigev_id_t)si->si_value.sival_ptr; - default: - warnx("%s %s : unknown si_code %d\n", __FILE__, __func__, - si->si_code); } return (-1); } static struct sigev_thread_node * -sigev_thread_create(pthread_attr_t *pattr) +sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev, + int usepool) { struct sigev_thread_node *tn; struct sigev_thread_attr sna; @@ -305,43 +381,54 @@ sigev_thread_create(pthread_attr_t *pattr) attr2sna(pattr, &sna); __sigev_thread_list_lock(); - /* Search a thread matching the required pthread_attr. */ - TAILQ_FOREACH(tn, &sigev_threads, tn_link) { - if (sna.sna_stackaddr == NULL) { - if (sna_eq(&tn->tn_sna, &sna)) - break; - } else { - /* - * Reuse the thread if it has same stack address, - * because two threads can not run on same stack. - */ + + if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) { + prev->tn_refcount++; + __sigev_thread_list_unlock(); + return (prev); + } + + if (sna_eq(&sna, &sigev_default_sna) && usepool && + sigev_default_thread != NULL) { + sigev_default_thread->tn_refcount++; + __sigev_thread_list_unlock(); + return (sigev_default_thread); + } + + tn = NULL; + /* Search a thread matching the required stack address */ + if (sna.sna_stackaddr != NULL) { + TAILQ_FOREACH(tn, &sigev_threads, tn_link) { if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) break; } } + if (tn != NULL) { + tn->tn_refcount++; __sigev_thread_list_unlock(); return (tn); } tn = malloc(sizeof(*tn)); tn->tn_sna = sna; tn->tn_cur = NULL; + tn->tn_lwpid = -1; + tn->tn_refcount = 1; TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); sigemptyset(&set); sigaddset(&set, SIGEV_SIGSERVICE); _sigprocmask(SIG_BLOCK, &set, NULL); - _pthread_cond_init(&tn->tn_cv, NULL); ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); _sigprocmask(SIG_UNBLOCK, &set, NULL); if (ret != 0) { TAILQ_REMOVE(&sigev_threads, tn, tn_link); __sigev_thread_list_unlock(); - _pthread_cond_destroy(&tn->tn_cv); free(tn); tn = NULL; } else { /* wait the thread to get its lwpid */ - _pthread_cond_wait(&tn->tn_cv, sigev_threads_mtx); + while (tn->tn_lwpid == -1) + _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx); __sigev_thread_list_unlock(); } return (tn); @@ -362,6 +449,7 @@ after_dispatch(struct sigev_thread_node *tn) tn->tn_cur = NULL; __sigev_list_unlock(); } + tn->tn_cur = NULL; } /* @@ -380,6 +468,11 @@ thread_cleanup(void *arg) abort(); } +/* + * Main notification dispatch function, the function either + * run user callback by itself or hand off the notifications + * to worker threads depend on flags. + */ static void * sigev_service_loop(void *arg) { @@ -388,12 +481,13 @@ sigev_service_loop(void *arg) struct sigev_thread_node *tn; struct sigev_node *sn; sigev_id_t id; + int ret; tn = arg; thr_self(&tn->tn_lwpid); - __sigev_list_lock(); - _pthread_cond_broadcast(&tn->tn_cv); - __sigev_list_unlock(); + __sigev_thread_list_lock(); + _pthread_cond_broadcast(sigev_threads_cv); + __sigev_thread_list_unlock(); /* * Service thread should not be killed by callback, if user @@ -405,19 +499,33 @@ sigev_service_loop(void *arg) sigaddset(&set, SIGEV_SIGSERVICE); pthread_cleanup_push(thread_cleanup, tn); for (;;) { - if (__predict_false(sigwaitinfo(&set, &si) == -1)) + ret = sigwaitinfo(&set, &si); + __sigev_thread_list_lock(); + if (tn->tn_refcount == 0) { + TAILQ_REMOVE(&sigev_threads, tn, tn_link); + __sigev_thread_list_unlock(); + free(tn); + break; + } + __sigev_thread_list_unlock(); + if (ret == -1) continue; - id = sigev_get_id(&si); __sigev_list_lock(); sn = __sigev_find(si.si_code, id); if (sn != NULL) { - tn->tn_cur = sn; - sn->sn_flags |= SNF_WORKING; - __sigev_list_unlock(); - sn->sn_dispatch(sn, &si); - after_dispatch(tn); - } else { + sn->sn_info = si; + if (!(sn->sn_flags & SNF_THREADPOOL)) { + tn->tn_cur = sn; + sn->sn_flags |= SNF_WORKING; + __sigev_list_unlock(); + sn->sn_dispatch(sn); + after_dispatch(tn); + } else { + assert(!(sn->sn_flags & SNF_ACTQ)); + sigev_put(sn); + } + } else { tn->tn_cur = NULL; __sigev_list_unlock(); } @@ -425,3 +533,153 @@ sigev_service_loop(void *arg) pthread_cleanup_pop(0); return (0); } + +/* + * Hand off notifications to worker threads. + * + * prerequist: sigev list locked. + */ +static void +sigev_put(struct sigev_node *sn) +{ + struct sigev_worker *worker; + pthread_t td; + int ret, ready; + + TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq); + sn->sn_flags |= SNF_ACTQ; + /* + * check if we should add more worker threads unless quota is hit. + */ + if (LIST_EMPTY(&sigev_worker_ready) && + sigev_worker_count + sigev_worker_start < sigev_worker_high) { + sigev_worker_start++; + __sigev_list_unlock(); + worker = malloc(sizeof(*worker)); + _pthread_cond_init(&worker->sw_cv, NULL); + worker->sw_flags = 0; + worker->sw_sn = 0; + worker->sw_readyptr = &ready; + ready = 0; + ret = pthread_create(&td, &sigev_default_attr, + sigev_worker_routine, worker); + if (ret) { + warnc(ret, "%s:%s can not create worker thread", + __FILE__, __func__); + __sigev_list_lock(); + sigev_worker_start--; + __sigev_list_unlock(); + } else { + __sigev_list_lock(); + while (ready == 0) { + _pthread_cond_wait(sigev_worker_init_cv, + sigev_list_mtx); + } + __sigev_list_unlock(); + } + } else { + worker = LIST_FIRST(&sigev_worker_ready); + if (worker) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + _pthread_cond_broadcast(&worker->sw_cv); + __sigev_list_unlock(); + } + } +} + +/* + * Background thread to dispatch notification to user code. + * These threads are not bound to any realtime objects. + */ +static void * +sigev_worker_routine(void *arg) +{ + struct sigev_worker *worker; + struct sigev_node *sn; + struct timespec ts; + int ret; + + worker = arg; + __sigev_list_lock(); + sigev_worker_count++; + sigev_worker_start--; + (*(worker->sw_readyptr))++; + LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); + worker->sw_flags |= SWF_READYQ; + _pthread_cond_broadcast(sigev_worker_init_cv); + + for (;;) { + if (worker->sw_flags & SWF_READYQ) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + } + + sn = TAILQ_FIRST(&sigev_actq); + if (sn != NULL) { + TAILQ_REMOVE(&sigev_actq, sn, sn_actq); + sn->sn_flags &= ~SNF_ACTQ; + sn->sn_flags |= SNF_WORKING; + __sigev_list_unlock(); + + worker->sw_sn = sn; + pthread_cleanup_push(worker_cleanup, worker); + sn->sn_dispatch(sn); + pthread_cleanup_pop(0); + worker->sw_sn = NULL; + + __sigev_list_lock(); + sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_REMOVED) + __sigev_free(sn); + else if (sn->sn_flags & SNF_ONESHOT) + __sigev_delete_node(sn); + } else { + LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); + worker->sw_flags |= SWF_READYQ; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += SIGEV_WORKER_IDLE; + ret = _pthread_cond_timedwait(&worker->sw_cv, + sigev_list_mtx, &ts); + if (ret == ETIMEDOUT) { + /* + * If we were timeouted and there is nothing + * to do, exit the thread. + */ + if (TAILQ_EMPTY(&sigev_actq) && + (worker->sw_flags & SWF_READYQ) && + sigev_worker_count > sigev_worker_low) + goto out; + } + } + } +out: + if (worker->sw_flags & SWF_READYQ) { + LIST_REMOVE(worker, sw_link); + worker->sw_flags &= ~SWF_READYQ; + } + sigev_worker_count--; + __sigev_list_unlock(); + _pthread_cond_destroy(&worker->sw_cv); + free(worker); + return (0); +} + +static void +worker_cleanup(void *arg) +{ + struct sigev_worker *worker = arg; + struct sigev_node *sn; + + sn = worker->sw_sn; + __sigev_list_lock(); + sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_REMOVED) + __sigev_free(sn); + else if (sn->sn_flags & SNF_ONESHOT) + __sigev_delete_node(sn); + sigev_worker_count--; + __sigev_list_unlock(); + _pthread_cond_destroy(&worker->sw_cv); + free(worker); +} diff --git a/lib/librt/sigev_thread.h b/lib/librt/sigev_thread.h index 998af17..89512f9 100644 --- a/lib/librt/sigev_thread.h +++ b/lib/librt/sigev_thread.h @@ -38,11 +38,12 @@ struct sigev_thread_node; struct sigev_node; typedef uintptr_t sigev_id_t; -typedef void (*sigev_dispatch_t)(struct sigev_node *, siginfo_t *); +typedef void (*sigev_dispatch_t)(struct sigev_node *); struct sigev_node { LIST_ENTRY(sigev_node) sn_link; LIST_ENTRY(sigev_node) sn_allist; + TAILQ_ENTRY(sigev_node) sn_actq; int sn_type; sigev_id_t sn_id; sigev_dispatch_t sn_dispatch; @@ -50,6 +51,8 @@ struct sigev_node { void *sn_func; int sn_flags; int sn_gen; + int sn_usethreadpool; + siginfo_t sn_info; struct sigev_thread_node * sn_tn; }; @@ -57,7 +60,6 @@ struct sigev_thread_attr { int sna_policy; int sna_inherit; int sna_prio; - int sna_scope; size_t sna_stacksize; void *sna_stackaddr; size_t sna_guardsize; @@ -68,19 +70,22 @@ struct sigev_thread_node { pthread_t tn_thread; struct sigev_node *tn_cur; struct sigev_thread_attr tn_sna; + int tn_refcount; long tn_lwpid; - pthread_cond_t tn_cv; jmp_buf tn_jbuf; }; #define SNF_WORKING 0x01 #define SNF_REMOVED 0x02 -#define SNF_ONESHOT 0x04 +#define SNF_ONESHOT 0x04 +#define SNF_ACTQ 0x08 +#define SNF_THREADPOOL 0x10 #define SIGEV_SIGSERVICE (SIGTHR+1) int __sigev_check_init(); -struct sigev_node *__sigev_alloc(int, const struct sigevent *); +struct sigev_node *__sigev_alloc(int, const struct sigevent *, + struct sigev_node *, int usethreadpool); struct sigev_node *__sigev_find(int, sigev_id_t); void __sigev_get_sigevent(struct sigev_node *, struct sigevent *, sigev_id_t); diff --git a/lib/librt/timer.c b/lib/librt/timer.c index c5d0671..3db5f5b 100644 --- a/lib/librt/timer.c +++ b/lib/librt/timer.c @@ -66,16 +66,16 @@ __weak_reference(__timer_settime, _timer_settime); __weak_reference(__timer_getoverrun, timer_getoverrun); __weak_reference(__timer_getoverrun, _timer_getoverrun); -typedef void (*timer_func)(union sigval val, int timerid, int overrun); +typedef void (*timer_func)(union sigval val, int overrun); static void -timer_dispatch(struct sigev_node *sn, siginfo_t *si) +timer_dispatch(struct sigev_node *sn) { timer_func f = sn->sn_func; /* I want to avoid expired notification. */ - if (si->si_value.sival_int == sn->sn_gen) - f(sn->sn_value, si->si_timerid, si->si_overrun); + if (sn->sn_info.si_value.sival_int == sn->sn_gen) + f(sn->sn_value, sn->sn_info.si_overrun); } int @@ -108,7 +108,7 @@ __timer_create(clockid_t clockid, struct sigevent *evp, timer_t *timerid) return (-1); } - sn = __sigev_alloc(SI_TIMER, evp); + sn = __sigev_alloc(SI_TIMER, evp, NULL, 0); if (sn == NULL) { errno = EAGAIN; return (-1); -- cgit v1.1