summaryrefslogtreecommitdiffstats
path: root/contrib/sendmail/libmilter/worker.c
diff options
context:
space:
mode:
authorpeter <peter@FreeBSD.org>2008-08-28 02:25:51 +0000
committerpeter <peter@FreeBSD.org>2008-08-28 02:25:51 +0000
commitea50d71feb02a78d4d5fa746a26ca7ddc6e8cb19 (patch)
treedaf40952cf309641cc6c7d987989fd2abce2d758 /contrib/sendmail/libmilter/worker.c
parenta2b986fa722f9860a6c56bb5cc724b7e2937d1b7 (diff)
downloadFreeBSD-src-ea50d71feb02a78d4d5fa746a26ca7ddc6e8cb19.zip
FreeBSD-src-ea50d71feb02a78d4d5fa746a26ca7ddc6e8cb19.tar.gz
Stage 1 of sendmail dist tree flattening. contrib/sendmail/contrib
prevents doing this in one pass.
Diffstat (limited to 'contrib/sendmail/libmilter/worker.c')
-rw-r--r--contrib/sendmail/libmilter/worker.c792
1 files changed, 0 insertions, 792 deletions
diff --git a/contrib/sendmail/libmilter/worker.c b/contrib/sendmail/libmilter/worker.c
deleted file mode 100644
index 0402678..0000000
--- a/contrib/sendmail/libmilter/worker.c
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
- * All rights reserved.
- *
- * By using this file, you agree to the terms and conditions set
- * forth in the LICENSE file which can be found at the top level of
- * the sendmail distribution.
- *
- * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
- * Jose-Marcio.Martins@ensmp.fr
- */
-
-#include <sm/gen.h>
-SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
-
-#include "libmilter.h"
-
-#if _FFR_WORKERS_POOL
-
-typedef struct taskmgr_S taskmgr_T;
-
-#define TM_SIGNATURE 0x23021957
-
-struct taskmgr_S
-{
- long tm_signature; /* has the controller been initialized */
- sthread_t tm_tid; /* thread id of controller */
- smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
-
- int tm_nb_workers; /* number of workers in the pool */
- int tm_nb_idle; /* number of workers waiting */
-
- int tm_p[2]; /* poll control pipe */
-
- smutex_t tm_w_mutex; /* linked list access mutex */
- scond_t tm_w_cond; /* */
-};
-
-static taskmgr_T Tskmgr = {0};
-
-#define WRK_CTX_HEAD Tskmgr.tm_ctx_head
-
-#define RD_PIPE (Tskmgr.tm_p[0])
-#define WR_PIPE (Tskmgr.tm_p[1])
-
-#define PIPE_SEND_SIGNAL() \
- do \
- { \
- char evt = 0x5a; \
- int fd = WR_PIPE; \
- if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
- smi_log(SMI_LOG_ERR, \
- "Error writing to event pipe: %s", \
- sm_errstring(errno)); \
- } while (0)
-
-#ifndef USE_PIPE_WAKE_POLL
-# define USE_PIPE_WAKE_POLL 1
-#endif /* USE_PIPE_WAKE_POLL */
-
-/* poll check periodicity (default 10000 - 10 s) */
-#define POLL_TIMEOUT 10000
-
-/* worker conditional wait timeout (default 10 s) */
-#define COND_TIMEOUT 10
-
-/* functions */
-static int mi_close_session __P((SMFICTX_PTR));
-
-static void *mi_worker __P((void *));
-static void *mi_pool_controller __P((void *));
-
-static int mi_list_add_ctx __P((SMFICTX_PTR));
-static int mi_list_del_ctx __P((SMFICTX_PTR));
-
-/*
-** periodicity of cleaning up old sessions (timedout)
-** sessions list will be checked to find old inactive
-** sessions each DT_CHECK_OLD_SESSIONS sec
-*/
-
-#define DT_CHECK_OLD_SESSIONS 600
-
-#ifndef OLD_SESSION_TIMEOUT
-# define OLD_SESSION_TIMEOUT ctx->ctx_timeout
-#endif /* OLD_SESSION_TIMEOUT */
-
-/* session states - with respect to the pool of workers */
-#define WKST_INIT 0 /* initial state */
-#define WKST_READY_TO_RUN 1 /* command ready do be read */
-#define WKST_RUNNING 2 /* session running on a worker */
-#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
-#define WKST_WAITING 4 /* waiting for new command */
-#define WKST_CLOSING 5 /* session finished */
-
-#ifndef MIN_WORKERS
-# define MIN_WORKERS 2 /* minimum number of threads to keep around */
-#endif
-
-#define MIN_IDLE 1 /* minimum number of idle threads */
-
-
-/*
-** Macros for threads and mutex management
-*/
-
-#define TASKMGR_LOCK() \
- do \
- { \
- if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
- smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
- } while (0)
-
-#define TASKMGR_UNLOCK() \
- do \
- { \
- if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
- smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
- } while (0)
-
-#define TASKMGR_COND_WAIT() \
- scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
-
-#define TASKMGR_COND_SIGNAL() \
- do \
- { \
- if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
- smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
- } while (0)
-
-#define LAUNCH_WORKER(ctx) \
- do \
- { \
- int r; \
- sthread_t tid; \
- \
- if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
- smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
- sm_errstring(r)); \
- } while (0)
-
-#if POOL_DEBUG
-# define POOL_LEV_DPRINTF(lev, x) \
- do { \
- if ((lev) < ctx->ctx_dbg) \
- sm_dprintf x; \
- } while (0)
-#else /* POOL_DEBUG */
-# define POOL_LEV_DPRINTF(lev, x)
-#endif /* POOL_DEBUG */
-
-/*
-** MI_START_SESSION -- Start a session in the pool of workers
-**
-** Parameters:
-** ctx -- context structure
-**
-** Returns:
-** MI_SUCCESS/MI_FAILURE
-*/
-
-int
-mi_start_session(ctx)
- SMFICTX_PTR ctx;
-{
- static long id = 0;
-
- SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
- SM_ASSERT(ctx != NULL);
- POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
- TASKMGR_LOCK();
-
- if (mi_list_add_ctx(ctx) != MI_SUCCESS)
- {
- TASKMGR_UNLOCK();
- return MI_FAILURE;
- }
-
- ctx->ctx_sid = id++;
-
- /* if there is an idle worker, signal it, otherwise start new worker */
- if (Tskmgr.tm_nb_idle > 0)
- {
- ctx->ctx_wstate = WKST_READY_TO_RUN;
- TASKMGR_COND_SIGNAL();
- }
- else
- {
- ctx->ctx_wstate = WKST_RUNNING;
- LAUNCH_WORKER(ctx);
- }
- TASKMGR_UNLOCK();
- return MI_SUCCESS;
-}
-
-/*
-** MI_CLOSE_SESSION -- Close a session and clean up data structures
-**
-** Parameters:
-** ctx -- context structure
-**
-** Returns:
-** MI_SUCCESS/MI_FAILURE
-*/
-
-static int
-mi_close_session(ctx)
- SMFICTX_PTR ctx;
-{
- SM_ASSERT(ctx != NULL);
-
- (void) mi_list_del_ctx(ctx);
- if (ValidSocket(ctx->ctx_sd))
- {
- (void) closesocket(ctx->ctx_sd);
- ctx->ctx_sd = INVALID_SOCKET;
- }
- if (ctx->ctx_reply != NULL)
- {
- free(ctx->ctx_reply);
- ctx->ctx_reply = NULL;
- }
- if (ctx->ctx_privdata != NULL)
- {
- smi_log(SMI_LOG_WARN, "%s: private data not NULL",
- ctx->ctx_smfi->xxfi_name);
- }
- mi_clr_macros(ctx, 0);
- free(ctx);
-
- return MI_SUCCESS;
-}
-
-/*
-** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
-** Must be called before starting sessions.
-**
-** Parameters:
-** none
-**
-** Returns:
-** MI_SUCCESS/MI_FAILURE
-*/
-
-int
-mi_pool_controller_init()
-{
- sthread_t tid;
- int r, i;
-
- if (Tskmgr.tm_signature == TM_SIGNATURE)
- return MI_SUCCESS;
-
- SM_TAILQ_INIT(&WRK_CTX_HEAD);
- Tskmgr.tm_tid = (sthread_t) -1;
- Tskmgr.tm_nb_workers = 0;
- Tskmgr.tm_nb_idle = 0;
-
- if (pipe(Tskmgr.tm_p) != 0)
- {
- smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
- sm_errstring(r));
- return MI_FAILURE;
- }
-
- POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
-
- (void) smutex_init(&Tskmgr.tm_w_mutex);
- (void) scond_init(&Tskmgr.tm_w_cond);
-
- /* Launch the pool controller */
- if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
- {
- smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
- sm_errstring(r));
- return MI_FAILURE;
- }
- Tskmgr.tm_tid = tid;
- Tskmgr.tm_signature = TM_SIGNATURE;
-
- /* Create the pool of workers */
- for (i = 0; i < MIN_WORKERS; i++)
- {
- if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
- {
- smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
- sm_errstring(r));
- return MI_FAILURE;
- }
- }
-
- return MI_SUCCESS;
-}
-
-/*
-** MI_POOL_CONTROLLER -- manage the pool of workers
-** This thread must be running when listener begins
-** starting sessions
-**
-** Parameters:
-** arg -- unused
-**
-** Returns:
-** NULL
-**
-** Control flow:
-** for (;;)
-** Look for timed out sessions
-** Select sessions to wait for sendmail command
-** Poll set of file descriptors
-** if timeout
-** continue
-** For each file descriptor ready
-** launch new thread if no worker available
-** else
-** signal waiting worker
-*/
-
-/* Poll structure array (pollfd) size step */
-#define PFD_STEP 256
-
-#define WAIT_FD(i) (pfd[i].fd)
-#define WAITFN "POLL"
-
-static void *
-mi_pool_controller(arg)
- void *arg;
-{
- struct pollfd *pfd = NULL;
- int dim_pfd = 0;
- bool rebuild_set = true;
- int pcnt = 0; /* error count for poll() failures */
-
- Tskmgr.tm_tid = sthread_get_id();
- if (pthread_detach(Tskmgr.tm_tid) != 0)
- {
- smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
- return NULL;
- }
-
- pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
- if (pfd == NULL)
- {
- smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
- sm_errstring(errno));
- return NULL;
- }
- dim_pfd = PFD_STEP;
-
- for (;;)
- {
- SMFICTX_PTR ctx;
- int nfd, rfd, i;
- time_t now;
- time_t lastcheck;
-
- POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
-
- if (mi_stop() != MILTER_CONT)
- break;
-
- TASKMGR_LOCK();
-
- now = time(NULL);
-
- /* check for timed out sessions? */
- if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
- {
- SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
- {
- if (ctx->ctx_wstate == WKST_WAITING)
- {
- if (ctx->ctx_wait == 0)
- {
- ctx->ctx_wait = now;
- continue;
- }
-
- /* if session timed out, close it */
- if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
- < now)
- {
- sfsistat (*fi_close) __P((SMFICTX *));
-
- POOL_LEV_DPRINTF(4,
- ("Closing old connection: sd=%d id=%d",
- ctx->ctx_sd,
- ctx->ctx_sid));
-
- if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
- (void) (*fi_close)(ctx);
-
- mi_close_session(ctx);
- ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
- continue;
- }
- }
- }
- lastcheck = now;
- }
-
- if (rebuild_set)
- {
- /*
- ** Initialize poll set.
- ** Insert into the poll set the file descriptors of
- ** all sessions waiting for a command from sendmail.
- */
-
- nfd = 0;
-
- /* begin with worker pipe */
- pfd[nfd].fd = RD_PIPE;
- pfd[nfd].events = MI_POLL_RD_FLAGS;
- pfd[nfd].revents = 0;
- nfd++;
-
- SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
- {
- /*
- ** update ctx_wait - start of wait moment -
- ** for timeout
- */
-
- if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
- ctx->ctx_wait = now;
-
- /* add the session to the pollfd array? */
- if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
- (ctx->ctx_wstate == WKST_WAITING))
- {
- /*
- ** Resize the pollfd array if it
- ** isn't large enough.
- */
-
- if (nfd >= dim_pfd)
- {
- struct pollfd *tpfd;
- size_t new;
-
- new = (dim_pfd + PFD_STEP) *
- sizeof(*tpfd);
- tpfd = (struct pollfd *)
- realloc(pfd, new);
- if (tpfd != NULL)
- {
- pfd = tpfd;
- dim_pfd += PFD_STEP;
- }
- else
- {
- smi_log(SMI_LOG_ERR,
- "Failed to realloc pollfd array:%s",
- sm_errstring(errno));
- }
- }
-
- /* add the session to pollfd array */
- if (nfd < dim_pfd)
- {
- ctx->ctx_wstate = WKST_WAITING;
- pfd[nfd].fd = ctx->ctx_sd;
- pfd[nfd].events = MI_POLL_RD_FLAGS;
- pfd[nfd].revents = 0;
- nfd++;
- }
- }
- }
- }
-
- TASKMGR_UNLOCK();
-
- /* Everything is ready, let's wait for an event */
- rfd = poll(pfd, nfd, POLL_TIMEOUT);
-
- POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
- WAITFN, now, nfd));
-
- /* timeout */
- if (rfd == 0)
- continue;
-
- rebuild_set = true;
-
- /* error */
- if (rfd < 0)
- {
- if (errno == EINTR)
- continue;
- pcnt++;
- smi_log(SMI_LOG_ERR,
- "%s() failed (%s), %s",
- WAITFN, sm_errstring(errno),
- pcnt >= MAX_FAILS_S ? "abort" : "try again");
-
- if (pcnt >= MAX_FAILS_S)
- goto err;
- }
- pcnt = 0;
-
- /* something happened */
- for (i = 0; i < nfd; i++)
- {
- if (pfd[i].revents == 0)
- continue;
-
- POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
- WAITFN, i, nfd,
- WAIT_FD(i)));
-
- /* has a worker signaled an end of task ? */
- if (WAIT_FD(i) == RD_PIPE)
- {
- char evt = 0;
- int r = 0;
-
- POOL_LEV_DPRINTF(4,
- ("PIPE WILL READ evt = %08X %08X",
- pfd[i].events, pfd[i].revents));
-
- if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
- {
- r = read(RD_PIPE, &evt, sizeof(evt));
- if (r == sizeof(evt))
- {
- /* Do nothing */
- }
- }
-
- POOL_LEV_DPRINTF(4,
- ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
- i, RD_PIPE, r, evt));
-
- if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
- {
- /* Exception handling */
- }
- continue;
- }
-
- /* no ! sendmail wants to send a command */
- SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
- {
- if (ctx->ctx_wstate != WKST_WAITING)
- continue;
-
- POOL_LEV_DPRINTF(4,
- ("Checking context sd=%d - fd=%d ",
- ctx->ctx_sd , WAIT_FD(i)));
-
- if (ctx->ctx_sd == pfd[i].fd)
- {
- TASKMGR_LOCK();
-
- POOL_LEV_DPRINTF(4,
- ("TASK: found %d for fd[%d]=%d",
- ctx->ctx_sid, i, WAIT_FD(i)));
-
- if (Tskmgr.tm_nb_idle > 0)
- {
- ctx->ctx_wstate = WKST_READY_TO_RUN;
- TASKMGR_COND_SIGNAL();
- }
- else
- {
- ctx->ctx_wstate = WKST_RUNNING;
- LAUNCH_WORKER(ctx);
- }
- TASKMGR_UNLOCK();
- break;
- }
- }
-
- POOL_LEV_DPRINTF(4,
- ("TASK %s FOUND - Checking PIPE for fd[%d]",
- ctx != NULL ? "" : "NOT", WAIT_FD(i)));
- }
- }
-
- err:
- if (pfd != NULL)
- free(pfd);
-
- Tskmgr.tm_signature = 0;
- for (;;)
- {
- SMFICTX_PTR ctx;
-
- ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
- if (ctx == NULL)
- break;
- mi_close_session(ctx);
- }
-
- (void) smutex_destroy(&Tskmgr.tm_w_mutex);
- (void) scond_destroy(&Tskmgr.tm_w_cond);
-
- return NULL;
-}
-
-/*
-** Look for a task ready to run.
-** Value of ctx is NULL or a pointer to a task ready to run.
-*/
-
-#define GET_TASK_READY_TO_RUN() \
- SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
- { \
- if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
- { \
- ctx->ctx_wstate = WKST_RUNNING; \
- break; \
- } \
- }
-
-/*
-** MI_WORKER -- worker thread
-** executes tasks distributed by the mi_pool_controller
-** or by mi_start_session
-**
-** Parameters:
-** arg -- pointer to context structure
-**
-** Returns:
-** NULL pointer
-*/
-
-static void *
-mi_worker(arg)
- void *arg;
-{
- SMFICTX_PTR ctx;
- bool done;
- sthread_t t_id;
- int r;
-
- ctx = (SMFICTX_PTR) arg;
- done = false;
- if (ctx != NULL)
- ctx->ctx_wstate = WKST_RUNNING;
-
- t_id = sthread_get_id();
- if (pthread_detach(t_id) != 0)
- {
- smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
- if (ctx != NULL)
- ctx->ctx_wstate = WKST_READY_TO_RUN;
- return NULL;
- }
-
- TASKMGR_LOCK();
- Tskmgr.tm_nb_workers++;
- TASKMGR_UNLOCK();
-
- while (!done)
- {
- if (mi_stop() != MILTER_CONT)
- break;
-
- /* let's handle next task... */
- if (ctx != NULL)
- {
- int res;
-
- POOL_LEV_DPRINTF(4,
- ("worker %d: new task -> let's handle it",
- t_id));
- res = mi_engine(ctx);
- POOL_LEV_DPRINTF(4,
- ("worker %d: mi_engine returned %d", t_id, res));
-
- TASKMGR_LOCK();
- if (res != MI_CONTINUE)
- {
- ctx->ctx_wstate = WKST_CLOSING;
-
- /*
- ** Delete context from linked list of
- ** sessions and close session.
- */
-
- mi_close_session(ctx);
- }
- else
- {
- ctx->ctx_wstate = WKST_READY_TO_WAIT;
-
- POOL_LEV_DPRINTF(4,
- ("writing to event pipe..."));
-
- /*
- ** Signal task controller to add new session
- ** to poll set.
- */
-
- PIPE_SEND_SIGNAL();
- }
- TASKMGR_UNLOCK();
- ctx = NULL;
-
- }
-
- /* check if there is any task waiting to be served */
- TASKMGR_LOCK();
-
- GET_TASK_READY_TO_RUN();
-
- /* Got a task? */
- if (ctx != NULL)
- {
- TASKMGR_UNLOCK();
- continue;
- }
-
- /*
- ** if not, let's check if there is enough idle workers
- ** if yes: quit
- */
-
- if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
- Tskmgr.tm_nb_idle > MIN_IDLE)
- done = true;
-
- POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
- Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
-
- if (done)
- {
- POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
- Tskmgr.tm_nb_workers--;
- TASKMGR_UNLOCK();
- continue;
- }
-
- /*
- ** if no task ready to run, wait for another one
- */
-
- Tskmgr.tm_nb_idle++;
- TASKMGR_COND_WAIT();
- Tskmgr.tm_nb_idle--;
-
- /* look for a task */
- GET_TASK_READY_TO_RUN();
-
- TASKMGR_UNLOCK();
- }
- return NULL;
-}
-
-/*
-** MI_LIST_ADD_CTX -- add new session to linked list
-**
-** Parameters:
-** ctx -- context structure
-**
-** Returns:
-** MI_FAILURE/MI_SUCCESS
-*/
-
-static int
-mi_list_add_ctx(ctx)
- SMFICTX_PTR ctx;
-{
- SM_ASSERT(ctx != NULL);
- SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
- return MI_SUCCESS;
-}
-
-/*
-** MI_LIST_DEL_CTX -- remove session from linked list when finished
-**
-** Parameters:
-** ctx -- context structure
-**
-** Returns:
-** MI_FAILURE/MI_SUCCESS
-*/
-
-static int
-mi_list_del_ctx(ctx)
- SMFICTX_PTR ctx;
-{
- SM_ASSERT(ctx != NULL);
- if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
- return MI_FAILURE;
-
- SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
- return MI_SUCCESS;
-}
-#endif /* _FFR_WORKERS_POOL */
OpenPOWER on IntegriCloud