diff options
author | davidxu <davidxu@FreeBSD.org> | 2010-12-22 05:01:52 +0000 |
---|---|---|
committer | davidxu <davidxu@FreeBSD.org> | 2010-12-22 05:01:52 +0000 |
commit | 437ad27f9c81f522301de0affe1fc5fef37d8828 (patch) | |
tree | d717e6b5f295c555e1deef8497d4130bec1843cf /lib/libthr/thread | |
parent | 24b08bca030970592bc5241517b0462f603b05b1 (diff) | |
download | FreeBSD-src-437ad27f9c81f522301de0affe1fc5fef37d8828.zip FreeBSD-src-437ad27f9c81f522301de0affe1fc5fef37d8828.tar.gz |
MFp4:
- Add flags CVWAIT_ABSTIME and CVWAIT_CLOCKID for umtx kernel based
condition variable, this should eliminate an extra system call to get
current time.
- Add sub-function UMTX_OP_NWAKE_PRIVATE to wake up N channels in single
system call. Create userland sleep queue for condition variable, in most
cases, thread will wait in the queue, the pthread_cond_signal will defer
thread wakeup until the mutex is unlocked, it tries to avoid an extra
system call and a extra context switch in time window of pthread_cond_signal
and pthread_mutex_unlock.
The changes are part of process-shared mutex project.
Diffstat (limited to 'lib/libthr/thread')
-rw-r--r-- | lib/libthr/thread/Makefile.inc | 1 | ||||
-rw-r--r-- | lib/libthr/thread/thr_cond.c | 377 | ||||
-rw-r--r-- | lib/libthr/thread/thr_init.c | 2 | ||||
-rw-r--r-- | lib/libthr/thread/thr_kern.c | 92 | ||||
-rw-r--r-- | lib/libthr/thread/thr_list.c | 4 | ||||
-rw-r--r-- | lib/libthr/thread/thr_mutex.c | 202 | ||||
-rw-r--r-- | lib/libthr/thread/thr_private.h | 115 | ||||
-rw-r--r-- | lib/libthr/thread/thr_umtx.c | 53 | ||||
-rw-r--r-- | lib/libthr/thread/thr_umtx.h | 11 |
9 files changed, 666 insertions, 191 deletions
diff --git a/lib/libthr/thread/Makefile.inc b/lib/libthr/thread/Makefile.inc index 4ce90b5..6d571d4 100644 --- a/lib/libthr/thread/Makefile.inc +++ b/lib/libthr/thread/Makefile.inc @@ -45,6 +45,7 @@ SRCS+= \ thr_setschedparam.c \ thr_sig.c \ thr_single_np.c \ + thr_sleepq.c \ thr_spec.c \ thr_spinlock.c \ thr_stack.c \ diff --git a/lib/libthr/thread/thr_cond.c b/lib/libthr/thread/thr_cond.c index 03b5cdd..6ec6d4c 100644 --- a/lib/libthr/thread/thr_cond.c +++ b/lib/libthr/thread/thr_cond.c @@ -45,7 +45,8 @@ int __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, static int cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); static int cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime, int cancel); -static int cond_signal_common(pthread_cond_t *cond, int broadcast); +static int cond_signal_common(pthread_cond_t *cond); +static int cond_broadcast_common(pthread_cond_t *cond); /* * Double underscore versions are cancellation points. Single underscore @@ -60,31 +61,31 @@ __weak_reference(_pthread_cond_destroy, pthread_cond_destroy); __weak_reference(_pthread_cond_signal, pthread_cond_signal); __weak_reference(_pthread_cond_broadcast, pthread_cond_broadcast); +#define CV_PSHARED(cvp) (((cvp)->__flags & USYNC_PROCESS_SHARED) != 0) + static int cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr) { - pthread_cond_t pcond; - int rval = 0; + struct pthread_cond *cvp; + int error = 0; - if ((pcond = (pthread_cond_t) + if ((cvp = (pthread_cond_t) calloc(1, sizeof(struct pthread_cond))) == NULL) { - rval = ENOMEM; + error = ENOMEM; } else { /* * Initialise the condition variable structure: */ if (cond_attr == NULL || *cond_attr == NULL) { - pcond->c_pshared = 0; - pcond->c_clockid = CLOCK_REALTIME; + cvp->__clock_id = CLOCK_REALTIME; } else { - pcond->c_pshared = (*cond_attr)->c_pshared; - pcond->c_clockid = (*cond_attr)->c_clockid; + if ((*cond_attr)->c_pshared) + cvp->__flags |= USYNC_PROCESS_SHARED; + cvp->__clock_id = (*cond_attr)->c_clockid; } - _thr_umutex_init(&pcond->c_lock); - *cond = pcond; + *cond = cvp; } - /* Return the completion status: */ - return (rval); + return (error); } static int @@ -105,16 +106,16 @@ init_static(struct pthread *thread, pthread_cond_t *cond) } #define CHECK_AND_INIT_COND \ - if (__predict_false((cv = (*cond)) <= THR_COND_DESTROYED)) { \ - if (cv == THR_COND_INITIALIZER) { \ + if (__predict_false((cvp = (*cond)) <= THR_COND_DESTROYED)) { \ + if (cvp == THR_COND_INITIALIZER) { \ int ret; \ ret = init_static(_get_curthread(), cond); \ if (ret) \ return (ret); \ - } else if (cv == THR_COND_DESTROYED) { \ + } else if (cvp == THR_COND_DESTROYED) { \ return (EINVAL); \ } \ - cv = *cond; \ + cvp = *cond; \ } int @@ -128,48 +129,24 @@ _pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr) int _pthread_cond_destroy(pthread_cond_t *cond) { - struct pthread *curthread = _get_curthread(); - struct pthread_cond *cv; - int rval = 0; - - if ((cv = *cond) == THR_COND_INITIALIZER) - rval = 0; - else if (cv == THR_COND_DESTROYED) - rval = EINVAL; + struct pthread_cond *cvp; + int error = 0; + + if ((cvp = *cond) == THR_COND_INITIALIZER) + error = 0; + else if (cvp == THR_COND_DESTROYED) + error = EINVAL; else { - cv = *cond; - THR_UMUTEX_LOCK(curthread, &cv->c_lock); + cvp = *cond; *cond = THR_COND_DESTROYED; - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); /* * Free the memory allocated for the condition * variable structure: */ - free(cv); - } - return (rval); -} - -struct cond_cancel_info -{ - pthread_mutex_t *mutex; - pthread_cond_t *cond; - int count; -}; - -static void -cond_cancel_handler(void *arg) -{ - struct pthread *curthread = _get_curthread(); - struct cond_cancel_info *info = (struct cond_cancel_info *)arg; - pthread_cond_t cv; - - if (info->cond != NULL) { - cv = *(info->cond); - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); + free(cvp); } - _mutex_cv_lock(info->mutex, info->count); + return (error); } /* @@ -181,53 +158,151 @@ cond_cancel_handler(void *arg) * to be lost. */ static int -cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, +cond_wait_kernel(struct pthread_cond *cvp, struct pthread_mutex *mp, const struct timespec *abstime, int cancel) { struct pthread *curthread = _get_curthread(); - struct timespec ts, ts2, *tsp; - struct cond_cancel_info info; - pthread_cond_t cv; - int ret; + int recurse; + int error, error2 = 0; + + error = _mutex_cv_detach(mp, &recurse); + if (error != 0) + return (error); + + if (cancel) { + _thr_cancel_enter2(curthread, 0); + error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters, + (struct umutex *)&mp->m_lock, abstime, + CVWAIT_ABSTIME|CVWAIT_CLOCKID); + _thr_cancel_leave(curthread, 0); + } else { + error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters, + (struct umutex *)&mp->m_lock, abstime, + CVWAIT_ABSTIME|CVWAIT_CLOCKID); + } /* - * If the condition variable is statically initialized, - * perform the dynamic initialization: + * Note that PP mutex and ROBUST mutex may return + * interesting error codes. */ - CHECK_AND_INIT_COND - - cv = *cond; - THR_UMUTEX_LOCK(curthread, &cv->c_lock); - ret = _mutex_cv_unlock(mutex, &info.count); - if (__predict_false(ret != 0)) { - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); - return (ret); + if (error == 0) { + error2 = _mutex_cv_lock(mp, recurse); + } else if (error == EINTR || error == ETIMEDOUT) { + error2 = _mutex_cv_lock(mp, recurse); + if (error2 == 0 && cancel) + _thr_testcancel(curthread); + if (error == EINTR) + error = 0; + } else { + /* We know that it didn't unlock the mutex. */ + error2 = _mutex_cv_attach(mp, recurse); + if (error2 == 0 && cancel) + _thr_testcancel(curthread); } + return (error2 != 0 ? error2 : error); +} + +/* + * Thread waits in userland queue whenever possible, when thread + * is signaled or broadcasted, it is removed from the queue, and + * is saved in curthread's defer_waiters[] buffer, but won't be + * woken up until mutex is unlocked. + */ + +static int +cond_wait_user(struct pthread_cond *cvp, struct pthread_mutex *mp, + const struct timespec *abstime, int cancel) +{ + struct pthread *curthread = _get_curthread(); + struct sleepqueue *sq; + int recurse; + int error; - info.mutex = mutex; - info.cond = cond; + if (curthread->wchan != NULL) + PANIC("thread was already on queue."); - if (abstime != NULL) { - clock_gettime(cv->c_clockid, &ts); - TIMESPEC_SUB(&ts2, abstime, &ts); - tsp = &ts2; - } else - tsp = NULL; + if (cancel) + _thr_testcancel(curthread); - if (cancel) { - THR_CLEANUP_PUSH(curthread, cond_cancel_handler, &info); - _thr_cancel_enter2(curthread, 0); - ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 1); - info.cond = NULL; - _thr_cancel_leave(curthread, (ret != 0)); - THR_CLEANUP_POP(curthread, 0); - } else { - ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 0); + _sleepq_lock(cvp); + /* + * set __has_user_waiters before unlocking mutex, this allows + * us to check it without locking in pthread_cond_signal(). + */ + cvp->__has_user_waiters = 1; + curthread->will_sleep = 1; + (void)_mutex_cv_unlock(mp, &recurse); + curthread->mutex_obj = mp; + _sleepq_add(cvp, curthread); + for(;;) { + _thr_clear_wake(curthread); + _sleepq_unlock(cvp); + + if (cancel) { + _thr_cancel_enter2(curthread, 0); + error = _thr_sleep(curthread, cvp->__clock_id, abstime); + _thr_cancel_leave(curthread, 0); + } else { + error = _thr_sleep(curthread, cvp->__clock_id, abstime); + } + + if (curthread->wchan == NULL) { + error = 0; + goto out; + } + + _sleepq_lock(cvp); + if (curthread->wchan == NULL) { + error = 0; + break; + } else if (cancel && SHOULD_CANCEL(curthread)) { + sq = _sleepq_lookup(cvp); + cvp->__has_user_waiters = + _sleepq_remove(sq, curthread); + _sleepq_unlock(cvp); + curthread->mutex_obj = NULL; + _mutex_cv_lock(mp, recurse); + if (!THR_IN_CRITICAL(curthread)) + _pthread_exit(PTHREAD_CANCELED); + else /* this should not happen */ + return (0); + } else if (error == ETIMEDOUT) { + sq = _sleepq_lookup(cvp); + cvp->__has_user_waiters = + _sleepq_remove(sq, curthread); + break; + } } - if (ret == EINTR) - ret = 0; - _mutex_cv_lock(mutex, info.count); - return (ret); + _sleepq_unlock(cvp); +out: + curthread->mutex_obj = NULL; + _mutex_cv_lock(mp, recurse); + return (error); +} + +static int +cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, + const struct timespec *abstime, int cancel) +{ + struct pthread *curthread = _get_curthread(); + struct pthread_cond *cvp; + struct pthread_mutex *mp; + int error; + + CHECK_AND_INIT_COND + + mp = *mutex; + + if ((error = _mutex_owned(curthread, mp)) != 0) + return (error); + + if (curthread->attr.sched_policy != SCHED_OTHER || + (mp->m_lock.m_flags & (UMUTEX_PRIO_PROTECT|UMUTEX_PRIO_INHERIT| + USYNC_PROCESS_SHARED)) != 0 || + (cvp->__flags & USYNC_PROCESS_SHARED) != 0) + return cond_wait_kernel(cvp, mp, abstime, cancel); + else + return cond_wait_user(cvp, mp, abstime, cancel); } int @@ -245,7 +320,7 @@ __pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) } int -_pthread_cond_timedwait(pthread_cond_t * cond, pthread_mutex_t * mutex, +_pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec * abstime) { @@ -269,11 +344,15 @@ __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, } static int -cond_signal_common(pthread_cond_t *cond, int broadcast) +cond_signal_common(pthread_cond_t *cond) { struct pthread *curthread = _get_curthread(); - pthread_cond_t cv; - int ret = 0; + struct pthread *td; + struct pthread_cond *cvp; + struct pthread_mutex *mp; + struct sleepqueue *sq; + int *waddr; + int pshared; /* * If the condition variable is statically initialized, perform dynamic @@ -281,25 +360,123 @@ cond_signal_common(pthread_cond_t *cond, int broadcast) */ CHECK_AND_INIT_COND - THR_UMUTEX_LOCK(curthread, &cv->c_lock); - if (!broadcast) - ret = _thr_ucond_signal(&cv->c_kerncv); - else - ret = _thr_ucond_broadcast(&cv->c_kerncv); - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); - return (ret); + pshared = CV_PSHARED(cvp); + + _thr_ucond_signal((struct ucond *)&cvp->__has_kern_waiters); + + if (pshared || cvp->__has_user_waiters == 0) + return (0); + + curthread = _get_curthread(); + waddr = NULL; + _sleepq_lock(cvp); + sq = _sleepq_lookup(cvp); + if (sq == NULL) { + _sleepq_unlock(cvp); + return (0); + } + + td = _sleepq_first(sq); + mp = td->mutex_obj; + cvp->__has_user_waiters = _sleepq_remove(sq, td); + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } + curthread->defer_waiters[curthread->nwaiter_defer++] = + &td->wake_addr->value; + mp->m_flags |= PMUTEX_FLAG_DEFERED; + } else { + waddr = &td->wake_addr->value; + } + _sleepq_unlock(cvp); + if (waddr != NULL) + _thr_set_wake(waddr); + return (0); +} + +struct broadcast_arg { + struct pthread *curthread; + unsigned int *waddrs[MAX_DEFER_WAITERS]; + int count; +}; + +static void +drop_cb(struct pthread *td, void *arg) +{ + struct broadcast_arg *ba = arg; + struct pthread_mutex *mp; + struct pthread *curthread = ba->curthread; + + mp = td->mutex_obj; + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } + curthread->defer_waiters[curthread->nwaiter_defer++] = + &td->wake_addr->value; + mp->m_flags |= PMUTEX_FLAG_DEFERED; + } else { + if (ba->count >= MAX_DEFER_WAITERS) { + _thr_wake_all(ba->waddrs, ba->count); + ba->count = 0; + } + ba->waddrs[ba->count++] = &td->wake_addr->value; + } +} + +static int +cond_broadcast_common(pthread_cond_t *cond) +{ + int pshared; + struct pthread_cond *cvp; + struct sleepqueue *sq; + struct broadcast_arg ba; + + /* + * If the condition variable is statically initialized, perform dynamic + * initialization. + */ + CHECK_AND_INIT_COND + + pshared = CV_PSHARED(cvp); + + _thr_ucond_broadcast((struct ucond *)&cvp->__has_kern_waiters); + + if (pshared || cvp->__has_user_waiters == 0) + return (0); + + ba.curthread = _get_curthread(); + ba.count = 0; + + _sleepq_lock(cvp); + sq = _sleepq_lookup(cvp); + if (sq == NULL) { + _sleepq_unlock(cvp); + return (0); + } + _sleepq_drop(sq, drop_cb, &ba); + cvp->__has_user_waiters = 0; + _sleepq_unlock(cvp); + if (ba.count > 0) + _thr_wake_all(ba.waddrs, ba.count); + return (0); } int _pthread_cond_signal(pthread_cond_t * cond) { - return (cond_signal_common(cond, 0)); + return (cond_signal_common(cond)); } int _pthread_cond_broadcast(pthread_cond_t * cond) { - return (cond_signal_common(cond, 1)); + return (cond_broadcast_common(cond)); } diff --git a/lib/libthr/thread/thr_init.c b/lib/libthr/thread/thr_init.c index b10c227..7e07215 100644 --- a/lib/libthr/thread/thr_init.c +++ b/lib/libthr/thread/thr_init.c @@ -444,6 +444,8 @@ init_private(void) _thr_once_init(); _thr_spinlock_init(); _thr_list_init(); + _thr_wake_addr_init(); + _sleepq_init(); /* * Avoid reinitializing some things if they don't need to be, diff --git a/lib/libthr/thread/thr_kern.c b/lib/libthr/thread/thr_kern.c index 3ad33ad..48f7c65 100644 --- a/lib/libthr/thread/thr_kern.c +++ b/lib/libthr/thread/thr_kern.c @@ -30,6 +30,7 @@ #include <sys/types.h> #include <sys/signalvar.h> #include <sys/rtprio.h> +#include <sys/mman.h> #include <pthread.h> #include "thr_private.h" @@ -41,6 +42,10 @@ #define DBG_MSG(x...) #endif +static struct umutex addr_lock; +static struct wake_addr *wake_addr_head; +static struct wake_addr default_wake_addr; + /* * This is called when the first thread (other than the initial * thread) is created. @@ -130,3 +135,90 @@ _thr_setscheduler(lwpid_t lwpid, int policy, const struct sched_param *param) _schedparam_to_rtp(policy, param, &rtp); return (rtprio_thread(RTP_SET, lwpid, &rtp)); } + +void +_thr_wake_addr_init(void) +{ + _thr_umutex_init(&addr_lock); + wake_addr_head = NULL; +} + +/* + * Allocate wake-address, the memory area is never freed after + * allocated, this becauses threads may be referencing it. + */ +struct wake_addr * +_thr_alloc_wake_addr(void) +{ + struct pthread *curthread; + struct wake_addr *p; + + if (_thr_initial == NULL) { + return &default_wake_addr; + } + + curthread = _get_curthread(); + + THR_LOCK_ACQUIRE(curthread, &addr_lock); + if (wake_addr_head == NULL) { + unsigned i; + unsigned pagesize = getpagesize(); + struct wake_addr *pp = (struct wake_addr *) + mmap(NULL, getpagesize(), PROT_READ|PROT_WRITE, + MAP_ANON|MAP_PRIVATE, -1, 0); + for (i = 1; i < pagesize/sizeof(struct wake_addr); ++i) + pp[i].link = &pp[i+1]; + pp[i-1].link = NULL; + wake_addr_head = &pp[1]; + p = &pp[0]; + } else { + p = wake_addr_head; + wake_addr_head = p->link; + } + THR_LOCK_RELEASE(curthread, &addr_lock); + p->value = 0; + return (p); +} + +void +_thr_release_wake_addr(struct wake_addr *wa) +{ + struct pthread *curthread = _get_curthread(); + + if (wa == &default_wake_addr) + return; + THR_LOCK_ACQUIRE(curthread, &addr_lock); + wa->link = wake_addr_head; + wake_addr_head = wa; + THR_LOCK_RELEASE(curthread, &addr_lock); +} + +/* Sleep on thread wakeup address */ +int +_thr_sleep(struct pthread *curthread, int clockid, + const struct timespec *abstime) +{ + + curthread->will_sleep = 0; + if (curthread->nwaiter_defer > 0) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } + + if (curthread->wake_addr->value != 0) + return (0); + + return _thr_umtx_timedwait_uint(&curthread->wake_addr->value, 0, + clockid, abstime, 0); +} + +void +_thr_wake_all(unsigned int *waddrs[], int count) +{ + int i; + + for (i = 0; i < count; ++i) + *waddrs[i] = 1; + _umtx_op(waddrs, UMTX_OP_NWAKE_PRIVATE, count, NULL, NULL); +} diff --git a/lib/libthr/thread/thr_list.c b/lib/libthr/thread/thr_list.c index 7541fd3..249501c 100644 --- a/lib/libthr/thread/thr_list.c +++ b/lib/libthr/thread/thr_list.c @@ -165,6 +165,8 @@ _thr_alloc(struct pthread *curthread) if (tcb != NULL) { memset(thread, 0, sizeof(*thread)); thread->tcb = tcb; + thread->sleepqueue = _sleepq_alloc(); + thread->wake_addr = _thr_alloc_wake_addr(); } else { thr_destroy(curthread, thread); atomic_fetchadd_int(&total_threads, -1); @@ -192,6 +194,8 @@ _thr_free(struct pthread *curthread, struct pthread *thread) } thread->tcb = NULL; if ((curthread == NULL) || (free_thread_count >= MAX_CACHED_THREADS)) { + _sleepq_free(thread->sleepqueue); + _thr_release_wake_addr(thread->wake_addr); thr_destroy(curthread, thread); atomic_fetchadd_int(&total_threads, -1); } else { diff --git a/lib/libthr/thread/thr_mutex.c b/lib/libthr/thread/thr_mutex.c index 29f91ec..bd1fc2b 100644 --- a/lib/libthr/thread/thr_mutex.c +++ b/lib/libthr/thread/thr_mutex.c @@ -92,7 +92,7 @@ int __pthread_mutex_setyieldloops_np(pthread_mutex_t *mutex, int count); static int mutex_self_trylock(pthread_mutex_t); static int mutex_self_lock(pthread_mutex_t, const struct timespec *abstime); -static int mutex_unlock_common(pthread_mutex_t *); +static int mutex_unlock_common(struct pthread_mutex *, int); static int mutex_lock_sleep(struct pthread *, pthread_mutex_t, const struct timespec *); @@ -145,10 +145,9 @@ mutex_init(pthread_mutex_t *mutex, calloc_cb(1, sizeof(struct pthread_mutex))) == NULL) return (ENOMEM); - pmutex->m_type = attr->m_type; + pmutex->m_flags = attr->m_type; pmutex->m_owner = NULL; pmutex->m_count = 0; - pmutex->m_refcount = 0; pmutex->m_spinloops = 0; pmutex->m_yieldloops = 0; MUTEX_INIT_LINK(pmutex); @@ -168,7 +167,7 @@ mutex_init(pthread_mutex_t *mutex, break; } - if (pmutex->m_type == PTHREAD_MUTEX_ADAPTIVE_NP) { + if (PMUTEX_TYPE(pmutex->m_flags) == PTHREAD_MUTEX_ADAPTIVE_NP) { pmutex->m_spinloops = _thr_spinloops ? _thr_spinloops: MUTEX_ADAPTIVE_SPINS; pmutex->m_yieldloops = _thr_yieldloops; @@ -229,7 +228,7 @@ _pthread_mutex_init_calloc_cb(pthread_mutex_t *mutex, ret = mutex_init(mutex, &attr, calloc_cb); if (ret == 0) - (*mutex)->m_private = 1; + (*mutex)->m_flags |= PMUTEX_FLAG_PRIVATE; return (ret); } @@ -266,7 +265,7 @@ _pthread_mutex_destroy(pthread_mutex_t *mutex) } else if (m == THR_MUTEX_DESTROYED) { ret = EINVAL; } else { - if (m->m_owner != NULL || m->m_refcount != 0) { + if (m->m_owner != NULL) { ret = EBUSY; } else { *mutex = THR_MUTEX_DESTROYED; @@ -290,6 +289,17 @@ _pthread_mutex_destroy(pthread_mutex_t *mutex) TAILQ_INSERT_TAIL(&curthread->pp_mutexq, (m), m_qe);\ } while (0) +#define DEQUEUE_MUTEX(curthread, m) \ + (m)->m_owner = NULL; \ + MUTEX_ASSERT_IS_OWNED(m); \ + if (__predict_true(((m)->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0)) \ + TAILQ_REMOVE(&curthread->mutexq, (m), m_qe); \ + else { \ + TAILQ_REMOVE(&curthread->pp_mutexq, (m), m_qe); \ + set_inherited_priority(curthread, m); \ + } \ + MUTEX_INIT_LINK(m); + #define CHECK_AND_INIT_MUTEX \ if (__predict_false((m = *mutex) <= THR_MUTEX_DESTROYED)) { \ if (m == THR_MUTEX_DESTROYED) \ @@ -310,7 +320,7 @@ mutex_trylock_common(pthread_mutex_t *mutex) int ret; id = TID(curthread); - if (m->m_private) + if (m->m_flags & PMUTEX_FLAG_PRIVATE) THR_CRITICAL_ENTER(curthread); ret = _thr_umutex_trylock(&m->m_lock, id); if (__predict_true(ret == 0)) { @@ -318,7 +328,7 @@ mutex_trylock_common(pthread_mutex_t *mutex) } else if (m->m_owner == curthread) { ret = mutex_self_trylock(m); } /* else {} */ - if (ret && m->m_private) + if (ret && (m->m_flags & PMUTEX_FLAG_PRIVATE)) THR_CRITICAL_LEAVE(curthread); return (ret); } @@ -403,12 +413,12 @@ done: static inline int mutex_lock_common(struct pthread_mutex *m, - const struct timespec *abstime) + const struct timespec *abstime, int cvattach) { struct pthread *curthread = _get_curthread(); int ret; - if (m->m_private) + if (!cvattach && m->m_flags & PMUTEX_FLAG_PRIVATE) THR_CRITICAL_ENTER(curthread); if (_thr_umutex_trylock2(&m->m_lock, TID(curthread)) == 0) { ENQUEUE_MUTEX(curthread, m); @@ -416,7 +426,7 @@ mutex_lock_common(struct pthread_mutex *m, } else { ret = mutex_lock_sleep(curthread, m, abstime); } - if (ret && m->m_private) + if (ret && (m->m_flags & PMUTEX_FLAG_PRIVATE) && !cvattach) THR_CRITICAL_LEAVE(curthread); return (ret); } @@ -430,7 +440,7 @@ __pthread_mutex_lock(pthread_mutex_t *mutex) CHECK_AND_INIT_MUTEX - return (mutex_lock_common(m, NULL)); + return (mutex_lock_common(m, NULL, 0)); } int @@ -442,28 +452,83 @@ __pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime CHECK_AND_INIT_MUTEX - return (mutex_lock_common(m, abstime)); + return (mutex_lock_common(m, abstime, 0)); } int -_pthread_mutex_unlock(pthread_mutex_t *m) +_pthread_mutex_unlock(pthread_mutex_t *mutex) { - return (mutex_unlock_common(m)); + struct pthread_mutex *mp; + + mp = *mutex; + return (mutex_unlock_common(mp, 0)); } int -_mutex_cv_lock(pthread_mutex_t *mutex, int count) +_mutex_cv_lock(struct pthread_mutex *m, int count) { - struct pthread_mutex *m; - int ret; + int error; - m = *mutex; - ret = mutex_lock_common(m, NULL); - if (ret == 0) { - m->m_refcount--; - m->m_count += count; + error = mutex_lock_common(m, NULL, 1); + if (error == 0) + m->m_count = count; + return (error); +} + +int +_mutex_cv_unlock(struct pthread_mutex *m, int *count) +{ + + /* + * Clear the count in case this is a recursive mutex. + */ + *count = m->m_count; + m->m_count = 0; + (void)mutex_unlock_common(m, 1); + return (0); +} + +int +_mutex_cv_attach(struct pthread_mutex *m, int count) +{ + struct pthread *curthread = _get_curthread(); + int error; + + ENQUEUE_MUTEX(curthread, m); + m->m_count = count; + return (error); +} + +int +_mutex_cv_detach(struct pthread_mutex *mp, int *recurse) +{ + struct pthread *curthread = _get_curthread(); + int defered; + int error; + + if ((error = _mutex_owned(curthread, mp)) != 0) + return (error); + + /* + * Clear the count in case this is a recursive mutex. + */ + *recurse = mp->m_count; + mp->m_count = 0; + DEQUEUE_MUTEX(curthread, mp); + + /* Will this happen in real-world ? */ + if ((mp->m_flags & PMUTEX_FLAG_DEFERED) != 0) { + defered = 1; + mp->m_flags &= ~PMUTEX_FLAG_DEFERED; + } else + defered = 0; + + if (defered) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; } - return (ret); + return (0); } static int @@ -471,7 +536,7 @@ mutex_self_trylock(struct pthread_mutex *m) { int ret; - switch (m->m_type) { + switch (PMUTEX_TYPE(m->m_flags)) { case PTHREAD_MUTEX_ERRORCHECK: case PTHREAD_MUTEX_NORMAL: ret = EBUSY; @@ -500,7 +565,7 @@ mutex_self_lock(struct pthread_mutex *m, const struct timespec *abstime) struct timespec ts1, ts2; int ret; - switch (m->m_type) { + switch (PMUTEX_TYPE(m->m_flags)) { case PTHREAD_MUTEX_ERRORCHECK: case PTHREAD_MUTEX_ADAPTIVE_NP: if (abstime) { @@ -564,13 +629,12 @@ mutex_self_lock(struct pthread_mutex *m, const struct timespec *abstime) } static int -mutex_unlock_common(pthread_mutex_t *mutex) +mutex_unlock_common(struct pthread_mutex *m, int cv) { struct pthread *curthread = _get_curthread(); - struct pthread_mutex *m; uint32_t id; + int defered; - m = *mutex; if (__predict_false(m <= THR_MUTEX_DESTROYED)) { if (m == THR_MUTEX_DESTROYED) return (EINVAL); @@ -585,65 +649,26 @@ mutex_unlock_common(pthread_mutex_t *mutex) id = TID(curthread); if (__predict_false( - m->m_type == PTHREAD_MUTEX_RECURSIVE && + PMUTEX_TYPE(m->m_flags) == PTHREAD_MUTEX_RECURSIVE && m->m_count > 0)) { m->m_count--; } else { - m->m_owner = NULL; - /* Remove the mutex from the threads queue. */ - MUTEX_ASSERT_IS_OWNED(m); - if (__predict_true((m->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0)) - TAILQ_REMOVE(&curthread->mutexq, m, m_qe); - else { - TAILQ_REMOVE(&curthread->pp_mutexq, m, m_qe); - set_inherited_priority(curthread, m); - } - MUTEX_INIT_LINK(m); - _thr_umutex_unlock(&m->m_lock, id); - } - if (m->m_private) - THR_CRITICAL_LEAVE(curthread); - return (0); -} - -int -_mutex_cv_unlock(pthread_mutex_t *mutex, int *count) -{ - struct pthread *curthread = _get_curthread(); - struct pthread_mutex *m; - - m = *mutex; - if (__predict_false(m <= THR_MUTEX_DESTROYED)) { - if (m == THR_MUTEX_DESTROYED) - return (EINVAL); - return (EPERM); - } + if (curthread->will_sleep == 0 && (m->m_flags & PMUTEX_FLAG_DEFERED) != 0) { + defered = 1; + m->m_flags &= ~PMUTEX_FLAG_DEFERED; + } else + defered = 0; - /* - * Check if the running thread is not the owner of the mutex. - */ - if (__predict_false(m->m_owner != curthread)) - return (EPERM); + DEQUEUE_MUTEX(curthread, m); + _thr_umutex_unlock(&m->m_lock, id); - /* - * Clear the count in case this is a recursive mutex. - */ - *count = m->m_count; - m->m_refcount++; - m->m_count = 0; - m->m_owner = NULL; - /* Remove the mutex from the threads queue. */ - MUTEX_ASSERT_IS_OWNED(m); - if (__predict_true((m->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0)) - TAILQ_REMOVE(&curthread->mutexq, m, m_qe); - else { - TAILQ_REMOVE(&curthread->pp_mutexq, m, m_qe); - set_inherited_priority(curthread, m); + if (defered) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } } - MUTEX_INIT_LINK(m); - _thr_umutex_unlock(&m->m_lock, TID(curthread)); - - if (m->m_private) + if (!cv && m->m_flags & PMUTEX_FLAG_PRIVATE) THR_CRITICAL_LEAVE(curthread); return (0); } @@ -757,3 +782,16 @@ _pthread_mutex_isowned_np(pthread_mutex_t *mutex) return (0); return (m->m_owner == _get_curthread()); } + +int +_mutex_owned(struct pthread *curthread, const struct pthread_mutex *mp) +{ + if (__predict_false(mp <= THR_MUTEX_DESTROYED)) { + if (mp == THR_MUTEX_DESTROYED) + return (EINVAL); + return (EPERM); + } + if (mp->m_owner != curthread) + return (EPERM); + return (0); +} diff --git a/lib/libthr/thread/thr_private.h b/lib/libthr/thread/thr_private.h index 7180d12..9df97aa 100644 --- a/lib/libthr/thread/thr_private.h +++ b/lib/libthr/thread/thr_private.h @@ -135,18 +135,23 @@ TAILQ_HEAD(mutex_queue, pthread_mutex); #define THR_RWLOCK_INITIALIZER ((struct pthread_rwlock *)NULL) #define THR_RWLOCK_DESTROYED ((struct pthread_rwlock *)1) +#define PMUTEX_FLAG_TYPE_MASK 0x0ff +#define PMUTEX_FLAG_PRIVATE 0x100 +#define PMUTEX_FLAG_DEFERED 0x200 +#define PMUTEX_TYPE(mtxflags) ((mtxflags) & PMUTEX_FLAG_TYPE_MASK) + +#define MAX_DEFER_WAITERS 50 + struct pthread_mutex { /* * Lock for accesses to this structure. */ struct umutex m_lock; - enum pthread_mutextype m_type; + int m_flags; struct pthread *m_owner; int m_count; - int m_refcount; int m_spinloops; int m_yieldloops; - int m_private; /* * Link for all mutexes a thread currently owns. */ @@ -163,10 +168,10 @@ struct pthread_mutex_attr { { PTHREAD_MUTEX_DEFAULT, PTHREAD_PRIO_NONE, 0, MUTEX_FLAGS_PRIVATE } struct pthread_cond { - struct umutex c_lock; - struct ucond c_kerncv; - int c_pshared; - int c_clockid; + __uint32_t __has_user_waiters; + __uint32_t __has_kern_waiters; + __uint32_t __flags; + __uint32_t __clock_id; }; struct pthread_cond_attr { @@ -245,6 +250,21 @@ struct pthread_attr { size_t cpusetsize; }; +struct wake_addr { + struct wake_addr *link; + unsigned int value; + char pad[12]; +}; + +struct sleepqueue { + TAILQ_HEAD(, pthread) sq_blocked; + SLIST_HEAD(, sleepqueue) sq_freeq; + LIST_ENTRY(sleepqueue) sq_hash; + SLIST_ENTRY(sleepqueue) sq_flink; + void *sq_wchan; + int sq_type; +}; + /* * Thread creation state attributes. */ @@ -356,6 +376,9 @@ struct pthread { /* Hash queue entry. */ LIST_ENTRY(pthread) hle; + /* Sleep queue entry */ + TAILQ_ENTRY(pthread) wle; + /* Threads reference count. */ int refcount; @@ -482,6 +505,27 @@ struct pthread { /* Event */ td_event_msg_t event_buf; + + struct wake_addr *wake_addr; +#define WAKE_ADDR(td) ((td)->wake_addr) + + /* Sleep queue */ + struct sleepqueue *sleepqueue; + + /* Wait channel */ + void *wchan; + + /* Referenced mutex. */ + struct pthread_mutex *mutex_obj; + + /* Thread will sleep. */ + int will_sleep; + + /* Number of threads deferred. */ + int nwaiter_defer; + + /* Deferred threads from pthread_cond_signal. */ + unsigned int *defer_waiters[MAX_DEFER_WAITERS]; }; #define THR_SHOULD_GC(thrd) \ @@ -519,6 +563,12 @@ do { \ _thr_umutex_lock(lck, TID(thrd)); \ } while (0) +#define THR_LOCK_ACQUIRE_SPIN(thrd, lck) \ +do { \ + (thrd)->locklevel++; \ + _thr_umutex_lock_spin(lck, TID(thrd)); \ +} while (0) + #ifdef _PTHREADS_INVARIANTS #define THR_ASSERT_LOCKLEVEL(thrd) \ do { \ @@ -671,8 +721,11 @@ extern struct umutex _thr_event_lock __hidden; */ __BEGIN_DECLS int _thr_setthreaded(int) __hidden; -int _mutex_cv_lock(pthread_mutex_t *, int count) __hidden; -int _mutex_cv_unlock(pthread_mutex_t *, int *count) __hidden; +int _mutex_cv_lock(struct pthread_mutex *, int count) __hidden; +int _mutex_cv_unlock(struct pthread_mutex *, int *count) __hidden; +int _mutex_cv_attach(struct pthread_mutex *, int count) __hidden; +int _mutex_cv_detach(struct pthread_mutex *, int *count) __hidden; +int _mutex_owned(struct pthread *, const struct pthread_mutex *) __hidden; int _mutex_reinit(pthread_mutex_t *) __hidden; void _mutex_fork(struct pthread *curthread) __hidden; void _libpthread_init(struct pthread *) __hidden; @@ -797,6 +850,50 @@ _thr_check_init(void) _libpthread_init(NULL); } +struct wake_addr *_thr_alloc_wake_addr(void); +void _thr_release_wake_addr(struct wake_addr *); +int _thr_sleep(struct pthread *, int, const struct timespec *); + +void _thr_wake_addr_init(void) __hidden; + +static inline void +_thr_clear_wake(struct pthread *td) +{ + td->wake_addr->value = 0; +} + +static inline int +_thr_is_woken(struct pthread *td) +{ + return td->wake_addr->value != 0; +} + +static inline void +_thr_set_wake(unsigned int *waddr) +{ + *waddr = 1; + _thr_umtx_wake(waddr, INT_MAX, 0); +} + +void _thr_wake_all(unsigned int *waddrs[], int) __hidden; + +static inline struct pthread * +_sleepq_first(struct sleepqueue *sq) +{ + return TAILQ_FIRST(&sq->sq_blocked); +} + +void _sleepq_init(void) __hidden; +struct sleepqueue *_sleepq_alloc(void) __hidden; +void _sleepq_free(struct sleepqueue *) __hidden; +void _sleepq_lock(void *) __hidden; +void _sleepq_unlock(void *) __hidden; +struct sleepqueue *_sleepq_lookup(void *) __hidden; +void _sleepq_add(void *, struct pthread *) __hidden; +int _sleepq_remove(struct sleepqueue *, struct pthread *) __hidden; +void _sleepq_drop(struct sleepqueue *, + void (*cb)(struct pthread *, void *arg), void *) __hidden; + struct dl_phdr_info; void __pthread_cxa_finalize(struct dl_phdr_info *phdr_info); void _thr_tsd_unload(struct dl_phdr_info *phdr_info) __hidden; diff --git a/lib/libthr/thread/thr_umtx.c b/lib/libthr/thread/thr_umtx.c index dabfa35..33c3637 100644 --- a/lib/libthr/thread/thr_umtx.c +++ b/lib/libthr/thread/thr_umtx.c @@ -74,6 +74,39 @@ __thr_umutex_lock(struct umutex *mtx, uint32_t id) return _umtx_op_err(mtx, UMTX_OP_MUTEX_LOCK, 0, 0, 0); } +#define SPINLOOPS 1000 + +int +__thr_umutex_lock_spin(struct umutex *mtx, uint32_t id) +{ + uint32_t owner; + + if (!_thr_is_smp) + return __thr_umutex_lock(mtx, id); + + if ((mtx->m_flags & (UMUTEX_PRIO_PROTECT | UMUTEX_PRIO_INHERIT)) == 0) { + for (;;) { + int count = SPINLOOPS; + while (count--) { + owner = mtx->m_owner; + if ((owner & ~UMUTEX_CONTESTED) == 0) { + if (atomic_cmpset_acq_32( + &mtx->m_owner, + owner, id|owner)) { + return (0); + } + } + CPU_SPINWAIT; + } + + /* wait in kernel */ + _umtx_op_err(mtx, UMTX_OP_MUTEX_WAIT, 0, 0, 0); + } + } + + return _umtx_op_err(mtx, UMTX_OP_MUTEX_LOCK, 0, 0, 0); +} + int __thr_umutex_timedlock(struct umutex *mtx, uint32_t id, const struct timespec *ets) @@ -164,6 +197,26 @@ _thr_umtx_wait_uint(volatile u_int *mtx, u_int id, const struct timespec *timeou } int +_thr_umtx_timedwait_uint(volatile u_int *mtx, u_int id, int clockid, + const struct timespec *abstime, int shared) +{ + struct timespec ts, ts2, *tsp; + + if (abstime != NULL) { + clock_gettime(clockid, &ts); + TIMESPEC_SUB(&ts2, abstime, &ts); + if (ts2.tv_sec < 0 || ts2.tv_nsec <= 0) + return (ETIMEDOUT); + tsp = &ts2; + } else { + tsp = NULL; + } + return _umtx_op_err(__DEVOLATILE(void *, mtx), + shared ? UMTX_OP_WAIT_UINT : UMTX_OP_WAIT_UINT_PRIVATE, id, NULL, + tsp); +} + +int _thr_umtx_wake(volatile void *mtx, int nr_wakeup, int shared) { return _umtx_op_err(__DEVOLATILE(void *, mtx), shared ? UMTX_OP_WAKE : UMTX_OP_WAKE_PRIVATE, diff --git a/lib/libthr/thread/thr_umtx.h b/lib/libthr/thread/thr_umtx.h index 3f53faf..0a8034b 100644 --- a/lib/libthr/thread/thr_umtx.h +++ b/lib/libthr/thread/thr_umtx.h @@ -36,6 +36,7 @@ #define DEFAULT_URWLOCK {0,0,0,0,{0,0,0,0}} int __thr_umutex_lock(struct umutex *mtx, uint32_t id) __hidden; +int __thr_umutex_lock_spin(struct umutex *mtx, uint32_t id) __hidden; int __thr_umutex_timedlock(struct umutex *mtx, uint32_t id, const struct timespec *timeout) __hidden; int __thr_umutex_unlock(struct umutex *mtx, uint32_t id) __hidden; @@ -50,6 +51,8 @@ int _thr_umtx_wait(volatile long *mtx, long exp, const struct timespec *timeout) __hidden; int _thr_umtx_wait_uint(volatile u_int *mtx, u_int exp, const struct timespec *timeout, int shared) __hidden; +int _thr_umtx_timedwait_uint(volatile u_int *mtx, u_int exp, int clockid, + const struct timespec *timeout, int shared) __hidden; int _thr_umtx_wake(volatile void *mtx, int count, int shared) __hidden; int _thr_ucond_wait(struct ucond *cv, struct umutex *m, const struct timespec *timeout, int check_unpaking) __hidden; @@ -97,6 +100,14 @@ _thr_umutex_lock(struct umutex *mtx, uint32_t id) } static inline int +_thr_umutex_lock_spin(struct umutex *mtx, uint32_t id) +{ + if (_thr_umutex_trylock2(mtx, id) == 0) + return (0); + return (__thr_umutex_lock_spin(mtx, id)); +} + +static inline int _thr_umutex_timedlock(struct umutex *mtx, uint32_t id, const struct timespec *timeout) { |