diff options
author | glebius <glebius@FreeBSD.org> | 2016-01-11 01:09:50 +0000 |
---|---|---|
committer | glebius <glebius@FreeBSD.org> | 2016-01-11 01:09:50 +0000 |
commit | c41fbaf9bc76f0b7650e448fbd88d0a9815f47fd (patch) | |
tree | 3b45c8e63a37b132f0fe10c570b8302052c547de /contrib/ntp/libntp/work_thread.c | |
parent | 3066f138bc3bf99f5cf6ec9412903d206c329cc7 (diff) | |
download | FreeBSD-src-c41fbaf9bc76f0b7650e448fbd88d0a9815f47fd.zip FreeBSD-src-c41fbaf9bc76f0b7650e448fbd88d0a9815f47fd.tar.gz |
Merge r293423, r293469:
ntp 4.2.8p5
Relnotes: yes
Diffstat (limited to 'contrib/ntp/libntp/work_thread.c')
-rw-r--r-- | contrib/ntp/libntp/work_thread.c | 591 |
1 files changed, 365 insertions, 226 deletions
diff --git a/contrib/ntp/libntp/work_thread.c b/contrib/ntp/libntp/work_thread.c index 38d8747..49e90c1 100644 --- a/contrib/ntp/libntp/work_thread.c +++ b/contrib/ntp/libntp/work_thread.c @@ -32,16 +32,20 @@ #define THREAD_MINSTACKSIZE (64U * 1024) #endif -#ifndef DEVOLATILE -#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var)) -#endif - #ifdef SYS_WINNT + # define thread_exit(c) _endthreadex(c) -# define tickle_sem SetEvent +# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) +u_int WINAPI blocking_thread(void *); +static BOOL same_os_sema(const sem_ref obj, void * osobj); + #else + # define thread_exit(c) pthread_exit((void*)(size_t)(c)) # define tickle_sem sem_post +void * blocking_thread(void *); +static void block_thread_signals(sigset_t *); + #endif #ifdef WORK_PIPE @@ -54,18 +58,10 @@ static void start_blocking_thread(blocking_child *); static void start_blocking_thread_internal(blocking_child *); static void prepare_child_sems(blocking_child *); static int wait_for_sem(sem_ref, struct timespec *); -static void ensure_workitems_empty_slot(blocking_child *); -static void ensure_workresp_empty_slot(blocking_child *); +static int ensure_workitems_empty_slot(blocking_child *); +static int ensure_workresp_empty_slot(blocking_child *); static int queue_req_pointer(blocking_child *, blocking_pipe_header *); static void cleanup_after_child(blocking_child *); -#ifdef SYS_WINNT -u_int WINAPI blocking_thread(void *); -#else -void * blocking_thread(void *); -#endif -#ifndef SYS_WINNT -static void block_thread_signals(sigset_t *); -#endif void @@ -76,7 +72,9 @@ exit_worker( thread_exit(exitcode); /* see #define thread_exit */ } - +/* -------------------------------------------------------------------- + * sleep for a given time or until the wakup semaphore is tickled. + */ int worker_sleep( blocking_child * c, @@ -98,9 +96,7 @@ worker_sleep( } # endif until.tv_sec += seconds; - do { - rc = wait_for_sem(c->wake_scheduled_sleep, &until); - } while (-1 == rc && EINTR == errno); + rc = wait_for_sem(c->wake_scheduled_sleep, &until); if (0 == rc) return -1; if (-1 == rc && ETIMEDOUT == errno) @@ -110,6 +106,9 @@ worker_sleep( } +/* -------------------------------------------------------------------- + * Wake up a worker that takes a nap. + */ void interrupt_worker_sleep(void) { @@ -124,65 +123,79 @@ interrupt_worker_sleep(void) } } - -static void +/* -------------------------------------------------------------------- + * Make sure there is an empty slot at the head of the request + * queue. Tell if the queue is currently empty. + */ +static int ensure_workitems_empty_slot( blocking_child *c ) { - const size_t each = sizeof(blocking_children[0]->workitems[0]); - size_t new_alloc; - size_t old_octets; - size_t new_octets; - void * nonvol_workitems; - - - if (c->workitems != NULL && - NULL == c->workitems[c->next_workitem]) - return; - - new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; - old_octets = c->workitems_alloc * each; - new_octets = new_alloc * each; - nonvol_workitems = DEVOLATILE(void *, c->workitems); - c->workitems = erealloc_zero(nonvol_workitems, new_octets, - old_octets); - if (0 == c->next_workitem) - c->next_workitem = c->workitems_alloc; - c->workitems_alloc = new_alloc; + /* + ** !!! PRECONDITION: caller holds access lock! + ** + ** This simply tries to increase the size of the buffer if it + ** becomes full. The resize operation does *not* maintain the + ** order of requests, but that should be irrelevant since the + ** processing is considered asynchronous anyway. + ** + ** Return if the buffer is currently empty. + */ + + static const size_t each = + sizeof(blocking_children[0]->workitems[0]); + + size_t new_alloc; + size_t slots_used; + + slots_used = c->head_workitem - c->tail_workitem; + if (slots_used >= c->workitems_alloc) { + new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; + c->workitems = erealloc(c->workitems, new_alloc * each); + c->tail_workitem = 0; + c->head_workitem = c->workitems_alloc; + c->workitems_alloc = new_alloc; + } + return (0 == slots_used); } - -static void +/* -------------------------------------------------------------------- + * Make sure there is an empty slot at the head of the response + * queue. Tell if the queue is currently empty. + */ +static int ensure_workresp_empty_slot( blocking_child *c ) { - const size_t each = sizeof(blocking_children[0]->responses[0]); - size_t new_alloc; - size_t old_octets; - size_t new_octets; - void * nonvol_responses; - - if (c->responses != NULL && - NULL == c->responses[c->next_response]) - return; - - new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; - old_octets = c->responses_alloc * each; - new_octets = new_alloc * each; - nonvol_responses = DEVOLATILE(void *, c->responses); - c->responses = erealloc_zero(nonvol_responses, new_octets, - old_octets); - if (0 == c->next_response) - c->next_response = c->responses_alloc; - c->responses_alloc = new_alloc; + /* + ** !!! PRECONDITION: caller holds access lock! + ** + ** Works like the companion function above. + */ + + static const size_t each = + sizeof(blocking_children[0]->responses[0]); + + size_t new_alloc; + size_t slots_used; + + slots_used = c->head_response - c->tail_response; + if (slots_used >= c->responses_alloc) { + new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; + c->responses = erealloc(c->responses, new_alloc * each); + c->tail_response = 0; + c->head_response = c->responses_alloc; + c->responses_alloc = new_alloc; + } + return (0 == slots_used); } -/* +/* -------------------------------------------------------------------- * queue_req_pointer() - append a work item or idle exit request to - * blocking_workitems[]. + * blocking_workitems[]. Employ proper locking. */ static int queue_req_pointer( @@ -190,21 +203,28 @@ queue_req_pointer( blocking_pipe_header * hdr ) { - c->workitems[c->next_workitem] = hdr; - c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc; + size_t qhead; + + /* >>>> ACCESS LOCKING STARTS >>>> */ + wait_for_sem(c->accesslock, NULL); + ensure_workitems_empty_slot(c); + qhead = c->head_workitem; + c->workitems[qhead % c->workitems_alloc] = hdr; + c->head_workitem = 1 + qhead; + tickle_sem(c->accesslock); + /* <<<< ACCESS LOCKING ENDS <<<< */ - /* - * We only want to signal the wakeup event if the child is - * blocking on it, which is indicated by setting the blocking - * event. Wait with zero timeout to test. - */ - /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */ - tickle_sem(c->blocking_req_ready); + /* queue consumer wake-up notification */ + tickle_sem(c->workitems_pending); return 0; } - +/* -------------------------------------------------------------------- + * API function to make sure a worker is running, a proper private copy + * of the data is made, the data eneterd into the queue and the worker + * is signalled. + */ int send_blocking_req_internal( blocking_child * c, @@ -223,12 +243,8 @@ send_blocking_req_internal( return 1; /* failure */ payload_octets = hdr->octets - sizeof(*hdr); - ensure_workitems_empty_slot(c); - if (NULL == c->thread_ref) { - ensure_workresp_empty_slot(c); + if (NULL == c->thread_ref) start_blocking_thread(c); - } - threadcopy = emalloc(hdr->octets); memcpy(threadcopy, hdr, sizeof(*hdr)); memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); @@ -236,43 +252,41 @@ send_blocking_req_internal( return queue_req_pointer(c, threadcopy); } - +/* -------------------------------------------------------------------- + * Wait for the 'incoming queue no longer empty' signal, lock the shared + * structure and dequeue an item. + */ blocking_pipe_header * receive_blocking_req_internal( blocking_child * c ) { blocking_pipe_header * req; - int rc; + size_t qhead, qtail; - /* - * Child blocks here when idle. SysV semaphores maintain a - * count and release from sem_wait() only when it reaches 0. - * Windows auto-reset events are simpler, and multiple SetEvent - * calls before any thread waits result in a single wakeup. - * On Windows, the child drains all workitems each wakeup, while - * with SysV semaphores wait_sem() is used before each item. - */ -#ifdef SYS_WINNT - while (NULL == c->workitems[c->next_workeritem]) { - /* !!!! SetEvent(c->child_is_blocking); */ - rc = wait_for_sem(c->blocking_req_ready, NULL); - INSIST(0 == rc); - /* !!!! ResetEvent(c->child_is_blocking); */ - } -#else + req = NULL; do { - rc = wait_for_sem(c->blocking_req_ready, NULL); - } while (-1 == rc && EINTR == errno); - INSIST(0 == rc); -#endif + /* wait for tickle from the producer side */ + wait_for_sem(c->workitems_pending, NULL); + + /* >>>> ACCESS LOCKING STARTS >>>> */ + wait_for_sem(c->accesslock, NULL); + qhead = c->head_workitem; + do { + qtail = c->tail_workitem; + if (qhead == qtail) + break; + c->tail_workitem = qtail + 1; + qtail %= c->workitems_alloc; + req = c->workitems[qtail]; + c->workitems[qtail] = NULL; + } while (NULL == req); + tickle_sem(c->accesslock); + /* <<<< ACCESS LOCKING ENDS <<<< */ + + } while (NULL == req); - req = c->workitems[c->next_workeritem]; INSIST(NULL != req); - c->workitems[c->next_workeritem] = NULL; - c->next_workeritem = (1 + c->next_workeritem) % - c->workitems_alloc; - if (CHILD_EXIT_REQ == req) { /* idled out */ send_blocking_resp_internal(c, CHILD_GONE_RESP); req = NULL; @@ -281,44 +295,74 @@ receive_blocking_req_internal( return req; } - +/* -------------------------------------------------------------------- + * Push a response into the return queue and eventually tickle the + * receiver. + */ int send_blocking_resp_internal( blocking_child * c, blocking_pipe_header * resp ) { - ensure_workresp_empty_slot(c); - - c->responses[c->next_response] = resp; - c->next_response = (1 + c->next_response) % c->responses_alloc; - -#ifdef WORK_PIPE - write(c->resp_write_pipe, "", 1); -#else - tickle_sem(c->blocking_response_ready); -#endif - + size_t qhead; + int empty; + + /* >>>> ACCESS LOCKING STARTS >>>> */ + wait_for_sem(c->accesslock, NULL); + empty = ensure_workresp_empty_slot(c); + qhead = c->head_response; + c->responses[qhead % c->responses_alloc] = resp; + c->head_response = 1 + qhead; + tickle_sem(c->accesslock); + /* <<<< ACCESS LOCKING ENDS <<<< */ + + /* queue consumer wake-up notification */ + if (empty) + { +# ifdef WORK_PIPE + write(c->resp_write_pipe, "", 1); +# else + tickle_sem(c->responses_pending); +# endif + } return 0; } #ifndef WORK_PIPE + +/* -------------------------------------------------------------------- + * Check if a (Windows-)hanndle to a semaphore is actually the same we + * are using inside the sema wrapper. + */ +static BOOL +same_os_sema( + const sem_ref obj, + void* osh + ) +{ + return obj && osh && (obj->shnd == (HANDLE)osh); +} + +/* -------------------------------------------------------------------- + * Find the shared context that associates to an OS handle and make sure + * the data is dequeued and processed. + */ void handle_blocking_resp_sem( void * context ) { - HANDLE ready; blocking_child * c; u_int idx; - ready = (HANDLE)context; c = NULL; for (idx = 0; idx < blocking_children_alloc; idx++) { c = blocking_children[idx]; - if (c != NULL && c->thread_ref != NULL && - ready == c->blocking_response_ready) + if (c != NULL && + c->thread_ref != NULL && + same_os_sema(c->responses_pending, context)) break; } if (idx < blocking_children_alloc) @@ -326,26 +370,41 @@ handle_blocking_resp_sem( } #endif /* !WORK_PIPE */ - +/* -------------------------------------------------------------------- + * Fetch the next response from the return queue. In case of signalling + * via pipe, make sure the pipe is flushed, too. + */ blocking_pipe_header * receive_blocking_resp_internal( blocking_child * c ) { blocking_pipe_header * removed; + size_t qhead, qtail, slot; + #ifdef WORK_PIPE int rc; char scratch[32]; - do { + do rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); - } while (-1 == rc && EINTR == errno); + while (-1 == rc && EINTR == errno); #endif - removed = c->responses[c->next_workresp]; + + /* >>>> ACCESS LOCKING STARTS >>>> */ + wait_for_sem(c->accesslock, NULL); + qhead = c->head_response; + qtail = c->tail_response; + for (removed = NULL; !removed && (qhead != qtail); ++qtail) { + slot = qtail % c->responses_alloc; + removed = c->responses[slot]; + c->responses[slot] = NULL; + } + c->tail_response = qtail; + tickle_sem(c->accesslock); + /* <<<< ACCESS LOCKING ENDS <<<< */ + if (NULL != removed) { - c->responses[c->next_workresp] = NULL; - c->next_workresp = (1 + c->next_workresp) % - c->responses_alloc; DEBUG_ENSURE(CHILD_GONE_RESP == removed || BLOCKING_RESP_MAGIC == removed->magic_sig); } @@ -357,7 +416,9 @@ receive_blocking_resp_internal( return removed; } - +/* -------------------------------------------------------------------- + * Light up a new worker. + */ static void start_blocking_thread( blocking_child * c @@ -370,40 +431,45 @@ start_blocking_thread( start_blocking_thread_internal(c); } - +/* -------------------------------------------------------------------- + * Create a worker thread. There are several differences between POSIX + * and Windows, of course -- most notably the Windows thread is no + * detached thread, and we keep the handle around until we want to get + * rid of the thread. The notification scheme also differs: Windows + * makes use of semaphores in both directions, POSIX uses a pipe for + * integration with 'select()' or alike. + */ static void start_blocking_thread_internal( blocking_child * c ) #ifdef SYS_WINNT { - thr_ref blocking_child_thread; - u_int blocking_thread_id; BOOL resumed; - (*addremove_io_semaphore)(c->blocking_response_ready, FALSE); - blocking_child_thread = + c->thread_ref = NULL; + (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); + c->thr_table[0].thnd = (HANDLE)_beginthreadex( NULL, 0, &blocking_thread, c, CREATE_SUSPENDED, - &blocking_thread_id); + NULL); - if (NULL == blocking_child_thread) { + if (NULL == c->thr_table[0].thnd) { msyslog(LOG_ERR, "start blocking thread failed: %m"); exit(-1); } - c->thread_id = blocking_thread_id; - c->thread_ref = blocking_child_thread; /* remember the thread priority is only within the process class */ - if (!SetThreadPriority(blocking_child_thread, + if (!SetThreadPriority(c->thr_table[0].thnd, THREAD_PRIORITY_BELOW_NORMAL)) msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); - resumed = ResumeThread(blocking_child_thread); + resumed = ResumeThread(c->thr_table[0].thnd); DEBUG_INSIST(resumed); + c->thread_ref = &c->thr_table[0]; } #else /* pthreads start_blocking_thread_internal() follows */ { @@ -419,6 +485,8 @@ start_blocking_thread_internal( size_t stacksize; sigset_t saved_sig_mask; + c->thread_ref = NULL; + # ifdef NEED_PTHREAD_INIT /* * from lib/isc/unix/app.c: @@ -475,7 +543,7 @@ start_blocking_thread_internal( #endif c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); block_thread_signals(&saved_sig_mask); - rc = pthread_create(c->thread_ref, &thr_attr, + rc = pthread_create(&c->thr_table[0], &thr_attr, &blocking_thread, c); saved_errno = errno; pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); @@ -485,11 +553,11 @@ start_blocking_thread_internal( msyslog(LOG_ERR, "pthread_create() blocking child: %m"); exit(1); } + c->thread_ref = &c->thr_table[0]; } #endif - -/* +/* -------------------------------------------------------------------- * block_thread_signals() * * Temporarily block signals used by ntpd main thread, so that signal @@ -538,61 +606,101 @@ block_thread_signals( #endif /* !SYS_WINNT */ -/* +/* -------------------------------------------------------------------- + * Create & destroy semaphores. This is sufficiently different between + * POSIX and Windows to warrant wrapper functions and close enough to + * use the concept of synchronization via semaphore for all platforms. + */ +static sem_ref +create_sema( + sema_type* semptr, + u_int inival, + u_int maxval) +{ +#ifdef SYS_WINNT + + long svini, svmax; + if (NULL != semptr) { + svini = (inival < LONG_MAX) + ? (long)inival : LONG_MAX; + svmax = (maxval < LONG_MAX && maxval > 0) + ? (long)maxval : LONG_MAX; + semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); + if (NULL == semptr->shnd) + semptr = NULL; + } + +#else + + (void)maxval; + if (semptr && sem_init(semptr, FALSE, inival)) + semptr = NULL; + +#endif + + return semptr; +} + +/* ------------------------------------------------------------------ */ +static sem_ref +delete_sema( + sem_ref obj) +{ + +# ifdef SYS_WINNT + + if (obj) { + if (obj->shnd) + CloseHandle(obj->shnd); + obj->shnd = NULL; + } + +# else + + if (obj) + sem_destroy(obj); + +# endif + + return NULL; +} + +/* -------------------------------------------------------------------- * prepare_child_sems() * - * create sync events (semaphores) - * child_is_blocking initially unset - * blocking_req_ready initially unset + * create sync & access semaphores * - * Child waits for blocking_req_ready to be set after - * setting child_is_blocking. blocking_req_ready and - * blocking_response_ready are auto-reset, so wake one - * waiter and become unset (unsignalled) in one operation. + * All semaphores are cleared, only the access semaphore has 1 unit. + * Childs wait on 'workitems_pending', then grabs 'sema_access' + * and dequeues jobs. When done, 'sema_access' is given one unit back. + * + * The producer grabs 'sema_access', manages the queue, restores + * 'sema_access' and puts one unit into 'workitems_pending'. + * + * The story goes the same for the response queue. */ static void prepare_child_sems( blocking_child *c ) -#ifdef SYS_WINNT -{ - if (NULL == c->blocking_req_ready) { - /* manual reset using ResetEvent() */ - /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */ - /* auto reset - one thread released from wait each set */ - c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL); - c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL); - c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL); - } else { - /* !!!! ResetEvent(c->child_is_blocking); */ - /* ResetEvent(c->blocking_req_ready); */ - /* ResetEvent(c->blocking_response_ready); */ - /* ResetEvent(c->wake_scheduled_sleep); */ - } -} -#else /* pthreads prepare_child_sems() follows */ { - size_t octets; - - if (NULL == c->blocking_req_ready) { - octets = sizeof(*c->blocking_req_ready); - octets += sizeof(*c->wake_scheduled_sleep); - /* !!!! octets += sizeof(*c->child_is_blocking); */ - c->blocking_req_ready = emalloc_zero(octets);; - c->wake_scheduled_sleep = 1 + c->blocking_req_ready; - /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */ - } else { - sem_destroy(c->blocking_req_ready); - sem_destroy(c->wake_scheduled_sleep); - /* !!!! sem_destroy(c->child_is_blocking); */ - } - sem_init(c->blocking_req_ready, FALSE, 0); - sem_init(c->wake_scheduled_sleep, FALSE, 0); - /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */ + c->accesslock = create_sema(&c->sem_table[0], 1, 1); + c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); + c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); +# ifndef WORK_PIPE + c->responses_pending = create_sema(&c->sem_table[3], 0, 0); +# endif } -#endif - +/* -------------------------------------------------------------------- + * wait for semaphore. Where the wait can be interrupted, it will + * internally resume -- When this function returns, there is either no + * semaphore at all, a timeout occurred, or the caller could + * successfully take a token from the semaphore. + * + * For untimed wait, not checking the result of this function at all is + * definitely an option. + */ static int wait_for_sem( sem_ref sem, @@ -605,6 +713,11 @@ wait_for_sem( DWORD msec; DWORD rc; + if (!(sem && sem->shnd)) { + errno = EINVAL; + return -1; + } + if (NULL == timeout) { msec = INFINITE; } else { @@ -619,7 +732,7 @@ wait_for_sem( msec += delta.tv_nsec / (1000 * 1000); } } - rc = WaitForSingleObject(sem, msec); + rc = WaitForSingleObject(sem->shnd, msec); if (WAIT_OBJECT_0 == rc) return 0; if (WAIT_TIMEOUT == rc) { @@ -632,24 +745,28 @@ wait_for_sem( } #else /* pthreads wait_for_sem() follows */ { - int rc; - - if (NULL == timeout) - rc = sem_wait(sem); + int rc = -1; + + if (sem) do { + if (NULL == timeout) + rc = sem_wait(sem); + else + rc = sem_timedwait(sem, timeout); + } while (rc == -1 && errno == EINTR); else - rc = sem_timedwait(sem, timeout); - + errno = EINVAL; + return rc; } #endif - -/* - * blocking_thread - thread functions have WINAPI calling convention +/* -------------------------------------------------------------------- + * blocking_thread - thread functions have WINAPI (aka 'stdcall') + * calling conventions under Windows and POSIX-defined signature + * otherwise. */ #ifdef SYS_WINNT -u_int -WINAPI +u_int WINAPI #else void * #endif @@ -666,20 +783,28 @@ blocking_thread( return 0; } - -/* +/* -------------------------------------------------------------------- * req_child_exit() runs in the parent. + * + * This function is called from from the idle timer, too, and possibly + * without a thread being there any longer. Since we have folded up our + * tent in that case and all the semaphores are already gone, we simply + * ignore this request in this case. + * + * Since the existence of the semaphores is controlled exclusively by + * the parent, there's no risk of data race here. */ int req_child_exit( blocking_child *c ) { - return queue_req_pointer(c, CHILD_EXIT_REQ); + return (c->accesslock) + ? queue_req_pointer(c, CHILD_EXIT_REQ) + : 0; } - -/* +/* -------------------------------------------------------------------- * cleanup_after_child() runs in parent. */ static void @@ -687,17 +812,27 @@ cleanup_after_child( blocking_child * c ) { - u_int idx; - DEBUG_INSIST(!c->reusable); -#ifdef SYS_WINNT - INSIST(CloseHandle(c->thread_ref)); -#else - free(c->thread_ref); -#endif + +# ifdef SYS_WINNT + /* The thread was not created in detached state, so we better + * clean up. + */ + if (c->thread_ref && c->thread_ref->thnd) { + WaitForSingleObject(c->thread_ref->thnd, INFINITE); + INSIST(CloseHandle(c->thread_ref->thnd)); + c->thread_ref->thnd = NULL; + } +# endif c->thread_ref = NULL; - c->thread_id = 0; -#ifdef WORK_PIPE + + /* remove semaphores and (if signalling vi IO) pipes */ + + c->accesslock = delete_sema(c->accesslock); + c->workitems_pending = delete_sema(c->workitems_pending); + c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); + +# ifdef WORK_PIPE DEBUG_INSIST(-1 != c->resp_read_pipe); DEBUG_INSIST(-1 != c->resp_write_pipe); (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); @@ -705,18 +840,22 @@ cleanup_after_child( close(c->resp_read_pipe); c->resp_write_pipe = -1; c->resp_read_pipe = -1; -#else - DEBUG_INSIST(NULL != c->blocking_response_ready); - (*addremove_io_semaphore)(c->blocking_response_ready, TRUE); -#endif - for (idx = 0; idx < c->workitems_alloc; idx++) - c->workitems[idx] = NULL; - c->next_workitem = 0; - c->next_workeritem = 0; - for (idx = 0; idx < c->responses_alloc; idx++) - c->responses[idx] = NULL; - c->next_response = 0; - c->next_workresp = 0; +# else + DEBUG_INSIST(NULL != c->responses_pending); + (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); + c->responses_pending = delete_sema(c->responses_pending); +# endif + + /* Is it necessary to check if there are pending requests and + * responses? If so, and if there are, what to do with them? + */ + + /* re-init buffer index sequencers */ + c->head_workitem = 0; + c->tail_workitem = 0; + c->head_response = 0; + c->tail_response = 0; + c->reusable = TRUE; } |