summaryrefslogtreecommitdiffstats
path: root/lib/libthr/thread
diff options
context:
space:
mode:
authordavidxu <davidxu@FreeBSD.org>2010-12-22 05:01:52 +0000
committerdavidxu <davidxu@FreeBSD.org>2010-12-22 05:01:52 +0000
commit437ad27f9c81f522301de0affe1fc5fef37d8828 (patch)
treed717e6b5f295c555e1deef8497d4130bec1843cf /lib/libthr/thread
parent24b08bca030970592bc5241517b0462f603b05b1 (diff)
downloadFreeBSD-src-437ad27f9c81f522301de0affe1fc5fef37d8828.zip
FreeBSD-src-437ad27f9c81f522301de0affe1fc5fef37d8828.tar.gz
MFp4:
- Add flags CVWAIT_ABSTIME and CVWAIT_CLOCKID for umtx kernel based condition variable, this should eliminate an extra system call to get current time. - Add sub-function UMTX_OP_NWAKE_PRIVATE to wake up N channels in single system call. Create userland sleep queue for condition variable, in most cases, thread will wait in the queue, the pthread_cond_signal will defer thread wakeup until the mutex is unlocked, it tries to avoid an extra system call and a extra context switch in time window of pthread_cond_signal and pthread_mutex_unlock. The changes are part of process-shared mutex project.
Diffstat (limited to 'lib/libthr/thread')
-rw-r--r--lib/libthr/thread/Makefile.inc1
-rw-r--r--lib/libthr/thread/thr_cond.c377
-rw-r--r--lib/libthr/thread/thr_init.c2
-rw-r--r--lib/libthr/thread/thr_kern.c92
-rw-r--r--lib/libthr/thread/thr_list.c4
-rw-r--r--lib/libthr/thread/thr_mutex.c202
-rw-r--r--lib/libthr/thread/thr_private.h115
-rw-r--r--lib/libthr/thread/thr_umtx.c53
-rw-r--r--lib/libthr/thread/thr_umtx.h11
9 files changed, 666 insertions, 191 deletions
diff --git a/lib/libthr/thread/Makefile.inc b/lib/libthr/thread/Makefile.inc
index 4ce90b5..6d571d4 100644
--- a/lib/libthr/thread/Makefile.inc
+++ b/lib/libthr/thread/Makefile.inc
@@ -45,6 +45,7 @@ SRCS+= \
thr_setschedparam.c \
thr_sig.c \
thr_single_np.c \
+ thr_sleepq.c \
thr_spec.c \
thr_spinlock.c \
thr_stack.c \
diff --git a/lib/libthr/thread/thr_cond.c b/lib/libthr/thread/thr_cond.c
index 03b5cdd..6ec6d4c 100644
--- a/lib/libthr/thread/thr_cond.c
+++ b/lib/libthr/thread/thr_cond.c
@@ -45,7 +45,8 @@ int __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
static int cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
static int cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime, int cancel);
-static int cond_signal_common(pthread_cond_t *cond, int broadcast);
+static int cond_signal_common(pthread_cond_t *cond);
+static int cond_broadcast_common(pthread_cond_t *cond);
/*
* Double underscore versions are cancellation points. Single underscore
@@ -60,31 +61,31 @@ __weak_reference(_pthread_cond_destroy, pthread_cond_destroy);
__weak_reference(_pthread_cond_signal, pthread_cond_signal);
__weak_reference(_pthread_cond_broadcast, pthread_cond_broadcast);
+#define CV_PSHARED(cvp) (((cvp)->__flags & USYNC_PROCESS_SHARED) != 0)
+
static int
cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr)
{
- pthread_cond_t pcond;
- int rval = 0;
+ struct pthread_cond *cvp;
+ int error = 0;
- if ((pcond = (pthread_cond_t)
+ if ((cvp = (pthread_cond_t)
calloc(1, sizeof(struct pthread_cond))) == NULL) {
- rval = ENOMEM;
+ error = ENOMEM;
} else {
/*
* Initialise the condition variable structure:
*/
if (cond_attr == NULL || *cond_attr == NULL) {
- pcond->c_pshared = 0;
- pcond->c_clockid = CLOCK_REALTIME;
+ cvp->__clock_id = CLOCK_REALTIME;
} else {
- pcond->c_pshared = (*cond_attr)->c_pshared;
- pcond->c_clockid = (*cond_attr)->c_clockid;
+ if ((*cond_attr)->c_pshared)
+ cvp->__flags |= USYNC_PROCESS_SHARED;
+ cvp->__clock_id = (*cond_attr)->c_clockid;
}
- _thr_umutex_init(&pcond->c_lock);
- *cond = pcond;
+ *cond = cvp;
}
- /* Return the completion status: */
- return (rval);
+ return (error);
}
static int
@@ -105,16 +106,16 @@ init_static(struct pthread *thread, pthread_cond_t *cond)
}
#define CHECK_AND_INIT_COND \
- if (__predict_false((cv = (*cond)) <= THR_COND_DESTROYED)) { \
- if (cv == THR_COND_INITIALIZER) { \
+ if (__predict_false((cvp = (*cond)) <= THR_COND_DESTROYED)) { \
+ if (cvp == THR_COND_INITIALIZER) { \
int ret; \
ret = init_static(_get_curthread(), cond); \
if (ret) \
return (ret); \
- } else if (cv == THR_COND_DESTROYED) { \
+ } else if (cvp == THR_COND_DESTROYED) { \
return (EINVAL); \
} \
- cv = *cond; \
+ cvp = *cond; \
}
int
@@ -128,48 +129,24 @@ _pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr)
int
_pthread_cond_destroy(pthread_cond_t *cond)
{
- struct pthread *curthread = _get_curthread();
- struct pthread_cond *cv;
- int rval = 0;
-
- if ((cv = *cond) == THR_COND_INITIALIZER)
- rval = 0;
- else if (cv == THR_COND_DESTROYED)
- rval = EINVAL;
+ struct pthread_cond *cvp;
+ int error = 0;
+
+ if ((cvp = *cond) == THR_COND_INITIALIZER)
+ error = 0;
+ else if (cvp == THR_COND_DESTROYED)
+ error = EINVAL;
else {
- cv = *cond;
- THR_UMUTEX_LOCK(curthread, &cv->c_lock);
+ cvp = *cond;
*cond = THR_COND_DESTROYED;
- THR_UMUTEX_UNLOCK(curthread, &cv->c_lock);
/*
* Free the memory allocated for the condition
* variable structure:
*/
- free(cv);
- }
- return (rval);
-}
-
-struct cond_cancel_info
-{
- pthread_mutex_t *mutex;
- pthread_cond_t *cond;
- int count;
-};
-
-static void
-cond_cancel_handler(void *arg)
-{
- struct pthread *curthread = _get_curthread();
- struct cond_cancel_info *info = (struct cond_cancel_info *)arg;
- pthread_cond_t cv;
-
- if (info->cond != NULL) {
- cv = *(info->cond);
- THR_UMUTEX_UNLOCK(curthread, &cv->c_lock);
+ free(cvp);
}
- _mutex_cv_lock(info->mutex, info->count);
+ return (error);
}
/*
@@ -181,53 +158,151 @@ cond_cancel_handler(void *arg)
* to be lost.
*/
static int
-cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
+cond_wait_kernel(struct pthread_cond *cvp, struct pthread_mutex *mp,
const struct timespec *abstime, int cancel)
{
struct pthread *curthread = _get_curthread();
- struct timespec ts, ts2, *tsp;
- struct cond_cancel_info info;
- pthread_cond_t cv;
- int ret;
+ int recurse;
+ int error, error2 = 0;
+
+ error = _mutex_cv_detach(mp, &recurse);
+ if (error != 0)
+ return (error);
+
+ if (cancel) {
+ _thr_cancel_enter2(curthread, 0);
+ error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters,
+ (struct umutex *)&mp->m_lock, abstime,
+ CVWAIT_ABSTIME|CVWAIT_CLOCKID);
+ _thr_cancel_leave(curthread, 0);
+ } else {
+ error = _thr_ucond_wait((struct ucond *)&cvp->__has_kern_waiters,
+ (struct umutex *)&mp->m_lock, abstime,
+ CVWAIT_ABSTIME|CVWAIT_CLOCKID);
+ }
/*
- * If the condition variable is statically initialized,
- * perform the dynamic initialization:
+ * Note that PP mutex and ROBUST mutex may return
+ * interesting error codes.
*/
- CHECK_AND_INIT_COND
-
- cv = *cond;
- THR_UMUTEX_LOCK(curthread, &cv->c_lock);
- ret = _mutex_cv_unlock(mutex, &info.count);
- if (__predict_false(ret != 0)) {
- THR_UMUTEX_UNLOCK(curthread, &cv->c_lock);
- return (ret);
+ if (error == 0) {
+ error2 = _mutex_cv_lock(mp, recurse);
+ } else if (error == EINTR || error == ETIMEDOUT) {
+ error2 = _mutex_cv_lock(mp, recurse);
+ if (error2 == 0 && cancel)
+ _thr_testcancel(curthread);
+ if (error == EINTR)
+ error = 0;
+ } else {
+ /* We know that it didn't unlock the mutex. */
+ error2 = _mutex_cv_attach(mp, recurse);
+ if (error2 == 0 && cancel)
+ _thr_testcancel(curthread);
}
+ return (error2 != 0 ? error2 : error);
+}
+
+/*
+ * Thread waits in userland queue whenever possible, when thread
+ * is signaled or broadcasted, it is removed from the queue, and
+ * is saved in curthread's defer_waiters[] buffer, but won't be
+ * woken up until mutex is unlocked.
+ */
+
+static int
+cond_wait_user(struct pthread_cond *cvp, struct pthread_mutex *mp,
+ const struct timespec *abstime, int cancel)
+{
+ struct pthread *curthread = _get_curthread();
+ struct sleepqueue *sq;
+ int recurse;
+ int error;
- info.mutex = mutex;
- info.cond = cond;
+ if (curthread->wchan != NULL)
+ PANIC("thread was already on queue.");
- if (abstime != NULL) {
- clock_gettime(cv->c_clockid, &ts);
- TIMESPEC_SUB(&ts2, abstime, &ts);
- tsp = &ts2;
- } else
- tsp = NULL;
+ if (cancel)
+ _thr_testcancel(curthread);
- if (cancel) {
- THR_CLEANUP_PUSH(curthread, cond_cancel_handler, &info);
- _thr_cancel_enter2(curthread, 0);
- ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 1);
- info.cond = NULL;
- _thr_cancel_leave(curthread, (ret != 0));
- THR_CLEANUP_POP(curthread, 0);
- } else {
- ret = _thr_ucond_wait(&cv->c_kerncv, &cv->c_lock, tsp, 0);
+ _sleepq_lock(cvp);
+ /*
+ * set __has_user_waiters before unlocking mutex, this allows
+ * us to check it without locking in pthread_cond_signal().
+ */
+ cvp->__has_user_waiters = 1;
+ curthread->will_sleep = 1;
+ (void)_mutex_cv_unlock(mp, &recurse);
+ curthread->mutex_obj = mp;
+ _sleepq_add(cvp, curthread);
+ for(;;) {
+ _thr_clear_wake(curthread);
+ _sleepq_unlock(cvp);
+
+ if (cancel) {
+ _thr_cancel_enter2(curthread, 0);
+ error = _thr_sleep(curthread, cvp->__clock_id, abstime);
+ _thr_cancel_leave(curthread, 0);
+ } else {
+ error = _thr_sleep(curthread, cvp->__clock_id, abstime);
+ }
+
+ if (curthread->wchan == NULL) {
+ error = 0;
+ goto out;
+ }
+
+ _sleepq_lock(cvp);
+ if (curthread->wchan == NULL) {
+ error = 0;
+ break;
+ } else if (cancel && SHOULD_CANCEL(curthread)) {
+ sq = _sleepq_lookup(cvp);
+ cvp->__has_user_waiters =
+ _sleepq_remove(sq, curthread);
+ _sleepq_unlock(cvp);
+ curthread->mutex_obj = NULL;
+ _mutex_cv_lock(mp, recurse);
+ if (!THR_IN_CRITICAL(curthread))
+ _pthread_exit(PTHREAD_CANCELED);
+ else /* this should not happen */
+ return (0);
+ } else if (error == ETIMEDOUT) {
+ sq = _sleepq_lookup(cvp);
+ cvp->__has_user_waiters =
+ _sleepq_remove(sq, curthread);
+ break;
+ }
}
- if (ret == EINTR)
- ret = 0;
- _mutex_cv_lock(mutex, info.count);
- return (ret);
+ _sleepq_unlock(cvp);
+out:
+ curthread->mutex_obj = NULL;
+ _mutex_cv_lock(mp, recurse);
+ return (error);
+}
+
+static int
+cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
+ const struct timespec *abstime, int cancel)
+{
+ struct pthread *curthread = _get_curthread();
+ struct pthread_cond *cvp;
+ struct pthread_mutex *mp;
+ int error;
+
+ CHECK_AND_INIT_COND
+
+ mp = *mutex;
+
+ if ((error = _mutex_owned(curthread, mp)) != 0)
+ return (error);
+
+ if (curthread->attr.sched_policy != SCHED_OTHER ||
+ (mp->m_lock.m_flags & (UMUTEX_PRIO_PROTECT|UMUTEX_PRIO_INHERIT|
+ USYNC_PROCESS_SHARED)) != 0 ||
+ (cvp->__flags & USYNC_PROCESS_SHARED) != 0)
+ return cond_wait_kernel(cvp, mp, abstime, cancel);
+ else
+ return cond_wait_user(cvp, mp, abstime, cancel);
}
int
@@ -245,7 +320,7 @@ __pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
}
int
-_pthread_cond_timedwait(pthread_cond_t * cond, pthread_mutex_t * mutex,
+_pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec * abstime)
{
@@ -269,11 +344,15 @@ __pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
}
static int
-cond_signal_common(pthread_cond_t *cond, int broadcast)
+cond_signal_common(pthread_cond_t *cond)
{
struct pthread *curthread = _get_curthread();
- pthread_cond_t cv;
- int ret = 0;
+ struct pthread *td;
+ struct pthread_cond *cvp;
+ struct pthread_mutex *mp;
+ struct sleepqueue *sq;
+ int *waddr;
+ int pshared;
/*
* If the condition variable is statically initialized, perform dynamic
@@ -281,25 +360,123 @@ cond_signal_common(pthread_cond_t *cond, int broadcast)
*/
CHECK_AND_INIT_COND
- THR_UMUTEX_LOCK(curthread, &cv->c_lock);
- if (!broadcast)
- ret = _thr_ucond_signal(&cv->c_kerncv);
- else
- ret = _thr_ucond_broadcast(&cv->c_kerncv);
- THR_UMUTEX_UNLOCK(curthread, &cv->c_lock);
- return (ret);
+ pshared = CV_PSHARED(cvp);
+
+ _thr_ucond_signal((struct ucond *)&cvp->__has_kern_waiters);
+
+ if (pshared || cvp->__has_user_waiters == 0)
+ return (0);
+
+ curthread = _get_curthread();
+ waddr = NULL;
+ _sleepq_lock(cvp);
+ sq = _sleepq_lookup(cvp);
+ if (sq == NULL) {
+ _sleepq_unlock(cvp);
+ return (0);
+ }
+
+ td = _sleepq_first(sq);
+ mp = td->mutex_obj;
+ cvp->__has_user_waiters = _sleepq_remove(sq, td);
+ if (mp->m_owner == curthread) {
+ if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) {
+ _thr_wake_all(curthread->defer_waiters,
+ curthread->nwaiter_defer);
+ curthread->nwaiter_defer = 0;
+ }
+ curthread->defer_waiters[curthread->nwaiter_defer++] =
+ &td->wake_addr->value;
+ mp->m_flags |= PMUTEX_FLAG_DEFERED;
+ } else {
+ waddr = &td->wake_addr->value;
+ }
+ _sleepq_unlock(cvp);
+ if (waddr != NULL)
+ _thr_set_wake(waddr);
+ return (0);
+}
+
+struct broadcast_arg {
+ struct pthread *curthread;
+ unsigned int *waddrs[MAX_DEFER_WAITERS];
+ int count;
+};
+
+static void
+drop_cb(struct pthread *td, void *arg)
+{
+ struct broadcast_arg *ba = arg;
+ struct pthread_mutex *mp;
+ struct pthread *curthread = ba->curthread;
+
+ mp = td->mutex_obj;
+ if (mp->m_owner == curthread) {
+ if (curthread->nwaiter_defer >= MAX_DEFER_WAITERS) {
+ _thr_wake_all(curthread->defer_waiters,
+ curthread->nwaiter_defer);
+ curthread->nwaiter_defer = 0;
+ }
+ curthread->defer_waiters[curthread->nwaiter_defer++] =
+ &td->wake_addr->value;
+ mp->m_flags |= PMUTEX_FLAG_DEFERED;
+ } else {
+ if (ba->count >= MAX_DEFER_WAITERS) {
+ _thr_wake_all(ba->waddrs, ba->count);
+ ba->count = 0;
+ }
+ ba->waddrs[ba->count++] = &td->wake_addr->value;
+ }
+}
+
+static int
+cond_broadcast_common(pthread_cond_t *cond)
+{
+ int pshared;
+ struct pthread_cond *cvp;
+ struct sleepqueue *sq;
+ struct broadcast_arg ba;
+
+ /*
+ * If the condition variable is statically initialized, perform dynamic
+ * initialization.
+ */
+ CHECK_AND_INIT_COND
+
+ pshared = CV_PSHARED(cvp);
+
+ _thr_ucond_broadcast((struct ucond *)&cvp->__has_kern_waiters);
+
+ if (pshared || cvp->__has_user_waiters == 0)
+ return (0);
+
+ ba.curthread = _get_curthread();
+ ba.count = 0;
+
+ _sleepq_lock(cvp);
+ sq = _sleepq_lookup(cvp);
+ if (sq == NULL) {
+ _sleepq_unlock(cvp);
+ return (0);
+ }
+ _sleepq_drop(sq, drop_cb, &ba);
+ cvp->__has_user_waiters = 0;
+ _sleepq_unlock(cvp);
+ if (ba.count > 0)
+ _thr_wake_all(ba.waddrs, ba.count);
+ return (0);
}
int
_pthread_cond_signal(pthread_cond_t * cond)
{
- return (cond_signal_common(cond, 0));
+ return (cond_signal_common(cond));
}
int
_pthread_cond_broadcast(pthread_cond_t * cond)
{
- return (cond_signal_common(cond, 1));
+ return (cond_broadcast_common(cond));
}
diff --git a/lib/libthr/thread/thr_init.c b/lib/libthr/thread/thr_init.c
index b10c227..7e07215 100644
--- a/lib/libthr/thread/thr_init.c
+++ b/lib/libthr/thread/thr_init.c
@@ -444,6 +444,8 @@ init_private(void)
_thr_once_init();
_thr_spinlock_init();
_thr_list_init();
+ _thr_wake_addr_init();
+ _sleepq_init();
/*
* Avoid reinitializing some things if they don't need to be,
diff --git a/lib/libthr/thread/thr_kern.c b/lib/libthr/thread/thr_kern.c
index 3ad33ad..48f7c65 100644
--- a/lib/libthr/thread/thr_kern.c
+++ b/lib/libthr/thread/thr_kern.c
@@ -30,6 +30,7 @@
#include <sys/types.h>
#include <sys/signalvar.h>
#include <sys/rtprio.h>
+#include <sys/mman.h>
#include <pthread.h>
#include "thr_private.h"
@@ -41,6 +42,10 @@
#define DBG_MSG(x...)
#endif
+static struct umutex addr_lock;
+static struct wake_addr *wake_addr_head;
+static struct wake_addr default_wake_addr;
+
/*
* This is called when the first thread (other than the initial
* thread) is created.
@@ -130,3 +135,90 @@ _thr_setscheduler(lwpid_t lwpid, int policy, const struct sched_param *param)
_schedparam_to_rtp(policy, param, &rtp);
return (rtprio_thread(RTP_SET, lwpid, &rtp));
}
+
+void
+_thr_wake_addr_init(void)
+{
+ _thr_umutex_init(&addr_lock);
+ wake_addr_head = NULL;
+}
+
+/*
+ * Allocate wake-address, the memory area is never freed after
+ * allocated, this becauses threads may be referencing it.
+ */
+struct wake_addr *
+_thr_alloc_wake_addr(void)
+{
+ struct pthread *curthread;
+ struct wake_addr *p;
+
+ if (_thr_initial == NULL) {
+ return &default_wake_addr;
+ }
+
+ curthread = _get_curthread();
+
+ THR_LOCK_ACQUIRE(curthread, &addr_lock);
+ if (wake_addr_head == NULL) {
+ unsigned i;
+ unsigned pagesize = getpagesize();
+ struct wake_addr *pp = (struct wake_addr *)
+ mmap(NULL, getpagesize(), PROT_READ|PROT_WRITE,
+ MAP_ANON|MAP_PRIVATE, -1, 0);
+ for (i = 1; i < pagesize/sizeof(struct wake_addr); ++i)
+ pp[i].link = &pp[i+1];
+ pp[i-1].link = NULL;
+ wake_addr_head = &pp[1];
+ p = &pp[0];
+ } else {
+ p = wake_addr_head;
+ wake_addr_head = p->link;
+ }
+ THR_LOCK_RELEASE(curthread, &addr_lock);
+ p->value = 0;
+ return (p);
+}
+
+void
+_thr_release_wake_addr(struct wake_addr *wa)
+{
+ struct pthread *curthread = _get_curthread();
+
+ if (wa == &default_wake_addr)
+ return;
+ THR_LOCK_ACQUIRE(curthread, &addr_lock);
+ wa->link = wake_addr_head;
+ wake_addr_head = wa;
+ THR_LOCK_RELEASE(curthread, &addr_lock);
+}
+
+/* Sleep on thread wakeup address */
+int
+_thr_sleep(struct pthread *curthread, int clockid,
+ const struct timespec *abstime)
+{
+
+ curthread->will_sleep = 0;
+ if (curthread->nwaiter_defer > 0) {
+ _thr_wake_all(curthread->defer_waiters,
+ curthread->nwaiter_defer);
+ curthread->nwaiter_defer = 0;
+ }
+
+ if (curthread->wake_addr->value != 0)
+ return (0);
+
+ return _thr_umtx_timedwait_uint(&curthread->wake_addr->value, 0,
+ clockid, abstime, 0);
+}
+
+void
+_thr_wake_all(unsigned int *waddrs[], int count)
+{
+ int i;
+
+ for (i = 0; i < count; ++i)
+ *waddrs[i] = 1;
+ _umtx_op(waddrs, UMTX_OP_NWAKE_PRIVATE, count, NULL, NULL);
+}
diff --git a/lib/libthr/thread/thr_list.c b/lib/libthr/thread/thr_list.c
index 7541fd3..249501c 100644
--- a/lib/libthr/thread/thr_list.c
+++ b/lib/libthr/thread/thr_list.c
@@ -165,6 +165,8 @@ _thr_alloc(struct pthread *curthread)
if (tcb != NULL) {
memset(thread, 0, sizeof(*thread));
thread->tcb = tcb;
+ thread->sleepqueue = _sleepq_alloc();
+ thread->wake_addr = _thr_alloc_wake_addr();
} else {
thr_destroy(curthread, thread);
atomic_fetchadd_int(&total_threads, -1);
@@ -192,6 +194,8 @@ _thr_free(struct pthread *curthread, struct pthread *thread)
}
thread->tcb = NULL;
if ((curthread == NULL) || (free_thread_count >= MAX_CACHED_THREADS)) {
+ _sleepq_free(thread->sleepqueue);
+ _thr_release_wake_addr(thread->wake_addr);
thr_destroy(curthread, thread);
atomic_fetchadd_int(&total_threads, -1);
} else {
diff --git a/lib/libthr/thread/thr_mutex.c b/lib/libthr/thread/thr_mutex.c
index 29f91ec..bd1fc2b 100644
--- a/lib/libthr/thread/thr_mutex.c
+++ b/lib/libthr/thread/thr_mutex.c
@@ -92,7 +92,7 @@ int __pthread_mutex_setyieldloops_np(pthread_mutex_t *mutex, int count);
static int mutex_self_trylock(pthread_mutex_t);
static int mutex_self_lock(pthread_mutex_t,
const struct timespec *abstime);
-static int mutex_unlock_common(pthread_mutex_t *);
+static int mutex_unlock_common(struct pthread_mutex *, int);
static int mutex_lock_sleep(struct pthread *, pthread_mutex_t,
const struct timespec *);
@@ -145,10 +145,9 @@ mutex_init(pthread_mutex_t *mutex,
calloc_cb(1, sizeof(struct pthread_mutex))) == NULL)
return (ENOMEM);
- pmutex->m_type = attr->m_type;
+ pmutex->m_flags = attr->m_type;
pmutex->m_owner = NULL;
pmutex->m_count = 0;
- pmutex->m_refcount = 0;
pmutex->m_spinloops = 0;
pmutex->m_yieldloops = 0;
MUTEX_INIT_LINK(pmutex);
@@ -168,7 +167,7 @@ mutex_init(pthread_mutex_t *mutex,
break;
}
- if (pmutex->m_type == PTHREAD_MUTEX_ADAPTIVE_NP) {
+ if (PMUTEX_TYPE(pmutex->m_flags) == PTHREAD_MUTEX_ADAPTIVE_NP) {
pmutex->m_spinloops =
_thr_spinloops ? _thr_spinloops: MUTEX_ADAPTIVE_SPINS;
pmutex->m_yieldloops = _thr_yieldloops;
@@ -229,7 +228,7 @@ _pthread_mutex_init_calloc_cb(pthread_mutex_t *mutex,
ret = mutex_init(mutex, &attr, calloc_cb);
if (ret == 0)
- (*mutex)->m_private = 1;
+ (*mutex)->m_flags |= PMUTEX_FLAG_PRIVATE;
return (ret);
}
@@ -266,7 +265,7 @@ _pthread_mutex_destroy(pthread_mutex_t *mutex)
} else if (m == THR_MUTEX_DESTROYED) {
ret = EINVAL;
} else {
- if (m->m_owner != NULL || m->m_refcount != 0) {
+ if (m->m_owner != NULL) {
ret = EBUSY;
} else {
*mutex = THR_MUTEX_DESTROYED;
@@ -290,6 +289,17 @@ _pthread_mutex_destroy(pthread_mutex_t *mutex)
TAILQ_INSERT_TAIL(&curthread->pp_mutexq, (m), m_qe);\
} while (0)
+#define DEQUEUE_MUTEX(curthread, m) \
+ (m)->m_owner = NULL; \
+ MUTEX_ASSERT_IS_OWNED(m); \
+ if (__predict_true(((m)->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0)) \
+ TAILQ_REMOVE(&curthread->mutexq, (m), m_qe); \
+ else { \
+ TAILQ_REMOVE(&curthread->pp_mutexq, (m), m_qe); \
+ set_inherited_priority(curthread, m); \
+ } \
+ MUTEX_INIT_LINK(m);
+
#define CHECK_AND_INIT_MUTEX \
if (__predict_false((m = *mutex) <= THR_MUTEX_DESTROYED)) { \
if (m == THR_MUTEX_DESTROYED) \
@@ -310,7 +320,7 @@ mutex_trylock_common(pthread_mutex_t *mutex)
int ret;
id = TID(curthread);
- if (m->m_private)
+ if (m->m_flags & PMUTEX_FLAG_PRIVATE)
THR_CRITICAL_ENTER(curthread);
ret = _thr_umutex_trylock(&m->m_lock, id);
if (__predict_true(ret == 0)) {
@@ -318,7 +328,7 @@ mutex_trylock_common(pthread_mutex_t *mutex)
} else if (m->m_owner == curthread) {
ret = mutex_self_trylock(m);
} /* else {} */
- if (ret && m->m_private)
+ if (ret && (m->m_flags & PMUTEX_FLAG_PRIVATE))
THR_CRITICAL_LEAVE(curthread);
return (ret);
}
@@ -403,12 +413,12 @@ done:
static inline int
mutex_lock_common(struct pthread_mutex *m,
- const struct timespec *abstime)
+ const struct timespec *abstime, int cvattach)
{
struct pthread *curthread = _get_curthread();
int ret;
- if (m->m_private)
+ if (!cvattach && m->m_flags & PMUTEX_FLAG_PRIVATE)
THR_CRITICAL_ENTER(curthread);
if (_thr_umutex_trylock2(&m->m_lock, TID(curthread)) == 0) {
ENQUEUE_MUTEX(curthread, m);
@@ -416,7 +426,7 @@ mutex_lock_common(struct pthread_mutex *m,
} else {
ret = mutex_lock_sleep(curthread, m, abstime);
}
- if (ret && m->m_private)
+ if (ret && (m->m_flags & PMUTEX_FLAG_PRIVATE) && !cvattach)
THR_CRITICAL_LEAVE(curthread);
return (ret);
}
@@ -430,7 +440,7 @@ __pthread_mutex_lock(pthread_mutex_t *mutex)
CHECK_AND_INIT_MUTEX
- return (mutex_lock_common(m, NULL));
+ return (mutex_lock_common(m, NULL, 0));
}
int
@@ -442,28 +452,83 @@ __pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime
CHECK_AND_INIT_MUTEX
- return (mutex_lock_common(m, abstime));
+ return (mutex_lock_common(m, abstime, 0));
}
int
-_pthread_mutex_unlock(pthread_mutex_t *m)
+_pthread_mutex_unlock(pthread_mutex_t *mutex)
{
- return (mutex_unlock_common(m));
+ struct pthread_mutex *mp;
+
+ mp = *mutex;
+ return (mutex_unlock_common(mp, 0));
}
int
-_mutex_cv_lock(pthread_mutex_t *mutex, int count)
+_mutex_cv_lock(struct pthread_mutex *m, int count)
{
- struct pthread_mutex *m;
- int ret;
+ int error;
- m = *mutex;
- ret = mutex_lock_common(m, NULL);
- if (ret == 0) {
- m->m_refcount--;
- m->m_count += count;
+ error = mutex_lock_common(m, NULL, 1);
+ if (error == 0)
+ m->m_count = count;
+ return (error);
+}
+
+int
+_mutex_cv_unlock(struct pthread_mutex *m, int *count)
+{
+
+ /*
+ * Clear the count in case this is a recursive mutex.
+ */
+ *count = m->m_count;
+ m->m_count = 0;
+ (void)mutex_unlock_common(m, 1);
+ return (0);
+}
+
+int
+_mutex_cv_attach(struct pthread_mutex *m, int count)
+{
+ struct pthread *curthread = _get_curthread();
+ int error;
+
+ ENQUEUE_MUTEX(curthread, m);
+ m->m_count = count;
+ return (error);
+}
+
+int
+_mutex_cv_detach(struct pthread_mutex *mp, int *recurse)
+{
+ struct pthread *curthread = _get_curthread();
+ int defered;
+ int error;
+
+ if ((error = _mutex_owned(curthread, mp)) != 0)
+ return (error);
+
+ /*
+ * Clear the count in case this is a recursive mutex.
+ */
+ *recurse = mp->m_count;
+ mp->m_count = 0;
+ DEQUEUE_MUTEX(curthread, mp);
+
+ /* Will this happen in real-world ? */
+ if ((mp->m_flags & PMUTEX_FLAG_DEFERED) != 0) {
+ defered = 1;
+ mp->m_flags &= ~PMUTEX_FLAG_DEFERED;
+ } else
+ defered = 0;
+
+ if (defered) {
+ _thr_wake_all(curthread->defer_waiters,
+ curthread->nwaiter_defer);
+ curthread->nwaiter_defer = 0;
}
- return (ret);
+ return (0);
}
static int
@@ -471,7 +536,7 @@ mutex_self_trylock(struct pthread_mutex *m)
{
int ret;
- switch (m->m_type) {
+ switch (PMUTEX_TYPE(m->m_flags)) {
case PTHREAD_MUTEX_ERRORCHECK:
case PTHREAD_MUTEX_NORMAL:
ret = EBUSY;
@@ -500,7 +565,7 @@ mutex_self_lock(struct pthread_mutex *m, const struct timespec *abstime)
struct timespec ts1, ts2;
int ret;
- switch (m->m_type) {
+ switch (PMUTEX_TYPE(m->m_flags)) {
case PTHREAD_MUTEX_ERRORCHECK:
case PTHREAD_MUTEX_ADAPTIVE_NP:
if (abstime) {
@@ -564,13 +629,12 @@ mutex_self_lock(struct pthread_mutex *m, const struct timespec *abstime)
}
static int
-mutex_unlock_common(pthread_mutex_t *mutex)
+mutex_unlock_common(struct pthread_mutex *m, int cv)
{
struct pthread *curthread = _get_curthread();
- struct pthread_mutex *m;
uint32_t id;
+ int defered;
- m = *mutex;
if (__predict_false(m <= THR_MUTEX_DESTROYED)) {
if (m == THR_MUTEX_DESTROYED)
return (EINVAL);
@@ -585,65 +649,26 @@ mutex_unlock_common(pthread_mutex_t *mutex)
id = TID(curthread);
if (__predict_false(
- m->m_type == PTHREAD_MUTEX_RECURSIVE &&
+ PMUTEX_TYPE(m->m_flags) == PTHREAD_MUTEX_RECURSIVE &&
m->m_count > 0)) {
m->m_count--;
} else {
- m->m_owner = NULL;
- /* Remove the mutex from the threads queue. */
- MUTEX_ASSERT_IS_OWNED(m);
- if (__predict_true((m->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0))
- TAILQ_REMOVE(&curthread->mutexq, m, m_qe);
- else {
- TAILQ_REMOVE(&curthread->pp_mutexq, m, m_qe);
- set_inherited_priority(curthread, m);
- }
- MUTEX_INIT_LINK(m);
- _thr_umutex_unlock(&m->m_lock, id);
- }
- if (m->m_private)
- THR_CRITICAL_LEAVE(curthread);
- return (0);
-}
-
-int
-_mutex_cv_unlock(pthread_mutex_t *mutex, int *count)
-{
- struct pthread *curthread = _get_curthread();
- struct pthread_mutex *m;
-
- m = *mutex;
- if (__predict_false(m <= THR_MUTEX_DESTROYED)) {
- if (m == THR_MUTEX_DESTROYED)
- return (EINVAL);
- return (EPERM);
- }
+ if (curthread->will_sleep == 0 && (m->m_flags & PMUTEX_FLAG_DEFERED) != 0) {
+ defered = 1;
+ m->m_flags &= ~PMUTEX_FLAG_DEFERED;
+ } else
+ defered = 0;
- /*
- * Check if the running thread is not the owner of the mutex.
- */
- if (__predict_false(m->m_owner != curthread))
- return (EPERM);
+ DEQUEUE_MUTEX(curthread, m);
+ _thr_umutex_unlock(&m->m_lock, id);
- /*
- * Clear the count in case this is a recursive mutex.
- */
- *count = m->m_count;
- m->m_refcount++;
- m->m_count = 0;
- m->m_owner = NULL;
- /* Remove the mutex from the threads queue. */
- MUTEX_ASSERT_IS_OWNED(m);
- if (__predict_true((m->m_lock.m_flags & UMUTEX_PRIO_PROTECT) == 0))
- TAILQ_REMOVE(&curthread->mutexq, m, m_qe);
- else {
- TAILQ_REMOVE(&curthread->pp_mutexq, m, m_qe);
- set_inherited_priority(curthread, m);
+ if (defered) {
+ _thr_wake_all(curthread->defer_waiters,
+ curthread->nwaiter_defer);
+ curthread->nwaiter_defer = 0;
+ }
}
- MUTEX_INIT_LINK(m);
- _thr_umutex_unlock(&m->m_lock, TID(curthread));
-
- if (m->m_private)
+ if (!cv && m->m_flags & PMUTEX_FLAG_PRIVATE)
THR_CRITICAL_LEAVE(curthread);
return (0);
}
@@ -757,3 +782,16 @@ _pthread_mutex_isowned_np(pthread_mutex_t *mutex)
return (0);
return (m->m_owner == _get_curthread());
}
+
+int
+_mutex_owned(struct pthread *curthread, const struct pthread_mutex *mp)
+{
+ if (__predict_false(mp <= THR_MUTEX_DESTROYED)) {
+ if (mp == THR_MUTEX_DESTROYED)
+ return (EINVAL);
+ return (EPERM);
+ }
+ if (mp->m_owner != curthread)
+ return (EPERM);
+ return (0);
+}
diff --git a/lib/libthr/thread/thr_private.h b/lib/libthr/thread/thr_private.h
index 7180d12..9df97aa 100644
--- a/lib/libthr/thread/thr_private.h
+++ b/lib/libthr/thread/thr_private.h
@@ -135,18 +135,23 @@ TAILQ_HEAD(mutex_queue, pthread_mutex);
#define THR_RWLOCK_INITIALIZER ((struct pthread_rwlock *)NULL)
#define THR_RWLOCK_DESTROYED ((struct pthread_rwlock *)1)
+#define PMUTEX_FLAG_TYPE_MASK 0x0ff
+#define PMUTEX_FLAG_PRIVATE 0x100
+#define PMUTEX_FLAG_DEFERED 0x200
+#define PMUTEX_TYPE(mtxflags) ((mtxflags) & PMUTEX_FLAG_TYPE_MASK)
+
+#define MAX_DEFER_WAITERS 50
+
struct pthread_mutex {
/*
* Lock for accesses to this structure.
*/
struct umutex m_lock;
- enum pthread_mutextype m_type;
+ int m_flags;
struct pthread *m_owner;
int m_count;
- int m_refcount;
int m_spinloops;
int m_yieldloops;
- int m_private;
/*
* Link for all mutexes a thread currently owns.
*/
@@ -163,10 +168,10 @@ struct pthread_mutex_attr {
{ PTHREAD_MUTEX_DEFAULT, PTHREAD_PRIO_NONE, 0, MUTEX_FLAGS_PRIVATE }
struct pthread_cond {
- struct umutex c_lock;
- struct ucond c_kerncv;
- int c_pshared;
- int c_clockid;
+ __uint32_t __has_user_waiters;
+ __uint32_t __has_kern_waiters;
+ __uint32_t __flags;
+ __uint32_t __clock_id;
};
struct pthread_cond_attr {
@@ -245,6 +250,21 @@ struct pthread_attr {
size_t cpusetsize;
};
+struct wake_addr {
+ struct wake_addr *link;
+ unsigned int value;
+ char pad[12];
+};
+
+struct sleepqueue {
+ TAILQ_HEAD(, pthread) sq_blocked;
+ SLIST_HEAD(, sleepqueue) sq_freeq;
+ LIST_ENTRY(sleepqueue) sq_hash;
+ SLIST_ENTRY(sleepqueue) sq_flink;
+ void *sq_wchan;
+ int sq_type;
+};
+
/*
* Thread creation state attributes.
*/
@@ -356,6 +376,9 @@ struct pthread {
/* Hash queue entry. */
LIST_ENTRY(pthread) hle;
+ /* Sleep queue entry */
+ TAILQ_ENTRY(pthread) wle;
+
/* Threads reference count. */
int refcount;
@@ -482,6 +505,27 @@ struct pthread {
/* Event */
td_event_msg_t event_buf;
+
+ struct wake_addr *wake_addr;
+#define WAKE_ADDR(td) ((td)->wake_addr)
+
+ /* Sleep queue */
+ struct sleepqueue *sleepqueue;
+
+ /* Wait channel */
+ void *wchan;
+
+ /* Referenced mutex. */
+ struct pthread_mutex *mutex_obj;
+
+ /* Thread will sleep. */
+ int will_sleep;
+
+ /* Number of threads deferred. */
+ int nwaiter_defer;
+
+ /* Deferred threads from pthread_cond_signal. */
+ unsigned int *defer_waiters[MAX_DEFER_WAITERS];
};
#define THR_SHOULD_GC(thrd) \
@@ -519,6 +563,12 @@ do { \
_thr_umutex_lock(lck, TID(thrd)); \
} while (0)
+#define THR_LOCK_ACQUIRE_SPIN(thrd, lck) \
+do { \
+ (thrd)->locklevel++; \
+ _thr_umutex_lock_spin(lck, TID(thrd)); \
+} while (0)
+
#ifdef _PTHREADS_INVARIANTS
#define THR_ASSERT_LOCKLEVEL(thrd) \
do { \
@@ -671,8 +721,11 @@ extern struct umutex _thr_event_lock __hidden;
*/
__BEGIN_DECLS
int _thr_setthreaded(int) __hidden;
-int _mutex_cv_lock(pthread_mutex_t *, int count) __hidden;
-int _mutex_cv_unlock(pthread_mutex_t *, int *count) __hidden;
+int _mutex_cv_lock(struct pthread_mutex *, int count) __hidden;
+int _mutex_cv_unlock(struct pthread_mutex *, int *count) __hidden;
+int _mutex_cv_attach(struct pthread_mutex *, int count) __hidden;
+int _mutex_cv_detach(struct pthread_mutex *, int *count) __hidden;
+int _mutex_owned(struct pthread *, const struct pthread_mutex *) __hidden;
int _mutex_reinit(pthread_mutex_t *) __hidden;
void _mutex_fork(struct pthread *curthread) __hidden;
void _libpthread_init(struct pthread *) __hidden;
@@ -797,6 +850,50 @@ _thr_check_init(void)
_libpthread_init(NULL);
}
+struct wake_addr *_thr_alloc_wake_addr(void);
+void _thr_release_wake_addr(struct wake_addr *);
+int _thr_sleep(struct pthread *, int, const struct timespec *);
+
+void _thr_wake_addr_init(void) __hidden;
+
+static inline void
+_thr_clear_wake(struct pthread *td)
+{
+ td->wake_addr->value = 0;
+}
+
+static inline int
+_thr_is_woken(struct pthread *td)
+{
+ return td->wake_addr->value != 0;
+}
+
+static inline void
+_thr_set_wake(unsigned int *waddr)
+{
+ *waddr = 1;
+ _thr_umtx_wake(waddr, INT_MAX, 0);
+}
+
+void _thr_wake_all(unsigned int *waddrs[], int) __hidden;
+
+static inline struct pthread *
+_sleepq_first(struct sleepqueue *sq)
+{
+ return TAILQ_FIRST(&sq->sq_blocked);
+}
+
+void _sleepq_init(void) __hidden;
+struct sleepqueue *_sleepq_alloc(void) __hidden;
+void _sleepq_free(struct sleepqueue *) __hidden;
+void _sleepq_lock(void *) __hidden;
+void _sleepq_unlock(void *) __hidden;
+struct sleepqueue *_sleepq_lookup(void *) __hidden;
+void _sleepq_add(void *, struct pthread *) __hidden;
+int _sleepq_remove(struct sleepqueue *, struct pthread *) __hidden;
+void _sleepq_drop(struct sleepqueue *,
+ void (*cb)(struct pthread *, void *arg), void *) __hidden;
+
struct dl_phdr_info;
void __pthread_cxa_finalize(struct dl_phdr_info *phdr_info);
void _thr_tsd_unload(struct dl_phdr_info *phdr_info) __hidden;
diff --git a/lib/libthr/thread/thr_umtx.c b/lib/libthr/thread/thr_umtx.c
index dabfa35..33c3637 100644
--- a/lib/libthr/thread/thr_umtx.c
+++ b/lib/libthr/thread/thr_umtx.c
@@ -74,6 +74,39 @@ __thr_umutex_lock(struct umutex *mtx, uint32_t id)
return _umtx_op_err(mtx, UMTX_OP_MUTEX_LOCK, 0, 0, 0);
}
+#define SPINLOOPS 1000
+
+int
+__thr_umutex_lock_spin(struct umutex *mtx, uint32_t id)
+{
+ uint32_t owner;
+
+ if (!_thr_is_smp)
+ return __thr_umutex_lock(mtx, id);
+
+ if ((mtx->m_flags & (UMUTEX_PRIO_PROTECT | UMUTEX_PRIO_INHERIT)) == 0) {
+ for (;;) {
+ int count = SPINLOOPS;
+ while (count--) {
+ owner = mtx->m_owner;
+ if ((owner & ~UMUTEX_CONTESTED) == 0) {
+ if (atomic_cmpset_acq_32(
+ &mtx->m_owner,
+ owner, id|owner)) {
+ return (0);
+ }
+ }
+ CPU_SPINWAIT;
+ }
+
+ /* wait in kernel */
+ _umtx_op_err(mtx, UMTX_OP_MUTEX_WAIT, 0, 0, 0);
+ }
+ }
+
+ return _umtx_op_err(mtx, UMTX_OP_MUTEX_LOCK, 0, 0, 0);
+}
+
int
__thr_umutex_timedlock(struct umutex *mtx, uint32_t id,
const struct timespec *ets)
@@ -164,6 +197,26 @@ _thr_umtx_wait_uint(volatile u_int *mtx, u_int id, const struct timespec *timeou
}
int
+_thr_umtx_timedwait_uint(volatile u_int *mtx, u_int id, int clockid,
+ const struct timespec *abstime, int shared)
+{
+ struct timespec ts, ts2, *tsp;
+
+ if (abstime != NULL) {
+ clock_gettime(clockid, &ts);
+ TIMESPEC_SUB(&ts2, abstime, &ts);
+ if (ts2.tv_sec < 0 || ts2.tv_nsec <= 0)
+ return (ETIMEDOUT);
+ tsp = &ts2;
+ } else {
+ tsp = NULL;
+ }
+ return _umtx_op_err(__DEVOLATILE(void *, mtx),
+ shared ? UMTX_OP_WAIT_UINT : UMTX_OP_WAIT_UINT_PRIVATE, id, NULL,
+ tsp);
+}
+
+int
_thr_umtx_wake(volatile void *mtx, int nr_wakeup, int shared)
{
return _umtx_op_err(__DEVOLATILE(void *, mtx), shared ? UMTX_OP_WAKE : UMTX_OP_WAKE_PRIVATE,
diff --git a/lib/libthr/thread/thr_umtx.h b/lib/libthr/thread/thr_umtx.h
index 3f53faf..0a8034b 100644
--- a/lib/libthr/thread/thr_umtx.h
+++ b/lib/libthr/thread/thr_umtx.h
@@ -36,6 +36,7 @@
#define DEFAULT_URWLOCK {0,0,0,0,{0,0,0,0}}
int __thr_umutex_lock(struct umutex *mtx, uint32_t id) __hidden;
+int __thr_umutex_lock_spin(struct umutex *mtx, uint32_t id) __hidden;
int __thr_umutex_timedlock(struct umutex *mtx, uint32_t id,
const struct timespec *timeout) __hidden;
int __thr_umutex_unlock(struct umutex *mtx, uint32_t id) __hidden;
@@ -50,6 +51,8 @@ int _thr_umtx_wait(volatile long *mtx, long exp,
const struct timespec *timeout) __hidden;
int _thr_umtx_wait_uint(volatile u_int *mtx, u_int exp,
const struct timespec *timeout, int shared) __hidden;
+int _thr_umtx_timedwait_uint(volatile u_int *mtx, u_int exp, int clockid,
+ const struct timespec *timeout, int shared) __hidden;
int _thr_umtx_wake(volatile void *mtx, int count, int shared) __hidden;
int _thr_ucond_wait(struct ucond *cv, struct umutex *m,
const struct timespec *timeout, int check_unpaking) __hidden;
@@ -97,6 +100,14 @@ _thr_umutex_lock(struct umutex *mtx, uint32_t id)
}
static inline int
+_thr_umutex_lock_spin(struct umutex *mtx, uint32_t id)
+{
+ if (_thr_umutex_trylock2(mtx, id) == 0)
+ return (0);
+ return (__thr_umutex_lock_spin(mtx, id));
+}
+
+static inline int
_thr_umutex_timedlock(struct umutex *mtx, uint32_t id,
const struct timespec *timeout)
{
OpenPOWER on IntegriCloud