summaryrefslogtreecommitdiffstats
path: root/contrib/sendmail/libmilter/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/sendmail/libmilter/worker.c')
-rw-r--r--contrib/sendmail/libmilter/worker.c792
1 files changed, 792 insertions, 0 deletions
diff --git a/contrib/sendmail/libmilter/worker.c b/contrib/sendmail/libmilter/worker.c
new file mode 100644
index 0000000..0402678
--- /dev/null
+++ b/contrib/sendmail/libmilter/worker.c
@@ -0,0 +1,792 @@
+/*
+ * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
+ * All rights reserved.
+ *
+ * By using this file, you agree to the terms and conditions set
+ * forth in the LICENSE file which can be found at the top level of
+ * the sendmail distribution.
+ *
+ * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
+ * Jose-Marcio.Martins@ensmp.fr
+ */
+
+#include <sm/gen.h>
+SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
+
+#include "libmilter.h"
+
+#if _FFR_WORKERS_POOL
+
+typedef struct taskmgr_S taskmgr_T;
+
+#define TM_SIGNATURE 0x23021957
+
+struct taskmgr_S
+{
+ long tm_signature; /* has the controller been initialized */
+ sthread_t tm_tid; /* thread id of controller */
+ smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
+
+ int tm_nb_workers; /* number of workers in the pool */
+ int tm_nb_idle; /* number of workers waiting */
+
+ int tm_p[2]; /* poll control pipe */
+
+ smutex_t tm_w_mutex; /* linked list access mutex */
+ scond_t tm_w_cond; /* */
+};
+
+static taskmgr_T Tskmgr = {0};
+
+#define WRK_CTX_HEAD Tskmgr.tm_ctx_head
+
+#define RD_PIPE (Tskmgr.tm_p[0])
+#define WR_PIPE (Tskmgr.tm_p[1])
+
+#define PIPE_SEND_SIGNAL() \
+ do \
+ { \
+ char evt = 0x5a; \
+ int fd = WR_PIPE; \
+ if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
+ smi_log(SMI_LOG_ERR, \
+ "Error writing to event pipe: %s", \
+ sm_errstring(errno)); \
+ } while (0)
+
+#ifndef USE_PIPE_WAKE_POLL
+# define USE_PIPE_WAKE_POLL 1
+#endif /* USE_PIPE_WAKE_POLL */
+
+/* poll check periodicity (default 10000 - 10 s) */
+#define POLL_TIMEOUT 10000
+
+/* worker conditional wait timeout (default 10 s) */
+#define COND_TIMEOUT 10
+
+/* functions */
+static int mi_close_session __P((SMFICTX_PTR));
+
+static void *mi_worker __P((void *));
+static void *mi_pool_controller __P((void *));
+
+static int mi_list_add_ctx __P((SMFICTX_PTR));
+static int mi_list_del_ctx __P((SMFICTX_PTR));
+
+/*
+** periodicity of cleaning up old sessions (timedout)
+** sessions list will be checked to find old inactive
+** sessions each DT_CHECK_OLD_SESSIONS sec
+*/
+
+#define DT_CHECK_OLD_SESSIONS 600
+
+#ifndef OLD_SESSION_TIMEOUT
+# define OLD_SESSION_TIMEOUT ctx->ctx_timeout
+#endif /* OLD_SESSION_TIMEOUT */
+
+/* session states - with respect to the pool of workers */
+#define WKST_INIT 0 /* initial state */
+#define WKST_READY_TO_RUN 1 /* command ready do be read */
+#define WKST_RUNNING 2 /* session running on a worker */
+#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
+#define WKST_WAITING 4 /* waiting for new command */
+#define WKST_CLOSING 5 /* session finished */
+
+#ifndef MIN_WORKERS
+# define MIN_WORKERS 2 /* minimum number of threads to keep around */
+#endif
+
+#define MIN_IDLE 1 /* minimum number of idle threads */
+
+
+/*
+** Macros for threads and mutex management
+*/
+
+#define TASKMGR_LOCK() \
+ do \
+ { \
+ if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
+ smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
+ } while (0)
+
+#define TASKMGR_UNLOCK() \
+ do \
+ { \
+ if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
+ smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
+ } while (0)
+
+#define TASKMGR_COND_WAIT() \
+ scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
+
+#define TASKMGR_COND_SIGNAL() \
+ do \
+ { \
+ if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
+ smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
+ } while (0)
+
+#define LAUNCH_WORKER(ctx) \
+ do \
+ { \
+ int r; \
+ sthread_t tid; \
+ \
+ if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
+ smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
+ sm_errstring(r)); \
+ } while (0)
+
+#if POOL_DEBUG
+# define POOL_LEV_DPRINTF(lev, x) \
+ do { \
+ if ((lev) < ctx->ctx_dbg) \
+ sm_dprintf x; \
+ } while (0)
+#else /* POOL_DEBUG */
+# define POOL_LEV_DPRINTF(lev, x)
+#endif /* POOL_DEBUG */
+
+/*
+** MI_START_SESSION -- Start a session in the pool of workers
+**
+** Parameters:
+** ctx -- context structure
+**
+** Returns:
+** MI_SUCCESS/MI_FAILURE
+*/
+
+int
+mi_start_session(ctx)
+ SMFICTX_PTR ctx;
+{
+ static long id = 0;
+
+ SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
+ SM_ASSERT(ctx != NULL);
+ POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
+ TASKMGR_LOCK();
+
+ if (mi_list_add_ctx(ctx) != MI_SUCCESS)
+ {
+ TASKMGR_UNLOCK();
+ return MI_FAILURE;
+ }
+
+ ctx->ctx_sid = id++;
+
+ /* if there is an idle worker, signal it, otherwise start new worker */
+ if (Tskmgr.tm_nb_idle > 0)
+ {
+ ctx->ctx_wstate = WKST_READY_TO_RUN;
+ TASKMGR_COND_SIGNAL();
+ }
+ else
+ {
+ ctx->ctx_wstate = WKST_RUNNING;
+ LAUNCH_WORKER(ctx);
+ }
+ TASKMGR_UNLOCK();
+ return MI_SUCCESS;
+}
+
+/*
+** MI_CLOSE_SESSION -- Close a session and clean up data structures
+**
+** Parameters:
+** ctx -- context structure
+**
+** Returns:
+** MI_SUCCESS/MI_FAILURE
+*/
+
+static int
+mi_close_session(ctx)
+ SMFICTX_PTR ctx;
+{
+ SM_ASSERT(ctx != NULL);
+
+ (void) mi_list_del_ctx(ctx);
+ if (ValidSocket(ctx->ctx_sd))
+ {
+ (void) closesocket(ctx->ctx_sd);
+ ctx->ctx_sd = INVALID_SOCKET;
+ }
+ if (ctx->ctx_reply != NULL)
+ {
+ free(ctx->ctx_reply);
+ ctx->ctx_reply = NULL;
+ }
+ if (ctx->ctx_privdata != NULL)
+ {
+ smi_log(SMI_LOG_WARN, "%s: private data not NULL",
+ ctx->ctx_smfi->xxfi_name);
+ }
+ mi_clr_macros(ctx, 0);
+ free(ctx);
+
+ return MI_SUCCESS;
+}
+
+/*
+** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
+** Must be called before starting sessions.
+**
+** Parameters:
+** none
+**
+** Returns:
+** MI_SUCCESS/MI_FAILURE
+*/
+
+int
+mi_pool_controller_init()
+{
+ sthread_t tid;
+ int r, i;
+
+ if (Tskmgr.tm_signature == TM_SIGNATURE)
+ return MI_SUCCESS;
+
+ SM_TAILQ_INIT(&WRK_CTX_HEAD);
+ Tskmgr.tm_tid = (sthread_t) -1;
+ Tskmgr.tm_nb_workers = 0;
+ Tskmgr.tm_nb_idle = 0;
+
+ if (pipe(Tskmgr.tm_p) != 0)
+ {
+ smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
+ sm_errstring(r));
+ return MI_FAILURE;
+ }
+
+ POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
+
+ (void) smutex_init(&Tskmgr.tm_w_mutex);
+ (void) scond_init(&Tskmgr.tm_w_cond);
+
+ /* Launch the pool controller */
+ if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
+ {
+ smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
+ sm_errstring(r));
+ return MI_FAILURE;
+ }
+ Tskmgr.tm_tid = tid;
+ Tskmgr.tm_signature = TM_SIGNATURE;
+
+ /* Create the pool of workers */
+ for (i = 0; i < MIN_WORKERS; i++)
+ {
+ if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
+ {
+ smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
+ sm_errstring(r));
+ return MI_FAILURE;
+ }
+ }
+
+ return MI_SUCCESS;
+}
+
+/*
+** MI_POOL_CONTROLLER -- manage the pool of workers
+** This thread must be running when listener begins
+** starting sessions
+**
+** Parameters:
+** arg -- unused
+**
+** Returns:
+** NULL
+**
+** Control flow:
+** for (;;)
+** Look for timed out sessions
+** Select sessions to wait for sendmail command
+** Poll set of file descriptors
+** if timeout
+** continue
+** For each file descriptor ready
+** launch new thread if no worker available
+** else
+** signal waiting worker
+*/
+
+/* Poll structure array (pollfd) size step */
+#define PFD_STEP 256
+
+#define WAIT_FD(i) (pfd[i].fd)
+#define WAITFN "POLL"
+
+static void *
+mi_pool_controller(arg)
+ void *arg;
+{
+ struct pollfd *pfd = NULL;
+ int dim_pfd = 0;
+ bool rebuild_set = true;
+ int pcnt = 0; /* error count for poll() failures */
+
+ Tskmgr.tm_tid = sthread_get_id();
+ if (pthread_detach(Tskmgr.tm_tid) != 0)
+ {
+ smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
+ return NULL;
+ }
+
+ pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
+ if (pfd == NULL)
+ {
+ smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
+ sm_errstring(errno));
+ return NULL;
+ }
+ dim_pfd = PFD_STEP;
+
+ for (;;)
+ {
+ SMFICTX_PTR ctx;
+ int nfd, rfd, i;
+ time_t now;
+ time_t lastcheck;
+
+ POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
+
+ if (mi_stop() != MILTER_CONT)
+ break;
+
+ TASKMGR_LOCK();
+
+ now = time(NULL);
+
+ /* check for timed out sessions? */
+ if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
+ {
+ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
+ {
+ if (ctx->ctx_wstate == WKST_WAITING)
+ {
+ if (ctx->ctx_wait == 0)
+ {
+ ctx->ctx_wait = now;
+ continue;
+ }
+
+ /* if session timed out, close it */
+ if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
+ < now)
+ {
+ sfsistat (*fi_close) __P((SMFICTX *));
+
+ POOL_LEV_DPRINTF(4,
+ ("Closing old connection: sd=%d id=%d",
+ ctx->ctx_sd,
+ ctx->ctx_sid));
+
+ if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
+ (void) (*fi_close)(ctx);
+
+ mi_close_session(ctx);
+ ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
+ continue;
+ }
+ }
+ }
+ lastcheck = now;
+ }
+
+ if (rebuild_set)
+ {
+ /*
+ ** Initialize poll set.
+ ** Insert into the poll set the file descriptors of
+ ** all sessions waiting for a command from sendmail.
+ */
+
+ nfd = 0;
+
+ /* begin with worker pipe */
+ pfd[nfd].fd = RD_PIPE;
+ pfd[nfd].events = MI_POLL_RD_FLAGS;
+ pfd[nfd].revents = 0;
+ nfd++;
+
+ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
+ {
+ /*
+ ** update ctx_wait - start of wait moment -
+ ** for timeout
+ */
+
+ if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
+ ctx->ctx_wait = now;
+
+ /* add the session to the pollfd array? */
+ if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
+ (ctx->ctx_wstate == WKST_WAITING))
+ {
+ /*
+ ** Resize the pollfd array if it
+ ** isn't large enough.
+ */
+
+ if (nfd >= dim_pfd)
+ {
+ struct pollfd *tpfd;
+ size_t new;
+
+ new = (dim_pfd + PFD_STEP) *
+ sizeof(*tpfd);
+ tpfd = (struct pollfd *)
+ realloc(pfd, new);
+ if (tpfd != NULL)
+ {
+ pfd = tpfd;
+ dim_pfd += PFD_STEP;
+ }
+ else
+ {
+ smi_log(SMI_LOG_ERR,
+ "Failed to realloc pollfd array:%s",
+ sm_errstring(errno));
+ }
+ }
+
+ /* add the session to pollfd array */
+ if (nfd < dim_pfd)
+ {
+ ctx->ctx_wstate = WKST_WAITING;
+ pfd[nfd].fd = ctx->ctx_sd;
+ pfd[nfd].events = MI_POLL_RD_FLAGS;
+ pfd[nfd].revents = 0;
+ nfd++;
+ }
+ }
+ }
+ }
+
+ TASKMGR_UNLOCK();
+
+ /* Everything is ready, let's wait for an event */
+ rfd = poll(pfd, nfd, POLL_TIMEOUT);
+
+ POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
+ WAITFN, now, nfd));
+
+ /* timeout */
+ if (rfd == 0)
+ continue;
+
+ rebuild_set = true;
+
+ /* error */
+ if (rfd < 0)
+ {
+ if (errno == EINTR)
+ continue;
+ pcnt++;
+ smi_log(SMI_LOG_ERR,
+ "%s() failed (%s), %s",
+ WAITFN, sm_errstring(errno),
+ pcnt >= MAX_FAILS_S ? "abort" : "try again");
+
+ if (pcnt >= MAX_FAILS_S)
+ goto err;
+ }
+ pcnt = 0;
+
+ /* something happened */
+ for (i = 0; i < nfd; i++)
+ {
+ if (pfd[i].revents == 0)
+ continue;
+
+ POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
+ WAITFN, i, nfd,
+ WAIT_FD(i)));
+
+ /* has a worker signaled an end of task ? */
+ if (WAIT_FD(i) == RD_PIPE)
+ {
+ char evt = 0;
+ int r = 0;
+
+ POOL_LEV_DPRINTF(4,
+ ("PIPE WILL READ evt = %08X %08X",
+ pfd[i].events, pfd[i].revents));
+
+ if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
+ {
+ r = read(RD_PIPE, &evt, sizeof(evt));
+ if (r == sizeof(evt))
+ {
+ /* Do nothing */
+ }
+ }
+
+ POOL_LEV_DPRINTF(4,
+ ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
+ i, RD_PIPE, r, evt));
+
+ if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
+ {
+ /* Exception handling */
+ }
+ continue;
+ }
+
+ /* no ! sendmail wants to send a command */
+ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
+ {
+ if (ctx->ctx_wstate != WKST_WAITING)
+ continue;
+
+ POOL_LEV_DPRINTF(4,
+ ("Checking context sd=%d - fd=%d ",
+ ctx->ctx_sd , WAIT_FD(i)));
+
+ if (ctx->ctx_sd == pfd[i].fd)
+ {
+ TASKMGR_LOCK();
+
+ POOL_LEV_DPRINTF(4,
+ ("TASK: found %d for fd[%d]=%d",
+ ctx->ctx_sid, i, WAIT_FD(i)));
+
+ if (Tskmgr.tm_nb_idle > 0)
+ {
+ ctx->ctx_wstate = WKST_READY_TO_RUN;
+ TASKMGR_COND_SIGNAL();
+ }
+ else
+ {
+ ctx->ctx_wstate = WKST_RUNNING;
+ LAUNCH_WORKER(ctx);
+ }
+ TASKMGR_UNLOCK();
+ break;
+ }
+ }
+
+ POOL_LEV_DPRINTF(4,
+ ("TASK %s FOUND - Checking PIPE for fd[%d]",
+ ctx != NULL ? "" : "NOT", WAIT_FD(i)));
+ }
+ }
+
+ err:
+ if (pfd != NULL)
+ free(pfd);
+
+ Tskmgr.tm_signature = 0;
+ for (;;)
+ {
+ SMFICTX_PTR ctx;
+
+ ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
+ if (ctx == NULL)
+ break;
+ mi_close_session(ctx);
+ }
+
+ (void) smutex_destroy(&Tskmgr.tm_w_mutex);
+ (void) scond_destroy(&Tskmgr.tm_w_cond);
+
+ return NULL;
+}
+
+/*
+** Look for a task ready to run.
+** Value of ctx is NULL or a pointer to a task ready to run.
+*/
+
+#define GET_TASK_READY_TO_RUN() \
+ SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
+ { \
+ if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
+ { \
+ ctx->ctx_wstate = WKST_RUNNING; \
+ break; \
+ } \
+ }
+
+/*
+** MI_WORKER -- worker thread
+** executes tasks distributed by the mi_pool_controller
+** or by mi_start_session
+**
+** Parameters:
+** arg -- pointer to context structure
+**
+** Returns:
+** NULL pointer
+*/
+
+static void *
+mi_worker(arg)
+ void *arg;
+{
+ SMFICTX_PTR ctx;
+ bool done;
+ sthread_t t_id;
+ int r;
+
+ ctx = (SMFICTX_PTR) arg;
+ done = false;
+ if (ctx != NULL)
+ ctx->ctx_wstate = WKST_RUNNING;
+
+ t_id = sthread_get_id();
+ if (pthread_detach(t_id) != 0)
+ {
+ smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
+ if (ctx != NULL)
+ ctx->ctx_wstate = WKST_READY_TO_RUN;
+ return NULL;
+ }
+
+ TASKMGR_LOCK();
+ Tskmgr.tm_nb_workers++;
+ TASKMGR_UNLOCK();
+
+ while (!done)
+ {
+ if (mi_stop() != MILTER_CONT)
+ break;
+
+ /* let's handle next task... */
+ if (ctx != NULL)
+ {
+ int res;
+
+ POOL_LEV_DPRINTF(4,
+ ("worker %d: new task -> let's handle it",
+ t_id));
+ res = mi_engine(ctx);
+ POOL_LEV_DPRINTF(4,
+ ("worker %d: mi_engine returned %d", t_id, res));
+
+ TASKMGR_LOCK();
+ if (res != MI_CONTINUE)
+ {
+ ctx->ctx_wstate = WKST_CLOSING;
+
+ /*
+ ** Delete context from linked list of
+ ** sessions and close session.
+ */
+
+ mi_close_session(ctx);
+ }
+ else
+ {
+ ctx->ctx_wstate = WKST_READY_TO_WAIT;
+
+ POOL_LEV_DPRINTF(4,
+ ("writing to event pipe..."));
+
+ /*
+ ** Signal task controller to add new session
+ ** to poll set.
+ */
+
+ PIPE_SEND_SIGNAL();
+ }
+ TASKMGR_UNLOCK();
+ ctx = NULL;
+
+ }
+
+ /* check if there is any task waiting to be served */
+ TASKMGR_LOCK();
+
+ GET_TASK_READY_TO_RUN();
+
+ /* Got a task? */
+ if (ctx != NULL)
+ {
+ TASKMGR_UNLOCK();
+ continue;
+ }
+
+ /*
+ ** if not, let's check if there is enough idle workers
+ ** if yes: quit
+ */
+
+ if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
+ Tskmgr.tm_nb_idle > MIN_IDLE)
+ done = true;
+
+ POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
+ Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
+
+ if (done)
+ {
+ POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
+ Tskmgr.tm_nb_workers--;
+ TASKMGR_UNLOCK();
+ continue;
+ }
+
+ /*
+ ** if no task ready to run, wait for another one
+ */
+
+ Tskmgr.tm_nb_idle++;
+ TASKMGR_COND_WAIT();
+ Tskmgr.tm_nb_idle--;
+
+ /* look for a task */
+ GET_TASK_READY_TO_RUN();
+
+ TASKMGR_UNLOCK();
+ }
+ return NULL;
+}
+
+/*
+** MI_LIST_ADD_CTX -- add new session to linked list
+**
+** Parameters:
+** ctx -- context structure
+**
+** Returns:
+** MI_FAILURE/MI_SUCCESS
+*/
+
+static int
+mi_list_add_ctx(ctx)
+ SMFICTX_PTR ctx;
+{
+ SM_ASSERT(ctx != NULL);
+ SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
+ return MI_SUCCESS;
+}
+
+/*
+** MI_LIST_DEL_CTX -- remove session from linked list when finished
+**
+** Parameters:
+** ctx -- context structure
+**
+** Returns:
+** MI_FAILURE/MI_SUCCESS
+*/
+
+static int
+mi_list_del_ctx(ctx)
+ SMFICTX_PTR ctx;
+{
+ SM_ASSERT(ctx != NULL);
+ if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
+ return MI_FAILURE;
+
+ SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
+ return MI_SUCCESS;
+}
+#endif /* _FFR_WORKERS_POOL */
OpenPOWER on IntegriCloud