diff options
Diffstat (limited to 'lib/librt/sigev_thread.c')
-rw-r--r-- | lib/librt/sigev_thread.c | 537 |
1 files changed, 159 insertions, 378 deletions
diff --git a/lib/librt/sigev_thread.c b/lib/librt/sigev_thread.c index ea86c60..998c08e 100644 --- a/lib/librt/sigev_thread.c +++ b/lib/librt/sigev_thread.c @@ -31,12 +31,10 @@ #include <machine/atomic.h> #include "namespace.h" -#include <assert.h> #include <err.h> #include <errno.h> #include <ucontext.h> #include <sys/thr.h> -#include <sys/time.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -46,96 +44,59 @@ #include "sigev_thread.h" -/* Lowest number of worker threads should be kept. */ -#define SIGEV_WORKER_LOW 0 - -/* Highest number of worker threads can be created. */ -#define SIGEV_WORKER_HIGH 20 - -/* How long an idle worker thread should stay. */ -#define SIGEV_WORKER_IDLE 10 - -struct sigev_worker { - LIST_ENTRY(sigev_worker) sw_link; - pthread_cond_t sw_cv; - struct sigev_node *sw_sn; - int sw_flags; - int *sw_readyptr; -}; - -#define SWF_READYQ 1 - LIST_HEAD(sigev_list_head, sigev_node); #define HASH_QUEUES 17 #define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) static struct sigev_list_head sigev_hash[HASH_QUEUES]; static struct sigev_list_head sigev_all; -static TAILQ_HEAD(, sigev_node) sigev_actq; -static TAILQ_HEAD(, sigev_thread_node) sigev_threads; +static LIST_HEAD(,sigev_thread) sigev_threads; static int sigev_generation; static pthread_mutex_t *sigev_list_mtx; static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; -static pthread_mutex_t *sigev_threads_mtx; -static pthread_cond_t *sigev_threads_cv; -static pthread_cond_t *sigev_actq_cv; -static struct sigev_thread_node *sigev_default_thread; -static struct sigev_thread_attr sigev_default_sna; +static struct sigev_thread *sigev_default_thread; static pthread_attr_t sigev_default_attr; static int atfork_registered; -static LIST_HEAD(,sigev_worker) sigev_worker_ready; -static int sigev_worker_count; -static int sigev_worker_start; -static int sigev_worker_high; -static int sigev_worker_low; -static pthread_cond_t *sigev_worker_init_cv; - -static void __sigev_fork_prepare(void); -static void __sigev_fork_parent(void); -static void __sigev_fork_child(void); -static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *, - struct sigev_thread_node *, int); -static void *sigev_service_loop(void *); -static void *sigev_worker_routine(void *); -static void sigev_put(struct sigev_node *); -static void worker_cleanup(void *arg); + +static void __sigev_fork_prepare(void); +static void __sigev_fork_parent(void); +static void __sigev_fork_child(void); +static struct sigev_thread *sigev_thread_create(int); +static void *sigev_service_loop(void *); +static void *worker_routine(void *); +static void worker_cleanup(void *); #pragma weak pthread_create -#pragma weak pthread_attr_getschedpolicy -#pragma weak pthread_attr_getinheritsched -#pragma weak pthread_attr_getschedparam -#pragma weak pthread_attr_getscope -#pragma weak pthread_attr_getstacksize -#pragma weak pthread_attr_getstackaddr -#pragma weak pthread_attr_getguardsize -#pragma weak pthread_attr_init -#pragma weak pthread_attr_setscope -#pragma weak pthread_attr_setdetachstate -#pragma weak pthread_atfork -#pragma weak _pthread_once -#pragma weak pthread_cleanup_push -#pragma weak pthread_cleanup_pop -#pragma weak pthread_setcancelstate - -static __inline void -attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) -{ - struct sched_param sched_param; - - pthread_attr_getschedpolicy(attr, &sna->sna_policy); - pthread_attr_getinheritsched(attr, &sna->sna_inherit); - pthread_attr_getschedparam(attr, &sched_param); - sna->sna_prio = sched_param.sched_priority; - pthread_attr_getstacksize(attr, &sna->sna_stacksize); - pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); - pthread_attr_getguardsize(attr, &sna->sna_guardsize); -} -static __inline int -sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) +static void +attrcopy(pthread_attr_t *src, pthread_attr_t *dst) { - return memcmp(a, b, sizeof(*a)) == 0; + struct sched_param sched; + void *a; + size_t u; + int v; + + _pthread_attr_getschedpolicy(src, &v); + _pthread_attr_setschedpolicy(dst, v); + + _pthread_attr_getinheritsched(src, &v); + _pthread_attr_setinheritsched(dst, v); + + _pthread_attr_getschedparam(src, &sched); + _pthread_attr_setschedparam(dst, &sched); + + _pthread_attr_getscope(src, &v); + _pthread_attr_setscope(dst, v); + + _pthread_attr_getstacksize(src, &u); + _pthread_attr_setstacksize(dst, u); + + _pthread_attr_getstackaddr(src, &a); + _pthread_attr_setstackaddr(src, a); + + _pthread_attr_getguardsize(src, &u); + _pthread_attr_setguardsize(dst, u); } static __inline int @@ -148,46 +109,36 @@ void __sigev_thread_init(void) { static int inited = 0; + pthread_mutexattr_t mattr; int i; + _pthread_mutexattr_init(&mattr); + _pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL); sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); - _pthread_mutex_init(sigev_list_mtx, NULL); - sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); - _pthread_mutex_init(sigev_threads_mtx, NULL); - sigev_threads_cv = malloc(sizeof(pthread_cond_t)); - _pthread_cond_init(sigev_threads_cv, NULL); - sigev_actq_cv = malloc(sizeof(pthread_cond_t)); - _pthread_cond_init(sigev_actq_cv, NULL); + _pthread_mutex_init(sigev_list_mtx, &mattr); + _pthread_mutexattr_destroy(&mattr); + for (i = 0; i < HASH_QUEUES; ++i) LIST_INIT(&sigev_hash[i]); LIST_INIT(&sigev_all); - TAILQ_INIT(&sigev_threads); - TAILQ_INIT(&sigev_actq); + LIST_INIT(&sigev_threads); sigev_default_thread = NULL; - sigev_worker_count = 0; - sigev_worker_start = 0; - LIST_INIT(&sigev_worker_ready); - sigev_worker_high = SIGEV_WORKER_HIGH; - sigev_worker_low = SIGEV_WORKER_LOW; - sigev_worker_init_cv = malloc(sizeof(pthread_cond_t)); - _pthread_cond_init(sigev_worker_init_cv, NULL); if (atfork_registered == 0) { - pthread_atfork( + _pthread_atfork( __sigev_fork_prepare, __sigev_fork_parent, __sigev_fork_child); atfork_registered = 1; } if (!inited) { - pthread_attr_init(&sigev_default_attr); - attr2sna(&sigev_default_attr, &sigev_default_sna); - pthread_attr_setscope(&sigev_default_attr, + _pthread_attr_init(&sigev_default_attr); + _pthread_attr_setscope(&sigev_default_attr, PTHREAD_SCOPE_SYSTEM); - pthread_attr_setdetachstate(&sigev_default_attr, + _pthread_attr_setdetachstate(&sigev_default_attr, PTHREAD_CREATE_DETACHED); inited = 1; } - sigev_default_thread = sigev_thread_create(NULL, NULL, 0); + sigev_default_thread = sigev_thread_create(0); } int @@ -223,49 +174,46 @@ __sigev_fork_child(void) __sigev_thread_init(); } -int +void __sigev_list_lock(void) { - return _pthread_mutex_lock(sigev_list_mtx); + _pthread_mutex_lock(sigev_list_mtx); } -int +void __sigev_list_unlock(void) { - return _pthread_mutex_unlock(sigev_list_mtx); -} - -int -__sigev_thread_list_lock(void) -{ - return _pthread_mutex_lock(sigev_threads_mtx); -} - -int -__sigev_thread_list_unlock(void) -{ - return _pthread_mutex_unlock(sigev_threads_mtx); + _pthread_mutex_unlock(sigev_list_mtx); } struct sigev_node * __sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, - int usethreadpool) + int usedefault) { struct sigev_node *sn; sn = calloc(1, sizeof(*sn)); if (sn != NULL) { sn->sn_value = evp->sigev_value; - sn->sn_func = evp->sigev_notify_function; - sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); - sn->sn_type = type; - if (usethreadpool) - sn->sn_flags |= SNF_THREADPOOL; - sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes, - prev ? prev->sn_tn : NULL, usethreadpool); - if (sn->sn_tn == NULL) { - free(sn); - sn = NULL; + sn->sn_func = evp->sigev_notify_function; + sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); + sn->sn_type = type; + _pthread_attr_init(&sn->sn_attr); + _pthread_attr_setdetachstate(&sn->sn_attr, PTHREAD_CREATE_DETACHED); + if (evp->sigev_notify_attributes) + attrcopy(evp->sigev_notify_attributes, &sn->sn_attr); + if (prev) { + __sigev_list_lock(); + prev->sn_tn->tn_refcount++; + __sigev_list_unlock(); + sn->sn_tn = prev->sn_tn; + } else { + sn->sn_tn = sigev_thread_create(usedefault); + if (sn->sn_tn == NULL) { + _pthread_attr_destroy(&sn->sn_attr); + free(sn); + sn = NULL; + } } } return (sn); @@ -276,11 +224,11 @@ __sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp, sigev_id_t id) { /* - * Build a new sigevent, and tell kernel to deliver SIGEV_SIGSERVICE + * Build a new sigevent, and tell kernel to deliver SIGSERVICE * signal to the new thread. */ newevp->sigev_notify = SIGEV_THREAD_ID; - newevp->sigev_signo = SIGEV_SIGSERVICE; + newevp->sigev_signo = SIGSERVICE; newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid; newevp->sigev_value.sival_ptr = (void *)id; } @@ -288,6 +236,7 @@ __sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp, void __sigev_free(struct sigev_node *sn) { + _pthread_attr_destroy(&sn->sn_attr); free(sn); } @@ -310,7 +259,6 @@ __sigev_register(struct sigev_node *sn) int chain = HASH(sn->sn_type, sn->sn_id); LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link); - LIST_INSERT_HEAD(&sigev_all, sn, sn_allist); return (0); } @@ -329,26 +277,17 @@ int __sigev_delete_node(struct sigev_node *sn) { LIST_REMOVE(sn, sn_link); - LIST_REMOVE(sn, sn_allist); - __sigev_thread_list_lock(); if (--sn->sn_tn->tn_refcount == 0) - if (!(sn->sn_flags & SNF_THREADPOOL)) - pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE); - __sigev_thread_list_unlock(); + _pthread_kill(sn->sn_tn->tn_thread, SIGSERVICE); if (sn->sn_flags & SNF_WORKING) sn->sn_flags |= SNF_REMOVED; - else { - if (sn->sn_flags & SNF_ACTQ) { - TAILQ_REMOVE(&sigev_actq, sn, sn_actq); - } + else __sigev_free(sn); - } return (0); } -static -sigev_id_t +static sigev_id_t sigev_get_id(siginfo_t *si) { switch(si->si_code) { @@ -362,324 +301,166 @@ sigev_get_id(siginfo_t *si) return (-1); } -static struct sigev_thread_node * -sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev, - int usepool) +static struct sigev_thread * +sigev_thread_create(int usedefault) { - struct sigev_thread_node *tn; - struct sigev_thread_attr sna; + struct sigev_thread *tn; sigset_t set; int ret; - if (pattr == NULL) - pattr = &sigev_default_attr; - else { - pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM); - pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED); - } - - attr2sna(pattr, &sna); - - __sigev_thread_list_lock(); - - if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) { - prev->tn_refcount++; - __sigev_thread_list_unlock(); - return (prev); - } - - if (sna_eq(&sna, &sigev_default_sna) && usepool && - sigev_default_thread != NULL) { + if (usedefault && sigev_default_thread) { + __sigev_list_lock(); sigev_default_thread->tn_refcount++; - __sigev_thread_list_unlock(); - return (sigev_default_thread); - } - - tn = NULL; - /* Search a thread matching the required stack address */ - if (sna.sna_stackaddr != NULL) { - TAILQ_FOREACH(tn, &sigev_threads, tn_link) { - if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) - break; - } + __sigev_list_unlock(); + return (sigev_default_thread); } - if (tn != NULL) { - tn->tn_refcount++; - __sigev_thread_list_unlock(); - return (tn); - } tn = malloc(sizeof(*tn)); - tn->tn_sna = sna; tn->tn_cur = NULL; tn->tn_lwpid = -1; tn->tn_refcount = 1; - TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); + _pthread_cond_init(&tn->tn_cv, NULL); + + /* for debug */ + __sigev_list_lock(); + LIST_INSERT_HEAD(&sigev_threads, tn, tn_link); + __sigev_list_unlock(); + sigemptyset(&set); - sigaddset(&set, SIGEV_SIGSERVICE); + sigaddset(&set, SIGSERVICE); + _sigprocmask(SIG_BLOCK, &set, NULL); - ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); + ret = pthread_create(&tn->tn_thread, &sigev_default_attr, + sigev_service_loop, tn); _sigprocmask(SIG_UNBLOCK, &set, NULL); + if (ret != 0) { - TAILQ_REMOVE(&sigev_threads, tn, tn_link); - __sigev_thread_list_unlock(); + __sigev_list_lock(); + LIST_REMOVE(tn, tn_link); + __sigev_list_unlock(); free(tn); tn = NULL; } else { /* wait the thread to get its lwpid */ - while (tn->tn_lwpid == -1) - _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx); - __sigev_thread_list_unlock(); - } - return (tn); -} -static void -after_dispatch(struct sigev_thread_node *tn) -{ - struct sigev_node *sn; - - if ((sn = tn->tn_cur) != NULL) { __sigev_list_lock(); - sn->sn_flags &= ~SNF_WORKING; - if (sn->sn_flags & SNF_REMOVED) - __sigev_free(sn); - else if (sn->sn_flags & SNF_ONESHOT) - __sigev_delete_node(sn); - tn->tn_cur = NULL; + while (tn->tn_lwpid == -1) + _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); __sigev_list_unlock(); } - tn->tn_cur = NULL; -} - -/* - * This function is called if user callback calls - * pthread_exit() or pthread_cancel() for the thread. - */ -static void -thread_cleanup(void *arg) -{ - struct sigev_thread_node *tn = arg; - - fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from " - "SIGEV_THREAD is undefined."); - after_dispatch(tn); - /* longjmp(tn->tn_jbuf, 1); */ - abort(); + return (tn); } /* - * Main notification dispatch function, the function either - * run user callback by itself or hand off the notifications - * to worker threads depend on flags. + * The thread receives notification from kernel and creates + * a thread to call user callback function. */ static void * sigev_service_loop(void *arg) { + static int failure; + siginfo_t si; sigset_t set; - struct sigev_thread_node *tn; + struct sigev_thread *tn; struct sigev_node *sn; sigev_id_t id; + pthread_t td; int ret; tn = arg; thr_self(&tn->tn_lwpid); - __sigev_thread_list_lock(); - _pthread_cond_broadcast(sigev_threads_cv); - __sigev_thread_list_unlock(); + __sigev_list_lock(); + _pthread_cond_broadcast(&tn->tn_cv); + __sigev_list_unlock(); /* * Service thread should not be killed by callback, if user * attempts to do so, the thread will be restarted. */ - setjmp(tn->tn_jbuf); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); sigemptyset(&set); - sigaddset(&set, SIGEV_SIGSERVICE); - pthread_cleanup_push(thread_cleanup, tn); + sigaddset(&set, SIGSERVICE); for (;;) { ret = sigwaitinfo(&set, &si); - __sigev_thread_list_lock(); + + __sigev_list_lock(); if (tn->tn_refcount == 0) { - TAILQ_REMOVE(&sigev_threads, tn, tn_link); - __sigev_thread_list_unlock(); + LIST_REMOVE(tn, tn_link); + __sigev_list_unlock(); free(tn); break; } - __sigev_thread_list_unlock(); - if (ret == -1) + + if (ret == -1) { + __sigev_list_unlock(); continue; + } + id = sigev_get_id(&si); - __sigev_list_lock(); sn = __sigev_find(si.si_code, id); - if (sn != NULL) { - sn->sn_info = si; - if (!(sn->sn_flags & SNF_THREADPOOL)) { - tn->tn_cur = sn; - sn->sn_flags |= SNF_WORKING; - __sigev_list_unlock(); - sn->sn_dispatch(sn); - after_dispatch(tn); - } else { - assert(!(sn->sn_flags & SNF_ACTQ)); - sigev_put(sn); - } - } else { - tn->tn_cur = NULL; + if (sn == NULL) { __sigev_list_unlock(); + continue; } - } - pthread_cleanup_pop(0); - return (0); -} + + sn->sn_info = si; + if (sn->sn_flags & SNF_SYNC) + tn->tn_cur = sn; + else + tn->tn_cur = NULL; + sn->sn_flags |= SNF_WORKING; + __sigev_list_unlock(); -/* - * Hand off notifications to worker threads. - * - * prerequist: sigev list locked. - */ -static void -sigev_put(struct sigev_node *sn) -{ - struct sigev_worker *worker; - pthread_t td; - int ret, ready; + ret = pthread_create(&td, &sn->sn_attr, worker_routine, sn); + if (ret != 0) { + if (failure++ < 5) + warnc(ret, "%s:%s failed to create thread.\n", + __FILE__, __func__); - TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq); - sn->sn_flags |= SNF_ACTQ; - /* - * check if we should add more worker threads unless quota is hit. - */ - if (LIST_EMPTY(&sigev_worker_ready) && - sigev_worker_count + sigev_worker_start < sigev_worker_high) { - sigev_worker_start++; - __sigev_list_unlock(); - worker = malloc(sizeof(*worker)); - _pthread_cond_init(&worker->sw_cv, NULL); - worker->sw_flags = 0; - worker->sw_sn = 0; - worker->sw_readyptr = &ready; - ready = 0; - ret = pthread_create(&td, &sigev_default_attr, - sigev_worker_routine, worker); - if (ret) { - warnc(ret, "%s:%s can not create worker thread", - __FILE__, __func__); __sigev_list_lock(); - sigev_worker_start--; + sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_REMOVED) + __sigev_free(sn); __sigev_list_unlock(); - } else { + } else if (tn->tn_cur) { __sigev_list_lock(); - while (ready == 0) { - _pthread_cond_wait(sigev_worker_init_cv, - sigev_list_mtx); - } - __sigev_list_unlock(); - } - } else { - worker = LIST_FIRST(&sigev_worker_ready); - if (worker) { - LIST_REMOVE(worker, sw_link); - worker->sw_flags &= ~SWF_READYQ; - _pthread_cond_broadcast(&worker->sw_cv); + while (tn->tn_cur) + _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); __sigev_list_unlock(); } } + return (0); } /* - * Background thread to dispatch notification to user code. - * These threads are not bound to any realtime objects. + * newly created worker thread to call user callback function. */ static void * -sigev_worker_routine(void *arg) +worker_routine(void *arg) { - struct sigev_worker *worker; - struct sigev_node *sn; - struct timespec ts; - int ret; + struct sigev_node *sn = arg; - worker = arg; - __sigev_list_lock(); - sigev_worker_count++; - sigev_worker_start--; - (*(worker->sw_readyptr))++; - LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); - worker->sw_flags |= SWF_READYQ; - _pthread_cond_broadcast(sigev_worker_init_cv); - - for (;;) { - if (worker->sw_flags & SWF_READYQ) { - LIST_REMOVE(worker, sw_link); - worker->sw_flags &= ~SWF_READYQ; - } + _pthread_cleanup_push(worker_cleanup, sn); + sn->sn_dispatch(sn); + _pthread_cleanup_pop(1); - sn = TAILQ_FIRST(&sigev_actq); - if (sn != NULL) { - TAILQ_REMOVE(&sigev_actq, sn, sn_actq); - sn->sn_flags &= ~SNF_ACTQ; - sn->sn_flags |= SNF_WORKING; - __sigev_list_unlock(); - - worker->sw_sn = sn; - pthread_cleanup_push(worker_cleanup, worker); - sn->sn_dispatch(sn); - pthread_cleanup_pop(0); - worker->sw_sn = NULL; - - __sigev_list_lock(); - sn->sn_flags &= ~SNF_WORKING; - if (sn->sn_flags & SNF_REMOVED) - __sigev_free(sn); - else if (sn->sn_flags & SNF_ONESHOT) - __sigev_delete_node(sn); - } else { - LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); - worker->sw_flags |= SWF_READYQ; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += SIGEV_WORKER_IDLE; - ret = _pthread_cond_timedwait(&worker->sw_cv, - sigev_list_mtx, &ts); - if (ret == ETIMEDOUT) { - /* - * If we were timeouted and there is nothing - * to do, exit the thread. - */ - if (TAILQ_EMPTY(&sigev_actq) && - (worker->sw_flags & SWF_READYQ) && - sigev_worker_count > sigev_worker_low) - goto out; - } - } - } -out: - if (worker->sw_flags & SWF_READYQ) { - LIST_REMOVE(worker, sw_link); - worker->sw_flags &= ~SWF_READYQ; - } - sigev_worker_count--; - __sigev_list_unlock(); - _pthread_cond_destroy(&worker->sw_cv); - free(worker); return (0); } +/* clean up a notification after dispatch. */ static void worker_cleanup(void *arg) { - struct sigev_worker *worker = arg; - struct sigev_node *sn; + struct sigev_node *sn = arg; - sn = worker->sw_sn; __sigev_list_lock(); - sn->sn_flags &= ~SNF_WORKING; + if (sn->sn_flags & SNF_SYNC) { + sn->sn_tn->tn_cur = NULL; + _pthread_cond_broadcast(&sn->sn_tn->tn_cv); + } if (sn->sn_flags & SNF_REMOVED) __sigev_free(sn); - else if (sn->sn_flags & SNF_ONESHOT) - __sigev_delete_node(sn); - sigev_worker_count--; + else + sn->sn_flags &= ~SNF_WORKING; __sigev_list_unlock(); - _pthread_cond_destroy(&worker->sw_cv); - free(worker); } |