diff options
Diffstat (limited to 'lib/libthr/thread/thr_cond.c')
-rw-r--r-- | lib/libthr/thread/thr_cond.c | 377 |
1 files changed, 277 insertions, 100 deletions
diff --git a/lib/libthr/thread/thr_cond.c b/lib/libthr/thread/thr_cond.c index 03b5cdd..6ec6d4c 100644 --- a/lib/libthr/thread/thr_cond.c +++ b/lib/libthr/thread/thr_cond.c @@ -45,7 +45,8 @@ int __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, static int cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); static int cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime, int cancel); -static int cond_signal_common(pthread_cond_t *cond, int broadcast); +static int cond_signal_common(pthread_cond_t *cond); +static int cond_broadcast_common(pthread_cond_t *cond); /* * Double underscore versions are cancellation points. Single underscore @@ -60,31 +61,31 @@ __weak_reference(_pthread_cond_destroy, pthread_cond_destroy); __weak_reference(_pthread_cond_signal, pthread_cond_signal); __weak_reference(_pthread_cond_broadcast, pthread_cond_broadcast); +#define CV_PSHARED(cvp) (((cvp)->__flags & USYNC_PROCESS_SHARED) != 0) + static int cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr) { - pthread_cond_t pcond; - int rval = 0; + struct pthread_cond *cvp; + int error = 0; - if ((pcond = (pthread_cond_t) + if ((cvp = (pthread_cond_t) calloc(1, sizeof(struct pthread_cond))) == NULL) { - rval = ENOMEM; + error = ENOMEM; } else { /* * Initialise the condition variable structure: */ if (cond_attr == NULL || *cond_attr == NULL) { - pcond->c_pshared = 0; - pcond->c_clockid = CLOCK_REALTIME; + cvp->__clock_id = CLOCK_REALTIME; } else { - pcond->c_pshared = (*cond_attr)->c_pshared; - pcond->c_clockid = (*cond_attr)->c_clockid; + if ((*cond_attr)->c_pshared) + cvp->__flags |= USYNC_PROCESS_SHARED; + cvp->__clock_id = (*cond_attr)->c_clockid; } - _thr_umutex_init(&pcond->c_lock); - *cond = pcond; + *cond = cvp; } - /* Return the completion status: */ - return (rval); + return (error); } static int @@ -105,16 +106,16 @@ init_static(struct pthread *thread, pthread_cond_t *cond) } #define CHECK_AND_INIT_COND \ - if (__predict_false((cv = (*cond)) <= THR_COND_DESTROYED)) { \ - if (cv == THR_COND_INITIALIZER) { \ + if (__predict_false((cvp = (*cond)) <= THR_COND_DESTROYED)) { \ + if (cvp == THR_COND_INITIALIZER) { \ int ret; \ ret = init_static(_get_curthread(), cond); \ if (ret) \ return (ret); \ - } else if (cv == THR_COND_DESTROYED) { \ + } else if (cvp == THR_COND_DESTROYED) { \ return (EINVAL); \ } \ - cv = *cond; \ + cvp = *cond; \ } int @@ -128,48 +129,24 @@ _pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr) int _pthread_cond_destroy(pthread_cond_t *cond) { - struct pthread *curthread = _get_curthread(); - struct pthread_cond *cv; - int rval = 0; - - if ((cv = *cond) == THR_COND_INITIALIZER) - rval = 0; - else if (cv == THR_COND_DESTROYED) - rval = EINVAL; + struct pthread_cond *cvp; + int error = 0; + + if ((cvp = *cond) == THR_COND_INITIALIZER) + error = 0; + else if (cvp == THR_COND_DESTROYED) + error = EINVAL; else { - cv = *cond; - THR_UMUTEX_LOCK(curthread, &cv->c_lock); + cvp = *cond; *cond = THR_COND_DESTROYED; - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); /* * Free the memory allocated for the condition * variable structure: */ - free(cv); - } - return (rval); -} - -struct cond_cancel_info -{ - pthread_mutex_t *mutex; - pthread_cond_t *cond; - int count; -}; - -static void -cond_cancel_handler(void *arg) -{ - struct pthread *curthread = _get_curthread(); - struct cond_cancel_info *info = (struct cond_cancel_info *)arg; - pthread_cond_t cv; - - if (info->cond != NULL) { - cv = *(info->cond); - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); + free(cvp); } - _mutex_cv_lock(info->mutex, info->count); + return (error); } /* @@ -181,53 +158,151 @@ cond_cancel_handler(void *arg) * to be lost. */ static int -cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, +cond_wait_kernel(struct pthread_cond *cvp, struct pthread_mutex *mp, const struct timespec *abstime, int cancel) { struct pthread *curthread = _get_curthread(); - struct timespec ts, ts2, *tsp; - struct cond_cancel_info info; - pthread_cond_t cv; - int ret; + int recurse; + int error, error2 = 0; + + error = _mutex_cv_detach(mp, &recurse); + if (error != 0) + return (error); + + if (cancel) { + _thr_cancel_enter2(curthread, 0); + error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters, + (struct umutex *)&mp->m_lock, abstime, + CVWAIT_ABSTIME|CVWAIT_CLOCKID); + _thr_cancel_leave(curthread, 0); + } else { + error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters, + (struct umutex *)&mp->m_lock, abstime, + CVWAIT_ABSTIME|CVWAIT_CLOCKID); + } /* - * If the condition variable is statically initialized, - * perform the dynamic initialization: + * Note that PP mutex and ROBUST mutex may return + * interesting error codes. */ - CHECK_AND_INIT_COND - - cv = *cond; - THR_UMUTEX_LOCK(curthread, &cv->c_lock); - ret = _mutex_cv_unlock(mutex, &info.count); - if (__predict_false(ret != 0)) { - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); - return (ret); + if (error == 0) { + error2 = _mutex_cv_lock(mp, recurse); + } else if (error == EINTR || error == ETIMEDOUT) { + error2 = _mutex_cv_lock(mp, recurse); + if (error2 == 0 && cancel) + _thr_testcancel(curthread); + if (error == EINTR) + error = 0; + } else { + /* We know that it didn't unlock the mutex. */ + error2 = _mutex_cv_attach(mp, recurse); + if (error2 == 0 && cancel) + _thr_testcancel(curthread); } + return (error2 != 0 ? error2 : error); +} + +/* + * Thread waits in userland queue whenever possible, when thread + * is signaled or broadcasted, it is removed from the queue, and + * is saved in curthread's defer_waiters[] buffer, but won't be + * woken up until mutex is unlocked. + */ + +static int +cond_wait_user(struct pthread_cond *cvp, struct pthread_mutex *mp, + const struct timespec *abstime, int cancel) +{ + struct pthread *curthread = _get_curthread(); + struct sleepqueue *sq; + int recurse; + int error; - info.mutex = mutex; - info.cond = cond; + if (curthread->wchan != NULL) + PANIC("thread was already on queue."); - if (abstime != NULL) { - clock_gettime(cv->c_clockid, &ts); - TIMESPEC_SUB(&ts2, abstime, &ts); - tsp = &ts2; - } else - tsp = NULL; + if (cancel) + _thr_testcancel(curthread); - if (cancel) { - THR_CLEANUP_PUSH(curthread, cond_cancel_handler, &info); - _thr_cancel_enter2(curthread, 0); - ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 1); - info.cond = NULL; - _thr_cancel_leave(curthread, (ret != 0)); - THR_CLEANUP_POP(curthread, 0); - } else { - ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 0); + _sleepq_lock(cvp); + /* + * set __has_user_waiters before unlocking mutex, this allows + * us to check it without locking in pthread_cond_signal(). + */ + cvp->__has_user_waiters = 1; + curthread->will_sleep = 1; + (void)_mutex_cv_unlock(mp, &recurse); + curthread->mutex_obj = mp; + _sleepq_add(cvp, curthread); + for(;;) { + _thr_clear_wake(curthread); + _sleepq_unlock(cvp); + + if (cancel) { + _thr_cancel_enter2(curthread, 0); + error = _thr_sleep(curthread, cvp->__clock_id, abstime); + _thr_cancel_leave(curthread, 0); + } else { + error = _thr_sleep(curthread, cvp->__clock_id, abstime); + } + + if (curthread->wchan == NULL) { + error = 0; + goto out; + } + + _sleepq_lock(cvp); + if (curthread->wchan == NULL) { + error = 0; + break; + } else if (cancel && SHOULD_CANCEL(curthread)) { + sq = _sleepq_lookup(cvp); + cvp->__has_user_waiters = + _sleepq_remove(sq, curthread); + _sleepq_unlock(cvp); + curthread->mutex_obj = NULL; + _mutex_cv_lock(mp, recurse); + if (!THR_IN_CRITICAL(curthread)) + _pthread_exit(PTHREAD_CANCELED); + else /* this should not happen */ + return (0); + } else if (error == ETIMEDOUT) { + sq = _sleepq_lookup(cvp); + cvp->__has_user_waiters = + _sleepq_remove(sq, curthread); + break; + } } - if (ret == EINTR) - ret = 0; - _mutex_cv_lock(mutex, info.count); - return (ret); + _sleepq_unlock(cvp); +out: + curthread->mutex_obj = NULL; + _mutex_cv_lock(mp, recurse); + return (error); +} + +static int +cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex, + const struct timespec *abstime, int cancel) +{ + struct pthread *curthread = _get_curthread(); + struct pthread_cond *cvp; + struct pthread_mutex *mp; + int error; + + CHECK_AND_INIT_COND + + mp = *mutex; + + if ((error = _mutex_owned(curthread, mp)) != 0) + return (error); + + if (curthread->attr.sched_policy != SCHED_OTHER || + (mp->m_lock.m_flags & (UMUTEX_PRIO_PROTECT|UMUTEX_PRIO_INHERIT| + USYNC_PROCESS_SHARED)) != 0 || + (cvp->__flags & USYNC_PROCESS_SHARED) != 0) + return cond_wait_kernel(cvp, mp, abstime, cancel); + else + return cond_wait_user(cvp, mp, abstime, cancel); } int @@ -245,7 +320,7 @@ __pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) } int -_pthread_cond_timedwait(pthread_cond_t * cond, pthread_mutex_t * mutex, +_pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec * abstime) { @@ -269,11 +344,15 @@ __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, } static int -cond_signal_common(pthread_cond_t *cond, int broadcast) +cond_signal_common(pthread_cond_t *cond) { struct pthread *curthread = _get_curthread(); - pthread_cond_t cv; - int ret = 0; + struct pthread *td; + struct pthread_cond *cvp; + struct pthread_mutex *mp; + struct sleepqueue *sq; + int *waddr; + int pshared; /* * If the condition variable is statically initialized, perform dynamic @@ -281,25 +360,123 @@ cond_signal_common(pthread_cond_t *cond, int broadcast) */ CHECK_AND_INIT_COND - THR_UMUTEX_LOCK(curthread, &cv->c_lock); - if (!broadcast) - ret = _thr_ucond_signal(&cv->c_kerncv); - else - ret = _thr_ucond_broadcast(&cv->c_kerncv); - THR_UMUTEX_UNLOCK(curthread, &cv->c_lock); - return (ret); + pshared = CV_PSHARED(cvp); + + _thr_ucond_signal((struct ucond *)&cvp->__has_kern_waiters); + + if (pshared || cvp->__has_user_waiters == 0) + return (0); + + curthread = _get_curthread(); + waddr = NULL; + _sleepq_lock(cvp); + sq = _sleepq_lookup(cvp); + if (sq == NULL) { + _sleepq_unlock(cvp); + return (0); + } + + td = _sleepq_first(sq); + mp = td->mutex_obj; + cvp->__has_user_waiters = _sleepq_remove(sq, td); + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } + curthread->defer_waiters[curthread->nwaiter_defer++] = + &td->wake_addr->value; + mp->m_flags |= PMUTEX_FLAG_DEFERED; + } else { + waddr = &td->wake_addr->value; + } + _sleepq_unlock(cvp); + if (waddr != NULL) + _thr_set_wake(waddr); + return (0); +} + +struct broadcast_arg { + struct pthread *curthread; + unsigned int *waddrs[MAX_DEFER_WAITERS]; + int count; +}; + +static void +drop_cb(struct pthread *td, void *arg) +{ + struct broadcast_arg *ba = arg; + struct pthread_mutex *mp; + struct pthread *curthread = ba->curthread; + + mp = td->mutex_obj; + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) { + _thr_wake_all(curthread->defer_waiters, + curthread->nwaiter_defer); + curthread->nwaiter_defer = 0; + } + curthread->defer_waiters[curthread->nwaiter_defer++] = + &td->wake_addr->value; + mp->m_flags |= PMUTEX_FLAG_DEFERED; + } else { + if (ba->count >= MAX_DEFER_WAITERS) { + _thr_wake_all(ba->waddrs, ba->count); + ba->count = 0; + } + ba->waddrs[ba->count++] = &td->wake_addr->value; + } +} + +static int +cond_broadcast_common(pthread_cond_t *cond) +{ + int pshared; + struct pthread_cond *cvp; + struct sleepqueue *sq; + struct broadcast_arg ba; + + /* + * If the condition variable is statically initialized, perform dynamic + * initialization. + */ + CHECK_AND_INIT_COND + + pshared = CV_PSHARED(cvp); + + _thr_ucond_broadcast((struct ucond *)&cvp->__has_kern_waiters); + + if (pshared || cvp->__has_user_waiters == 0) + return (0); + + ba.curthread = _get_curthread(); + ba.count = 0; + + _sleepq_lock(cvp); + sq = _sleepq_lookup(cvp); + if (sq == NULL) { + _sleepq_unlock(cvp); + return (0); + } + _sleepq_drop(sq, drop_cb, &ba); + cvp->__has_user_waiters = 0; + _sleepq_unlock(cvp); + if (ba.count > 0) + _thr_wake_all(ba.waddrs, ba.count); + return (0); } int _pthread_cond_signal(pthread_cond_t * cond) { - return (cond_signal_common(cond, 0)); + return (cond_signal_common(cond)); } int _pthread_cond_broadcast(pthread_cond_t * cond) { - return (cond_signal_common(cond, 1)); + return (cond_broadcast_common(cond)); } |