From ea50d71feb02a78d4d5fa746a26ca7ddc6e8cb19 Mon Sep 17 00:00:00 2001 From: peter Date: Thu, 28 Aug 2008 02:25:51 +0000 Subject: Stage 1 of sendmail dist tree flattening. contrib/sendmail/contrib prevents doing this in one pass. --- contrib/sendmail/libmilter/worker.c | 792 ------------------------------------ 1 file changed, 792 deletions(-) delete mode 100644 contrib/sendmail/libmilter/worker.c (limited to 'contrib/sendmail/libmilter/worker.c') diff --git a/contrib/sendmail/libmilter/worker.c b/contrib/sendmail/libmilter/worker.c deleted file mode 100644 index 0402678..0000000 --- a/contrib/sendmail/libmilter/worker.c +++ /dev/null @@ -1,792 +0,0 @@ -/* - * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers. - * All rights reserved. - * - * By using this file, you agree to the terms and conditions set - * forth in the LICENSE file which can be found at the top level of - * the sendmail distribution. - * - * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris - * Jose-Marcio.Martins@ensmp.fr - */ - -#include -SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $") - -#include "libmilter.h" - -#if _FFR_WORKERS_POOL - -typedef struct taskmgr_S taskmgr_T; - -#define TM_SIGNATURE 0x23021957 - -struct taskmgr_S -{ - long tm_signature; /* has the controller been initialized */ - sthread_t tm_tid; /* thread id of controller */ - smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ - - int tm_nb_workers; /* number of workers in the pool */ - int tm_nb_idle; /* number of workers waiting */ - - int tm_p[2]; /* poll control pipe */ - - smutex_t tm_w_mutex; /* linked list access mutex */ - scond_t tm_w_cond; /* */ -}; - -static taskmgr_T Tskmgr = {0}; - -#define WRK_CTX_HEAD Tskmgr.tm_ctx_head - -#define RD_PIPE (Tskmgr.tm_p[0]) -#define WR_PIPE (Tskmgr.tm_p[1]) - -#define PIPE_SEND_SIGNAL() \ - do \ - { \ - char evt = 0x5a; \ - int fd = WR_PIPE; \ - if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ - smi_log(SMI_LOG_ERR, \ - "Error writing to event pipe: %s", \ - sm_errstring(errno)); \ - } while (0) - -#ifndef USE_PIPE_WAKE_POLL -# define USE_PIPE_WAKE_POLL 1 -#endif /* USE_PIPE_WAKE_POLL */ - -/* poll check periodicity (default 10000 - 10 s) */ -#define POLL_TIMEOUT 10000 - -/* worker conditional wait timeout (default 10 s) */ -#define COND_TIMEOUT 10 - -/* functions */ -static int mi_close_session __P((SMFICTX_PTR)); - -static void *mi_worker __P((void *)); -static void *mi_pool_controller __P((void *)); - -static int mi_list_add_ctx __P((SMFICTX_PTR)); -static int mi_list_del_ctx __P((SMFICTX_PTR)); - -/* -** periodicity of cleaning up old sessions (timedout) -** sessions list will be checked to find old inactive -** sessions each DT_CHECK_OLD_SESSIONS sec -*/ - -#define DT_CHECK_OLD_SESSIONS 600 - -#ifndef OLD_SESSION_TIMEOUT -# define OLD_SESSION_TIMEOUT ctx->ctx_timeout -#endif /* OLD_SESSION_TIMEOUT */ - -/* session states - with respect to the pool of workers */ -#define WKST_INIT 0 /* initial state */ -#define WKST_READY_TO_RUN 1 /* command ready do be read */ -#define WKST_RUNNING 2 /* session running on a worker */ -#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ -#define WKST_WAITING 4 /* waiting for new command */ -#define WKST_CLOSING 5 /* session finished */ - -#ifndef MIN_WORKERS -# define MIN_WORKERS 2 /* minimum number of threads to keep around */ -#endif - -#define MIN_IDLE 1 /* minimum number of idle threads */ - - -/* -** Macros for threads and mutex management -*/ - -#define TASKMGR_LOCK() \ - do \ - { \ - if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ - smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ - } while (0) - -#define TASKMGR_UNLOCK() \ - do \ - { \ - if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ - smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ - } while (0) - -#define TASKMGR_COND_WAIT() \ - scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) - -#define TASKMGR_COND_SIGNAL() \ - do \ - { \ - if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ - smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ - } while (0) - -#define LAUNCH_WORKER(ctx) \ - do \ - { \ - int r; \ - sthread_t tid; \ - \ - if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ - smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ - sm_errstring(r)); \ - } while (0) - -#if POOL_DEBUG -# define POOL_LEV_DPRINTF(lev, x) \ - do { \ - if ((lev) < ctx->ctx_dbg) \ - sm_dprintf x; \ - } while (0) -#else /* POOL_DEBUG */ -# define POOL_LEV_DPRINTF(lev, x) -#endif /* POOL_DEBUG */ - -/* -** MI_START_SESSION -- Start a session in the pool of workers -** -** Parameters: -** ctx -- context structure -** -** Returns: -** MI_SUCCESS/MI_FAILURE -*/ - -int -mi_start_session(ctx) - SMFICTX_PTR ctx; -{ - static long id = 0; - - SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); - SM_ASSERT(ctx != NULL); - POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); - TASKMGR_LOCK(); - - if (mi_list_add_ctx(ctx) != MI_SUCCESS) - { - TASKMGR_UNLOCK(); - return MI_FAILURE; - } - - ctx->ctx_sid = id++; - - /* if there is an idle worker, signal it, otherwise start new worker */ - if (Tskmgr.tm_nb_idle > 0) - { - ctx->ctx_wstate = WKST_READY_TO_RUN; - TASKMGR_COND_SIGNAL(); - } - else - { - ctx->ctx_wstate = WKST_RUNNING; - LAUNCH_WORKER(ctx); - } - TASKMGR_UNLOCK(); - return MI_SUCCESS; -} - -/* -** MI_CLOSE_SESSION -- Close a session and clean up data structures -** -** Parameters: -** ctx -- context structure -** -** Returns: -** MI_SUCCESS/MI_FAILURE -*/ - -static int -mi_close_session(ctx) - SMFICTX_PTR ctx; -{ - SM_ASSERT(ctx != NULL); - - (void) mi_list_del_ctx(ctx); - if (ValidSocket(ctx->ctx_sd)) - { - (void) closesocket(ctx->ctx_sd); - ctx->ctx_sd = INVALID_SOCKET; - } - if (ctx->ctx_reply != NULL) - { - free(ctx->ctx_reply); - ctx->ctx_reply = NULL; - } - if (ctx->ctx_privdata != NULL) - { - smi_log(SMI_LOG_WARN, "%s: private data not NULL", - ctx->ctx_smfi->xxfi_name); - } - mi_clr_macros(ctx, 0); - free(ctx); - - return MI_SUCCESS; -} - -/* -** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller -** Must be called before starting sessions. -** -** Parameters: -** none -** -** Returns: -** MI_SUCCESS/MI_FAILURE -*/ - -int -mi_pool_controller_init() -{ - sthread_t tid; - int r, i; - - if (Tskmgr.tm_signature == TM_SIGNATURE) - return MI_SUCCESS; - - SM_TAILQ_INIT(&WRK_CTX_HEAD); - Tskmgr.tm_tid = (sthread_t) -1; - Tskmgr.tm_nb_workers = 0; - Tskmgr.tm_nb_idle = 0; - - if (pipe(Tskmgr.tm_p) != 0) - { - smi_log(SMI_LOG_ERR, "can't create event pipe: %s", - sm_errstring(r)); - return MI_FAILURE; - } - - POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); - - (void) smutex_init(&Tskmgr.tm_w_mutex); - (void) scond_init(&Tskmgr.tm_w_cond); - - /* Launch the pool controller */ - if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) - { - smi_log(SMI_LOG_ERR, "can't create controller thread: %s", - sm_errstring(r)); - return MI_FAILURE; - } - Tskmgr.tm_tid = tid; - Tskmgr.tm_signature = TM_SIGNATURE; - - /* Create the pool of workers */ - for (i = 0; i < MIN_WORKERS; i++) - { - if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) - { - smi_log(SMI_LOG_ERR, "can't create workers crew: %s", - sm_errstring(r)); - return MI_FAILURE; - } - } - - return MI_SUCCESS; -} - -/* -** MI_POOL_CONTROLLER -- manage the pool of workers -** This thread must be running when listener begins -** starting sessions -** -** Parameters: -** arg -- unused -** -** Returns: -** NULL -** -** Control flow: -** for (;;) -** Look for timed out sessions -** Select sessions to wait for sendmail command -** Poll set of file descriptors -** if timeout -** continue -** For each file descriptor ready -** launch new thread if no worker available -** else -** signal waiting worker -*/ - -/* Poll structure array (pollfd) size step */ -#define PFD_STEP 256 - -#define WAIT_FD(i) (pfd[i].fd) -#define WAITFN "POLL" - -static void * -mi_pool_controller(arg) - void *arg; -{ - struct pollfd *pfd = NULL; - int dim_pfd = 0; - bool rebuild_set = true; - int pcnt = 0; /* error count for poll() failures */ - - Tskmgr.tm_tid = sthread_get_id(); - if (pthread_detach(Tskmgr.tm_tid) != 0) - { - smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); - return NULL; - } - - pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); - if (pfd == NULL) - { - smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", - sm_errstring(errno)); - return NULL; - } - dim_pfd = PFD_STEP; - - for (;;) - { - SMFICTX_PTR ctx; - int nfd, rfd, i; - time_t now; - time_t lastcheck; - - POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); - - if (mi_stop() != MILTER_CONT) - break; - - TASKMGR_LOCK(); - - now = time(NULL); - - /* check for timed out sessions? */ - if (lastcheck + DT_CHECK_OLD_SESSIONS < now) - { - SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) - { - if (ctx->ctx_wstate == WKST_WAITING) - { - if (ctx->ctx_wait == 0) - { - ctx->ctx_wait = now; - continue; - } - - /* if session timed out, close it */ - if (ctx->ctx_wait + OLD_SESSION_TIMEOUT - < now) - { - sfsistat (*fi_close) __P((SMFICTX *)); - - POOL_LEV_DPRINTF(4, - ("Closing old connection: sd=%d id=%d", - ctx->ctx_sd, - ctx->ctx_sid)); - - if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) - (void) (*fi_close)(ctx); - - mi_close_session(ctx); - ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); - continue; - } - } - } - lastcheck = now; - } - - if (rebuild_set) - { - /* - ** Initialize poll set. - ** Insert into the poll set the file descriptors of - ** all sessions waiting for a command from sendmail. - */ - - nfd = 0; - - /* begin with worker pipe */ - pfd[nfd].fd = RD_PIPE; - pfd[nfd].events = MI_POLL_RD_FLAGS; - pfd[nfd].revents = 0; - nfd++; - - SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) - { - /* - ** update ctx_wait - start of wait moment - - ** for timeout - */ - - if (ctx->ctx_wstate == WKST_READY_TO_WAIT) - ctx->ctx_wait = now; - - /* add the session to the pollfd array? */ - if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || - (ctx->ctx_wstate == WKST_WAITING)) - { - /* - ** Resize the pollfd array if it - ** isn't large enough. - */ - - if (nfd >= dim_pfd) - { - struct pollfd *tpfd; - size_t new; - - new = (dim_pfd + PFD_STEP) * - sizeof(*tpfd); - tpfd = (struct pollfd *) - realloc(pfd, new); - if (tpfd != NULL) - { - pfd = tpfd; - dim_pfd += PFD_STEP; - } - else - { - smi_log(SMI_LOG_ERR, - "Failed to realloc pollfd array:%s", - sm_errstring(errno)); - } - } - - /* add the session to pollfd array */ - if (nfd < dim_pfd) - { - ctx->ctx_wstate = WKST_WAITING; - pfd[nfd].fd = ctx->ctx_sd; - pfd[nfd].events = MI_POLL_RD_FLAGS; - pfd[nfd].revents = 0; - nfd++; - } - } - } - } - - TASKMGR_UNLOCK(); - - /* Everything is ready, let's wait for an event */ - rfd = poll(pfd, nfd, POLL_TIMEOUT); - - POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", - WAITFN, now, nfd)); - - /* timeout */ - if (rfd == 0) - continue; - - rebuild_set = true; - - /* error */ - if (rfd < 0) - { - if (errno == EINTR) - continue; - pcnt++; - smi_log(SMI_LOG_ERR, - "%s() failed (%s), %s", - WAITFN, sm_errstring(errno), - pcnt >= MAX_FAILS_S ? "abort" : "try again"); - - if (pcnt >= MAX_FAILS_S) - goto err; - } - pcnt = 0; - - /* something happened */ - for (i = 0; i < nfd; i++) - { - if (pfd[i].revents == 0) - continue; - - POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", - WAITFN, i, nfd, - WAIT_FD(i))); - - /* has a worker signaled an end of task ? */ - if (WAIT_FD(i) == RD_PIPE) - { - char evt = 0; - int r = 0; - - POOL_LEV_DPRINTF(4, - ("PIPE WILL READ evt = %08X %08X", - pfd[i].events, pfd[i].revents)); - - if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) - { - r = read(RD_PIPE, &evt, sizeof(evt)); - if (r == sizeof(evt)) - { - /* Do nothing */ - } - } - - POOL_LEV_DPRINTF(4, - ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", - i, RD_PIPE, r, evt)); - - if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) - { - /* Exception handling */ - } - continue; - } - - /* no ! sendmail wants to send a command */ - SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) - { - if (ctx->ctx_wstate != WKST_WAITING) - continue; - - POOL_LEV_DPRINTF(4, - ("Checking context sd=%d - fd=%d ", - ctx->ctx_sd , WAIT_FD(i))); - - if (ctx->ctx_sd == pfd[i].fd) - { - TASKMGR_LOCK(); - - POOL_LEV_DPRINTF(4, - ("TASK: found %d for fd[%d]=%d", - ctx->ctx_sid, i, WAIT_FD(i))); - - if (Tskmgr.tm_nb_idle > 0) - { - ctx->ctx_wstate = WKST_READY_TO_RUN; - TASKMGR_COND_SIGNAL(); - } - else - { - ctx->ctx_wstate = WKST_RUNNING; - LAUNCH_WORKER(ctx); - } - TASKMGR_UNLOCK(); - break; - } - } - - POOL_LEV_DPRINTF(4, - ("TASK %s FOUND - Checking PIPE for fd[%d]", - ctx != NULL ? "" : "NOT", WAIT_FD(i))); - } - } - - err: - if (pfd != NULL) - free(pfd); - - Tskmgr.tm_signature = 0; - for (;;) - { - SMFICTX_PTR ctx; - - ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); - if (ctx == NULL) - break; - mi_close_session(ctx); - } - - (void) smutex_destroy(&Tskmgr.tm_w_mutex); - (void) scond_destroy(&Tskmgr.tm_w_cond); - - return NULL; -} - -/* -** Look for a task ready to run. -** Value of ctx is NULL or a pointer to a task ready to run. -*/ - -#define GET_TASK_READY_TO_RUN() \ - SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ - { \ - if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ - { \ - ctx->ctx_wstate = WKST_RUNNING; \ - break; \ - } \ - } - -/* -** MI_WORKER -- worker thread -** executes tasks distributed by the mi_pool_controller -** or by mi_start_session -** -** Parameters: -** arg -- pointer to context structure -** -** Returns: -** NULL pointer -*/ - -static void * -mi_worker(arg) - void *arg; -{ - SMFICTX_PTR ctx; - bool done; - sthread_t t_id; - int r; - - ctx = (SMFICTX_PTR) arg; - done = false; - if (ctx != NULL) - ctx->ctx_wstate = WKST_RUNNING; - - t_id = sthread_get_id(); - if (pthread_detach(t_id) != 0) - { - smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); - if (ctx != NULL) - ctx->ctx_wstate = WKST_READY_TO_RUN; - return NULL; - } - - TASKMGR_LOCK(); - Tskmgr.tm_nb_workers++; - TASKMGR_UNLOCK(); - - while (!done) - { - if (mi_stop() != MILTER_CONT) - break; - - /* let's handle next task... */ - if (ctx != NULL) - { - int res; - - POOL_LEV_DPRINTF(4, - ("worker %d: new task -> let's handle it", - t_id)); - res = mi_engine(ctx); - POOL_LEV_DPRINTF(4, - ("worker %d: mi_engine returned %d", t_id, res)); - - TASKMGR_LOCK(); - if (res != MI_CONTINUE) - { - ctx->ctx_wstate = WKST_CLOSING; - - /* - ** Delete context from linked list of - ** sessions and close session. - */ - - mi_close_session(ctx); - } - else - { - ctx->ctx_wstate = WKST_READY_TO_WAIT; - - POOL_LEV_DPRINTF(4, - ("writing to event pipe...")); - - /* - ** Signal task controller to add new session - ** to poll set. - */ - - PIPE_SEND_SIGNAL(); - } - TASKMGR_UNLOCK(); - ctx = NULL; - - } - - /* check if there is any task waiting to be served */ - TASKMGR_LOCK(); - - GET_TASK_READY_TO_RUN(); - - /* Got a task? */ - if (ctx != NULL) - { - TASKMGR_UNLOCK(); - continue; - } - - /* - ** if not, let's check if there is enough idle workers - ** if yes: quit - */ - - if (Tskmgr.tm_nb_workers > MIN_WORKERS && - Tskmgr.tm_nb_idle > MIN_IDLE) - done = true; - - POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, - Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); - - if (done) - { - POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); - Tskmgr.tm_nb_workers--; - TASKMGR_UNLOCK(); - continue; - } - - /* - ** if no task ready to run, wait for another one - */ - - Tskmgr.tm_nb_idle++; - TASKMGR_COND_WAIT(); - Tskmgr.tm_nb_idle--; - - /* look for a task */ - GET_TASK_READY_TO_RUN(); - - TASKMGR_UNLOCK(); - } - return NULL; -} - -/* -** MI_LIST_ADD_CTX -- add new session to linked list -** -** Parameters: -** ctx -- context structure -** -** Returns: -** MI_FAILURE/MI_SUCCESS -*/ - -static int -mi_list_add_ctx(ctx) - SMFICTX_PTR ctx; -{ - SM_ASSERT(ctx != NULL); - SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); - return MI_SUCCESS; -} - -/* -** MI_LIST_DEL_CTX -- remove session from linked list when finished -** -** Parameters: -** ctx -- context structure -** -** Returns: -** MI_FAILURE/MI_SUCCESS -*/ - -static int -mi_list_del_ctx(ctx) - SMFICTX_PTR ctx; -{ - SM_ASSERT(ctx != NULL); - if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) - return MI_FAILURE; - - SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); - return MI_SUCCESS; -} -#endif /* _FFR_WORKERS_POOL */ -- cgit v1.1