summaryrefslogtreecommitdiffstats
path: root/contrib/ntp/libntp/work_thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/ntp/libntp/work_thread.c')
-rw-r--r--contrib/ntp/libntp/work_thread.c591
1 files changed, 365 insertions, 226 deletions
diff --git a/contrib/ntp/libntp/work_thread.c b/contrib/ntp/libntp/work_thread.c
index 38d8747..49e90c1 100644
--- a/contrib/ntp/libntp/work_thread.c
+++ b/contrib/ntp/libntp/work_thread.c
@@ -32,16 +32,20 @@
#define THREAD_MINSTACKSIZE (64U * 1024)
#endif
-#ifndef DEVOLATILE
-#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
-#endif
-
#ifdef SYS_WINNT
+
# define thread_exit(c) _endthreadex(c)
-# define tickle_sem SetEvent
+# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
+u_int WINAPI blocking_thread(void *);
+static BOOL same_os_sema(const sem_ref obj, void * osobj);
+
#else
+
# define thread_exit(c) pthread_exit((void*)(size_t)(c))
# define tickle_sem sem_post
+void * blocking_thread(void *);
+static void block_thread_signals(sigset_t *);
+
#endif
#ifdef WORK_PIPE
@@ -54,18 +58,10 @@ static void start_blocking_thread(blocking_child *);
static void start_blocking_thread_internal(blocking_child *);
static void prepare_child_sems(blocking_child *);
static int wait_for_sem(sem_ref, struct timespec *);
-static void ensure_workitems_empty_slot(blocking_child *);
-static void ensure_workresp_empty_slot(blocking_child *);
+static int ensure_workitems_empty_slot(blocking_child *);
+static int ensure_workresp_empty_slot(blocking_child *);
static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
static void cleanup_after_child(blocking_child *);
-#ifdef SYS_WINNT
-u_int WINAPI blocking_thread(void *);
-#else
-void * blocking_thread(void *);
-#endif
-#ifndef SYS_WINNT
-static void block_thread_signals(sigset_t *);
-#endif
void
@@ -76,7 +72,9 @@ exit_worker(
thread_exit(exitcode); /* see #define thread_exit */
}
-
+/* --------------------------------------------------------------------
+ * sleep for a given time or until the wakup semaphore is tickled.
+ */
int
worker_sleep(
blocking_child * c,
@@ -98,9 +96,7 @@ worker_sleep(
}
# endif
until.tv_sec += seconds;
- do {
- rc = wait_for_sem(c->wake_scheduled_sleep, &until);
- } while (-1 == rc && EINTR == errno);
+ rc = wait_for_sem(c->wake_scheduled_sleep, &until);
if (0 == rc)
return -1;
if (-1 == rc && ETIMEDOUT == errno)
@@ -110,6 +106,9 @@ worker_sleep(
}
+/* --------------------------------------------------------------------
+ * Wake up a worker that takes a nap.
+ */
void
interrupt_worker_sleep(void)
{
@@ -124,65 +123,79 @@ interrupt_worker_sleep(void)
}
}
-
-static void
+/* --------------------------------------------------------------------
+ * Make sure there is an empty slot at the head of the request
+ * queue. Tell if the queue is currently empty.
+ */
+static int
ensure_workitems_empty_slot(
blocking_child *c
)
{
- const size_t each = sizeof(blocking_children[0]->workitems[0]);
- size_t new_alloc;
- size_t old_octets;
- size_t new_octets;
- void * nonvol_workitems;
-
-
- if (c->workitems != NULL &&
- NULL == c->workitems[c->next_workitem])
- return;
-
- new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
- old_octets = c->workitems_alloc * each;
- new_octets = new_alloc * each;
- nonvol_workitems = DEVOLATILE(void *, c->workitems);
- c->workitems = erealloc_zero(nonvol_workitems, new_octets,
- old_octets);
- if (0 == c->next_workitem)
- c->next_workitem = c->workitems_alloc;
- c->workitems_alloc = new_alloc;
+ /*
+ ** !!! PRECONDITION: caller holds access lock!
+ **
+ ** This simply tries to increase the size of the buffer if it
+ ** becomes full. The resize operation does *not* maintain the
+ ** order of requests, but that should be irrelevant since the
+ ** processing is considered asynchronous anyway.
+ **
+ ** Return if the buffer is currently empty.
+ */
+
+ static const size_t each =
+ sizeof(blocking_children[0]->workitems[0]);
+
+ size_t new_alloc;
+ size_t slots_used;
+
+ slots_used = c->head_workitem - c->tail_workitem;
+ if (slots_used >= c->workitems_alloc) {
+ new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
+ c->workitems = erealloc(c->workitems, new_alloc * each);
+ c->tail_workitem = 0;
+ c->head_workitem = c->workitems_alloc;
+ c->workitems_alloc = new_alloc;
+ }
+ return (0 == slots_used);
}
-
-static void
+/* --------------------------------------------------------------------
+ * Make sure there is an empty slot at the head of the response
+ * queue. Tell if the queue is currently empty.
+ */
+static int
ensure_workresp_empty_slot(
blocking_child *c
)
{
- const size_t each = sizeof(blocking_children[0]->responses[0]);
- size_t new_alloc;
- size_t old_octets;
- size_t new_octets;
- void * nonvol_responses;
-
- if (c->responses != NULL &&
- NULL == c->responses[c->next_response])
- return;
-
- new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
- old_octets = c->responses_alloc * each;
- new_octets = new_alloc * each;
- nonvol_responses = DEVOLATILE(void *, c->responses);
- c->responses = erealloc_zero(nonvol_responses, new_octets,
- old_octets);
- if (0 == c->next_response)
- c->next_response = c->responses_alloc;
- c->responses_alloc = new_alloc;
+ /*
+ ** !!! PRECONDITION: caller holds access lock!
+ **
+ ** Works like the companion function above.
+ */
+
+ static const size_t each =
+ sizeof(blocking_children[0]->responses[0]);
+
+ size_t new_alloc;
+ size_t slots_used;
+
+ slots_used = c->head_response - c->tail_response;
+ if (slots_used >= c->responses_alloc) {
+ new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
+ c->responses = erealloc(c->responses, new_alloc * each);
+ c->tail_response = 0;
+ c->head_response = c->responses_alloc;
+ c->responses_alloc = new_alloc;
+ }
+ return (0 == slots_used);
}
-/*
+/* --------------------------------------------------------------------
* queue_req_pointer() - append a work item or idle exit request to
- * blocking_workitems[].
+ * blocking_workitems[]. Employ proper locking.
*/
static int
queue_req_pointer(
@@ -190,21 +203,28 @@ queue_req_pointer(
blocking_pipe_header * hdr
)
{
- c->workitems[c->next_workitem] = hdr;
- c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
+ size_t qhead;
+
+ /* >>>> ACCESS LOCKING STARTS >>>> */
+ wait_for_sem(c->accesslock, NULL);
+ ensure_workitems_empty_slot(c);
+ qhead = c->head_workitem;
+ c->workitems[qhead % c->workitems_alloc] = hdr;
+ c->head_workitem = 1 + qhead;
+ tickle_sem(c->accesslock);
+ /* <<<< ACCESS LOCKING ENDS <<<< */
- /*
- * We only want to signal the wakeup event if the child is
- * blocking on it, which is indicated by setting the blocking
- * event. Wait with zero timeout to test.
- */
- /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
- tickle_sem(c->blocking_req_ready);
+ /* queue consumer wake-up notification */
+ tickle_sem(c->workitems_pending);
return 0;
}
-
+/* --------------------------------------------------------------------
+ * API function to make sure a worker is running, a proper private copy
+ * of the data is made, the data eneterd into the queue and the worker
+ * is signalled.
+ */
int
send_blocking_req_internal(
blocking_child * c,
@@ -223,12 +243,8 @@ send_blocking_req_internal(
return 1; /* failure */
payload_octets = hdr->octets - sizeof(*hdr);
- ensure_workitems_empty_slot(c);
- if (NULL == c->thread_ref) {
- ensure_workresp_empty_slot(c);
+ if (NULL == c->thread_ref)
start_blocking_thread(c);
- }
-
threadcopy = emalloc(hdr->octets);
memcpy(threadcopy, hdr, sizeof(*hdr));
memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
@@ -236,43 +252,41 @@ send_blocking_req_internal(
return queue_req_pointer(c, threadcopy);
}
-
+/* --------------------------------------------------------------------
+ * Wait for the 'incoming queue no longer empty' signal, lock the shared
+ * structure and dequeue an item.
+ */
blocking_pipe_header *
receive_blocking_req_internal(
blocking_child * c
)
{
blocking_pipe_header * req;
- int rc;
+ size_t qhead, qtail;
- /*
- * Child blocks here when idle. SysV semaphores maintain a
- * count and release from sem_wait() only when it reaches 0.
- * Windows auto-reset events are simpler, and multiple SetEvent
- * calls before any thread waits result in a single wakeup.
- * On Windows, the child drains all workitems each wakeup, while
- * with SysV semaphores wait_sem() is used before each item.
- */
-#ifdef SYS_WINNT
- while (NULL == c->workitems[c->next_workeritem]) {
- /* !!!! SetEvent(c->child_is_blocking); */
- rc = wait_for_sem(c->blocking_req_ready, NULL);
- INSIST(0 == rc);
- /* !!!! ResetEvent(c->child_is_blocking); */
- }
-#else
+ req = NULL;
do {
- rc = wait_for_sem(c->blocking_req_ready, NULL);
- } while (-1 == rc && EINTR == errno);
- INSIST(0 == rc);
-#endif
+ /* wait for tickle from the producer side */
+ wait_for_sem(c->workitems_pending, NULL);
+
+ /* >>>> ACCESS LOCKING STARTS >>>> */
+ wait_for_sem(c->accesslock, NULL);
+ qhead = c->head_workitem;
+ do {
+ qtail = c->tail_workitem;
+ if (qhead == qtail)
+ break;
+ c->tail_workitem = qtail + 1;
+ qtail %= c->workitems_alloc;
+ req = c->workitems[qtail];
+ c->workitems[qtail] = NULL;
+ } while (NULL == req);
+ tickle_sem(c->accesslock);
+ /* <<<< ACCESS LOCKING ENDS <<<< */
+
+ } while (NULL == req);
- req = c->workitems[c->next_workeritem];
INSIST(NULL != req);
- c->workitems[c->next_workeritem] = NULL;
- c->next_workeritem = (1 + c->next_workeritem) %
- c->workitems_alloc;
-
if (CHILD_EXIT_REQ == req) { /* idled out */
send_blocking_resp_internal(c, CHILD_GONE_RESP);
req = NULL;
@@ -281,44 +295,74 @@ receive_blocking_req_internal(
return req;
}
-
+/* --------------------------------------------------------------------
+ * Push a response into the return queue and eventually tickle the
+ * receiver.
+ */
int
send_blocking_resp_internal(
blocking_child * c,
blocking_pipe_header * resp
)
{
- ensure_workresp_empty_slot(c);
-
- c->responses[c->next_response] = resp;
- c->next_response = (1 + c->next_response) % c->responses_alloc;
-
-#ifdef WORK_PIPE
- write(c->resp_write_pipe, "", 1);
-#else
- tickle_sem(c->blocking_response_ready);
-#endif
-
+ size_t qhead;
+ int empty;
+
+ /* >>>> ACCESS LOCKING STARTS >>>> */
+ wait_for_sem(c->accesslock, NULL);
+ empty = ensure_workresp_empty_slot(c);
+ qhead = c->head_response;
+ c->responses[qhead % c->responses_alloc] = resp;
+ c->head_response = 1 + qhead;
+ tickle_sem(c->accesslock);
+ /* <<<< ACCESS LOCKING ENDS <<<< */
+
+ /* queue consumer wake-up notification */
+ if (empty)
+ {
+# ifdef WORK_PIPE
+ write(c->resp_write_pipe, "", 1);
+# else
+ tickle_sem(c->responses_pending);
+# endif
+ }
return 0;
}
#ifndef WORK_PIPE
+
+/* --------------------------------------------------------------------
+ * Check if a (Windows-)hanndle to a semaphore is actually the same we
+ * are using inside the sema wrapper.
+ */
+static BOOL
+same_os_sema(
+ const sem_ref obj,
+ void* osh
+ )
+{
+ return obj && osh && (obj->shnd == (HANDLE)osh);
+}
+
+/* --------------------------------------------------------------------
+ * Find the shared context that associates to an OS handle and make sure
+ * the data is dequeued and processed.
+ */
void
handle_blocking_resp_sem(
void * context
)
{
- HANDLE ready;
blocking_child * c;
u_int idx;
- ready = (HANDLE)context;
c = NULL;
for (idx = 0; idx < blocking_children_alloc; idx++) {
c = blocking_children[idx];
- if (c != NULL && c->thread_ref != NULL &&
- ready == c->blocking_response_ready)
+ if (c != NULL &&
+ c->thread_ref != NULL &&
+ same_os_sema(c->responses_pending, context))
break;
}
if (idx < blocking_children_alloc)
@@ -326,26 +370,41 @@ handle_blocking_resp_sem(
}
#endif /* !WORK_PIPE */
-
+/* --------------------------------------------------------------------
+ * Fetch the next response from the return queue. In case of signalling
+ * via pipe, make sure the pipe is flushed, too.
+ */
blocking_pipe_header *
receive_blocking_resp_internal(
blocking_child * c
)
{
blocking_pipe_header * removed;
+ size_t qhead, qtail, slot;
+
#ifdef WORK_PIPE
int rc;
char scratch[32];
- do {
+ do
rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
- } while (-1 == rc && EINTR == errno);
+ while (-1 == rc && EINTR == errno);
#endif
- removed = c->responses[c->next_workresp];
+
+ /* >>>> ACCESS LOCKING STARTS >>>> */
+ wait_for_sem(c->accesslock, NULL);
+ qhead = c->head_response;
+ qtail = c->tail_response;
+ for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
+ slot = qtail % c->responses_alloc;
+ removed = c->responses[slot];
+ c->responses[slot] = NULL;
+ }
+ c->tail_response = qtail;
+ tickle_sem(c->accesslock);
+ /* <<<< ACCESS LOCKING ENDS <<<< */
+
if (NULL != removed) {
- c->responses[c->next_workresp] = NULL;
- c->next_workresp = (1 + c->next_workresp) %
- c->responses_alloc;
DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
BLOCKING_RESP_MAGIC == removed->magic_sig);
}
@@ -357,7 +416,9 @@ receive_blocking_resp_internal(
return removed;
}
-
+/* --------------------------------------------------------------------
+ * Light up a new worker.
+ */
static void
start_blocking_thread(
blocking_child * c
@@ -370,40 +431,45 @@ start_blocking_thread(
start_blocking_thread_internal(c);
}
-
+/* --------------------------------------------------------------------
+ * Create a worker thread. There are several differences between POSIX
+ * and Windows, of course -- most notably the Windows thread is no
+ * detached thread, and we keep the handle around until we want to get
+ * rid of the thread. The notification scheme also differs: Windows
+ * makes use of semaphores in both directions, POSIX uses a pipe for
+ * integration with 'select()' or alike.
+ */
static void
start_blocking_thread_internal(
blocking_child * c
)
#ifdef SYS_WINNT
{
- thr_ref blocking_child_thread;
- u_int blocking_thread_id;
BOOL resumed;
- (*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
- blocking_child_thread =
+ c->thread_ref = NULL;
+ (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
+ c->thr_table[0].thnd =
(HANDLE)_beginthreadex(
NULL,
0,
&blocking_thread,
c,
CREATE_SUSPENDED,
- &blocking_thread_id);
+ NULL);
- if (NULL == blocking_child_thread) {
+ if (NULL == c->thr_table[0].thnd) {
msyslog(LOG_ERR, "start blocking thread failed: %m");
exit(-1);
}
- c->thread_id = blocking_thread_id;
- c->thread_ref = blocking_child_thread;
/* remember the thread priority is only within the process class */
- if (!SetThreadPriority(blocking_child_thread,
+ if (!SetThreadPriority(c->thr_table[0].thnd,
THREAD_PRIORITY_BELOW_NORMAL))
msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
- resumed = ResumeThread(blocking_child_thread);
+ resumed = ResumeThread(c->thr_table[0].thnd);
DEBUG_INSIST(resumed);
+ c->thread_ref = &c->thr_table[0];
}
#else /* pthreads start_blocking_thread_internal() follows */
{
@@ -419,6 +485,8 @@ start_blocking_thread_internal(
size_t stacksize;
sigset_t saved_sig_mask;
+ c->thread_ref = NULL;
+
# ifdef NEED_PTHREAD_INIT
/*
* from lib/isc/unix/app.c:
@@ -475,7 +543,7 @@ start_blocking_thread_internal(
#endif
c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
block_thread_signals(&saved_sig_mask);
- rc = pthread_create(c->thread_ref, &thr_attr,
+ rc = pthread_create(&c->thr_table[0], &thr_attr,
&blocking_thread, c);
saved_errno = errno;
pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
@@ -485,11 +553,11 @@ start_blocking_thread_internal(
msyslog(LOG_ERR, "pthread_create() blocking child: %m");
exit(1);
}
+ c->thread_ref = &c->thr_table[0];
}
#endif
-
-/*
+/* --------------------------------------------------------------------
* block_thread_signals()
*
* Temporarily block signals used by ntpd main thread, so that signal
@@ -538,61 +606,101 @@ block_thread_signals(
#endif /* !SYS_WINNT */
-/*
+/* --------------------------------------------------------------------
+ * Create & destroy semaphores. This is sufficiently different between
+ * POSIX and Windows to warrant wrapper functions and close enough to
+ * use the concept of synchronization via semaphore for all platforms.
+ */
+static sem_ref
+create_sema(
+ sema_type* semptr,
+ u_int inival,
+ u_int maxval)
+{
+#ifdef SYS_WINNT
+
+ long svini, svmax;
+ if (NULL != semptr) {
+ svini = (inival < LONG_MAX)
+ ? (long)inival : LONG_MAX;
+ svmax = (maxval < LONG_MAX && maxval > 0)
+ ? (long)maxval : LONG_MAX;
+ semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
+ if (NULL == semptr->shnd)
+ semptr = NULL;
+ }
+
+#else
+
+ (void)maxval;
+ if (semptr && sem_init(semptr, FALSE, inival))
+ semptr = NULL;
+
+#endif
+
+ return semptr;
+}
+
+/* ------------------------------------------------------------------ */
+static sem_ref
+delete_sema(
+ sem_ref obj)
+{
+
+# ifdef SYS_WINNT
+
+ if (obj) {
+ if (obj->shnd)
+ CloseHandle(obj->shnd);
+ obj->shnd = NULL;
+ }
+
+# else
+
+ if (obj)
+ sem_destroy(obj);
+
+# endif
+
+ return NULL;
+}
+
+/* --------------------------------------------------------------------
* prepare_child_sems()
*
- * create sync events (semaphores)
- * child_is_blocking initially unset
- * blocking_req_ready initially unset
+ * create sync & access semaphores
*
- * Child waits for blocking_req_ready to be set after
- * setting child_is_blocking. blocking_req_ready and
- * blocking_response_ready are auto-reset, so wake one
- * waiter and become unset (unsignalled) in one operation.
+ * All semaphores are cleared, only the access semaphore has 1 unit.
+ * Childs wait on 'workitems_pending', then grabs 'sema_access'
+ * and dequeues jobs. When done, 'sema_access' is given one unit back.
+ *
+ * The producer grabs 'sema_access', manages the queue, restores
+ * 'sema_access' and puts one unit into 'workitems_pending'.
+ *
+ * The story goes the same for the response queue.
*/
static void
prepare_child_sems(
blocking_child *c
)
-#ifdef SYS_WINNT
-{
- if (NULL == c->blocking_req_ready) {
- /* manual reset using ResetEvent() */
- /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
- /* auto reset - one thread released from wait each set */
- c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
- c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
- c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
- } else {
- /* !!!! ResetEvent(c->child_is_blocking); */
- /* ResetEvent(c->blocking_req_ready); */
- /* ResetEvent(c->blocking_response_ready); */
- /* ResetEvent(c->wake_scheduled_sleep); */
- }
-}
-#else /* pthreads prepare_child_sems() follows */
{
- size_t octets;
-
- if (NULL == c->blocking_req_ready) {
- octets = sizeof(*c->blocking_req_ready);
- octets += sizeof(*c->wake_scheduled_sleep);
- /* !!!! octets += sizeof(*c->child_is_blocking); */
- c->blocking_req_ready = emalloc_zero(octets);;
- c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
- /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
- } else {
- sem_destroy(c->blocking_req_ready);
- sem_destroy(c->wake_scheduled_sleep);
- /* !!!! sem_destroy(c->child_is_blocking); */
- }
- sem_init(c->blocking_req_ready, FALSE, 0);
- sem_init(c->wake_scheduled_sleep, FALSE, 0);
- /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
+ c->accesslock = create_sema(&c->sem_table[0], 1, 1);
+ c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
+ c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
+# ifndef WORK_PIPE
+ c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
+# endif
}
-#endif
-
+/* --------------------------------------------------------------------
+ * wait for semaphore. Where the wait can be interrupted, it will
+ * internally resume -- When this function returns, there is either no
+ * semaphore at all, a timeout occurred, or the caller could
+ * successfully take a token from the semaphore.
+ *
+ * For untimed wait, not checking the result of this function at all is
+ * definitely an option.
+ */
static int
wait_for_sem(
sem_ref sem,
@@ -605,6 +713,11 @@ wait_for_sem(
DWORD msec;
DWORD rc;
+ if (!(sem && sem->shnd)) {
+ errno = EINVAL;
+ return -1;
+ }
+
if (NULL == timeout) {
msec = INFINITE;
} else {
@@ -619,7 +732,7 @@ wait_for_sem(
msec += delta.tv_nsec / (1000 * 1000);
}
}
- rc = WaitForSingleObject(sem, msec);
+ rc = WaitForSingleObject(sem->shnd, msec);
if (WAIT_OBJECT_0 == rc)
return 0;
if (WAIT_TIMEOUT == rc) {
@@ -632,24 +745,28 @@ wait_for_sem(
}
#else /* pthreads wait_for_sem() follows */
{
- int rc;
-
- if (NULL == timeout)
- rc = sem_wait(sem);
+ int rc = -1;
+
+ if (sem) do {
+ if (NULL == timeout)
+ rc = sem_wait(sem);
+ else
+ rc = sem_timedwait(sem, timeout);
+ } while (rc == -1 && errno == EINTR);
else
- rc = sem_timedwait(sem, timeout);
-
+ errno = EINVAL;
+
return rc;
}
#endif
-
-/*
- * blocking_thread - thread functions have WINAPI calling convention
+/* --------------------------------------------------------------------
+ * blocking_thread - thread functions have WINAPI (aka 'stdcall')
+ * calling conventions under Windows and POSIX-defined signature
+ * otherwise.
*/
#ifdef SYS_WINNT
-u_int
-WINAPI
+u_int WINAPI
#else
void *
#endif
@@ -666,20 +783,28 @@ blocking_thread(
return 0;
}
-
-/*
+/* --------------------------------------------------------------------
* req_child_exit() runs in the parent.
+ *
+ * This function is called from from the idle timer, too, and possibly
+ * without a thread being there any longer. Since we have folded up our
+ * tent in that case and all the semaphores are already gone, we simply
+ * ignore this request in this case.
+ *
+ * Since the existence of the semaphores is controlled exclusively by
+ * the parent, there's no risk of data race here.
*/
int
req_child_exit(
blocking_child *c
)
{
- return queue_req_pointer(c, CHILD_EXIT_REQ);
+ return (c->accesslock)
+ ? queue_req_pointer(c, CHILD_EXIT_REQ)
+ : 0;
}
-
-/*
+/* --------------------------------------------------------------------
* cleanup_after_child() runs in parent.
*/
static void
@@ -687,17 +812,27 @@ cleanup_after_child(
blocking_child * c
)
{
- u_int idx;
-
DEBUG_INSIST(!c->reusable);
-#ifdef SYS_WINNT
- INSIST(CloseHandle(c->thread_ref));
-#else
- free(c->thread_ref);
-#endif
+
+# ifdef SYS_WINNT
+ /* The thread was not created in detached state, so we better
+ * clean up.
+ */
+ if (c->thread_ref && c->thread_ref->thnd) {
+ WaitForSingleObject(c->thread_ref->thnd, INFINITE);
+ INSIST(CloseHandle(c->thread_ref->thnd));
+ c->thread_ref->thnd = NULL;
+ }
+# endif
c->thread_ref = NULL;
- c->thread_id = 0;
-#ifdef WORK_PIPE
+
+ /* remove semaphores and (if signalling vi IO) pipes */
+
+ c->accesslock = delete_sema(c->accesslock);
+ c->workitems_pending = delete_sema(c->workitems_pending);
+ c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
+
+# ifdef WORK_PIPE
DEBUG_INSIST(-1 != c->resp_read_pipe);
DEBUG_INSIST(-1 != c->resp_write_pipe);
(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
@@ -705,18 +840,22 @@ cleanup_after_child(
close(c->resp_read_pipe);
c->resp_write_pipe = -1;
c->resp_read_pipe = -1;
-#else
- DEBUG_INSIST(NULL != c->blocking_response_ready);
- (*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
-#endif
- for (idx = 0; idx < c->workitems_alloc; idx++)
- c->workitems[idx] = NULL;
- c->next_workitem = 0;
- c->next_workeritem = 0;
- for (idx = 0; idx < c->responses_alloc; idx++)
- c->responses[idx] = NULL;
- c->next_response = 0;
- c->next_workresp = 0;
+# else
+ DEBUG_INSIST(NULL != c->responses_pending);
+ (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
+ c->responses_pending = delete_sema(c->responses_pending);
+# endif
+
+ /* Is it necessary to check if there are pending requests and
+ * responses? If so, and if there are, what to do with them?
+ */
+
+ /* re-init buffer index sequencers */
+ c->head_workitem = 0;
+ c->tail_workitem = 0;
+ c->head_response = 0;
+ c->tail_response = 0;
+
c->reusable = TRUE;
}
OpenPOWER on IntegriCloud