summaryrefslogtreecommitdiffstats
path: root/contrib/sendmail/src/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/sendmail/src/queue.c')
-rw-r--r--contrib/sendmail/src/queue.c6910
1 files changed, 5855 insertions, 1055 deletions
diff --git a/contrib/sendmail/src/queue.c b/contrib/sendmail/src/queue.c
index aeed7f9..6a2da9c 100644
--- a/contrib/sendmail/src/queue.c
+++ b/contrib/sendmail/src/queue.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 1998-2001 Sendmail, Inc. and its suppliers.
+ * Copyright (c) 1998-2002 Sendmail, Inc. and its suppliers.
* All rights reserved.
* Copyright (c) 1983, 1995-1997 Eric P. Allman. All rights reserved.
* Copyright (c) 1988, 1993
@@ -11,28 +11,41 @@
*
*/
-
#include <sendmail.h>
-#ifndef lint
-# if QUEUE
-static char id[] = "@(#)$Id: queue.c,v 8.343.4.62 2001/07/20 00:53:01 gshapiro Exp $ (with queueing)";
-# else /* QUEUE */
-static char id[] = "@(#)$Id: queue.c,v 8.343.4.62 2001/07/20 00:53:01 gshapiro Exp $ (without queueing)";
-# endif /* QUEUE */
-#endif /* ! lint */
+SM_RCSID("@(#)$Id: queue.c,v 8.834 2002/01/08 23:04:58 ca Exp $")
+
+#include <dirent.h>
+
+#if SM_CONF_SHM
+# include <sm/shm.h>
+#endif /* SM_CONF_SHM */
-# include <dirent.h>
+# define RELEASE_QUEUE (void) 0
+# define ST_INODE(st) (st).st_ino
-#if QUEUE
-# if _FFR_QUEUEDELAY
-# define QF_VERSION 5 /* version number of this queue format */
+/*
+** Historical notes:
+** QF_VERSION==4 was sendmail 8.10/8.11 without _FFR_QUEUEDELAY
+** QF_VERSION==5 was sendmail 8.10/8.11 with _FFR_QUEUEDELAY
+*/
+
+#if _FFR_QUEUEDELAY
+# define QF_VERSION 7 /* version number of this queue format */
static time_t queuedelay __P((ENVELOPE *));
-# else /* _FFR_QUEUEDELAY */
-# define QF_VERSION 4 /* version number of this queue format */
-# define queuedelay(e) MinQueueAge
-# endif /* _FFR_QUEUEDELAY */
+#define queuedelay_qfver_unsupported(qfver) false
+#else /* _FFR_QUEUEDELAY */
+# define QF_VERSION 6 /* version number of this queue format */
+# define queuedelay(e) MinQueueAge
+#define queuedelay_qfver_unsupported(qfver) ((qfver) == 5 || (qfver) == 7)
+#endif /* _FFR_QUEUEDELAY */
+#if _FFR_QUARANTINE
+static char queue_letter __P((ENVELOPE *, int));
+static bool quarantine_queue_item __P((int, int, ENVELOPE *, char *));
+#endif /* _FFR_QUARANTINE */
+
+/* Naming convention: qgrp: index of queue group, qg: QUEUEGROUP */
/*
** Work queue.
@@ -45,28 +58,214 @@ struct work
bool w_lock; /* is message locked? */
bool w_tooyoung; /* is it too young to run? */
long w_pri; /* priority of message, see below */
- time_t w_ctime; /* creation time of message */
+ time_t w_ctime; /* creation time */
+ time_t w_mtime; /* modification time */
+ int w_qgrp; /* queue group located in */
+ int w_qdir; /* queue directory located in */
struct work *w_next; /* next in queue */
};
typedef struct work WORK;
-static WORK *WorkQ; /* queue of things to be done */
+static WORK *WorkQ; /* queue of things to be done */
+static int NumWorkGroups; /* number of work groups */
+
+/*
+** use of DoQueueRun:
+** NumQueue: indicates that a queue run is needed, look at individual bits
+** 0 - NumQueue-1: indicates that a queue run for this queue group
+** is needed.
+*/
+
+static BITMAP256 volatile DoQueueRun; /* non-interrupt time queue run needed */
+
+/*
+** Work group definition structure.
+** Each work group contains one or more queue groups. This is done
+** to manage the number of queue group runners active at the same time
+** to be within the constraints of MaxQueueChildren (if it is set).
+** The number of queue groups that can be run on the next work run
+** is kept track of. The queue groups are run in a round robin.
+*/
+
+struct workgrp
+{
+ int wg_numqgrp; /* number of queue groups in work grp */
+ int wg_runners; /* total runners */
+ int wg_curqgrp; /* current queue group */
+ QUEUEGRP **wg_qgs; /* array of queue groups */
+ int wg_maxact; /* max # of active runners */
+ time_t wg_lowqintvl; /* lowest queue interval */
+ int wg_restart; /* needs restarting? */
+ int wg_restartcnt; /* count of times restarted */
+};
+
+typedef struct workgrp WORKGRP;
+
+static WORKGRP volatile WorkGrp[MAXWORKGROUPS + 1]; /* work groups */
+
+#if SM_HEAP_CHECK
+static SM_DEBUG_T DebugLeakQ = SM_DEBUG_INITIALIZER("leak_q",
+ "@(#)$Debug: leak_q - trace memory leaks during queue processing $");
+#endif /* SM_HEAP_CHECK */
-static void grow_wlist __P((int));
-static int orderq __P((int, bool));
-static void printctladdr __P((ADDRESS *, FILE *));
-static int print_single_queue __P((int));
-static bool readqf __P((ENVELOPE *));
-static void runqueueevent __P((void));
-static int run_single_queue __P((int, bool, bool));
+/*
+** We use EmptyString instead of "" to avoid
+** 'zero-length format string' warnings from gcc
+*/
+
+static const char EmptyString[] = "";
+
+static void grow_wlist __P((int, int));
+static int multiqueue_cache __P((char *, int, QUEUEGRP *, int, unsigned int *));
+static int gatherq __P((int, int, bool, bool *, bool *));
+static int sortq __P((int));
+static void printctladdr __P((ADDRESS *, SM_FILE_T *));
+static bool readqf __P((ENVELOPE *, bool));
+static void restart_work_group __P((int));
+static void runner_work __P((ENVELOPE *, int, bool, int, int));
+static void schedule_queue_runs __P((bool, int));
static char *strrev __P((char *));
-static ADDRESS *setctluser __P((char *, int));
+static ADDRESS *setctluser __P((char *, int, ENVELOPE *));
+#if _FFR_RHS
+static int sm_strshufflecmp __P((char *, char *));
+static void init_shuffle_alphabet __P(());
+#endif /* _FFR_RHS */
static int workcmpf0();
static int workcmpf1();
static int workcmpf2();
static int workcmpf3();
static int workcmpf4();
+static int workcmpf5();
+static int workcmpf6();
+#if _FFR_RHS
+static int workcmpf7();
+#endif /* _FFR_RHS */
+
+#if RANDOMSHIFT
+# define get_rand_mod(m) ((get_random() >> RANDOMSHIFT) % (m))
+#else /* RANDOMSHIFT */
+# define get_rand_mod(m) (get_random() % (m))
+#endif /* RANDOMSHIFT */
+
+/*
+** File system definition.
+** Used to keep track of how much free space is available
+** on a file system in which one or more queue directories reside.
+*/
+
+typedef struct filesys_shared FILESYS;
+
+struct filesys_shared
+{
+ dev_t fs_dev; /* unique device id */
+ long fs_avail; /* number of free blocks available */
+ long fs_blksize; /* block size, in bytes */
+};
+
+/* probably kept in shared memory */
+static FILESYS FileSys[MAXFILESYS]; /* queue file systems */
+static char *FSPath[MAXFILESYS]; /* pathnames for file systems */
+
+#if SM_CONF_SHM
+
+/*
+** Shared memory data
+**
+** Current layout:
+** size -- size of shared memory segment
+** pid -- pid of owner, should be a unique id to avoid misinterpretations
+** by other processes.
+** tag -- should be a unique id to avoid misinterpretations by others.
+** idea: hash over configuration data that will be stored here.
+** NumFileSys -- number of file systems.
+** FileSys -- (arrary of) structure for used file systems.
+** RSATmpCnt -- counter for number of uses of ephemeral RSA key.
+** QShm -- (array of) structure for information about queue directories.
+*/
+
+/*
+** Queue data in shared memory
+*/
+
+typedef struct queue_shared QUEUE_SHM_T;
+
+struct queue_shared
+{
+ int qs_entries; /* number of entries */
+ /* XXX more to follow? */
+};
+
+static void *Pshm; /* pointer to shared memory */
+static FILESYS *PtrFileSys; /* pointer to queue file system array */
+int ShmId = SM_SHM_NO_ID; /* shared memory id */
+static QUEUE_SHM_T *QShm; /* pointer to shared queue data */
+
+# define SHM_OFF_PID(p) (((char *) (p)) + sizeof(int))
+# define SHM_OFF_TAG(p) (((char *) (p)) + sizeof(pid_t) + sizeof(int))
+# define SHM_OFF_HEAD (sizeof(pid_t) + sizeof(int) * 2)
+
+/* how to access FileSys */
+# define FILE_SYS(i) (PtrFileSys[i])
+
+/* first entry is a tag, for now just the size */
+# define OFF_FILE_SYS(p) (((char *) (p)) + SHM_OFF_HEAD)
+
+/* offset for PNumFileSys */
+# define OFF_NUM_FILE_SYS(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys))
+
+/* offset for PRSATmpCnt */
+# define OFF_RSA_TMP_CNT(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int))
+int *PRSATmpCnt;
+
+/* offset for queue_shm */
+# define OFF_QUEUE_SHM(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int) * 2)
+
+#define QSHM_ENTRIES(i) QShm[i].qs_entries
+
+/* basic size of shared memory segment */
+# define SM_T_SIZE (SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int) * 2)
+
+static unsigned int hash_q __P((char *, unsigned int));
+
+/*
+** HASH_Q -- simple hash function
+**
+** Parameters:
+** p -- string to hash.
+** h -- hash start value (from previous run).
+**
+** Returns:
+** hash value.
+*/
+
+static unsigned int
+hash_q(p, h)
+ char *p;
+ unsigned int h;
+{
+ int c, d;
+
+ while (*p != '\0')
+ {
+ d = *p++;
+ c = d;
+ c ^= c<<6;
+ h += (c<<11) ^ (c>>1);
+ h ^= (d<<14) + (d<<7) + (d<<4) + d;
+ }
+ return h;
+}
+
+#else /* SM_CONF_SHM */
+# define FILE_SYS(i) FileSys[i]
+#endif /* SM_CONF_SHM */
+
+/* access to the various components of file system data */
+#define FILE_SYS_NAME(i) FSPath[i]
+#define FILE_SYS_AVAIL(i) FILE_SYS(i).fs_avail
+#define FILE_SYS_BLKSIZE(i) FILE_SYS(i).fs_blksize
+#define FILE_SYS_DEV(i) FILE_SYS(i).fs_dev
/*
** Current qf file field assignments:
@@ -75,9 +274,10 @@ static int workcmpf4();
** B body type
** C controlling user
** D data file name
+** d data file directory name (added in 8.12)
** E error recipient
** F flag bits
-** G queue delay algorithm
+** G queue delay algorithm (_FFR_QUEUEDELAY)
** H header
** I data file's inode number
** K time of last delivery attempt
@@ -85,43 +285,44 @@ static int workcmpf4();
** M message (obsolete)
** N number of delivery attempts
** P message priority
+** q quarantine reason (_FFR_QUARANTINE)
** Q original recipient (ORCPT=)
+** r final recipient (Final-Recipient: DSN field)
** R recipient
** S sender
** T init time
** V queue file version
-** X character set (_FFR_SAVE_CHARSET)
-** Y current delay
+** X free (was: character set if _FFR_SAVE_CHARSET)
+** Y current delay (_FFR_QUEUEDELAY)
** Z original envelope id from ESMTP
+** ! deliver by (added in 8.12)
** $ define macro
** . terminate file
*/
- /*
+/*
** QUEUEUP -- queue a message up for future transmission.
**
** Parameters:
** e -- the envelope to queue up.
-** announce -- if TRUE, tell when you are queueing up.
+** announce -- if true, tell when you are queueing up.
+** msync -- if true, then fsync() if SuperSafe interactive mode.
**
** Returns:
** none.
**
** Side Effects:
-** The current request are saved in a control file.
+** The current request is saved in a control file.
** The queue file is left locked.
*/
-# define TEMPQF_LETTER 'T'
-# define LOSEQF_LETTER 'Q'
-
void
-queueup(e, announce)
+queueup(e, announce, msync)
register ENVELOPE *e;
bool announce;
+ bool msync;
{
- char *qf;
- register FILE *tfp;
+ register SM_FILE_T *tfp;
register HDR *h;
register ADDRESS *q;
int tfd = -1;
@@ -130,7 +331,9 @@ queueup(e, announce)
register char *p;
MAILER nullmailer;
MCI mcibuf;
+ char qf[MAXPATHLEN];
char tf[MAXPATHLEN];
+ char df[MAXPATHLEN];
char buf[MAXLINE];
/*
@@ -138,36 +341,28 @@ queueup(e, announce)
*/
newid = (e->e_id == NULL) || !bitset(EF_INQUEUE, e->e_flags);
-
- /* if newid, queuename will create a locked qf file in e->lockfp */
- (void) strlcpy(tf, queuename(e, 't'), sizeof tf);
+ (void) sm_strlcpy(tf, queuename(e, NEWQFL_LETTER), sizeof tf);
tfp = e->e_lockfp;
if (tfp == NULL)
- newid = FALSE;
+ newid = false;
- /* if newid, just write the qf file directly (instead of tf file) */
+ /* if newid, write the queue file directly (instead of temp file) */
if (!newid)
{
- int flags;
-
- flags = O_CREAT|O_WRONLY|O_EXCL;
+ const int flags = O_CREAT|O_WRONLY|O_EXCL;
/* get a locked tf file */
for (i = 0; i < 128; i++)
{
if (tfd < 0)
{
-#if _FFR_QUEUE_FILE_MODE
- MODE_T oldumask;
+ MODE_T oldumask = 0;
if (bitset(S_IWGRP, QueueFileMode))
oldumask = umask(002);
tfd = open(tf, flags, QueueFileMode);
if (bitset(S_IWGRP, QueueFileMode))
(void) umask(oldumask);
-#else /* _FFR_QUEUE_FILE_MODE */
- tfd = open(tf, flags, FileMode);
-#endif /* _FFR_QUEUE_FILE_MODE */
if (tfd < 0)
{
@@ -176,7 +371,8 @@ queueup(e, announce)
if (LogLevel > 0 && (i % 32) == 0)
sm_syslog(LOG_ALERT, e->e_id,
"queueup: cannot create %s, uid=%d: %s",
- tf, geteuid(), errstring(errno));
+ tf, geteuid(),
+ sm_errstring(errno));
}
}
if (tfd >= 0)
@@ -186,7 +382,7 @@ queueup(e, announce)
else if (LogLevel > 0 && (i % 32) == 0)
sm_syslog(LOG_ALERT, e->e_id,
"queueup: cannot lock %s: %s",
- tf, errstring(errno));
+ tf, sm_errstring(errno));
if ((i % 32) == 31)
{
(void) close(tfd);
@@ -202,11 +398,13 @@ queueup(e, announce)
else
(void) sleep(i % 32);
}
- if (tfd < 0 || (tfp = fdopen(tfd, "w")) == NULL)
+ if (tfd < 0 || (tfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT,
+ (void *) &tfd, SM_IO_WRONLY,
+ NULL)) == NULL)
{
int save_errno = errno;
- printopenfds(TRUE);
+ printopenfds(true);
errno = save_errno;
syserr("!queueup: cannot create queue temp file %s, uid=%d",
tf, geteuid());
@@ -214,146 +412,192 @@ queueup(e, announce)
}
if (tTd(40, 1))
- dprintf("\n>>>>> queueing %s/qf%s%s >>>>>\n",
- qid_printqueue(e->e_queuedir), e->e_id,
- newid ? " (new id)" : "");
+ sm_dprintf("\n>>>>> queueing %s/%s%s >>>>>\n",
+ qid_printqueue(e->e_qgrp, e->e_qdir),
+ queuename(e, ANYQFL_LETTER),
+ newid ? " (new id)" : "");
if (tTd(40, 3))
{
- dprintf(" e_flags=");
+ sm_dprintf(" e_flags=");
printenvflags(e);
}
if (tTd(40, 32))
{
- dprintf(" sendq=");
- printaddr(e->e_sendqueue, TRUE);
+ sm_dprintf(" sendq=");
+ printaddr(e->e_sendqueue, true);
}
if (tTd(40, 9))
{
- dprintf(" tfp=");
- dumpfd(fileno(tfp), TRUE, FALSE);
- dprintf(" lockfp=");
+ sm_dprintf(" tfp=");
+ dumpfd(sm_io_getinfo(tfp, SM_IO_WHAT_FD, NULL), true, false);
+ sm_dprintf(" lockfp=");
if (e->e_lockfp == NULL)
- dprintf("NULL\n");
+ sm_dprintf("NULL\n");
else
- dumpfd(fileno(e->e_lockfp), TRUE, FALSE);
+ dumpfd(sm_io_getinfo(e->e_lockfp, SM_IO_WHAT_FD, NULL),
+ true, false);
}
/*
** If there is no data file yet, create one.
*/
+ (void) sm_strlcpy(df, queuename(e, DATAFL_LETTER), sizeof df);
if (bitset(EF_HAS_DF, e->e_flags))
{
- if (e->e_dfp != NULL && bfcommit(e->e_dfp) < 0)
+ if (e->e_dfp != NULL &&
+ SuperSafe != SAFE_REALLY &&
+ sm_io_setinfo(e->e_dfp, SM_BF_COMMIT, NULL) < 0 &&
+ errno != EINVAL)
+ {
syserr("!queueup: cannot commit data file %s, uid=%d",
- queuename(e, 'd'), geteuid());
+ queuename(e, DATAFL_LETTER), geteuid());
+ }
+ if (e->e_dfp != NULL &&
+ SuperSafe == SAFE_INTERACTIVE && msync)
+ {
+ if (tTd(40,32))
+ sm_syslog(LOG_INFO, e->e_id,
+ "queueup: fsync(e->e_dfp)");
+
+ if (fsync(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD,
+ NULL)) < 0)
+ {
+ if (newid)
+ syserr("!552 Error writing data file %s",
+ df);
+ else
+ syserr("!452 Error writing data file %s",
+ df);
+ }
+ }
}
else
{
int dfd;
- register FILE *dfp = NULL;
- char dfname[MAXPATHLEN];
+ MODE_T oldumask = 0;
+ register SM_FILE_T *dfp = NULL;
struct stat stbuf;
- if (e->e_dfp != NULL && bftest(e->e_dfp))
+ if (e->e_dfp != NULL &&
+ sm_io_getinfo(e->e_dfp, SM_IO_WHAT_ISTYPE, BF_FILE_TYPE))
syserr("committing over bf file");
- (void) strlcpy(dfname, queuename(e, 'd'), sizeof dfname);
-#if _FFR_QUEUE_FILE_MODE
- {
- MODE_T oldumask;
-
- if (bitset(S_IWGRP, QueueFileMode))
- oldumask = umask(002);
- dfd = open(dfname, O_WRONLY|O_CREAT|O_TRUNC,
- QueueFileMode);
- if (bitset(S_IWGRP, QueueFileMode))
- (void) umask(oldumask);
- }
-#else /* _FFR_QUEUE_FILE_MODE */
- dfd = open(dfname, O_WRONLY|O_CREAT|O_TRUNC, FileMode);
-#endif /* _FFR_QUEUE_FILE_MODE */
- if (dfd < 0 || (dfp = fdopen(dfd, "w")) == NULL)
+ if (bitset(S_IWGRP, QueueFileMode))
+ oldumask = umask(002);
+ dfd = open(df, O_WRONLY|O_CREAT|O_TRUNC, QueueFileMode);
+ if (bitset(S_IWGRP, QueueFileMode))
+ (void) umask(oldumask);
+ if (dfd < 0 || (dfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT,
+ (void *) &dfd, SM_IO_WRONLY,
+ NULL)) == NULL)
syserr("!queueup: cannot create data temp file %s, uid=%d",
- dfname, geteuid());
+ df, geteuid());
if (fstat(dfd, &stbuf) < 0)
e->e_dfino = -1;
else
{
e->e_dfdev = stbuf.st_dev;
- e->e_dfino = stbuf.st_ino;
+ e->e_dfino = ST_INODE(stbuf);
}
e->e_flags |= EF_HAS_DF;
memset(&mcibuf, '\0', sizeof mcibuf);
mcibuf.mci_out = dfp;
mcibuf.mci_mailer = FileMailer;
(*e->e_putbody)(&mcibuf, e, NULL);
- if (fclose(dfp) < 0)
+
+ if (SuperSafe == SAFE_REALLY ||
+ (SuperSafe == SAFE_INTERACTIVE && msync))
+ {
+ if (tTd(40,32))
+ sm_syslog(LOG_INFO, e->e_id,
+ "queueup: fsync(dfp)");
+
+ if (fsync(sm_io_getinfo(dfp, SM_IO_WHAT_FD, NULL)) < 0)
+ {
+ if (newid)
+ syserr("!552 Error writing data file %s",
+ df);
+ else
+ syserr("!452 Error writing data file %s",
+ df);
+ }
+ }
+
+ if (sm_io_close(dfp, SM_TIME_DEFAULT) < 0)
syserr("!queueup: cannot save data temp file %s, uid=%d",
- dfname, geteuid());
+ df, geteuid());
e->e_putbody = putbody;
}
/*
** Output future work requests.
** Priority and creation time should be first, since
- ** they are required by orderq.
+ ** they are required by gatherq.
*/
/* output queue version number (must be first!) */
- fprintf(tfp, "V%d\n", QF_VERSION);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "V%d\n", QF_VERSION);
/* output creation time */
- fprintf(tfp, "T%ld\n", (long) e->e_ctime);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "T%ld\n", (long) e->e_ctime);
/* output last delivery time */
-# if _FFR_QUEUEDELAY
- fprintf(tfp, "K%ld\n", (long) e->e_dtime);
- fprintf(tfp, "G%d\n", e->e_queuealg);
- fprintf(tfp, "Y%ld\n", (long) e->e_queuedelay);
+#if _FFR_QUEUEDELAY
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "K%ld\n", (long) e->e_dtime);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "G%d\n", e->e_queuealg);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Y%ld\n", (long) e->e_queuedelay);
if (tTd(40, 64))
sm_syslog(LOG_INFO, e->e_id,
"queue alg: %d delay %ld next: %ld (now: %ld)\n",
e->e_queuealg, e->e_queuedelay, e->e_dtime, curtime());
-# else /* _FFR_QUEUEDELAY */
- fprintf(tfp, "K%ld\n", (long) e->e_dtime);
-# endif /* _FFR_QUEUEDELAY */
+#else /* _FFR_QUEUEDELAY */
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "K%ld\n", (long) e->e_dtime);
+#endif /* _FFR_QUEUEDELAY */
/* output number of delivery attempts */
- fprintf(tfp, "N%d\n", e->e_ntries);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "N%d\n", e->e_ntries);
/* output message priority */
- fprintf(tfp, "P%ld\n", e->e_msgpriority);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "P%ld\n", e->e_msgpriority);
+
+ /*
+ ** If data file is in a different directory than the queue file,
+ ** output a "d" record naming the directory of the data file.
+ */
+
+ if (e->e_dfqgrp != e->e_qgrp)
+ {
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "d%s\n",
+ Queue[e->e_dfqgrp]->qg_qpaths[e->e_dfqdir].qp_name);
+ }
/* output inode number of data file */
/* XXX should probably include device major/minor too */
if (e->e_dfino != -1)
{
- /*CONSTCOND*/
- if (sizeof e->e_dfino > sizeof(long))
- fprintf(tfp, "I%ld/%ld/%s\n",
- (long) major(e->e_dfdev),
- (long) minor(e->e_dfdev),
- quad_to_string(e->e_dfino));
- else
- fprintf(tfp, "I%ld/%ld/%lu\n",
- (long) major(e->e_dfdev),
- (long) minor(e->e_dfdev),
- (unsigned long) e->e_dfino);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "I%ld/%ld/%llu\n",
+ (long) major(e->e_dfdev),
+ (long) minor(e->e_dfdev),
+ (ULONGLONG_T) e->e_dfino);
}
/* output body type */
if (e->e_bodytype != NULL)
- fprintf(tfp, "B%s\n", denlstring(e->e_bodytype, TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "B%s\n",
+ denlstring(e->e_bodytype, true, false));
-# if _FFR_SAVE_CHARSET
- if (e->e_charset != NULL)
- fprintf(tfp, "X%s\n", denlstring(e->e_charset, TRUE, FALSE));
-# endif /* _FFR_SAVE_CHARSET */
+#if _FFR_QUARANTINE
+ /* quarantine reason */
+ if (e->e_quarmsg != NULL)
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "q%s\n",
+ denlstring(e->e_quarmsg, true, false));
+#endif /* _FFR_QUARANTINE */
/* message from envelope, if it exists */
if (e->e_message != NULL)
- fprintf(tfp, "M%s\n", denlstring(e->e_message, TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "M%s\n",
+ denlstring(e->e_message, true, false));
/* send various flag bits through */
p = buf;
@@ -369,28 +613,35 @@ queueup(e, announce)
*p++ = 'd';
if (bitset(EF_NO_BODY_RETN, e->e_flags))
*p++ = 'n';
+ if (bitset(EF_SPLIT, e->e_flags))
+ *p++ = 's';
*p++ = '\0';
if (buf[0] != '\0')
- fprintf(tfp, "F%s\n", buf);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "F%s\n", buf);
/* save $={persistentMacros} macro values */
- queueup_macros(macid("{persistentMacros}", NULL), tfp, e);
+ queueup_macros(macid("{persistentMacros}"), tfp, e);
/* output name of sender */
if (bitnset(M_UDBENVELOPE, e->e_from.q_mailer->m_flags))
p = e->e_sender;
else
p = e->e_from.q_paddr;
- fprintf(tfp, "S%s\n", denlstring(p, TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "S%s\n",
+ denlstring(p, true, false));
/* output ESMTP-supplied "original" information */
if (e->e_envid != NULL)
- fprintf(tfp, "Z%s\n", denlstring(e->e_envid, TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Z%s\n",
+ denlstring(e->e_envid, true, false));
/* output AUTH= parameter */
if (e->e_auth_param != NULL)
- fprintf(tfp, "A%s\n", denlstring(e->e_auth_param,
- TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "A%s\n",
+ denlstring(e->e_auth_param, true, false));
+ if (e->e_dlvr_flag != 0)
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "!%c %ld\n",
+ (char) e->e_dlvr_flag, e->e_deliver_by);
/* output list of recipient addresses */
printctladdr(NULL, NULL);
@@ -399,40 +650,58 @@ queueup(e, announce)
if (!QS_IS_UNDELIVERED(q->q_state))
continue;
+ /* message for this recipient, if it exists */
+ if (q->q_message != NULL)
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "M%s\n",
+ denlstring(q->q_message, true,
+ false));
+
printctladdr(q, tfp);
if (q->q_orcpt != NULL)
- fprintf(tfp, "Q%s\n",
- denlstring(q->q_orcpt, TRUE, FALSE));
-
- (void) putc('R', tfp);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Q%s\n",
+ denlstring(q->q_orcpt, true,
+ false));
+ if (q->q_finalrcpt != NULL)
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "r%s\n",
+ denlstring(q->q_finalrcpt, true,
+ false));
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'R');
if (bitset(QPRIMARY, q->q_flags))
- (void) putc('P', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'P');
if (bitset(QHASNOTIFY, q->q_flags))
- (void) putc('N', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'N');
if (bitset(QPINGONSUCCESS, q->q_flags))
- (void) putc('S', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'S');
if (bitset(QPINGONFAILURE, q->q_flags))
- (void) putc('F', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'F');
if (bitset(QPINGONDELAY, q->q_flags))
- (void) putc('D', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'D');
if (q->q_alias != NULL &&
bitset(QALIAS, q->q_alias->q_flags))
- (void) putc('A', tfp);
- (void) putc(':', tfp);
- (void) fprintf(tfp, "%s\n", denlstring(q->q_paddr, TRUE, FALSE));
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'A');
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, ':');
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s\n",
+ denlstring(q->q_paddr, true, false));
if (announce)
{
+ char *tag = "queued";
+
+#if _FFR_QUARANTINE
+ if (e->e_quarmsg != NULL)
+ tag = "quarantined";
+#endif /* _FFR_QUARANTINE */
+
e->e_to = q->q_paddr;
- message("queued");
+ message(tag);
if (LogLevel > 8)
logdelivery(q->q_mailer, NULL, q->q_status,
- "queued", NULL, (time_t) 0, e);
+ tag, NULL, (time_t) 0, e);
e->e_to = NULL;
}
if (tTd(40, 1))
{
- dprintf("queueing ");
- printaddr(q, FALSE);
+ sm_dprintf("queueing ");
+ printaddr(q, false);
}
}
@@ -454,7 +723,7 @@ queueup(e, announce)
mcibuf.mci_mailer = &nullmailer;
mcibuf.mci_out = tfp;
- define('g', "\201f", e);
+ macdefine(&e->e_macro, A_PERM, 'g', "\201f");
for (h = e->e_header; h != NULL; h = h->h_link)
{
if (h->h_value == NULL)
@@ -474,16 +743,18 @@ queueup(e, announce)
}
/* output this header */
- fprintf(tfp, "H?");
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "H?");
/* output conditional macro if present */
if (h->h_macro != '\0')
{
if (bitset(0200, h->h_macro))
- fprintf(tfp, "${%s}",
- macname(bitidx(h->h_macro)));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT,
+ "${%s}",
+ macname(bitidx(h->h_macro)));
else
- fprintf(tfp, "$%c", h->h_macro);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT,
+ "$%c", h->h_macro);
}
else if (!bitzerop(h->h_mflags) &&
bitset(H_CHECK|H_ACHECK, h->h_flags))
@@ -493,28 +764,29 @@ queueup(e, announce)
/* if conditional, output the set of conditions */
for (j = '\0'; j <= '\177'; j++)
if (bitnset(j, h->h_mflags))
- (void) putc(j, tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT,
+ j);
}
- (void) putc('?', tfp);
+ (void) sm_io_putc(tfp, SM_TIME_DEFAULT, '?');
/* output the header: expand macros, convert addresses */
if (bitset(H_DEFAULT, h->h_flags) &&
!bitset(H_BINDLATE, h->h_flags))
{
- fprintf(tfp, "%s: %s\n",
- h->h_field,
- denlstring(buf, FALSE, TRUE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s: %s\n",
+ h->h_field,
+ denlstring(buf, false, true));
}
else if (bitset(H_FROM|H_RCPT, h->h_flags) &&
!bitset(H_BINDLATE, h->h_flags))
{
bool oldstyle = bitset(EF_OLDSTYLE, e->e_flags);
- FILE *savetrace = TrafficLogFile;
+ SM_FILE_T *savetrace = TrafficLogFile;
TrafficLogFile = NULL;
if (bitset(H_FROM, h->h_flags))
- oldstyle = FALSE;
+ oldstyle = false;
commaize(h, h->h_value, oldstyle, &mcibuf, e);
@@ -522,9 +794,10 @@ queueup(e, announce)
}
else
{
- fprintf(tfp, "%s: %s\n",
- h->h_field,
- denlstring(h->h_value, FALSE, TRUE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s: %s\n",
+ h->h_field,
+ denlstring(h->h_value, false,
+ true));
}
}
@@ -535,11 +808,13 @@ queueup(e, announce)
** scurrilous crackers from appending any data.
*/
- fprintf(tfp, ".\n");
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, ".\n");
- if (fflush(tfp) != 0 ||
- (SuperSafe && fsync(fileno(tfp)) < 0) ||
- ferror(tfp))
+ if (sm_io_flush(tfp, SM_TIME_DEFAULT) != 0 ||
+ ((SuperSafe == SAFE_REALLY ||
+ (SuperSafe == SAFE_INTERACTIVE && msync)) &&
+ fsync(sm_io_getinfo(tfp, SM_IO_WHAT_FD, NULL)) < 0) ||
+ sm_io_error(tfp))
{
if (newid)
syserr("!552 Error writing control file %s", tf);
@@ -549,44 +824,112 @@ queueup(e, announce)
if (!newid)
{
- /* rename (locked) tf to be (locked) qf */
- qf = queuename(e, 'q');
+#if _FFR_QUARANTINE
+ char new = queue_letter(e, ANYQFL_LETTER);
+#endif /* _FFR_QUARANTINE */
+
+ /* rename (locked) tf to be (locked) [qh]f */
+ (void) sm_strlcpy(qf, queuename(e, ANYQFL_LETTER),
+ sizeof qf);
if (rename(tf, qf) < 0)
syserr("cannot rename(%s, %s), uid=%d",
tf, qf, geteuid());
+# if _FFR_QUARANTINE
+ else
+ {
+ /*
+ ** Check if type has changed and only
+ ** remove the old item if the rename above
+ ** succeeded.
+ */
+
+ if (e->e_qfletter != '\0' &&
+ e->e_qfletter != new)
+ {
+ if (tTd(40, 5))
+ {
+ sm_dprintf("type changed from %c to %c\n",
+ e->e_qfletter, new);
+ }
+
+ if (unlink(queuename(e, e->e_qfletter)) < 0)
+ {
+ /* XXX: something more drastic? */
+ if (LogLevel > 0)
+ sm_syslog(LOG_ERR, e->e_id,
+ "queueup: unlink(%s) failed: %s",
+ queuename(e, e->e_qfletter),
+ sm_errstring(errno));
+ }
+ }
+ }
+ e->e_qfletter = new;
+# endif /* _FFR_QUARANTINE */
+
/*
- ** fsync() after renaming to make sure
- ** metadata is written to disk on
- ** filesystems in which renames are
- ** not guaranteed such as softupdates.
+ ** fsync() after renaming to make sure metadata is
+ ** written to disk on filesystems in which renames are
+ ** not guaranteed.
*/
- if (tfd >= 0 && SuperSafe && fsync(tfd) < 0)
- syserr("!queueup: cannot fsync queue temp file %s", tf);
+ if (SuperSafe != SAFE_NO)
+ {
+ /* for softupdates */
+ if (tfd >= 0 && fsync(tfd) < 0)
+ {
+ syserr("!queueup: cannot fsync queue temp file %s",
+ tf);
+ }
+ SYNC_DIR(qf, true);
+ }
- /* close and unlock old (locked) qf */
+ /* close and unlock old (locked) queue file */
if (e->e_lockfp != NULL)
- (void) fclose(e->e_lockfp);
+ (void) sm_io_close(e->e_lockfp, SM_TIME_DEFAULT);
e->e_lockfp = tfp;
+
+ /* save log info */
+ if (LogLevel > 79)
+ sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", qf);
}
else
- qf = tf;
+ {
+ /* save log info */
+ if (LogLevel > 79)
+ sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", tf);
+
+#if _FFR_QUARANTINE
+ e->e_qfletter = queue_letter(e, ANYQFL_LETTER);
+#endif /* _FFR_QUARANTINE */
+ }
+
errno = 0;
e->e_flags |= EF_INQUEUE;
- /* save log info */
- if (LogLevel > 79)
- sm_syslog(LOG_DEBUG, e->e_id, "queueup, qf=%s", qf);
-
if (tTd(40, 1))
- dprintf("<<<<< done queueing %s <<<<<\n\n", e->e_id);
+ sm_dprintf("<<<<< done queueing %s <<<<<\n\n", e->e_id);
return;
}
+/*
+** PRINTCTLADDR -- print control address to file.
+**
+** Parameters:
+** a -- address.
+** tfp -- file pointer.
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** The control address (if changed) is printed to the file.
+** The last control address and uid are saved.
+*/
+
static void
printctladdr(a, tfp)
register ADDRESS *a;
- FILE *tfp;
+ SM_FILE_T *tfp;
{
char *user;
register ADDRESS *q;
@@ -599,7 +942,7 @@ printctladdr(a, tfp)
if (a == NULL || a->q_alias == NULL || tfp == NULL)
{
if (lastctladdr != NULL && tfp != NULL)
- fprintf(tfp, "C\n");
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C\n");
lastctladdr = NULL;
lastuid = 0;
return;
@@ -629,77 +972,414 @@ printctladdr(a, tfp)
lastctladdr = a;
if (uid == 0 || user == NULL || user[0] == '\0')
- fprintf(tfp, "C");
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C");
else
- fprintf(tfp, "C%s:%ld:%ld",
- denlstring(user, TRUE, FALSE), (long) uid, (long) gid);
- fprintf(tfp, ":%s\n", denlstring(a->q_paddr, TRUE, FALSE));
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C%s:%ld:%ld",
+ denlstring(user, true, false), (long) uid,
+ (long) gid);
+ (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, ":%s\n",
+ denlstring(a->q_paddr, true, false));
}
- /*
+
+/*
+** RUNNERS_SIGTERM -- propagate a SIGTERM to queue runner process
+**
+** This propagates the signal to the child processes that are queue
+** runners. This is for a queue runner "cleanup". After all of the
+** child queue runner processes are signaled (it should be SIGTERM
+** being the sig) then the old signal handler (Oldsh) is called
+** to handle any cleanup set for this process (provided it is not
+** SIG_DFL or SIG_IGN). The signal may not be handled immediately
+** if the BlockOldsh flag is set. If the current process doesn't
+** have a parent then handle the signal immediately, regardless of
+** BlockOldsh.
+**
+** Parameters:
+** sig -- the signal number being sent
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** Sets the NoMoreRunners boolean to true to stop more runners
+** from being started in runqueue().
+**
+** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD
+** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE
+** DOING.
+*/
+
+static bool volatile NoMoreRunners = false;
+static sigfunc_t Oldsh_term = SIG_DFL;
+static sigfunc_t Oldsh_hup = SIG_DFL;
+static sigfunc_t volatile Oldsh = SIG_DFL;
+static bool BlockOldsh = false;
+static int volatile Oldsig = 0;
+static SIGFUNC_DECL runners_sigterm __P((int));
+static SIGFUNC_DECL runners_sighup __P((int));
+
+static SIGFUNC_DECL
+runners_sigterm(sig)
+ int sig;
+{
+ int save_errno = errno;
+
+ FIX_SYSV_SIGNAL(sig, runners_sigterm);
+ errno = save_errno;
+ CHECK_CRITICAL(sig);
+ NoMoreRunners = true;
+ Oldsh = Oldsh_term;
+ Oldsig = sig;
+ proc_list_signal(PROC_QUEUE, sig);
+
+ if (!BlockOldsh || getppid() <= 1)
+ {
+ /* Check that a valid 'old signal handler' is callable */
+ if (Oldsh_term != SIG_DFL && Oldsh_term != SIG_IGN &&
+ Oldsh_term != runners_sigterm)
+ (*Oldsh_term)(sig);
+ }
+ errno = save_errno;
+ return SIGFUNC_RETURN;
+}
+/*
+** RUNNERS_SIGHUP -- propagate a SIGHUP to queue runner process
+**
+** This propagates the signal to the child processes that are queue
+** runners. This is for a queue runner "cleanup". After all of the
+** child queue runner processes are signaled (it should be SIGHUP
+** being the sig) then the old signal handler (Oldsh) is called to
+** handle any cleanup set for this process (provided it is not SIG_DFL
+** or SIG_IGN). The signal may not be handled immediately if the
+** BlockOldsh flag is set. If the current process doesn't have
+** a parent then handle the signal immediately, regardless of
+** BlockOldsh.
+**
+** Parameters:
+** sig -- the signal number being sent
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** Sets the NoMoreRunners boolean to true to stop more runners
+** from being started in runqueue().
+**
+** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD
+** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE
+** DOING.
+*/
+
+static SIGFUNC_DECL
+runners_sighup(sig)
+ int sig;
+{
+ int save_errno = errno;
+
+ FIX_SYSV_SIGNAL(sig, runners_sighup);
+ errno = save_errno;
+ CHECK_CRITICAL(sig);
+ NoMoreRunners = true;
+ Oldsh = Oldsh_hup;
+ Oldsig = sig;
+ proc_list_signal(PROC_QUEUE, sig);
+
+ if (!BlockOldsh || getppid() <= 1)
+ {
+ /* Check that a valid 'old signal handler' is callable */
+ if (Oldsh_hup != SIG_DFL && Oldsh_hup != SIG_IGN &&
+ Oldsh_hup != runners_sighup)
+ (*Oldsh_hup)(sig);
+ }
+ errno = save_errno;
+ return SIGFUNC_RETURN;
+}
+/*
+** MARK_WORK_GROUP_RESTART -- mark a work group as needing a restart
+**
+** Sets a workgroup for restarting.
+**
+** Parameters:
+** wgrp -- the work group id to restart.
+** reason -- why (signal?), -1 to turn off restart
+**
+** Returns:
+** none.
+**
+** Side effects:
+** May set global RestartWorkGroup to true.
+**
+** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD
+** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE
+** DOING.
+*/
+
+void
+mark_work_group_restart(wgrp, reason)
+ int wgrp;
+ int reason;
+{
+ if (wgrp < 0 || wgrp > NumWorkGroups)
+ return;
+
+ WorkGrp[wgrp].wg_restart = reason;
+ if (reason >= 0)
+ RestartWorkGroup = true;
+}
+/*
+** RESTART_MARKED_WORK_GROUPS -- restart work groups marked as needing restart
+**
+** Restart any workgroup marked as needing a restart provided more
+** runners are allowed.
+**
+** Parameters:
+** none.
+**
+** Returns:
+** none.
+**
+** Side effects:
+** Sets global RestartWorkGroup to false.
+*/
+
+void
+restart_marked_work_groups()
+{
+ int i;
+ int wasblocked;
+
+ if (NoMoreRunners)
+ return;
+
+ /* Block SIGCHLD so reapchild() doesn't mess with us */
+ wasblocked = sm_blocksignal(SIGCHLD);
+
+ for (i = 0; i < NumWorkGroups; i++)
+ {
+ if (WorkGrp[i].wg_restart >= 0)
+ {
+ if (LogLevel > 8)
+ sm_syslog(LOG_ERR, NOQID,
+ "restart queue runner=%d due to signal 0x%x",
+ i, WorkGrp[i].wg_restart);
+ restart_work_group(i);
+ }
+ }
+ RestartWorkGroup = false;
+
+ if (wasblocked == 0)
+ (void) sm_releasesignal(SIGCHLD);
+}
+/*
+** RESTART_WORK_GROUP -- restart a specific work group
+**
+** Restart a specific workgroup provided more runners are allowed.
+** If the requested work group has been restarted too many times log
+** this and refuse to restart.
+**
+** Parameters:
+** wgrp -- the work group id to restart
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** starts another process doing the work of wgrp
+*/
+
+#define MAX_PERSIST_RESTART 10 /* max allowed number of restarts */
+
+static void
+restart_work_group(wgrp)
+ int wgrp;
+{
+ if (NoMoreRunners ||
+ wgrp < 0 || wgrp > NumWorkGroups)
+ return;
+
+ WorkGrp[wgrp].wg_restart = -1;
+ if (WorkGrp[wgrp].wg_restartcnt < MAX_PERSIST_RESTART)
+ {
+ /* avoid overflow; increment here */
+ WorkGrp[wgrp].wg_restartcnt++;
+ (void) run_work_group(wgrp, true, false, true, true);
+ }
+ else
+ {
+ sm_syslog(LOG_ERR, NOQID,
+ "ERROR: persistent queue runner=%d restarted too many times, queue runner lost",
+ wgrp);
+ }
+}
+/*
+** SCHEDULE_QUEUE_RUNS -- schedule the next queue run for a work group.
+**
+** Parameters:
+** runall -- schedule even if individual bit is not set.
+** wgrp -- the work group id to schedule.
+**
+** Returns:
+** nothing
+*/
+
+#define INCR_MOD(v, m) if (++v >= m) \
+ v = 0; \
+ else
+
+static void
+schedule_queue_runs(runall, wgrp)
+ bool runall;
+ int wgrp;
+{
+ int qgrp, cgrp, endgrp;
+
+ /*
+ ** This is a bit ugly since we have to duplicate the
+ ** code that "walks" through a work queue group.
+ */
+
+ cgrp = endgrp = WorkGrp[wgrp].wg_curqgrp;
+ do
+ {
+ time_t qintvl;
+
+ qgrp = WorkGrp[wgrp].wg_qgs[cgrp]->qg_index;
+ if (Queue[qgrp]->qg_queueintvl > 0)
+ qintvl = Queue[qgrp]->qg_queueintvl;
+ else if (QueueIntvl > 0)
+ qintvl = QueueIntvl;
+ else
+ qintvl = (time_t) 0;
+ if ((runall || bitnset(qgrp, DoQueueRun)) && qintvl > 0)
+ (void) sm_setevent(qintvl, runqueueevent, qgrp);
+#if _FFR_QUEUE_SCHED_DBG
+ if (tTd(69, 10))
+ sm_syslog(LOG_INFO, NOQID,
+ "sqr: wgrp=%d, cgrp=%d, qgrp=%d, intvl=%ld, QI=%ld, runall=%d, bit=%d, sched=%d",
+ wgrp, cgrp, qgrp, Queue[qgrp]->qg_queueintvl,
+ QueueIntvl, runall, bitnset(qgrp, DoQueueRun),
+ (runall || bitnset(qgrp, DoQueueRun)) &&
+ qintvl > 0);
+#endif /* _FFR_QUEUE_SCHED_DBG */
+ clrbitn(qgrp, DoQueueRun);
+ INCR_MOD(cgrp, WorkGrp[wgrp].wg_numqgrp);
+ } while (endgrp != cgrp);
+}
+/*
** RUNQUEUE -- run the jobs in the queue.
**
** Gets the stuff out of the queue in some presumably logical
** order and processes them.
**
** Parameters:
-** forkflag -- TRUE if the queue scanning should be done in
+** forkflag -- true if the queue scanning should be done in
** a child process. We double-fork so it is not our
** child and we don't have to clean up after it.
-** FALSE can be ignored if we have multiple queues.
-** verbose -- if TRUE, print out status information.
+** false can be ignored if we have multiple queues.
+** verbose -- if true, print out status information.
+** persistent -- persistent queue runner?
+** runall -- run all groups or only a subset (DoQueueRun)?
**
** Returns:
-** TRUE if the queue run successfully began.
+** true if the queue run successfully began.
**
** Side Effects:
-** runs things in the mail queue.
+** runs things in the mail queue using run_work_group().
+** maybe schedules next queue run.
+**
*/
static ENVELOPE QueueEnvelope; /* the queue run envelope */
-int NumQueues = 0; /* number of queues */
static time_t LastQueueTime = 0; /* last time a queue ID assigned */
static pid_t LastQueuePid = -1; /* last PID which had a queue ID */
-struct qpaths_s
-{
- char *qp_name; /* name of queue dir */
- short qp_subdirs; /* use subdirs? */
-};
-
-typedef struct qpaths_s QPATHS;
-
/* values for qp_supdirs */
#define QP_NOSUB 0x0000 /* No subdirectories */
#define QP_SUBDF 0x0001 /* "df" subdirectory */
#define QP_SUBQF 0x0002 /* "qf" subdirectory */
#define QP_SUBXF 0x0004 /* "xf" subdirectory */
-static QPATHS *QPaths = NULL; /* list of queue directories */
-
bool
-runqueue(forkflag, verbose)
+runqueue(forkflag, verbose, persistent, runall)
bool forkflag;
bool verbose;
+ bool persistent;
+ bool runall;
{
int i;
- bool ret = TRUE;
+ bool ret = true;
static int curnum = 0;
+ sigfunc_t cursh;
+#if SM_HEAP_CHECK
+ SM_NONVOLATILE int oldgroup = 0;
+
+ if (sm_debug_active(&DebugLeakQ, 1))
+ {
+ oldgroup = sm_heap_group();
+ sm_heap_newgroup();
+ sm_dprintf("runqueue() heap group #%d\n", sm_heap_group());
+ }
+#endif /* SM_HEAP_CHECK */
+
+ /* queue run has been started, don't do any more this time */
+ clrbitn(NumQueue, DoQueueRun);
- DoQueueRun = FALSE;
+ /* more than one queue or more than one directory per queue */
+ if (!forkflag && !verbose &&
+ (WorkGrp[0].wg_qgs[0]->qg_numqueues > 1 || NumWorkGroups > 1 ||
+ WorkGrp[0].wg_numqgrp > 1))
+ forkflag = true;
+ /*
+ ** For controlling queue runners via signals sent to this process.
+ ** Oldsh* will get called too by runners_sig* (if it is not SIG_IGN
+ ** or SIG_DFL) to preserve cleanup behavior. Now that this process
+ ** will have children (and perhaps grandchildren) this handler will
+ ** be left in place. This is because this process, once it has
+ ** finished spinning off queue runners, may go back to doing something
+ ** else (like being a daemon). And we still want on a SIG{TERM,HUP} to
+ ** clean up the child queue runners. Only install 'runners_sig*' once
+ ** else we'll get stuck looping forever.
+ */
- if (!forkflag && NumQueues > 1 && !verbose)
- forkflag = TRUE;
+ cursh = sm_signal(SIGTERM, runners_sigterm);
+ if (cursh != runners_sigterm)
+ Oldsh_term = cursh;
+ cursh = sm_signal(SIGHUP, runners_sighup);
+ if (cursh != runners_sighup)
+ Oldsh_hup = cursh;
- for (i = 0; i < NumQueues; i++)
+ for (i = 0; i < NumWorkGroups && !NoMoreRunners; i++)
{
/*
- ** Pick up where we left off, in case we
- ** used up all the children last time
- ** without finishing.
+ ** If MaxQueueChildren active then test whether the start
+ ** of the next queue group's additional queue runners (maximum)
+ ** will result in MaxQueueChildren being exceeded.
+ **
+ ** Note: do not use continue; even though another workgroup
+ ** may have fewer queue runners, this would be "unfair",
+ ** i.e., this work group might "starve" then.
+ */
+
+#if _FFR_QUEUE_SCHED_DBG
+ if (tTd(69, 10))
+ sm_syslog(LOG_INFO, NOQID,
+ "rq: curnum=%d, MaxQueueChildren=%d, CurRunners=%d, WorkGrp[curnum].wg_maxact=%d",
+ curnum, MaxQueueChildren, CurRunners,
+ WorkGrp[curnum].wg_maxact);
+#endif /* _FFR_QUEUE_SCHED_DBG */
+ if (MaxQueueChildren > 0 &&
+ CurRunners + WorkGrp[curnum].wg_maxact > MaxQueueChildren)
+ break;
+
+ /*
+ ** Pick up where we left off (curnum), in case we
+ ** used up all the children last time without finishing.
+ ** This give a round-robin fairness to queue runs.
*/
- ret = run_single_queue(curnum, forkflag, verbose);
+ ret = run_work_group(curnum, forkflag, verbose, persistent,
+ runall);
/*
** Failure means a message was printed for ETRN
@@ -709,71 +1389,289 @@ runqueue(forkflag, verbose)
if (!ret)
break;
- if (++curnum >= NumQueues)
- curnum = 0;
+ /* Success means the runner count needs to be updated. */
+ CurRunners += WorkGrp[curnum].wg_maxact;
+ if (!persistent)
+ schedule_queue_runs(runall, curnum);
+ INCR_MOD(curnum, NumWorkGroups);
+ }
+
+ /* schedule left over queue runs */
+ if (i < NumWorkGroups && !NoMoreRunners && !persistent)
+ {
+ int h;
+
+ for (h = curnum; i < NumWorkGroups; i++)
+ {
+ schedule_queue_runs(runall, h);
+ INCR_MOD(h, NumWorkGroups);
+ }
}
- if (QueueIntvl != 0)
- (void) setevent(QueueIntvl, runqueueevent, 0);
+
+
+#if SM_HEAP_CHECK
+ if (sm_debug_active(&DebugLeakQ, 1))
+ sm_heap_setgroup(oldgroup);
+#endif /* SM_HEAP_CHECK */
return ret;
}
- /*
-** RUN_SINGLE_QUEUE -- run the jobs in a single queue.
+/*
+** RUNNER_WORK -- have a queue runner do its work
+**
+** Have a queue runner do its work a list of entries.
+** When work isn't directly being done then this process can take a signal
+** and terminate immediately (in a clean fashion of course).
+** When work is directly being done, it's not to be interrupted
+** immediately: the work should be allowed to finish at a clean point
+** before termination (in a clean fashion of course).
+**
+** Parameters:
+** e -- envelope.
+** sequenceno -- 'th process to run WorkQ.
+** didfork -- did the calling process fork()?
+** skip -- process only each skip'th item.
+** njobs -- number of jobs in WorkQ.
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** runs things in the mail queue.
+*/
+
+/* Get new load average every 30 seconds. */
+#define GET_NEW_LA_TIME 30
+
+static void
+runner_work(e, sequenceno, didfork, skip, njobs)
+ register ENVELOPE *e;
+ int sequenceno;
+ bool didfork;
+ int skip;
+ int njobs;
+{
+ int n;
+ WORK *w;
+ time_t current_la_time, now;
+
+ current_la_time = curtime();
+
+ /*
+ ** Here we temporarily block the second calling of the handlers.
+ ** This allows us to handle the signal without terminating in the
+ ** middle of direct work. If a signal does come, the test for
+ ** NoMoreRunners will find it.
+ */
+
+ BlockOldsh = true;
+
+ /* process them once at a time */
+ while (WorkQ != NULL)
+ {
+#if SM_HEAP_CHECK
+ SM_NONVOLATILE int oldgroup = 0;
+
+ if (sm_debug_active(&DebugLeakQ, 1))
+ {
+ oldgroup = sm_heap_group();
+ sm_heap_newgroup();
+ sm_dprintf("run_queue_group() heap group #%d\n",
+ sm_heap_group());
+ }
+#endif /* SM_HEAP_CHECK */
+
+ /* do no more work */
+ if (NoMoreRunners)
+ {
+ /* Check that a valid signal handler is callable */
+ if (Oldsh != SIG_DFL && Oldsh != SIG_IGN &&
+ Oldsh != runners_sighup &&
+ Oldsh != runners_sigterm)
+ (*Oldsh)(Oldsig);
+ break;
+ }
+
+ w = WorkQ; /* assign current work item */
+
+ /*
+ ** Set the head of the WorkQ to the next work item.
+ ** It is set 'skip' ahead (the number of parallel queue
+ ** runners working on WorkQ together) since each runner
+ ** works on every 'skip'th (N-th) item.
+ */
+
+ for (n = 0; n < skip && WorkQ != NULL; n++)
+ WorkQ = WorkQ->w_next;
+ e->e_to = NULL;
+
+ /*
+ ** Ignore jobs that are too expensive for the moment.
+ **
+ ** Get new load average every GET_NEW_LA_TIME seconds.
+ */
+
+ now = curtime();
+ if (current_la_time < now - GET_NEW_LA_TIME)
+ {
+ sm_getla();
+ current_la_time = now;
+ }
+ if (shouldqueue(WkRecipFact, current_la_time))
+ {
+ char *msg = "Aborting queue run: load average too high";
+
+ if (Verbose)
+ message("%s", msg);
+ if (LogLevel > 8)
+ sm_syslog(LOG_INFO, NOQID, "runqueue: %s", msg);
+ break;
+ }
+ if (shouldqueue(w->w_pri, w->w_ctime))
+ {
+ if (Verbose)
+ message(EmptyString);
+ if (QueueSortOrder == QSO_BYPRIORITY)
+ {
+ if (Verbose)
+ message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue",
+ qid_printqueue(w->w_qgrp,
+ w->w_qdir),
+ w->w_name + 2, sequenceno,
+ njobs);
+ if (LogLevel > 8)
+ sm_syslog(LOG_INFO, NOQID,
+ "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)",
+ qid_printqueue(w->w_qgrp,
+ w->w_qdir),
+ w->w_name + 2, w->w_pri,
+ CurrentLA, sequenceno,
+ njobs);
+ break;
+ }
+ else if (Verbose)
+ message("Skipping %s/%s (sequence %d of %d)",
+ qid_printqueue(w->w_qgrp, w->w_qdir),
+ w->w_name + 2, sequenceno, njobs);
+ }
+ else
+ {
+ if (Verbose)
+ {
+ message(EmptyString);
+ message("Running %s/%s (sequence %d of %d)",
+ qid_printqueue(w->w_qgrp, w->w_qdir),
+ w->w_name + 2, sequenceno, njobs);
+ }
+ if (didfork && MaxQueueChildren > 0)
+ {
+ sm_blocksignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, reapchild);
+ }
+ if (tTd(63, 100))
+ sm_syslog(LOG_DEBUG, NOQID,
+ "runqueue %s dowork(%s)",
+ qid_printqueue(w->w_qgrp, w->w_qdir),
+ w->w_name + 2);
+
+ (void) dowork(w->w_qgrp, w->w_qdir, w->w_name + 2,
+ false, false, e);
+ errno = 0;
+ }
+ sm_free(w->w_name); /* XXX */
+ if (w->w_host != NULL)
+ sm_free(w->w_host); /* XXX */
+ sm_free((char *) w); /* XXX */
+ sequenceno += skip; /* next sequence number */
+#if SM_HEAP_CHECK
+ if (sm_debug_active(&DebugLeakQ, 1))
+ sm_heap_setgroup(oldgroup);
+#endif /* SM_HEAP_CHECK */
+ }
+
+ BlockOldsh = false;
+
+ /* check the signals didn't happen during the revert */
+ if (NoMoreRunners)
+ {
+ /* Check that a valid signal handler is callable */
+ if (Oldsh != SIG_DFL && Oldsh != SIG_IGN &&
+ Oldsh != runners_sighup && Oldsh != runners_sigterm)
+ (*Oldsh)(Oldsig);
+ }
+
+ Oldsh = SIG_DFL; /* after the NoMoreRunners check */
+}
+/*
+** RUN_WORK_GROUP -- run the jobs in a queue group from a work group.
**
** Gets the stuff out of the queue in some presumably logical
** order and processes them.
**
** Parameters:
-** queuedir -- queue to process
-** forkflag -- TRUE if the queue scanning should be done in
+** wgrp -- work group to process.
+** forkflag -- true if the queue scanning should be done in
** a child process. We double-fork so it is not our
** child and we don't have to clean up after it.
-** verbose -- if TRUE, print out status information.
+** verbose -- if true, print out status information.
+** persistent -- persistent queue runner?
+** runall -- true: run all of the queue groups in this work group
**
** Returns:
-** TRUE if the queue run successfully began.
+** true if the queue run successfully began.
**
** Side Effects:
** runs things in the mail queue.
*/
-static bool
-run_single_queue(queuedir, forkflag, verbose)
- int queuedir;
+/* Minimum sleep time for persistent queue runners */
+#define MIN_SLEEP_TIME 5
+
+bool
+run_work_group(wgrp, forkflag, verbose, persistent, runall)
+ int wgrp;
bool forkflag;
bool verbose;
+ bool persistent;
+ bool runall;
{
register ENVELOPE *e;
- int njobs;
- int sequenceno = 0;
- time_t current_la_time, now;
+ int njobs, qdir;
+ int sequenceno = 1;
+ int qgrp, endgrp, h, i;
+ time_t current_la_time;
+ bool full, more;
+ SM_RPOOL_T *rpool;
+ extern void rmexpstab __P((void));
extern ENVELOPE BlankEnvelope;
+ extern SIGFUNC_DECL reapchild __P((int));
+
+ if (wgrp < 0)
+ return false;
/*
** If no work will ever be selected, don't even bother reading
** the queue.
*/
- CurrentLA = sm_getla(NULL); /* get load average */
+ sm_getla(); /* get load average */
current_la_time = curtime();
- if (shouldqueue(WkRecipFact, current_la_time))
+ if (!persistent && shouldqueue(WkRecipFact, current_la_time))
{
char *msg = "Skipping queue run -- load average too high";
if (verbose)
message("458 %s\n", msg);
if (LogLevel > 8)
- sm_syslog(LOG_INFO, NOQID,
- "runqueue: %s",
- msg);
- return FALSE;
+ sm_syslog(LOG_INFO, NOQID, "runqueue: %s", msg);
+ return false;
}
/*
** See if we already have too many children.
*/
- if (forkflag && QueueIntvl != 0 &&
+ if (forkflag && WorkGrp[wgrp].wg_lowqintvl > 0 && !persistent &&
MaxChildren > 0 && CurChildren >= MaxChildren)
{
char *msg = "Skipping queue run -- too many children";
@@ -781,10 +1679,9 @@ run_single_queue(queuedir, forkflag, verbose)
if (verbose)
message("458 %s (%d)\n", msg, CurChildren);
if (LogLevel > 8)
- sm_syslog(LOG_INFO, NOQID,
- "runqueue: %s (%d)",
+ sm_syslog(LOG_INFO, NOQID, "runqueue: %s (%d)",
msg, CurChildren);
- return FALSE;
+ return false;
}
/*
@@ -795,81 +1692,84 @@ run_single_queue(queuedir, forkflag, verbose)
{
pid_t pid;
- (void) blocksignal(SIGCHLD);
- (void) setsignal(SIGCHLD, reapchild);
+ (void) sm_blocksignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, reapchild);
pid = dofork();
if (pid == -1)
{
const char *msg = "Skipping queue run -- fork() failed";
- const char *err = errstring(errno);
+ const char *err = sm_errstring(errno);
if (verbose)
message("458 %s: %s\n", msg, err);
if (LogLevel > 8)
- sm_syslog(LOG_INFO, NOQID,
- "runqueue: %s: %s",
+ sm_syslog(LOG_INFO, NOQID, "runqueue: %s: %s",
msg, err);
- (void) releasesignal(SIGCHLD);
- return FALSE;
+ (void) sm_releasesignal(SIGCHLD);
+ return false;
}
if (pid != 0)
{
/* parent -- pick up intermediate zombie */
- (void) blocksignal(SIGALRM);
- proc_list_add(pid, "Queue runner", PROC_QUEUE);
- (void) releasesignal(SIGALRM);
- (void) releasesignal(SIGCHLD);
- return TRUE;
+ (void) sm_blocksignal(SIGALRM);
+
+ /* wgrp only used when queue runners are persistent */
+ proc_list_add(pid, "Queue runner", PROC_QUEUE,
+ WorkGrp[wgrp].wg_maxact,
+ persistent ? wgrp : -1);
+ (void) sm_releasesignal(SIGALRM);
+ (void) sm_releasesignal(SIGCHLD);
+ return true;
}
+
/* child -- clean up signals */
/* Reset global flags */
RestartRequest = NULL;
+ RestartWorkGroup = false;
ShutdownRequest = NULL;
PendingSignal = 0;
+ CurrentPid = getpid();
+ /*
+ ** Initialize exception stack and default exception
+ ** handler for child process.
+ */
+
+ sm_exc_newthread(fatal_error);
clrcontrol();
proc_list_clear();
/* Add parent process as first child item */
- proc_list_add(getpid(), "Queue runner child process",
- PROC_QUEUE_CHILD);
- (void) releasesignal(SIGCHLD);
- (void) setsignal(SIGCHLD, SIG_DFL);
- (void) setsignal(SIGHUP, SIG_DFL);
- (void) setsignal(SIGTERM, intsig);
+ proc_list_add(CurrentPid, "Queue runner child process",
+ PROC_QUEUE_CHILD, 0, -1);
+ (void) sm_releasesignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, SIG_DFL);
+ (void) sm_signal(SIGHUP, SIG_DFL);
+ (void) sm_signal(SIGTERM, intsig);
}
- sm_setproctitle(TRUE, CurEnv, "running queue: %s",
- qid_printqueue(queuedir));
-
- if (LogLevel > 69 || tTd(63, 99))
- sm_syslog(LOG_DEBUG, NOQID,
- "runqueue %s, pid=%d, forkflag=%d",
- qid_printqueue(queuedir), (int) getpid(), forkflag);
-
/*
** Release any resources used by the daemon code.
*/
-# if DAEMON
clrdaemon();
-# endif /* DAEMON */
/* force it to run expensive jobs */
- NoConnect = FALSE;
+ NoConnect = false;
/* drop privileges */
if (geteuid() == (uid_t) 0)
- (void) drop_privileges(FALSE);
+ (void) drop_privileges(false);
/*
** Create ourselves an envelope
*/
CurEnv = &QueueEnvelope;
- e = newenvelope(&QueueEnvelope, CurEnv);
+ rpool = sm_rpool_new_x(NULL);
+ e = newenvelope(&QueueEnvelope, CurEnv, rpool);
e->e_flags = BlankEnvelope.e_flags;
e->e_parent = NULL;
@@ -877,7 +1777,7 @@ run_single_queue(queuedir, forkflag, verbose)
if (forkflag)
{
disconnect(1, e);
- QuickAbort = FALSE;
+ QuickAbort = false;
}
/*
@@ -886,200 +1786,494 @@ run_single_queue(queuedir, forkflag, verbose)
*/
if (QueueLimitId != NULL || QueueLimitSender != NULL ||
+#if _FFR_QUARANTINE
+ QueueLimitQuarantine != NULL ||
+#endif /* _FFR_QUARANTINE */
QueueLimitRecipient != NULL)
{
- IgnoreHostStatus = TRUE;
+ IgnoreHostStatus = true;
MinQueueAge = 0;
}
/*
+ ** Here is where we choose the queue group from the work group.
+ ** The caller of the "domorework" label must setup a new envelope.
+ */
+
+ endgrp = WorkGrp[wgrp].wg_curqgrp; /* to not spin endlessly */
+
+ domorework:
+
+ /*
+ ** Run a queue group if:
+ ** runall is set or the bit for this group is set.
+ */
+
+ for (;;)
+ {
+ /*
+ ** Find the next queue group within the work group that
+ ** has been marked as needing a run.
+ */
+
+ qgrp = WorkGrp[wgrp].wg_qgs[WorkGrp[wgrp].wg_curqgrp]->qg_index;
+ WorkGrp[wgrp].wg_curqgrp++; /* advance */
+ WorkGrp[wgrp].wg_curqgrp %= WorkGrp[wgrp].wg_numqgrp; /* wrap */
+ if (runall || bitnset(qgrp, DoQueueRun))
+ break;
+ if (endgrp == WorkGrp[wgrp].wg_curqgrp)
+ {
+ e->e_id = NULL;
+ if (forkflag)
+ finis(true, true, ExitStat);
+ return true; /* we're done */
+ }
+ }
+
+ qdir = Queue[qgrp]->qg_curnum; /* round-robin init of queue position */
+#if _FFR_QUEUE_SCHED_DBG
+ if (tTd(69, 12))
+ sm_syslog(LOG_INFO, NOQID,
+ "rwg: wgrp=%d, qgrp=%d, qdir=%d, name=%s, curqgrp=%d, numgrps=%d",
+ wgrp, qgrp, qdir, qid_printqueue(qgrp, qdir),
+ WorkGrp[wgrp].wg_curqgrp, WorkGrp[wgrp].wg_numqgrp);
+#endif /* _FFR_QUEUE_SCHED_DBG */
+
+#if HASNICE
+ /* tweak niceness of queue runs */
+ if (Queue[qgrp]->qg_nice > 0)
+ (void) nice(Queue[qgrp]->qg_nice);
+#endif /* HASNICE */
+
+ /* XXX running queue group... */
+ sm_setproctitle(true, CurEnv, "running queue: %s",
+ qid_printqueue(qgrp, qdir));
+
+ if (LogLevel > 69 || tTd(63, 99))
+ sm_syslog(LOG_DEBUG, NOQID,
+ "runqueue %s, pid=%d, forkflag=%d",
+ qid_printqueue(qgrp, qdir), (int) CurrentPid,
+ forkflag);
+
+ /*
** Start making passes through the queue.
** First, read and sort the entire queue.
** Then, process the work in that order.
** But if you take too long, start over.
*/
+ for (i = 0; i < Queue[qgrp]->qg_numqueues; i++)
+ {
+ h = gatherq(qgrp, qdir, false, &full, &more);
+#if SM_CONF_SHM
+ if (ShmId != SM_SHM_NO_ID)
+ QSHM_ENTRIES(Queue[qgrp]->qg_qpaths[qdir].qp_idx) = h;
+#endif /* SM_CONF_SHM */
+ /* If there are no more items in this queue advance */
+ if (!more)
+ {
+ /* A round-robin advance */
+ qdir++;
+ qdir %= Queue[qgrp]->qg_numqueues;
+ }
+
+ /* Has the WorkList reached the limit? */
+ if (full)
+ break; /* don't try to gather more */
+ }
+
/* order the existing work requests */
- njobs = orderq(queuedir, FALSE);
+ njobs = sortq(Queue[qgrp]->qg_maxlist);
+ Queue[qgrp]->qg_curnum = qdir; /* update */
- /* process them once at a time */
- while (WorkQ != NULL)
+ if (!Verbose && bitnset(QD_FORK, Queue[qgrp]->qg_flags))
{
- WORK *w = WorkQ;
-
- WorkQ = WorkQ->w_next;
- e->e_to = NULL;
+ int loop, maxrunners;
+ pid_t pid;
/*
- ** Ignore jobs that are too expensive for the moment.
- **
- ** Get new load average every 30 seconds.
+ ** For this WorkQ we want to fork off N children (maxrunners)
+ ** at this point. Each child has a copy of WorkQ. Each child
+ ** will process every N-th item. The parent will wait for all
+ ** of the children to finish before moving on to the next
+ ** queue group within the work group. This saves us forking
+ ** a new runner-child for each work item.
+ ** It's valid for qg_maxqrun == 0 since this may be an
+ ** explicit "don't run this queue" setting.
*/
- now = curtime();
- if (current_la_time < now - 30)
+ maxrunners = Queue[qgrp]->qg_maxqrun;
+
+ /* No need to have more runners then there are jobs */
+ if (maxrunners > njobs)
+ maxrunners = njobs;
+ for (loop = 0; loop < maxrunners; loop++)
{
- CurrentLA = sm_getla(e);
- current_la_time = now;
+ /*
+ ** Since the delivery may happen in a child and the
+ ** parent does not wait, the parent may close the
+ ** maps thereby removing any shared memory used by
+ ** the map. Therefore, close the maps now so the
+ ** child will dynamically open them if necessary.
+ */
+
+ closemaps(false);
+
+ pid = fork();
+ if (pid < 0)
+ {
+ syserr("run_work_group: cannot fork");
+ return 0;
+ }
+ else if (pid > 0)
+ {
+ /* parent -- clean out connection cache */
+ mci_flush(false, NULL);
+ WorkQ = WorkQ->w_next; /* for the skip */
+ sequenceno++;
+ proc_list_add(pid, "Queue child runner process",
+ PROC_QUEUE_CHILD, 0, -1);
+
+ /* No additional work, no additional runners */
+ if (WorkQ == NULL)
+ break;
+ }
+ else
+ {
+ /* child -- Reset global flags */
+ RestartRequest = NULL;
+ RestartWorkGroup = false;
+ ShutdownRequest = NULL;
+ PendingSignal = 0;
+ CurrentPid = getpid();
+
+ /*
+ ** Initialize exception stack and default
+ ** exception handler for child process.
+ ** When fork()'d the child now has a private
+ ** copy of WorkQ at its current position.
+ */
+
+ sm_exc_newthread(fatal_error);
+
+ /*
+ ** SMTP processes (whether -bd or -bs) set
+ ** SIGCHLD to reapchild to collect
+ ** children status. However, at delivery
+ ** time, that status must be collected
+ ** by sm_wait() to be dealt with properly
+ ** (check success of delivery based
+ ** on status code, etc). Therefore, if we
+ ** are an SMTP process, reset SIGCHLD
+ ** back to the default so reapchild
+ ** doesn't collect status before
+ ** sm_wait().
+ */
+
+ if (OpMode == MD_SMTP ||
+ OpMode == MD_DAEMON ||
+ MaxQueueChildren > 0)
+ {
+ proc_list_clear();
+ sm_releasesignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, SIG_DFL);
+ }
+
+ /* child -- error messages to the transcript */
+ QuickAbort = OnlyOneError = false;
+ runner_work(e, sequenceno, true,
+ maxrunners, njobs);
+
+ /* This child is done */
+ finis(true, true, ExitStat);
+ /* NOTREACHED */
+ }
}
- if (shouldqueue(WkRecipFact, current_la_time))
+
+ sm_releasesignal(SIGCHLD);
+
+ /*
+ ** Wait until all of the runners have completed before
+ ** seeing if there is another queue group in the
+ ** work group to process.
+ ** XXX Future enhancement: don't wait() for all children
+ ** here, just go ahead and make sure that overall the number
+ ** of children is not exceeded.
+ */
+
+ while (CurChildren > 0)
{
- char *msg = "Aborting queue run: load average too high";
+ int status;
+ pid_t ret;
- if (Verbose)
- message("%s", msg);
- if (LogLevel > 8)
- sm_syslog(LOG_INFO, NOQID,
- "runqueue: %s",
- msg);
- break;
+ while ((ret = sm_wait(&status)) <= 0)
+ continue;
+ proc_list_drop(ret, status, NULL);
}
- sequenceno++;
- if (shouldqueue(w->w_pri, w->w_ctime))
+ }
+ else
+ {
+ /*
+ ** When current process will not fork children to do the work,
+ ** it will do the work itself. The 'skip' will be 1 since
+ ** there are no child runners to divide the work across.
+ */
+
+ runner_work(e, sequenceno, false, 1, njobs);
+ }
+
+ /* free memory allocated by newenvelope() above */
+ sm_rpool_free(rpool);
+ QueueEnvelope.e_rpool = NULL;
+
+ /* Are there still more queues in the work group to process? */
+ if (endgrp != WorkGrp[wgrp].wg_curqgrp)
+ {
+ rpool = sm_rpool_new_x(NULL);
+ e = newenvelope(&QueueEnvelope, CurEnv, rpool);
+ e->e_flags = BlankEnvelope.e_flags;
+ goto domorework;
+ }
+
+ /* No more queues in work group to process. Now check persistent. */
+ if (persistent)
+ {
+ time_t now;
+
+ sequenceno = 1;
+ sm_setproctitle(true, CurEnv, "running queue: %s",
+ qid_printqueue(qgrp, qdir));
+
+ /*
+ ** close bogus maps, i.e., maps which caused a tempfail,
+ ** so we get fresh map connections on the next lookup.
+ ** closemaps() is also called when children are started.
+ */
+
+ closemaps(true);
+
+ /* Close any cached connections. */
+ mci_flush(true, NULL);
+
+ /* Clean out expired related entries. */
+ rmexpstab();
+
+#if NAMED_BIND
+ /* Update MX records for FallBackMX. */
+ if (FallBackMX != NULL)
+ (void) getfallbackmxrr(FallBackMX);
+#endif /* NAMED_BIND */
+
+#if USERDB
+ /* close UserDatabase */
+ _udbx_close();
+#endif /* USERDB */
+
+#if SM_HEAP_CHECK
+ if (sm_debug_active(&SmHeapCheck, 2)
+ && access("memdump", F_OK) == 0
+ )
{
- if (Verbose)
- message("");
- if (QueueSortOrder == QSO_BYPRIORITY)
+ SM_FILE_T *out;
+
+ remove("memdump");
+ out = sm_io_open(SmFtStdio, SM_TIME_DEFAULT,
+ "memdump.out", SM_IO_APPEND, NULL);
+ if (out != NULL)
{
- if (Verbose)
- message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue",
- qid_printqueue(queuedir),
- w->w_name + 2,
- sequenceno,
- njobs);
- if (LogLevel > 8)
- sm_syslog(LOG_INFO, NOQID,
- "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)",
- qid_printqueue(queuedir),
- w->w_name + 2,
- w->w_pri,
- CurrentLA,
- sequenceno,
- njobs);
- break;
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT, "----------------------\n");
+ sm_heap_report(out,
+ sm_debug_level(&SmHeapCheck) - 1);
+ (void) sm_io_close(out, SM_TIME_DEFAULT);
}
- else if (Verbose)
- message("Skipping %s/%s (sequence %d of %d)",
- qid_printqueue(queuedir),
- w->w_name + 2,
- sequenceno, njobs);
}
+#endif /* SM_HEAP_CHECK */
+
+ /* let me rest for a second to catch my breath */
+ if (njobs == 0 && WorkGrp[wgrp].wg_lowqintvl < MIN_SLEEP_TIME)
+ sleep(MIN_SLEEP_TIME);
+ else if (WorkGrp[wgrp].wg_lowqintvl <= 0)
+ sleep(QueueIntvl > 0 ? QueueIntvl : MIN_SLEEP_TIME);
else
- {
- pid_t pid;
+ sleep(WorkGrp[wgrp].wg_lowqintvl);
- if (Verbose)
- {
- message("");
- message("Running %s/%s (sequence %d of %d)",
- qid_printqueue(queuedir),
- w->w_name + 2,
- sequenceno, njobs);
- }
- if (tTd(63, 100))
- sm_syslog(LOG_DEBUG, NOQID,
- "runqueue %s dowork(%s)",
- qid_printqueue(queuedir),
- w->w_name + 2);
+ /*
+ ** Get the LA outside the WorkQ loop if necessary.
+ ** In a persistent queue runner the code is repeated over
+ ** and over but gatherq() may ignore entries due to
+ ** shouldqueue() (do we really have to do this twice?).
+ ** Hence the queue runners would just idle around when once
+ ** CurrentLA caused all entries in a queue to be ignored.
+ */
- pid = dowork(queuedir, w->w_name + 2,
- ForkQueueRuns, FALSE, e);
- errno = 0;
- if (pid != 0)
- (void) waitfor(pid);
+ now = curtime();
+ if (njobs == 0 && current_la_time < now - GET_NEW_LA_TIME)
+ {
+ sm_getla();
+ current_la_time = now;
}
- sm_free(w->w_name);
- if (w->w_host)
- sm_free(w->w_host);
- sm_free((char *) w);
+ rpool = sm_rpool_new_x(NULL);
+ e = newenvelope(&QueueEnvelope, CurEnv, rpool);
+ e->e_flags = BlankEnvelope.e_flags;
+ goto domorework;
}
/* exit without the usual cleanup */
e->e_id = NULL;
if (forkflag)
- finis(TRUE, ExitStat);
+ finis(true, true, ExitStat);
/* NOTREACHED */
- return TRUE;
+ return true;
}
/*
-** RUNQUEUEEVENT -- stub for use in setevent
+** DOQUEUERUN -- do a queue run?
+*/
+
+bool
+doqueuerun()
+{
+ return bitnset(NumQueue, DoQueueRun);
+}
+
+/*
+** RUNQUEUEEVENT -- stub for use in sm_setevent
+**
+** Sets the bit to indicate that on the next run this queue should be
+** processed. The work group that the queue group is a member of has its
+** count of queue's to process updated.
**
** Parameters:
-** none.
+** qgrp -- the index of the queue group.
**
** Returns:
** none.
**
+** Side Effects:
+** The work group that the queue group is a member of has its
+** count of queues to process updated.
+** The invocation of this function via an alarm may interrupt
+** a set of actions. Thus errno may be set in that context.
+** We need to restore errno at the end of this function to ensure
+** that any work done here that sets errno doesn't return a
+** misleading/false errno value. Errno may be EINTR upon entry to
+** this function because of non-restartable/continuable system
+** API was active. Iff this is true we will override errno as
+** a timeout (as a more accurate error message).
+**
** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD
** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE
** DOING.
*/
-static void
-runqueueevent()
+void
+runqueueevent(qgrp)
+ int qgrp;
{
- DoQueueRun = TRUE;
+ int i;
+ int save_errno = errno;
+
+ /*
+ ** Set the general bit that we want a queue run,
+ ** tested in doqueuerun()
+ */
+
+ setbitn(NumQueue, DoQueueRun);
+
+ /* if it is a specific group: set that bit */
+ if (qgrp != NOQGRP)
+ {
+ setbitn(qgrp, DoQueueRun);
+ goto ret;
+ }
+
+ /* for all others: set the bit if it doesn't have a queue interval */
+ for (i = 0; i < NumQueue; i++)
+ {
+ if (Queue[i]->qg_queueintvl <= 0)
+ setbitn(i, DoQueueRun);
+ }
+
+ ret:
+ errno = save_errno;
+ if (errno == EINTR)
+ errno = ETIMEDOUT;
}
- /*
-** ORDERQ -- order the work queue.
+/*
+** GATHERQ -- gather messages from the message queue(s) the work queue.
**
** Parameters:
-** queuedir -- the index of the queue directory.
+** qgrp -- the index of the queue group.
+** qdir -- the index of the queue directory.
** doall -- if set, include everything in the queue (even
** the jobs that cannot be run because the load
-** average is too high). Otherwise, exclude those
-** jobs.
+** average is too high, or MaxQueueRun is reached).
+** Otherwise, exclude those jobs.
+** full -- (optional) to be set 'true' if WorkList is full
+** more -- (optional) to be set 'true' if there are still more
+** messages in this queue not added to WorkList
**
** Returns:
** The number of request in the queue (not necessarily
-** the number of requests in WorkQ however).
+** the number of requests in WorkList however).
**
** Side Effects:
-** Sets WorkQ to the queue of available work, in order.
+** prepares available work into WorkList
*/
-# define NEED_P 001
-# define NEED_T 002
-# define NEED_R 004
-# define NEED_S 010
-# define NEED_H 020
+#define NEED_P 0001 /* 'P': priority */
+#define NEED_T 0002 /* 'T': time */
+#define NEED_R 0004 /* 'R': recipient */
+#define NEED_S 0010 /* 'S': sender */
+#define NEED_H 0020 /* host */
+#if _FFR_QUARANTINE
+# define HAS_QUARANTINE 0040 /* has an unexpected 'q' line */
+# define NEED_QUARANTINE 0100 /* 'q': reason */
+#endif /* _FFR_QUARANTINE */
-static WORK *WorkList = NULL;
-static int WorkListSize = 0;
+static WORK *WorkList = NULL; /* list of unsort work */
+static int WorkListSize = 0; /* current max size of WorkList */
+static int WorkListCount = 0; /* # of work items in WorkList */
static int
-orderq(queuedir, doall)
- int queuedir;
+gatherq(qgrp, qdir, doall, full, more)
+ int qgrp;
+ int qdir;
bool doall;
+ bool *full;
+ bool *more;
{
register struct dirent *d;
register WORK *w;
register char *p;
DIR *f;
- register int i;
- int wn = -1;
- int wc;
+ int i, num_ent;
+ int wn;
QUEUE_CHAR *check;
char qd[MAXPATHLEN];
char qf[MAXPATHLEN];
- if (queuedir == NOQDIR)
- (void) strlcpy(qd, ".", sizeof qd);
+ wn = WorkListCount - 1;
+ num_ent = 0;
+ if (qdir == NOQDIR)
+ (void) sm_strlcpy(qd, ".", sizeof qd);
else
- (void) snprintf(qd, sizeof qd, "%s%s",
- QPaths[queuedir].qp_name,
- (bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : ""));
+ (void) sm_strlcpyn(qd, sizeof qd, 2,
+ Queue[qgrp]->qg_qpaths[qdir].qp_name,
+ (bitset(QP_SUBQF,
+ Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
+ ? "/qf" : ""));
if (tTd(41, 1))
{
- dprintf("orderq:\n");
+ sm_dprintf("gatherq:\n");
check = QueueLimitId;
while (check != NULL)
{
- dprintf("\tQueueLimitId = %s\n",
+ sm_dprintf("\tQueueLimitId = %s%s\n",
+ check->queue_negate ? "!" : "",
check->queue_match);
check = check->queue_next;
}
@@ -1087,7 +2281,8 @@ orderq(queuedir, doall)
check = QueueLimitSender;
while (check != NULL)
{
- dprintf("\tQueueLimitSender = %s\n",
+ sm_dprintf("\tQueueLimitSender = %s%s\n",
+ check->queue_negate ? "!" : "",
check->queue_match);
check = check->queue_next;
}
@@ -1095,30 +2290,37 @@ orderq(queuedir, doall)
check = QueueLimitRecipient;
while (check != NULL)
{
- dprintf("\tQueueLimitRecipient = %s\n",
+ sm_dprintf("\tQueueLimitRecipient = %s%s\n",
+ check->queue_negate ? "!" : "",
check->queue_match);
check = check->queue_next;
}
- }
- /* clear out old WorkQ */
- for (w = WorkQ; w != NULL; )
- {
- register WORK *nw = w->w_next;
-
- WorkQ = nw;
- sm_free(w->w_name);
- if (w->w_host != NULL)
- sm_free(w->w_host);
- sm_free((char *) w);
- w = nw;
+#if _FFR_QUARANTINE
+ if (QueueMode == QM_QUARANTINE)
+ {
+ check = QueueLimitQuarantine;
+ while (check != NULL)
+ {
+ sm_dprintf("\tQueueLimitQuarantine = %s%s\n",
+ check->queue_negate ? "!" : "",
+ check->queue_match);
+ check = check->queue_next;
+ }
+ }
+#endif /* _FFR_QUARANTINE */
}
/* open the queue directory */
f = opendir(qd);
if (f == NULL)
{
- syserr("orderq: cannot open \"%s\"", qid_printqueue(queuedir));
+ syserr("gatherq: cannot open \"%s\"",
+ qid_printqueue(qgrp, qdir));
+ if (full != NULL)
+ *full = WorkListCount >= MaxQueueRun && MaxQueueRun > 0;
+ if (more != NULL)
+ *more = false;
return 0;
}
@@ -1128,26 +2330,43 @@ orderq(queuedir, doall)
while ((d = readdir(f)) != NULL)
{
- FILE *cf;
+ SM_FILE_T *cf;
int qfver = 0;
char lbuf[MAXNAME + 1];
struct stat sbuf;
if (tTd(41, 50))
- dprintf("orderq: checking %s\n", d->d_name);
+ sm_dprintf("gatherq: checking %s..", d->d_name);
/* is this an interesting entry? */
- if (d->d_name[0] != 'q' || d->d_name[1] != 'f')
+#if _FFR_QUARANTINE
+ if (!(((QueueMode == QM_NORMAL &&
+ d->d_name[0] == NORMQF_LETTER) ||
+ (QueueMode == QM_QUARANTINE &&
+ d->d_name[0] == QUARQF_LETTER) ||
+ (QueueMode == QM_LOST &&
+ d->d_name[0] == LOSEQF_LETTER)) &&
+ d->d_name[1] == 'f'))
+#else /* _FFR_QUARANTINE */
+ if (d->d_name[0] != NORMQF_LETTER || d->d_name[1] != 'f')
+#endif /* _FFR_QUARANTINE */
+ {
+ if (tTd(41, 50))
+ sm_dprintf(" skipping\n");
continue;
+ }
+ if (tTd(41, 50))
+ sm_dprintf("\n");
if (strlen(d->d_name) >= MAXQFNAME)
{
if (Verbose)
- printf("orderq: %s too long, %d max characters\n",
- d->d_name, MAXQFNAME);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "gatherq: %s too long, %d max characters\n",
+ d->d_name, MAXQFNAME);
if (LogLevel > 0)
sm_syslog(LOG_ALERT, NOQID,
- "orderq: %s too long, %d max characters",
+ "gatherq: %s too long, %d max characters",
d->d_name, MAXQFNAME);
continue;
}
@@ -1155,7 +2374,8 @@ orderq(queuedir, doall)
check = QueueLimitId;
while (check != NULL)
{
- if (strcontainedin(check->queue_match, d->d_name))
+ if (strcontainedin(true, check->queue_match,
+ d->d_name) != check->queue_negate)
break;
else
check = check->queue_next;
@@ -1169,76 +2389,109 @@ orderq(queuedir, doall)
if (wn == MaxQueueRun && LogLevel > 0)
sm_syslog(LOG_WARNING, NOQID,
"WorkList for %s maxed out at %d",
- qid_printqueue(queuedir),
+ qid_printqueue(qgrp, qdir),
MaxQueueRun);
- continue;
+ if (doall)
+ continue; /* just count entries */
+ break;
}
if (wn >= WorkListSize)
{
- grow_wlist(queuedir);
+ grow_wlist(qgrp, qdir);
if (wn >= WorkListSize)
continue;
}
+ SM_ASSERT(wn >= 0);
w = &WorkList[wn];
- (void) snprintf(qf, sizeof qf, "%s/%s", qd, d->d_name);
+ (void) sm_strlcpyn(qf, sizeof qf, 3, qd, "/", d->d_name);
if (stat(qf, &sbuf) < 0)
{
if (errno != ENOENT)
sm_syslog(LOG_INFO, NOQID,
- "orderq: can't stat %s/%s",
- qid_printqueue(queuedir), d->d_name);
+ "gatherq: can't stat %s/%s",
+ qid_printqueue(qgrp, qdir),
+ d->d_name);
wn--;
continue;
}
if (!bitset(S_IFREG, sbuf.st_mode))
{
/* Yikes! Skip it or we will hang on open! */
- syserr("orderq: %s/%s is not a regular file",
- qid_printqueue(queuedir), d->d_name);
+ if (!((d->d_name[0] == DATAFL_LETTER ||
+ d->d_name[0] == NORMQF_LETTER ||
+#if _FFR_QUARANTINE
+ d->d_name[0] == QUARQF_LETTER ||
+ d->d_name[0] == LOSEQF_LETTER ||
+#endif /* _FFR_QUARANTINE */
+ d->d_name[0] == XSCRPT_LETTER) &&
+ d->d_name[1] == 'f' && d->d_name[2] == '\0'))
+ syserr("gatherq: %s/%s is not a regular file",
+ qid_printqueue(qgrp, qdir), d->d_name);
wn--;
continue;
}
/* avoid work if possible */
- if (QueueSortOrder == QSO_BYFILENAME &&
+ if ((QueueSortOrder == QSO_BYFILENAME ||
+ QueueSortOrder == QSO_BYMODTIME ||
+ QueueSortOrder == QSO_RANDOM) &&
+#if _FFR_QUARANTINE
+ QueueLimitQuarantine == NULL &&
+#endif /* _FFR_QUARANTINE */
QueueLimitSender == NULL &&
QueueLimitRecipient == NULL)
{
+ w->w_qgrp = qgrp;
+ w->w_qdir = qdir;
w->w_name = newstr(d->d_name);
w->w_host = NULL;
- w->w_lock = w->w_tooyoung = FALSE;
+ w->w_lock = w->w_tooyoung = false;
w->w_pri = 0;
w->w_ctime = 0;
+ w->w_mtime = sbuf.st_mtime;
+ ++num_ent;
continue;
}
/* open control file */
- cf = fopen(qf, "r");
-
- if (cf == NULL)
+ cf = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDONLY,
+ NULL);
+ if (cf == NULL && OpMode != MD_PRINT)
{
/* this may be some random person sending hir msgs */
- /* syserr("orderq: cannot open %s", cbuf); */
if (tTd(41, 2))
- dprintf("orderq: cannot open %s: %s\n",
- d->d_name, errstring(errno));
+ sm_dprintf("gatherq: cannot open %s: %s\n",
+ d->d_name, sm_errstring(errno));
errno = 0;
wn--;
continue;
}
+ w->w_qgrp = qgrp;
+ w->w_qdir = qdir;
w->w_name = newstr(d->d_name);
w->w_host = NULL;
- w->w_lock = !lockfile(fileno(cf), w->w_name, NULL, LOCK_SH|LOCK_NB);
- w->w_tooyoung = FALSE;
+ if (cf != NULL)
+ {
+ w->w_lock = !lockfile(sm_io_getinfo(cf, SM_IO_WHAT_FD,
+ NULL),
+ w->w_name, NULL,
+ LOCK_SH|LOCK_NB);
+ }
+ w->w_tooyoung = false;
/* make sure jobs in creation don't clog queue */
w->w_pri = 0x7fffffff;
w->w_ctime = 0;
+ w->w_mtime = sbuf.st_mtime;
/* extract useful information */
- i = NEED_P | NEED_T;
- if (QueueSortOrder == QSO_BYHOST)
+ i = NEED_P|NEED_T;
+ if (QueueSortOrder == QSO_BYHOST
+#if _FFR_RHS
+ || QueueSortOrder == QSO_BYSHUFFLE
+#endif /* _FFR_RHS */
+ )
{
/* need w_host set for host sort order */
i |= NEED_H;
@@ -1247,7 +2500,13 @@ orderq(queuedir, doall)
i |= NEED_S;
if (QueueLimitRecipient != NULL)
i |= NEED_R;
- while (i != 0 && fgets(lbuf, sizeof lbuf, cf) != NULL)
+#if _FFR_QUARANTINE
+ if (QueueLimitQuarantine != NULL)
+ i |= NEED_QUARANTINE;
+#endif /* _FFR_QUARANTINE */
+ while (cf != NULL && i != 0 &&
+ sm_io_fgets(cf, SM_TIME_DEFAULT, lbuf,
+ sizeof lbuf) != NULL)
{
int c;
time_t age;
@@ -1258,7 +2517,8 @@ orderq(queuedir, doall)
else
{
/* flush rest of overly long line */
- while ((c = getc(cf)) != EOF && c != '\n')
+ while ((c = sm_io_getc(cf, SM_TIME_DEFAULT))
+ != SM_IO_EOF && c != '\n')
continue;
}
@@ -1278,11 +2538,51 @@ orderq(queuedir, doall)
i &= ~NEED_T;
break;
+#if _FFR_QUARANTINE
+ case 'q':
+ if (QueueMode != QM_QUARANTINE &&
+ QueueMode != QM_LOST)
+ {
+ if (tTd(41, 49))
+ sm_dprintf("%s not marked as quarantined but has a 'q' line\n",
+ w->w_name);
+ i |= HAS_QUARANTINE;
+ }
+ else if (QueueMode == QM_QUARANTINE)
+ {
+ if (QueueLimitQuarantine == NULL)
+ {
+ i &= ~NEED_QUARANTINE;
+ break;
+ }
+ p = &lbuf[1];
+ check = QueueLimitQuarantine;
+ while (check != NULL)
+ {
+ if (strcontainedin(false,
+ check->queue_match,
+ p) !=
+ check->queue_negate)
+ break;
+ else
+ check = check->queue_next;
+ }
+ if (check != NULL)
+ i &= ~NEED_QUARANTINE;
+ }
+ break;
+#endif /* _FFR_QUARANTINE */
+
case 'R':
if (w->w_host == NULL &&
(p = strrchr(&lbuf[1], '@')) != NULL)
{
- w->w_host = strrev(&p[1]);
+#if _FFR_RHS
+ if (QueueSortOrder == QSO_BYSHUFFLE)
+ w->w_host = newstr(&p[1]);
+ else
+#endif /* _FFR_RHS */
+ w->w_host = strrev(&p[1]);
makelower(w->w_host);
i &= ~NEED_H;
}
@@ -1302,8 +2602,10 @@ orderq(queuedir, doall)
check = QueueLimitRecipient;
while (check != NULL)
{
- if (strcontainedin(check->queue_match,
- p))
+ if (strcontainedin(true,
+ check->queue_match,
+ p) !=
+ check->queue_negate)
break;
else
check = check->queue_next;
@@ -1316,8 +2618,10 @@ orderq(queuedir, doall)
check = QueueLimitSender;
while (check != NULL)
{
- if (strcontainedin(check->queue_match,
- &lbuf[1]))
+ if (strcontainedin(true,
+ check->queue_match,
+ &lbuf[1]) !=
+ check->queue_negate)
break;
else
check = check->queue_next;
@@ -1330,15 +2634,15 @@ orderq(queuedir, doall)
age = curtime() - (time_t) atol(&lbuf[1]);
if (age >= 0 && MinQueueAge > 0 &&
age < MinQueueAge)
- w->w_tooyoung = TRUE;
+ w->w_tooyoung = true;
break;
case 'N':
if (atol(&lbuf[1]) == 0)
- w->w_tooyoung = FALSE;
+ w->w_tooyoung = false;
break;
-# if _FFR_QUEUEDELAY
+#if _FFR_QUEUEDELAY
/*
case 'G':
queuealg = atoi(lbuf[1]);
@@ -1347,32 +2651,104 @@ orderq(queuedir, doall)
queuedelay = (time_t) atol(&lbuf[1]);
break;
*/
-# endif /* _FFR_QUEUEDELAY */
+#endif /* _FFR_QUEUEDELAY */
}
}
- (void) fclose(cf);
+ if (cf != NULL)
+ (void) sm_io_close(cf, SM_TIME_DEFAULT);
if ((!doall && shouldqueue(w->w_pri, w->w_ctime)) ||
+#if _FFR_QUARANTINE
+ bitset(HAS_QUARANTINE, i) ||
+ bitset(NEED_QUARANTINE, i) ||
+#endif /* _FFR_QUARANTINE */
bitset(NEED_R|NEED_S, i))
{
/* don't even bother sorting this job in */
if (tTd(41, 49))
- dprintf("skipping %s (%x)\n", w->w_name, i);
- sm_free(w->w_name);
- if (w->w_host)
- sm_free(w->w_host);
+ sm_dprintf("skipping %s (%x)\n", w->w_name, i);
+ sm_free(w->w_name); /* XXX */
+ if (w->w_host != NULL)
+ sm_free(w->w_host); /* XXX */
wn--;
}
+ else
+ ++num_ent;
}
(void) closedir(f);
wn++;
- WorkQ = NULL;
- if (WorkList == NULL)
+ i = wn - WorkListCount;
+ WorkListCount += SM_MIN(num_ent, WorkListSize);
+
+ if (more != NULL)
+ *more = WorkListCount < wn;
+
+ if (full != NULL)
+ *full = (wn >= MaxQueueRun && MaxQueueRun > 0) ||
+ (WorkList == NULL && wn > 0);
+
+ return i;
+}
+/*
+** SORTQ -- sort the work list
+**
+** First the old WorkQ is cleared away. Then the WorkList is sorted
+** for all items so that important (higher sorting value) items are not
+** trunctated off. Then the most important items are moved from
+** WorkList to WorkQ. The lower count of 'max' or MaxListCount items
+** are moved.
+**
+** Parameters:
+** max -- maximum number of items to be placed in WorkQ
+**
+** Returns:
+** the number of items in WorkQ
+**
+** Side Effects:
+** WorkQ gets released and filled with new work. WorkList
+** gets released. Work items get sorted in order.
+*/
+
+static int
+sortq(max)
+ int max;
+{
+ register int i; /* local counter */
+ register WORK *w; /* tmp item pointer */
+ int wc = WorkListCount; /* trim size for WorkQ */
+
+ if (WorkQ != NULL)
+ {
+ /* Clear out old WorkQ. */
+ for (w = WorkQ; w != NULL; )
+ {
+ register WORK *nw = w->w_next;
+
+ WorkQ = nw;
+ sm_free(w->w_name); /* XXX */
+ if (w->w_host != NULL)
+ sm_free(w->w_host); /* XXX */
+ sm_free((char *) w); /* XXX */
+ w = nw;
+ }
+ sm_free((char *) WorkQ);
+ WorkQ = NULL;
+ }
+
+ if (WorkList == NULL || wc <= 0)
return 0;
- wc = min(wn, WorkListSize);
- if (wc > MaxQueueRun && MaxQueueRun > 0)
- wc = MaxQueueRun;
+
+ /* Check if the per queue group item limit will be exceeded */
+ if (wc > max && max > 0)
+ wc = max;
+
+ /*
+ ** The sort now takes place using all of the items in WorkList.
+ ** The list gets trimmed to the most important items after the sort.
+ ** If the trim were to happen before the sort then one or more
+ ** important items might get truncated off -- not what we want.
+ */
if (QueueSortOrder == QSO_BYHOST)
{
@@ -1401,11 +2777,12 @@ orderq(queuedir, doall)
{
if (WorkList[i].w_host == NULL &&
w->w_host == NULL)
- WorkList[i].w_lock = TRUE;
+ WorkList[i].w_lock = true;
else if (WorkList[i].w_host != NULL &&
w->w_host != NULL &&
- sm_strcasecmp(WorkList[i].w_host, w->w_host) == 0)
- WorkList[i].w_lock = TRUE;
+ sm_strcasecmp(WorkList[i].w_host,
+ w->w_host) == 0)
+ WorkList[i].w_lock = true;
else
break;
}
@@ -1429,11 +2806,42 @@ orderq(queuedir, doall)
else if (QueueSortOrder == QSO_BYFILENAME)
{
/*
- ** Sort based on qf filename.
+ ** Sort based on queue filename.
*/
qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf4);
}
+ else if (QueueSortOrder == QSO_RANDOM)
+ {
+ /*
+ ** Sort randomly.
+ ** workcmpf5() returns a random 1 or -1.
+ ** As long as nobody does a verification pass over the
+ ** sorted list, we should be golden.
+ */
+
+ qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf5);
+ }
+ else if (QueueSortOrder == QSO_BYMODTIME)
+ {
+ /*
+ ** Simple sort based on modification time of queue file.
+ ** This puts the oldest items first.
+ */
+
+ qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf6);
+ }
+#if _FFR_RHS
+ else if (QueueSortOrder == QSO_BYSHUFFLE)
+ {
+ /*
+ ** Simple sort based on shuffled host name.
+ */
+
+ init_shuffle_alphabet();
+ qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf7);
+ }
+#endif /* _FFR_RHS */
else
{
/*
@@ -1446,45 +2854,52 @@ orderq(queuedir, doall)
/*
** Convert the work list into canonical form.
** Should be turning it into a list of envelopes here perhaps.
+ ** Only take the most important items up to the per queue group
+ ** maximum.
*/
for (i = wc; --i >= 0; )
{
w = (WORK *) xalloc(sizeof *w);
+ w->w_qgrp = WorkList[i].w_qgrp;
+ w->w_qdir = WorkList[i].w_qdir;
w->w_name = WorkList[i].w_name;
w->w_host = WorkList[i].w_host;
w->w_lock = WorkList[i].w_lock;
w->w_tooyoung = WorkList[i].w_tooyoung;
w->w_pri = WorkList[i].w_pri;
w->w_ctime = WorkList[i].w_ctime;
+ w->w_mtime = WorkList[i].w_mtime;
w->w_next = WorkQ;
WorkQ = w;
}
if (WorkList != NULL)
- sm_free(WorkList);
+ sm_free(WorkList); /* XXX */
WorkList = NULL;
WorkListSize = 0;
+ WorkListCount = 0;
if (tTd(40, 1))
{
for (w = WorkQ; w != NULL; w = w->w_next)
{
if (w->w_host != NULL)
- dprintf("%22s: pri=%ld %s\n",
+ sm_dprintf("%22s: pri=%ld %s\n",
w->w_name, w->w_pri, w->w_host);
else
- dprintf("%32s: pri=%ld\n",
+ sm_dprintf("%32s: pri=%ld\n",
w->w_name, w->w_pri);
}
}
- return wn;
+ return wc; /* return number of WorkQ items */
}
- /*
+/*
** GROW_WLIST -- make the work list larger
**
** Parameters:
-** queuedir -- the index for the queue directory.
+** qgrp -- the index for the queue group.
+** qdir -- the index for the queue directory.
**
** Returns:
** none.
@@ -1496,11 +2911,12 @@ orderq(queuedir, doall)
*/
static void
-grow_wlist(queuedir)
- int queuedir;
+grow_wlist(qgrp, qdir)
+ int qgrp;
+ int qdir;
{
if (tTd(41, 1))
- dprintf("grow_wlist: WorkListSize=%d\n", WorkListSize);
+ sm_dprintf("grow_wlist: WorkListSize=%d\n", WorkListSize);
if (WorkList == NULL)
{
WorkList = (WORK *) xalloc((sizeof *WorkList) *
@@ -1510,8 +2926,8 @@ grow_wlist(queuedir)
else
{
int newsize = WorkListSize + QUEUESEGSIZE;
- WORK *newlist = (WORK *) xrealloc((char *)WorkList,
- (unsigned)sizeof(WORK) * (newsize + 1));
+ WORK *newlist = (WORK *) sm_realloc((char *) WorkList,
+ (unsigned) sizeof(WORK) * (newsize + 1));
if (newlist != NULL)
{
@@ -1521,7 +2937,7 @@ grow_wlist(queuedir)
{
sm_syslog(LOG_INFO, NOQID,
"grew WorkList for %s to %d",
- qid_printqueue(queuedir),
+ qid_printqueue(qgrp, qdir),
WorkListSize);
}
}
@@ -1529,13 +2945,13 @@ grow_wlist(queuedir)
{
sm_syslog(LOG_ALERT, NOQID,
"FAILED to grow WorkList for %s to %d",
- qid_printqueue(queuedir), newsize);
+ qid_printqueue(qgrp, qdir), newsize);
}
}
if (tTd(41, 1))
- dprintf("grow_wlist: WorkListSize now %d\n", WorkListSize);
+ sm_dprintf("grow_wlist: WorkListSize now %d\n", WorkListSize);
}
- /*
+/*
** WORKCMPF0 -- simple priority-only compare function.
**
** Parameters:
@@ -1547,8 +2963,6 @@ grow_wlist(queuedir)
** 0 if a == b
** +1 if a > b
**
-** Side Effects:
-** none.
*/
static int
@@ -1566,7 +2980,7 @@ workcmpf0(a, b)
else
return -1;
}
- /*
+/*
** WORKCMPF1 -- first compare function for ordering work based on host name.
**
** Sorts on host name, lock status, and priority in that order.
@@ -1580,8 +2994,6 @@ workcmpf0(a, b)
** 0 if a == b
** >0 if a > b
**
-** Side Effects:
-** none.
*/
static int
@@ -1607,7 +3019,7 @@ workcmpf1(a, b)
/* job priority */
return workcmpf0(a, b);
}
- /*
+/*
** WORKCMPF2 -- second compare function for ordering work based on host name.
**
** Sorts on lock status, host name, and priority in that order.
@@ -1621,8 +3033,6 @@ workcmpf1(a, b)
** 0 if a == b
** >0 if a > b
**
-** Side Effects:
-** none.
*/
static int
@@ -1648,7 +3058,7 @@ workcmpf2(a, b)
/* job priority */
return workcmpf0(a, b);
}
- /*
+/*
** WORKCMPF3 -- simple submission-time-only compare function.
**
** Parameters:
@@ -1660,8 +3070,6 @@ workcmpf2(a, b)
** 0 if a == b
** +1 if a > b
**
-** Side Effects:
-** none.
*/
static int
@@ -1676,7 +3084,7 @@ workcmpf3(a, b)
else
return 0;
}
- /*
+/*
** WORKCMPF4 -- compare based on file name
**
** Parameters:
@@ -1688,8 +3096,6 @@ workcmpf3(a, b)
** 0 if a == b
** +1 if a > b
**
-** Side Effects:
-** none.
*/
static int
@@ -1699,7 +3105,93 @@ workcmpf4(a, b)
{
return strcmp(a->w_name, b->w_name);
}
- /*
+/*
+** WORKCMPF5 -- compare based on assigned random number
+**
+** Parameters:
+** a -- the first argument (ignored).
+** b -- the second argument (ignored).
+**
+** Returns:
+** randomly 1/-1
+*/
+
+/* ARGSUSED0 */
+static int
+workcmpf5(a, b)
+ register WORK *a;
+ register WORK *b;
+{
+ return (get_rand_mod(2)) ? 1 : -1;
+}
+/*
+** WORKCMPF6 -- simple modification-time-only compare function.
+**
+** Parameters:
+** a -- the first argument.
+** b -- the second argument.
+**
+** Returns:
+** -1 if a < b
+** 0 if a == b
+** +1 if a > b
+**
+*/
+
+static int
+workcmpf6(a, b)
+ register WORK *a;
+ register WORK *b;
+{
+ if (a->w_mtime > b->w_mtime)
+ return 1;
+ else if (a->w_mtime < b->w_mtime)
+ return -1;
+ else
+ return 0;
+}
+#if _FFR_RHS
+/*
+** WORKCMPF7 -- compare function for ordering work based on shuffled host name.
+**
+** Sorts on lock status, host name, and priority in that order.
+**
+** Parameters:
+** a -- the first argument.
+** b -- the second argument.
+**
+** Returns:
+** <0 if a < b
+** 0 if a == b
+** >0 if a > b
+**
+*/
+
+static int
+workcmpf7(a, b)
+ register WORK *a;
+ register WORK *b;
+{
+ int i;
+
+ /* lock status */
+ if (a->w_lock != b->w_lock)
+ return a->w_lock - b->w_lock;
+
+ /* host name */
+ if (a->w_host != NULL && b->w_host == NULL)
+ return 1;
+ else if (a->w_host == NULL && b->w_host != NULL)
+ return -1;
+ if (a->w_host != NULL && b->w_host != NULL &&
+ (i = sm_strshufflecmp(a->w_host, b->w_host)) != 0)
+ return i;
+
+ /* job priority */
+ return workcmpf0(a, b);
+}
+#endif /* _FFR_RHS */
+/*
** STRREV -- reverse string
**
** Returns a pointer to a new string that is the reverse of
@@ -1727,11 +3219,71 @@ strrev(fwd)
rev[len] = '\0';
return rev;
}
- /*
+
+#if _FFR_RHS
+
+#define NASCII 128
+#define NCHAR 256
+
+static unsigned char ShuffledAlphabet[NCHAR];
+
+void
+init_shuffle_alphabet()
+{
+ static bool init = false;
+ int i;
+
+ if (init)
+ return;
+
+ /* fill the ShuffledAlphabet */
+ for (i = 0; i < NCHAR; i++)
+ ShuffledAlphabet[i] = i;
+
+ /* mix it */
+ for (i = 1; i < NCHAR; i++)
+ {
+ register int j = get_random() % NCHAR;
+ register int tmp;
+
+ tmp = ShuffledAlphabet[j];
+ ShuffledAlphabet[j] = ShuffledAlphabet[i];
+ ShuffledAlphabet[i] = tmp;
+ }
+
+ /* make it case insensitive */
+ for (i = 'A'; i <= 'Z'; i++)
+ ShuffledAlphabet[i] = ShuffledAlphabet[i + 'a' - 'A'];
+
+ /* fill the upper part */
+ for (i = 0; i < NCHAR; i++)
+ ShuffledAlphabet[i + NCHAR] = ShuffledAlphabet[i];
+ init = true;
+}
+
+static int
+sm_strshufflecmp(a, b)
+ char *a;
+ char *b;
+{
+ const unsigned char *us1 = (const unsigned char *) a;
+ const unsigned char *us2 = (const unsigned char *) b;
+
+ while (ShuffledAlphabet[*us1] == ShuffledAlphabet[*us2++])
+ {
+ if (*us1++ == '\0')
+ return 0;
+ }
+ return (ShuffledAlphabet[*us1] - ShuffledAlphabet[*--us2]);
+}
+#endif /* _FFR_RHS */
+
+/*
** DOWORK -- do a work request.
**
** Parameters:
-** queuedir -- the index of the queue directory for the job.
+** qgrp -- the index of the queue group for the job.
+** qdir -- the index of the queue directory for the job.
** id -- the ID of the job to run.
** forkflag -- if set, run this in background.
** requeueflag -- if set, reinstantiate the queue quickly.
@@ -1748,17 +3300,19 @@ strrev(fwd)
*/
pid_t
-dowork(queuedir, id, forkflag, requeueflag, e)
- int queuedir;
+dowork(qgrp, qdir, id, forkflag, requeueflag, e)
+ int qgrp;
+ int qdir;
char *id;
bool forkflag;
bool requeueflag;
register ENVELOPE *e;
{
register pid_t pid;
+ SM_RPOOL_T *rpool;
if (tTd(40, 1))
- dprintf("dowork(%s/%s)\n", qid_printqueue(queuedir), id);
+ sm_dprintf("dowork(%s/%s)\n", qid_printqueue(qgrp, qdir), id);
/*
** Fork for work.
@@ -1774,7 +3328,7 @@ dowork(queuedir, id, forkflag, requeueflag, e)
** child will dynamically open them if necessary.
*/
- closemaps();
+ closemaps(false);
pid = fork();
if (pid < 0)
@@ -1785,12 +3339,38 @@ dowork(queuedir, id, forkflag, requeueflag, e)
else if (pid > 0)
{
/* parent -- clean out connection cache */
- mci_flush(FALSE, NULL);
+ mci_flush(false, NULL);
}
else
{
+ /*
+ ** Initialize exception stack and default exception
+ ** handler for child process.
+ */
+
+ /* Reset global flags */
+ RestartRequest = NULL;
+ RestartWorkGroup = false;
+ ShutdownRequest = NULL;
+ PendingSignal = 0;
+ CurrentPid = getpid();
+ sm_exc_newthread(fatal_error);
+
+ /*
+ ** See note above about SMTP processes and SIGCHLD.
+ */
+
+ if (OpMode == MD_SMTP ||
+ OpMode == MD_DAEMON ||
+ MaxQueueChildren > 0)
+ {
+ proc_list_clear();
+ sm_releasesignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, SIG_DFL);
+ }
+
/* child -- error messages to the transcript */
- QuickAbort = OnlyOneError = FALSE;
+ QuickAbort = OnlyOneError = false;
}
}
else
@@ -1808,94 +3388,306 @@ dowork(queuedir, id, forkflag, requeueflag, e)
** can recover on interrupt.
*/
- /* Reset global flags */
- RestartRequest = NULL;
- ShutdownRequest = NULL;
- PendingSignal = 0;
+ if (forkflag)
+ {
+ /* Reset global flags */
+ RestartRequest = NULL;
+ RestartWorkGroup = false;
+ ShutdownRequest = NULL;
+ PendingSignal = 0;
+ }
/* set basic modes, etc. */
- (void) alarm(0);
+ sm_clear_events();
clearstats();
- clearenvelope(e, FALSE);
+ rpool = sm_rpool_new_x(NULL);
+ clearenvelope(e, false, rpool);
e->e_flags |= EF_QUEUERUN|EF_GLOBALERRS;
set_delivery_mode(SM_DELIVER, e);
e->e_errormode = EM_MAIL;
e->e_id = id;
- e->e_queuedir = queuedir;
- GrabTo = UseErrorsTo = FALSE;
+ e->e_qgrp = qgrp;
+ e->e_qdir = qdir;
+ GrabTo = UseErrorsTo = false;
ExitStat = EX_OK;
if (forkflag)
{
disconnect(1, e);
- OpMode = MD_QUEUERUN;
+ set_op_mode(MD_QUEUERUN);
}
- sm_setproctitle(TRUE, e, "%s: from queue", qid_printname(e));
+ sm_setproctitle(true, e, "%s from queue", qid_printname(e));
if (LogLevel > 76)
- sm_syslog(LOG_DEBUG, e->e_id,
- "dowork, pid=%d",
- (int) getpid());
+ sm_syslog(LOG_DEBUG, e->e_id, "dowork, pid=%d",
+ (int) CurrentPid);
/* don't use the headers from sendmail.cf... */
e->e_header = NULL;
/* read the queue control file -- return if locked */
- if (!readqf(e))
+ if (!readqf(e, false))
{
if (tTd(40, 4) && e->e_id != NULL)
- dprintf("readqf(%s) failed\n",
+ sm_dprintf("readqf(%s) failed\n",
qid_printname(e));
e->e_id = NULL;
if (forkflag)
- finis(FALSE, EX_OK);
+ finis(false, true, EX_OK);
else
+ {
+ /* adding this frees 8 bytes */
+ clearenvelope(e, false, rpool);
+
+ /* adding this frees 12 bytes */
+ sm_rpool_free(rpool);
+ e->e_rpool = NULL;
return 0;
+ }
}
e->e_flags |= EF_INQUEUE;
- eatheader(e, requeueflag);
+ eatheader(e, requeueflag, true);
if (requeueflag)
- queueup(e, FALSE);
+ queueup(e, false, false);
/* do the delivery */
sendall(e, SM_DELIVER);
/* finish up and exit */
if (forkflag)
- finis(TRUE, ExitStat);
+ finis(true, true, ExitStat);
else
- dropenvelope(e, TRUE);
+ {
+ dropenvelope(e, true, false);
+ sm_rpool_free(rpool);
+ e->e_rpool = NULL;
+ }
}
e->e_id = NULL;
return pid;
}
- /*
+
+/*
+** DOWORKLIST -- process a list of envelopes as work requests
+**
+** Similar to dowork(), except that after forking, it processes an
+** envelope and its siblings, treating each envelope as a work request.
+**
+** Parameters:
+** el -- envelope to be processed including its siblings.
+** forkflag -- if set, run this in background.
+** requeueflag -- if set, reinstantiate the queue quickly.
+** This is used when expanding aliases in the queue.
+** If forkflag is also set, it doesn't wait for the
+** child.
+**
+** Returns:
+** process id of process that is running the queue job.
+**
+** Side Effects:
+** The work request is satisfied if possible.
+*/
+
+pid_t
+doworklist(el, forkflag, requeueflag)
+ ENVELOPE *el;
+ bool forkflag;
+ bool requeueflag;
+{
+ register pid_t pid;
+ ENVELOPE *ei;
+
+ if (tTd(40, 1))
+ sm_dprintf("doworklist()\n");
+
+ /*
+ ** Fork for work.
+ */
+
+ if (forkflag)
+ {
+ /*
+ ** Since the delivery may happen in a child and the
+ ** parent does not wait, the parent may close the
+ ** maps thereby removing any shared memory used by
+ ** the map. Therefore, close the maps now so the
+ ** child will dynamically open them if necessary.
+ */
+
+ closemaps(false);
+
+ pid = fork();
+ if (pid < 0)
+ {
+ syserr("doworklist: cannot fork");
+ return 0;
+ }
+ else if (pid > 0)
+ {
+ /* parent -- clean out connection cache */
+ mci_flush(false, NULL);
+ }
+ else
+ {
+ /*
+ ** Initialize exception stack and default exception
+ ** handler for child process.
+ */
+
+ /* Reset global flags */
+ RestartRequest = NULL;
+ RestartWorkGroup = false;
+ ShutdownRequest = NULL;
+ PendingSignal = 0;
+ CurrentPid = getpid();
+ sm_exc_newthread(fatal_error);
+
+ /*
+ ** See note above about SMTP processes and SIGCHLD.
+ */
+
+ if (OpMode == MD_SMTP ||
+ OpMode == MD_DAEMON ||
+ MaxQueueChildren > 0)
+ {
+ proc_list_clear();
+ sm_releasesignal(SIGCHLD);
+ (void) sm_signal(SIGCHLD, SIG_DFL);
+ }
+
+ /* child -- error messages to the transcript */
+ QuickAbort = OnlyOneError = false;
+ }
+ }
+ else
+ {
+ pid = 0;
+ }
+
+ if (pid != 0)
+ return pid;
+
+ /*
+ ** IN CHILD
+ ** Lock the control file to avoid duplicate deliveries.
+ ** Then run the file as though we had just read it.
+ ** We save an idea of the temporary name so we
+ ** can recover on interrupt.
+ */
+
+ if (forkflag)
+ {
+ /* Reset global flags */
+ RestartRequest = NULL;
+ RestartWorkGroup = false;
+ ShutdownRequest = NULL;
+ PendingSignal = 0;
+ }
+
+ /* set basic modes, etc. */
+ sm_clear_events();
+ clearstats();
+ GrabTo = UseErrorsTo = false;
+ ExitStat = EX_OK;
+ if (forkflag)
+ {
+ disconnect(1, el);
+ set_op_mode(MD_QUEUERUN);
+ }
+ if (LogLevel > 76)
+ sm_syslog(LOG_DEBUG, el->e_id, "doworklist, pid=%d",
+ (int) CurrentPid);
+
+ for (ei = el; ei != NULL; ei = ei->e_sibling)
+ {
+ ENVELOPE e;
+ SM_RPOOL_T *rpool;
+
+ if (WILL_BE_QUEUED(ei->e_sendmode))
+ continue;
+#if _FFR_QUARANTINE
+ else if (QueueMode != QM_QUARANTINE &&
+ ei->e_quarmsg != NULL)
+ continue;
+#endif /* _FFR_QUARANTINE */
+
+ rpool = sm_rpool_new_x(NULL);
+ clearenvelope(&e, true, rpool);
+ e.e_flags |= EF_QUEUERUN|EF_GLOBALERRS;
+ set_delivery_mode(SM_DELIVER, &e);
+ e.e_errormode = EM_MAIL;
+ e.e_id = ei->e_id;
+ e.e_qgrp = ei->e_qgrp;
+ e.e_qdir = ei->e_qdir;
+ openxscript(&e);
+ sm_setproctitle(true, &e, "%s from queue", qid_printname(&e));
+
+ /* don't use the headers from sendmail.cf... */
+ e.e_header = NULL;
+ CurEnv = &e;
+
+ /* read the queue control file -- return if locked */
+ if (readqf(&e, false))
+ {
+ e.e_flags |= EF_INQUEUE;
+ eatheader(&e, requeueflag, true);
+
+ if (requeueflag)
+ queueup(&e, false, false);
+
+ /* do the delivery */
+ sendall(&e, SM_DELIVER);
+ dropenvelope(&e, true, false);
+ }
+ else
+ {
+ if (tTd(40, 4) && e.e_id != NULL)
+ sm_dprintf("readqf(%s) failed\n",
+ qid_printname(&e));
+ }
+ sm_rpool_free(rpool);
+ ei->e_id = NULL;
+ }
+
+ /* restore CurEnv */
+ CurEnv = el;
+
+ /* finish up and exit */
+ if (forkflag)
+ finis(true, true, ExitStat);
+ return 0;
+}
+/*
** READQF -- read queue file and set up environment.
**
** Parameters:
** e -- the envelope of the job to run.
+** openonly -- only open the qf (returned as e_lockfp)
**
** Returns:
-** TRUE if it successfully read the queue file.
-** FALSE otherwise.
+** true if it successfully read the queue file.
+** false otherwise.
**
** Side Effects:
** The queue file is returned locked.
*/
static bool
-readqf(e)
+readqf(e, openonly)
register ENVELOPE *e;
+ bool openonly;
{
- register FILE *qfp;
+ register SM_FILE_T *qfp;
ADDRESS *ctladdr;
struct stat st, stf;
char *bp;
int qfver = 0;
long hdrsize = 0;
register char *p;
+ char *frcpt = NULL;
char *orcpt = NULL;
- bool nomore = FALSE;
+ bool nomore = false;
+ bool bogus = false;
MODE_T qsafe;
char qf[MAXPATHLEN];
char buf[MAXLINE];
@@ -1904,33 +3696,37 @@ readqf(e)
** Read and process the file.
*/
- (void) strlcpy(qf, queuename(e, 'q'), sizeof qf);
- qfp = fopen(qf, "r+");
+ (void) sm_strlcpy(qf, queuename(e, ANYQFL_LETTER), sizeof qf);
+ qfp = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDWR, NULL);
if (qfp == NULL)
{
int save_errno = errno;
if (tTd(40, 8))
- dprintf("readqf(%s): fopen failure (%s)\n",
- qf, errstring(errno));
+ sm_dprintf("readqf(%s): sm_io_open failure (%s)\n",
+ qf, sm_errstring(errno));
errno = save_errno;
if (errno != ENOENT
)
syserr("readqf: no control file %s", qf);
- return FALSE;
+ RELEASE_QUEUE;
+ return false;
}
- if (!lockfile(fileno(qfp), qf, NULL, LOCK_EX|LOCK_NB))
+ if (!lockfile(sm_io_getinfo(qfp, SM_IO_WHAT_FD, NULL), qf, NULL,
+ LOCK_EX|LOCK_NB))
{
/* being processed by another queuer */
if (Verbose)
- printf("%s: locked\n", e->e_id);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "%s: locked\n", e->e_id);
if (tTd(40, 8))
- dprintf("%s: locked\n", e->e_id);
+ sm_dprintf("%s: locked\n", e->e_id);
if (LogLevel > 19)
sm_syslog(LOG_DEBUG, e->e_id, "locked");
- (void) fclose(qfp);
- return FALSE;
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
/*
@@ -1952,35 +3748,38 @@ readqf(e)
*/
if (stat(qf, &stf) < 0 ||
- fstat(fileno(qfp), &st) < 0)
+ fstat(sm_io_getinfo(qfp, SM_IO_WHAT_FD, NULL), &st) < 0)
{
/* must have been being processed by someone else */
if (tTd(40, 8))
- dprintf("readqf(%s): [f]stat failure (%s)\n",
- qf, errstring(errno));
- (void) fclose(qfp);
- return FALSE;
+ sm_dprintf("readqf(%s): [f]stat failure (%s)\n",
+ qf, sm_errstring(errno));
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
if (st.st_nlink != stf.st_nlink ||
st.st_dev != stf.st_dev ||
- st.st_ino != stf.st_ino ||
-# if HAS_ST_GEN && 0 /* AFS returns garbage in st_gen */
+ ST_INODE(st) != ST_INODE(stf) ||
+#if HAS_ST_GEN && 0 /* AFS returns garbage in st_gen */
st.st_gen != stf.st_gen ||
-# endif /* HAS_ST_GEN && 0 */
+#endif /* HAS_ST_GEN && 0 */
st.st_uid != stf.st_uid ||
st.st_gid != stf.st_gid ||
st.st_size != stf.st_size)
{
/* changed after opened */
if (Verbose)
- printf("%s: changed\n", e->e_id);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "%s: changed\n", e->e_id);
if (tTd(40, 8))
- dprintf("%s: changed\n", e->e_id);
+ sm_dprintf("%s: changed\n", e->e_id);
if (LogLevel > 19)
sm_syslog(LOG_DEBUG, e->e_id, "changed");
- (void) fclose(qfp);
- return FALSE;
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
/*
@@ -1988,39 +3787,78 @@ readqf(e)
*/
qsafe = S_IWOTH|S_IWGRP;
-#if _FFR_QUEUE_FILE_MODE
if (bitset(S_IWGRP, QueueFileMode))
qsafe &= ~S_IWGRP;
-#endif /* _FFR_QUEUE_FILE_MODE */
- if ((st.st_uid != geteuid() &&
- st.st_uid != TrustedUid &&
- geteuid() != RealUid) ||
- bitset(qsafe, st.st_mode))
+ bogus = st.st_uid != geteuid() &&
+ st.st_uid != TrustedUid &&
+ geteuid() != RealUid;
+
+ /*
+ ** If this qf file results from a set-group-ID binary, then
+ ** we check whether the directory is group-writable,
+ ** the queue file mode contains the group-writable bit, and
+ ** the groups are the same.
+ ** Notice: this requires that the set-group-ID binary is used to
+ ** run the queue!
+ */
+
+ if (bogus && st.st_gid == getegid() && UseMSP)
+ {
+ char delim;
+ struct stat dst;
+
+ bp = SM_LAST_DIR_DELIM(qf);
+ if (bp == NULL)
+ delim = '\0';
+ else
+ {
+ delim = *bp;
+ *bp = '\0';
+ }
+ if (stat(delim == '\0' ? "." : qf, &dst) < 0)
+ syserr("readqf: cannot stat directory %s",
+ delim == '\0' ? "." : qf);
+ else
+ {
+ bogus = !(bitset(S_IWGRP, QueueFileMode) &&
+ bitset(S_IWGRP, dst.st_mode) &&
+ dst.st_gid == st.st_gid);
+ }
+ if (delim != '\0')
+ *bp = delim;
+ }
+ if (!bogus)
+ bogus = bitset(qsafe, st.st_mode);
+ if (bogus)
{
if (LogLevel > 0)
{
sm_syslog(LOG_ALERT, e->e_id,
- "bogus queue file, uid=%d, mode=%o",
- st.st_uid, st.st_mode);
+ "bogus queue file, uid=%d, gid=%d, mode=%o",
+ st.st_uid, st.st_gid, st.st_mode);
}
if (tTd(40, 8))
- dprintf("readqf(%s): bogus file\n", qf);
- loseqfile(e, "bogus file uid in mqueue");
- (void) fclose(qfp);
- return FALSE;
+ sm_dprintf("readqf(%s): bogus file\n", qf);
+ e->e_flags |= EF_INQUEUE;
+ if (!openonly)
+ loseqfile(e, "bogus file uid/gid in mqueue");
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
if (st.st_size == 0)
{
/* must be a bogus file -- if also old, just remove it */
- if (st.st_ctime + 10 * 60 < curtime())
+ if (!openonly && st.st_ctime + 10 * 60 < curtime())
{
- (void) xunlink(queuename(e, 'd'));
- (void) xunlink(queuename(e, 'q'));
+ (void) xunlink(queuename(e, DATAFL_LETTER));
+ (void) xunlink(queuename(e, ANYQFL_LETTER));
}
- (void) fclose(qfp);
- return FALSE;
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
if (st.st_nlink == 0)
@@ -2030,151 +3868,203 @@ readqf(e)
** unlinked. Just assume it is zero length.
*/
- (void) fclose(qfp);
- return FALSE;
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
+#if _FFR_TRUSTED_QF
+ /*
+ ** If we don't own the file mark it as unsafe.
+ ** However, allow TrustedUser to own it as well
+ ** in case TrustedUser manipulates the queue.
+ */
+
+ if (st.st_uid != geteuid() && st.st_uid != TrustedUid)
+ e->e_flags |= EF_UNSAFE;
+#else /* _FFR_TRUSTED_QF */
+ /* If we don't own the file mark it as unsafe */
+ if (st.st_uid != geteuid())
+ e->e_flags |= EF_UNSAFE;
+#endif /* _FFR_TRUSTED_QF */
+
/* good file -- save this lock */
e->e_lockfp = qfp;
+ /* Just wanted the open file */
+ if (openonly)
+ return true;
+
/* do basic system initialization */
initsys(e);
- define('i', e->e_id, e);
+ macdefine(&e->e_macro, A_PERM, 'i', e->e_id);
LineNumber = 0;
e->e_flags |= EF_GLOBALERRS;
- OpMode = MD_QUEUERUN;
+ set_op_mode(MD_QUEUERUN);
ctladdr = NULL;
+#if _FFR_QUARANTINE
+ e->e_qfletter = queue_letter(e, ANYQFL_LETTER);
+#endif /* _FFR_QUARANTINE */
+ e->e_dfqgrp = e->e_qgrp;
+ e->e_dfqdir = e->e_qdir;
+#if _FFR_QUEUE_MACRO
+ macdefine(&e->e_macro, A_TEMP, macid("{queue}"),
+ qid_printqueue(e->e_qgrp, e->e_qdir));
+#endif /* _FFR_QUEUE_MACRO */
e->e_dfino = -1;
e->e_msgsize = -1;
-# if _FFR_QUEUEDELAY
+#if _FFR_QUEUEDELAY
e->e_queuealg = QD_LINEAR;
e->e_queuedelay = (time_t) 0;
-# endif /* _FFR_QUEUEDELAY */
+#endif /* _FFR_QUEUEDELAY */
while ((bp = fgetfolded(buf, sizeof buf, qfp)) != NULL)
{
- u_long qflags;
+ unsigned long qflags;
ADDRESS *q;
- int mid;
+ int r;
time_t now;
auto char *ep;
if (tTd(40, 4))
- dprintf("+++++ %s\n", bp);
+ sm_dprintf("+++++ %s\n", bp);
if (nomore)
{
/* hack attack */
- syserr("SECURITY ALERT: extra data in qf: %s", bp);
- (void) fclose(qfp);
+ hackattack:
+ syserr("SECURITY ALERT: extra or bogus data in queue file: %s",
+ bp);
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+
+ /* the file is already on disk */
+ e->e_flags |= EF_INQUEUE;
loseqfile(e, "bogus queue line");
- return FALSE;
+ RELEASE_QUEUE;
+ return false;
}
switch (bp[0])
{
- case 'V': /* queue file version number */
- qfver = atoi(&bp[1]);
- if (qfver <= QF_VERSION)
- break;
- syserr("Version number in qf (%d) greater than max (%d)",
- qfver, QF_VERSION);
- (void) fclose(qfp);
- loseqfile(e, "unsupported qf file version");
- return FALSE;
+ case 'A': /* AUTH= parameter */
+ if (!xtextok(&bp[1]))
+ goto hackattack;
+ e->e_auth_param = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ break;
+
+ case 'B': /* body type */
+ r = check_bodytype(&bp[1]);
+ if (!BODYTYPE_VALID(r))
+ goto hackattack;
+ e->e_bodytype = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ break;
case 'C': /* specify controlling user */
- ctladdr = setctluser(&bp[1], qfver);
+ ctladdr = setctluser(&bp[1], qfver, e);
break;
- case 'Q': /* original recipient */
- orcpt = newstr(&bp[1]);
+ case 'D': /* data file name */
+ /* obsolete -- ignore */
break;
- case 'R': /* specify recipient */
- p = bp;
- qflags = 0;
- if (qfver >= 1)
+ case 'd': /* data file directory name */
{
- /* get flag bits */
- while (*++p != '\0' && *p != ':')
+ int qgrp, qdir;
+
+#if _FFR_MSP_PARANOIA
+ /* forbid queue groups in MSP? */
+ if (UseMSP)
+ goto hackattack;
+#endif /* _FFR_MSP_PARANOIA */
+ for (qgrp = 0;
+ qgrp < NumQueue && Queue[qgrp] != NULL;
+ ++qgrp)
{
- switch (*p)
+ for (qdir = 0;
+ qdir < Queue[qgrp]->qg_numqueues;
+ ++qdir)
{
- case 'N':
- qflags |= QHASNOTIFY;
- break;
-
- case 'S':
- qflags |= QPINGONSUCCESS;
- break;
-
- case 'F':
- qflags |= QPINGONFAILURE;
- break;
-
- case 'D':
- qflags |= QPINGONDELAY;
- break;
-
- case 'P':
- qflags |= QPRIMARY;
- break;
-
- case 'A':
- if (ctladdr != NULL)
- ctladdr->q_flags |= QALIAS;
- break;
+ if (strcmp(&bp[1],
+ Queue[qgrp]->qg_qpaths[qdir].qp_name)
+ == 0)
+ {
+ e->e_dfqgrp = qgrp;
+ e->e_dfqdir = qdir;
+ goto done;
+ }
}
}
+ loseqfile(e, "bogus queue file directory");
+ RELEASE_QUEUE;
+ return false;
+ done:
+ break;
}
- else
- qflags |= QPRIMARY;
- q = parseaddr(++p, NULLADDR, RF_COPYALL, '\0', NULL, e);
- if (q != NULL)
- {
- q->q_alias = ctladdr;
- if (qfver >= 1)
- q->q_flags &= ~Q_PINGFLAGS;
- q->q_flags |= qflags;
- q->q_orcpt = orcpt;
- (void) recipient(q, &e->e_sendqueue, 0, e);
- }
- orcpt = NULL;
- break;
case 'E': /* specify error recipient */
/* no longer used */
break;
- case 'H': /* header */
- (void) chompheader(&bp[1], CHHDR_QUEUE, NULL, e);
- hdrsize += strlen(&bp[1]);
- break;
+ case 'F': /* flag bits */
+ if (strncmp(bp, "From ", 5) == 0)
+ {
+ /* we are being spoofed! */
+ syserr("SECURITY ALERT: bogus qf line %s", bp);
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ loseqfile(e, "bogus queue line");
+ RELEASE_QUEUE;
+ return false;
+ }
+ for (p = &bp[1]; *p != '\0'; p++)
+ {
+ switch (*p)
+ {
+ case '8': /* has 8 bit data */
+ e->e_flags |= EF_HAS8BIT;
+ break;
- case 'L': /* Solaris Content-Length: */
- case 'M': /* message */
- /* ignore this; we want a new message next time */
- break;
+ case 'b': /* delete Bcc: header */
+ e->e_flags |= EF_DELETE_BCC;
+ break;
- case 'S': /* sender */
- setsender(newstr(&bp[1]), e, NULL, '\0', TRUE);
- break;
+ case 'd': /* envelope has DSN RET= */
+ e->e_flags |= EF_RET_PARAM;
+ break;
- case 'B': /* body type */
- e->e_bodytype = newstr(&bp[1]);
+ case 'n': /* don't return body */
+ e->e_flags |= EF_NO_BODY_RETN;
+ break;
+
+ case 'r': /* response */
+ e->e_flags |= EF_RESPONSE;
+ break;
+
+ case 's': /* split */
+ e->e_flags |= EF_SPLIT;
+ break;
+
+ case 'w': /* warning sent */
+ e->e_flags |= EF_WARNING;
+ break;
+ }
+ }
break;
-# if _FFR_SAVE_CHARSET
- case 'X': /* character set */
- e->e_charset = newstr(&bp[1]);
+#if _FFR_QUEUEDELAY
+ case 'G': /* queue delay algorithm */
+ e->e_queuealg = atoi(&buf[1]);
break;
-# endif /* _FFR_SAVE_CHARSET */
+#endif /* _FFR_QUEUEDELAY */
- case 'D': /* data file name */
- /* obsolete -- ignore */
+#if _FFR_QUARANTINE
+ case 'q': /* quarantine reason */
+ e->e_quarmsg = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ macdefine(&e->e_macro, A_PERM,
+ macid("{quarantine}"), e->e_quarmsg);
break;
+#endif /* _FFR_QUARANTINE */
- case 'T': /* init time */
- e->e_ctime = atol(&bp[1]);
+ case 'H': /* header */
+ (void) chompheader(&bp[1], CHHDR_QUEUE, NULL, e);
+ hdrsize += strlen(&bp[1]);
break;
case 'I': /* data file's inode number */
@@ -2185,14 +4075,10 @@ readqf(e)
e->e_dtime = atol(&buf[1]);
break;
-# if _FFR_QUEUEDELAY
- case 'G': /* queue delay algorithm */
- e->e_queuealg = atoi(&buf[1]);
- break;
- case 'Y': /* current delay */
- e->e_queuedelay = (time_t) atol(&buf[1]);
+ case 'L': /* Solaris Content-Length: */
+ case 'M': /* message */
+ /* ignore this; we want a new message next time */
break;
-# endif /* _FFR_QUEUEDELAY */
case 'N': /* number of delivery attempts */
e->e_ntries = atoi(&buf[1]);
@@ -2204,12 +4090,14 @@ readqf(e)
{
char *howlong;
- howlong = pintvl(now - e->e_dtime, TRUE);
+ howlong = pintvl(now - e->e_dtime, true);
if (Verbose)
- printf("%s: too young (%s)\n",
- e->e_id, howlong);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%s: too young (%s)\n",
+ e->e_id, howlong);
if (tTd(40, 8))
- dprintf("%s: too young (%s)\n",
+ sm_dprintf("%s: too young (%s)\n",
e->e_id, howlong);
if (LogLevel > 19)
sm_syslog(LOG_DEBUG, e->e_id,
@@ -2217,11 +4105,13 @@ readqf(e)
howlong);
e->e_id = NULL;
unlockqueue(e);
- return FALSE;
+ RELEASE_QUEUE;
+ return false;
}
- define(macid("{ntries}", NULL), newstr(&buf[1]), e);
+ macdefine(&e->e_macro, A_TEMP,
+ macid("{ntries}"), &buf[1]);
-# if NAMED_BIND
+#if NAMED_BIND
/* adjust BIND parameters immediately */
if (e->e_ntries == 0)
{
@@ -2233,108 +4123,151 @@ readqf(e)
_res.retry = TimeOuts.res_retry[RES_TO_NORMAL];
_res.retrans = TimeOuts.res_retrans[RES_TO_NORMAL];
}
-# endif /* NAMED_BIND */
+#endif /* NAMED_BIND */
break;
case 'P': /* message priority */
e->e_msgpriority = atol(&bp[1]) + WkTimeFact;
break;
- case 'F': /* flag bits */
- if (strncmp(bp, "From ", 5) == 0)
- {
- /* we are being spoofed! */
- syserr("SECURITY ALERT: bogus qf line %s", bp);
- (void) fclose(qfp);
- loseqfile(e, "bogus queue line");
- return FALSE;
- }
- for (p = &bp[1]; *p != '\0'; p++)
+ case 'Q': /* original recipient */
+ orcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ break;
+
+ case 'r': /* original recipient */
+ frcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ break;
+
+ case 'R': /* specify recipient */
+ p = bp;
+ qflags = 0;
+ if (qfver >= 1)
{
- switch (*p)
+ /* get flag bits */
+ while (*++p != '\0' && *p != ':')
{
- case 'w': /* warning sent */
- e->e_flags |= EF_WARNING;
- break;
+ switch (*p)
+ {
+ case 'N':
+ qflags |= QHASNOTIFY;
+ break;
- case 'r': /* response */
- e->e_flags |= EF_RESPONSE;
- break;
+ case 'S':
+ qflags |= QPINGONSUCCESS;
+ break;
- case '8': /* has 8 bit data */
- e->e_flags |= EF_HAS8BIT;
- break;
+ case 'F':
+ qflags |= QPINGONFAILURE;
+ break;
- case 'b': /* delete Bcc: header */
- e->e_flags |= EF_DELETE_BCC;
- break;
+ case 'D':
+ qflags |= QPINGONDELAY;
+ break;
- case 'd': /* envelope has DSN RET= */
- e->e_flags |= EF_RET_PARAM;
- break;
+ case 'P':
+ qflags |= QPRIMARY;
+ break;
- case 'n': /* don't return body */
- e->e_flags |= EF_NO_BODY_RETN;
- break;
+ case 'A':
+ if (ctladdr != NULL)
+ ctladdr->q_flags |= QALIAS;
+ break;
+
+ default: /* ignore or complain? */
+ break;
+ }
}
}
+ else
+ qflags |= QPRIMARY;
+ q = parseaddr(++p, NULLADDR, RF_COPYALL, '\0', NULL, e,
+ true);
+ if (q != NULL)
+ {
+ q->q_alias = ctladdr;
+ if (qfver >= 1)
+ q->q_flags &= ~Q_PINGFLAGS;
+ q->q_flags |= qflags;
+ q->q_finalrcpt = frcpt;
+ q->q_orcpt = orcpt;
+ (void) recipient(q, &e->e_sendqueue, 0, e);
+ }
+ frcpt = NULL;
+ orcpt = NULL;
break;
- case 'Z': /* original envelope id from ESMTP */
- e->e_envid = newstr(&bp[1]);
- define(macid("{dsn_envid}", NULL), newstr(&bp[1]), e);
+ case 'S': /* sender */
+ setsender(sm_rpool_strdup_x(e->e_rpool, &bp[1]),
+ e, NULL, '\0', true);
break;
- case 'A': /* AUTH= parameter */
- e->e_auth_param = newstr(&bp[1]);
+ case 'T': /* init time */
+ e->e_ctime = atol(&bp[1]);
+ break;
+
+ case 'V': /* queue file version number */
+ qfver = atoi(&bp[1]);
+ if (queuedelay_qfver_unsupported(qfver))
+ syserr("queue file version %d not supported: %s",
+ qfver,
+ "sendmail not compiled with _FFR_QUEUEDELAY");
+ if (qfver <= QF_VERSION)
+ break;
+ syserr("Version number in queue file (%d) greater than max (%d)",
+ qfver, QF_VERSION);
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ loseqfile(e, "unsupported queue file version");
+ RELEASE_QUEUE;
+ return false;
+ /* NOTREACHED */
+ break;
+
+#if _FFR_QUEUEDELAY
+ case 'Y': /* current delay */
+ e->e_queuedelay = (time_t) atol(&buf[1]);
+ break;
+#endif /* _FFR_QUEUEDELAY */
+
+ case 'Z': /* original envelope id from ESMTP */
+ e->e_envid = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
+ macdefine(&e->e_macro, A_PERM,
+ macid("{dsn_envid}"), e->e_envid);
break;
+ case '!': /* deliver by */
+
+ /* format: flag (1 char) space long-integer */
+ e->e_dlvr_flag = buf[1];
+ e->e_deliver_by = strtol(&buf[3], NULL, 10);
+
case '$': /* define macro */
{
char *p;
- mid = macid(&bp[1], &ep);
- if (mid == 0)
+ /* XXX elimate p? */
+ r = macid_parse(&bp[1], &ep);
+ if (r == 0)
break;
-
- p = newstr(ep);
- define(mid, p, e);
-
- /*
- ** HACK ALERT: Unfortunately, 8.10 and
- ** 8.11 reused the ${if_addr} and
- ** ${if_family} macros for both the incoming
- ** interface address/family (getrequests())
- ** and the outgoing interface address/family
- ** (makeconnection()). In order for D_BINDIF
- ** to work properly, have to preserve the
- ** incoming information in the queue file for
- ** later delivery attempts. The original
- ** information is stored in the envelope
- ** in readqf() so it can be stored in
- ** queueup_macros(). This should be fixed
- ** in 8.12.
- */
-
- if (strcmp(macname(mid), "if_addr") == 0)
- e->e_if_macros[EIF_ADDR] = p;
+ p = sm_rpool_strdup_x(e->e_rpool, ep);
+ macdefine(&e->e_macro, A_PERM, r, p);
}
break;
case '.': /* terminate file */
- nomore = TRUE;
+ nomore = true;
break;
default:
syserr("readqf: %s: line %d: bad line \"%s\"",
qf, LineNumber, shortenstring(bp, MAXSHORTSTR));
- (void) fclose(qfp);
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
loseqfile(e, "unrecognized line");
- return FALSE;
+ RELEASE_QUEUE;
+ return false;
}
if (bp != buf)
- sm_free(bp);
+ sm_free(bp); /* XXX */
}
/*
@@ -2345,25 +4278,38 @@ readqf(e)
if (LineNumber == 0)
{
errno = 0;
- e->e_flags |= EF_CLRQUEUE | EF_FATALERRS | EF_RESPONSE;
- return TRUE;
+ e->e_flags |= EF_CLRQUEUE|EF_FATALERRS|EF_RESPONSE;
+ RELEASE_QUEUE;
+ return true;
+ }
+
+ /* Check to make sure we have a complete queue file read */
+ if (!nomore)
+ {
+ syserr("readqf: %s: incomplete queue file read", qf);
+ (void) sm_io_close(qfp, SM_TIME_DEFAULT);
+ RELEASE_QUEUE;
+ return false;
}
/* possibly set ${dsn_ret} macro */
if (bitset(EF_RET_PARAM, e->e_flags))
{
if (bitset(EF_NO_BODY_RETN, e->e_flags))
- define(macid("{dsn_ret}", NULL), "hdrs", e);
+ macdefine(&e->e_macro, A_PERM,
+ macid("{dsn_ret}"), "hdrs");
else
- define(macid("{dsn_ret}", NULL), "full", e);
+ macdefine(&e->e_macro, A_PERM,
+ macid("{dsn_ret}"), "full");
}
/*
** Arrange to read the data file.
*/
- p = queuename(e, 'd');
- e->e_dfp = fopen(p, "r");
+ p = queuename(e, DATAFL_LETTER);
+ e->e_dfp = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, p, SM_IO_RDONLY,
+ NULL);
if (e->e_dfp == NULL)
{
syserr("readqf: cannot open %s", p);
@@ -2371,17 +4317,19 @@ readqf(e)
else
{
e->e_flags |= EF_HAS_DF;
- if (fstat(fileno(e->e_dfp), &st) >= 0)
+ if (fstat(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD, NULL), &st)
+ >= 0)
{
e->e_msgsize = st.st_size + hdrsize;
e->e_dfdev = st.st_dev;
- e->e_dfino = st.st_ino;
+ e->e_dfino = ST_INODE(st);
}
}
- return TRUE;
+ RELEASE_QUEUE;
+ return true;
}
- /*
+/*
** PRTSTR -- print a string, "unprintable" characters are shown as \oct
**
** Parameters:
@@ -2389,7 +4337,7 @@ readqf(e)
** ml -- maximum length of output
**
** Returns:
-** none.
+** number of entries
**
** Side Effects:
** Prints a string on stdout.
@@ -2400,7 +4348,7 @@ prtstr(s, ml)
char *s;
int ml;
{
- char c;
+ int c;
if (s == NULL)
return;
@@ -2410,20 +4358,104 @@ prtstr(s, ml)
{
if (ml-- > 0)
{
- putchar(c);
- putchar(c);
+ (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c);
+ (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c);
}
}
else if (isascii(c) && isprint(c))
- putchar(c);
+ (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c);
else
{
if ((ml -= 3) > 0)
- printf("\\%03o", c);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "\\%03o", c & 0xFF);
}
}
}
- /*
+/*
+** PRINTNQE -- print out number of entries in the mail queue
+**
+** Parameters:
+** out -- output file pointer.
+** prefix -- string to output in front of each line.
+**
+** Returns:
+** none.
+*/
+
+void
+printnqe(out, prefix)
+ SM_FILE_T *out;
+ char *prefix;
+{
+#if SM_CONF_SHM
+ int i, k = 0, nrequests = 0;
+ bool unknown = false;
+
+ if (ShmId == SM_SHM_NO_ID)
+ {
+ if (prefix == NULL)
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "Data unavailable: shared memory not updated\n");
+ else
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%sNOTCONFIGURED:-1\r\n", prefix);
+ return;
+ }
+ for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
+ {
+ int j;
+
+ k++;
+ for (j = 0; j < Queue[i]->qg_numqueues; j++)
+ {
+ int n;
+
+ if (StopRequest)
+ stop_sendmail();
+
+ n = QSHM_ENTRIES(Queue[i]->qg_qpaths[j].qp_idx);
+ if (prefix != NULL)
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%s%s:%d\r\n",
+ prefix, qid_printqueue(i, j), n);
+ else if (n < 0)
+ {
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%s: unknown number of entries\n",
+ qid_printqueue(i, j));
+ unknown = true;
+ }
+ else if (n == 0)
+ {
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%s is empty\n",
+ qid_printqueue(i, j));
+ }
+ else if (n > 0)
+ {
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%s: entries=%d\n",
+ qid_printqueue(i, j), n);
+ nrequests += n;
+ k++;
+ }
+ }
+ }
+ if (prefix == NULL && k > 1)
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "\t\tTotal requests: %d%s\n",
+ nrequests, unknown ? " (about)" : "");
+#else /* SM_CONF_SHM */
+ if (prefix == NULL)
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "Data unavailable without shared memory support\n");
+ else
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%sNOTAVAILABLE:-1\r\n", prefix);
+#endif /* SM_CONF_SHM */
+}
+/*
** PRINTQUEUE -- print out a representation of the mail queue
**
** Parameters:
@@ -2439,54 +4471,69 @@ prtstr(s, ml)
void
printqueue()
{
- int i, nrequests = 0;
+ int i, k = 0, nrequests = 0;
- for (i = 0; i < NumQueues; i++)
+ for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
{
- if (StopRequest)
- stop_sendmail();
- nrequests += print_single_queue(i);
+ int j;
+
+ k++;
+ for (j = 0; j < Queue[i]->qg_numqueues; j++)
+ {
+ if (StopRequest)
+ stop_sendmail();
+ nrequests += print_single_queue(i, j);
+ k++;
+ }
}
- if (NumQueues > 1)
- printf("\t\tTotal Requests: %d\n", nrequests);
+ if (k > 1)
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "\t\tTotal requests: %d\n",
+ nrequests);
}
- /*
+/*
** PRINT_SINGLE_QUEUE -- print out a representation of a single mail queue
**
** Parameters:
-** queuedir -- queue directory
+** qgrp -- the index of the queue group.
+** qdir -- the queue directory.
**
** Returns:
-** number of entries
+** number of requests in mail queue.
**
** Side Effects:
** Prints a listing of the mail queue on the standard output.
*/
-static int
-print_single_queue(queuedir)
- int queuedir;
+int
+print_single_queue(qgrp, qdir)
+ int qgrp;
+ int qdir;
{
register WORK *w;
- FILE *f;
+ SM_FILE_T *f;
int nrequests;
char qd[MAXPATHLEN];
char qddf[MAXPATHLEN];
char buf[MAXLINE];
- if (queuedir == NOQDIR)
+ if (qdir == NOQDIR)
{
- (void) strlcpy(qd, ".", sizeof qd);
- (void) strlcpy(qddf, ".", sizeof qddf);
+ (void) sm_strlcpy(qd, ".", sizeof qd);
+ (void) sm_strlcpy(qddf, ".", sizeof qddf);
}
else
{
- (void) snprintf(qd, sizeof qd, "%s%s",
- QPaths[queuedir].qp_name,
- (bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : ""));
- (void) snprintf(qddf, sizeof qddf, "%s%s",
- QPaths[queuedir].qp_name,
- (bitset(QP_SUBDF, QPaths[queuedir].qp_subdirs) ? "/df" : ""));
+ (void) sm_strlcpyn(qd, sizeof qd, 2,
+ Queue[qgrp]->qg_qpaths[qdir].qp_name,
+ (bitset(QP_SUBQF,
+ Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
+ ? "/qf" : ""));
+ (void) sm_strlcpyn(qddf, sizeof qddf, 2,
+ Queue[qgrp]->qg_qpaths[qdir].qp_name,
+ (bitset(QP_SUBDF,
+ Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
+ ? "/df" : ""));
}
/*
@@ -2496,17 +4543,18 @@ print_single_queue(queuedir)
if (bitset(PRIV_RESTRICTMAILQ, PrivacyFlags) && RealUid != 0)
{
struct stat st;
-# ifdef NGROUPS_MAX
+#ifdef NGROUPS_MAX
int n;
extern GIDSET_T InitialGidSet[NGROUPS_MAX];
-# endif /* NGROUPS_MAX */
+#endif /* NGROUPS_MAX */
if (stat(qd, &st) < 0)
{
- syserr("Cannot stat %s", qid_printqueue(queuedir));
+ syserr("Cannot stat %s",
+ qid_printqueue(qgrp, qdir));
return 0;
}
-# ifdef NGROUPS_MAX
+#ifdef NGROUPS_MAX
n = NGROUPS_MAX;
while (--n >= 0)
{
@@ -2514,9 +4562,9 @@ print_single_queue(queuedir)
break;
}
if (n < 0 && RealGid != st.st_gid)
-# else /* NGROUPS_MAX */
+#else /* NGROUPS_MAX */
if (RealGid != st.st_gid)
-# endif /* NGROUPS_MAX */
+#endif /* NGROUPS_MAX */
{
usrerr("510 You are not permitted to see the queue");
setstat(EX_NOPERM);
@@ -2528,7 +4576,8 @@ print_single_queue(queuedir)
** Read and order the queue.
*/
- nrequests = orderq(queuedir, TRUE);
+ nrequests = gatherq(qgrp, qdir, true, NULL, NULL);
+ (void) sortq(Queue[qgrp]->qg_maxlist);
/*
** Print the work list that we have read.
@@ -2537,20 +4586,25 @@ print_single_queue(queuedir)
/* first see if there is anything */
if (nrequests <= 0)
{
- printf("%s is empty\n", qid_printqueue(queuedir));
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "%s is empty\n",
+ qid_printqueue(qgrp, qdir));
return 0;
}
- CurrentLA = sm_getla(NULL); /* get load average */
+ sm_getla(); /* get load average */
- printf("\t\t%s (%d request%s", qid_printqueue(queuedir), nrequests,
- nrequests == 1 ? "" : "s");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "\t\t%s (%d request%s",
+ qid_printqueue(qgrp, qdir),
+ nrequests, nrequests == 1 ? "" : "s");
if (MaxQueueRun > 0 && nrequests > MaxQueueRun)
- printf(", only %d printed", MaxQueueRun);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ ", only %d printed", MaxQueueRun);
if (Verbose)
- printf(")\n----Q-ID---- --Size-- -Priority- ---Q-Time--- ---------Sender/Recipient--------\n");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ ")\n-----Q-ID----- --Size-- -Priority- ---Q-Time--- --------Sender/Recipient--------\n");
else
- printf(")\n----Q-ID---- --Size-- -----Q-Time----- ------------Sender/Recipient------------\n");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ ")\n-----Q-ID----- --Size-- -----Q-Time----- ------------Sender/Recipient-----------\n");
for (w = WorkQ; w != NULL; w = w->w_next)
{
struct stat st;
@@ -2558,6 +4612,9 @@ print_single_queue(queuedir)
long dfsize;
int flags = 0;
int qfver;
+#if _FFR_QUARANTINE
+ char quarmsg[MAXLINE];
+#endif /* _FFR_QUARANTINE */
char statmsg[MAXLINE];
char bodytype[MAXNAME + 1];
char qf[MAXPATHLEN];
@@ -2565,34 +4622,80 @@ print_single_queue(queuedir)
if (StopRequest)
stop_sendmail();
- printf("%12s", w->w_name + 2);
- (void) snprintf(qf, sizeof qf, "%s/%s", qd, w->w_name);
- f = fopen(qf, "r");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "%13s",
+ w->w_name + 2);
+ (void) sm_strlcpyn(qf, sizeof qf, 3, qd, "/", w->w_name);
+ f = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDONLY,
+ NULL);
if (f == NULL)
{
- printf(" (job completed)\n");
+ if (errno == EPERM)
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ " (permission denied)\n");
+ else if (errno == ENOENT)
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ " (job completed)\n");
+ else
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ " (%s)\n",
+ sm_errstring(errno));
errno = 0;
continue;
}
- w->w_name[0] = 'd';
- (void) snprintf(qf, sizeof qf, "%s/%s", qddf, w->w_name);
+ w->w_name[0] = DATAFL_LETTER;
+ (void) sm_strlcpyn(qf, sizeof qf, 3, qddf, "/", w->w_name);
if (stat(qf, &st) >= 0)
dfsize = st.st_size;
else
+ {
+ ENVELOPE e;
+
+ /*
+ ** Maybe the df file can't be statted because
+ ** it is in a different directory than the qf file.
+ ** In order to find out, we must read the qf file.
+ */
+
+ newenvelope(&e, &BlankEnvelope, sm_rpool_new_x(NULL));
+ e.e_id = w->w_name + 2;
+ e.e_qgrp = qgrp;
+ e.e_qdir = qdir;
dfsize = -1;
+ if (readqf(&e, false))
+ {
+ char *df = queuename(&e, DATAFL_LETTER);
+ if (stat(df, &st) >= 0)
+ dfsize = st.st_size;
+ }
+ if (e.e_lockfp != NULL)
+ {
+ (void) sm_io_close(e.e_lockfp, SM_TIME_DEFAULT);
+ e.e_lockfp = NULL;
+ }
+ clearenvelope(&e, false, e.e_rpool);
+ sm_rpool_free(e.e_rpool);
+ }
if (w->w_lock)
- printf("*");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "*");
+#if _FFR_QUARANTINE
+ else if (QueueMode == QM_LOST)
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "?");
+#endif /* _FFR_QUARANTINE */
else if (w->w_tooyoung)
- printf("-");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "-");
else if (shouldqueue(w->w_pri, w->w_ctime))
- printf("X");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "X");
else
- printf(" ");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, " ");
+
errno = 0;
+#if _FFR_QUARANTINE
+ quarmsg[0] = '\0';
+#endif /* _FFR_QUARANTINE */
statmsg[0] = bodytype[0] = '\0';
qfver = 0;
- while (fgets(buf, sizeof buf, f) != NULL)
+ while (sm_io_fgets(f, SM_TIME_DEFAULT, buf, sizeof buf) != NULL)
{
register int i;
register char *p;
@@ -2600,7 +4703,7 @@ print_single_queue(queuedir)
if (StopRequest)
stop_sendmail();
- fixcrlf(buf, TRUE);
+ fixcrlf(buf, true);
switch (buf[0])
{
case 'V': /* queue file version */
@@ -2614,6 +4717,15 @@ print_single_queue(queuedir)
statmsg[i] = '\0';
break;
+#if _FFR_QUARANTINE
+ case 'q': /* quarantine reason */
+ if ((i = strlen(&buf[1])) >= sizeof quarmsg)
+ i = sizeof quarmsg - 1;
+ memmove(quarmsg, &buf[1], i);
+ quarmsg[i] = '\0';
+ break;
+#endif /* _FFR_QUARANTINE */
+
case 'B': /* body type */
if ((i = strlen(&buf[1])) >= sizeof bodytype)
i = sizeof bodytype - 1;
@@ -2624,33 +4736,58 @@ print_single_queue(queuedir)
case 'S': /* sender name */
if (Verbose)
{
- printf("%8ld %10ld%c%.12s ",
- dfsize,
- w->w_pri,
- bitset(EF_WARNING, flags) ? '+' : ' ',
- ctime(&submittime) + 4);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%8ld %10ld%c%.12s ",
+ dfsize,
+ w->w_pri,
+ bitset(EF_WARNING, flags)
+ ? '+' : ' ',
+ ctime(&submittime) + 4);
prtstr(&buf[1], 78);
}
else
{
- printf("%8ld %.16s ", dfsize,
- ctime(&submittime));
- prtstr(&buf[1], 40);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%8ld %.16s ",
+ dfsize,
+ ctime(&submittime));
+ prtstr(&buf[1], 39);
}
+#if _FFR_QUARANTINE
+ if (quarmsg[0] != '\0')
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n QUARANTINE: %.*s",
+ Verbose ? 100 : 60,
+ quarmsg);
+ quarmsg[0] = '\0';
+ }
+#endif /* _FFR_QUARANTINE */
if (statmsg[0] != '\0' || bodytype[0] != '\0')
{
- printf("\n %10.10s", bodytype);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n %10.10s",
+ bodytype);
if (statmsg[0] != '\0')
- printf(" (%.*s)",
- Verbose ? 100 : 60,
- statmsg);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ " (%.*s)",
+ Verbose ? 100 : 60,
+ statmsg);
+ statmsg[0] = '\0';
}
break;
case 'C': /* controlling user */
if (Verbose)
- printf("\n\t\t\t\t (---%.74s---)",
- &buf[1]);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n\t\t\t\t\t\t(---%.64s---)",
+ &buf[1]);
break;
case 'R': /* recipient name */
@@ -2664,13 +4801,25 @@ print_single_queue(queuedir)
}
if (Verbose)
{
- printf("\n\t\t\t\t\t ");
- prtstr(p, 73);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n\t\t\t\t\t\t");
+ prtstr(p, 71);
}
else
{
- printf("\n\t\t\t\t ");
- prtstr(p, 40);
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n\t\t\t\t\t ");
+ prtstr(p, 38);
+ }
+ if (Verbose && statmsg[0] != '\0')
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "\n\t\t (%.100s)",
+ statmsg);
+ statmsg[0] = '\0';
}
break;
@@ -2691,13 +4840,65 @@ print_single_queue(queuedir)
}
}
if (submittime == (time_t) 0)
- printf(" (no control file)");
- printf("\n");
- (void) fclose(f);
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ " (no control file)");
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "\n");
+ (void) sm_io_close(f, SM_TIME_DEFAULT);
}
return nrequests;
}
- /*
+
+#if _FFR_QUARANTINE
+/*
+** QUEUE_LETTER -- get the proper queue letter for the current QueueMode.
+**
+** Parameters:
+** e -- envelope to build it in/from.
+** type -- the file type, used as the first character
+** of the file name.
+**
+** Returns:
+** the letter to use
+*/
+
+static char
+queue_letter(e, type)
+ ENVELOPE *e;
+ int type;
+{
+ /* Change type according to QueueMode */
+ if (type == ANYQFL_LETTER)
+ {
+ if (e->e_quarmsg != NULL)
+ type = QUARQF_LETTER;
+ else
+ {
+ switch (QueueMode)
+ {
+ case QM_NORMAL:
+ type = NORMQF_LETTER;
+ break;
+
+ case QM_QUARANTINE:
+ type = QUARQF_LETTER;
+ break;
+
+ case QM_LOST:
+ type = LOSEQF_LETTER;
+ break;
+
+ default:
+ /* should never happen */
+ abort();
+ /* NOTREACHED */
+ }
+ }
+ }
+ return type;
+}
+#endif /* _FFR_QUARANTINE */
+
+/*
** QUEUENAME -- build a file name in the queue directory for this envelope.
**
** Parameters:
@@ -2719,60 +4920,121 @@ queuename(e, type)
register ENVELOPE *e;
int type;
{
- char *sub = "";
+ int qd, qg;
+ char *sub = "/";
+ char pref[3];
static char buf[MAXPATHLEN];
/* Assign an ID if needed */
if (e->e_id == NULL)
assign_queueid(e);
- /* Assign a queue directory if needed */
- if (e->e_queuedir == NOQDIR)
- setnewqueue(e);
+#if _FFR_QUARANTINE
+ type = queue_letter(e, type);
+#endif /* _FFR_QUARANTINE */
+
+ /* begin of filename */
+ pref[0] = (char) type;
+ pref[1] = 'f';
+ pref[2] = '\0';
+
+ /* Assign a queue group/directory if needed */
+ if (type == XSCRPT_LETTER)
+ {
+ /*
+ ** We don't want to call setnewqueue() if we are fetching
+ ** the pathname of the transcript file, because setnewqueue
+ ** chooses a queue, and sometimes we need to write to the
+ ** transcript file before we have gathered enough information
+ ** to choose a queue.
+ */
+
+ if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR)
+ {
+ if (e->e_qgrp != NOQGRP && e->e_qdir != NOQDIR)
+ {
+ e->e_xfqgrp = e->e_qgrp;
+ e->e_xfqdir = e->e_qdir;
+ }
+ else
+ {
+ e->e_xfqgrp = 0;
+ if (Queue[e->e_xfqgrp]->qg_numqueues <= 1)
+ e->e_xfqdir = 0;
+ else
+ {
+ e->e_xfqdir = get_rand_mod(
+ Queue[e->e_xfqgrp]->qg_numqueues);
+ }
+ }
+ }
+ qd = e->e_xfqdir;
+ qg = e->e_xfqgrp;
+ }
+ else
+ {
+ if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR)
+ setnewqueue(e);
+ if (type == DATAFL_LETTER)
+ {
+ qd = e->e_dfqdir;
+ qg = e->e_dfqgrp;
+ }
+ else
+ {
+ qd = e->e_qdir;
+ qg = e->e_qgrp;
+ }
+ }
- if (e->e_queuedir == NOQDIR)
- (void) snprintf(buf, sizeof buf, "%cf%s",
- type, e->e_id);
+ if (e->e_qdir == NOQDIR)
+ (void) sm_strlcpyn(buf, sizeof buf, 2, pref, e->e_id);
else
{
switch (type)
{
- case 'd':
- if (bitset(QP_SUBDF, QPaths[e->e_queuedir].qp_subdirs))
- sub = "/df";
+ case DATAFL_LETTER:
+ if (bitset(QP_SUBDF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
+ sub = "/df/";
break;
+#if _FFR_QUARANTINE
+ case QUARQF_LETTER:
+#endif /* _FFR_QUARANTINE */
case TEMPQF_LETTER:
- case 't':
+ case NEWQFL_LETTER:
case LOSEQF_LETTER:
- case 'q':
- if (bitset(QP_SUBQF, QPaths[e->e_queuedir].qp_subdirs))
- sub = "/qf";
+ case NORMQF_LETTER:
+ if (bitset(QP_SUBQF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
+ sub = "/qf/";
break;
- case 'x':
- if (bitset(QP_SUBXF, QPaths[e->e_queuedir].qp_subdirs))
- sub = "/xf";
+ case XSCRPT_LETTER:
+ if (bitset(QP_SUBXF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
+ sub = "/xf/";
break;
+
+ default:
+ sm_abort("queuename: bad queue file type %d", type);
}
- (void) snprintf(buf, sizeof buf, "%s%s/%cf%s",
- QPaths[e->e_queuedir].qp_name,
- sub, type, e->e_id);
+ (void) sm_strlcpyn(buf, sizeof buf, 4,
+ Queue[qg]->qg_qpaths[qd].qp_name,
+ sub, pref, e->e_id);
}
if (tTd(7, 2))
- dprintf("queuename: %s\n", buf);
+ sm_dprintf("queuename: %s\n", buf);
return buf;
}
- /*
+/*
** ASSIGN_QUEUEID -- assign a queue ID for this envelope.
**
** Assigns an id code if one does not already exist.
** This code assumes that nothing will remain in the queue for
** longer than 60 years. It is critical that files with the given
-** name not already exist in the queue.
-** Also initializes e_queuedir to NOQDIR.
+** name do not already exist in the queue.
+** [No longer initializes e_qdir to NOQDIR.]
**
** Parameters:
** e -- envelope to set it in.
@@ -2783,22 +5045,26 @@ queuename(e, type)
static const char QueueIdChars[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx";
# define QIC_LEN 60
+# define queuenextid() CurrentPid
+
void
assign_queueid(e)
register ENVELOPE *e;
{
- pid_t pid = getpid();
- static char cX = 0;
+ pid_t pid = queuenextid();
+ static int cX = 0;
static long random_offset;
struct tm *tm;
char idbuf[MAXQFNAME - 2];
+ int seq;
if (e->e_id != NULL)
return;
/* see if we need to get a new base time/pid */
- if (cX >= QIC_LEN || LastQueueTime == 0 || LastQueuePid != pid)
+ if (cX >= QIC_LEN * QIC_LEN || LastQueueTime == 0 ||
+ LastQueuePid != pid)
{
time_t then = LastQueueTime;
@@ -2811,12 +5077,22 @@ assign_queueid(e)
{
(void) sleep(1);
}
- LastQueuePid = getpid();
+ LastQueuePid = queuenextid();
cX = 0;
}
+
+ /*
+ ** Generate a new sequence number between 0 and QIC_LEN*QIC_LEN-1.
+ ** This lets us generate up to QIC_LEN*QIC_LEN unique queue ids
+ ** per second, per process. With envelope splitting,
+ ** a single message can consume many queue ids.
+ */
+
+ seq = (int)((cX + random_offset) % (QIC_LEN * QIC_LEN));
+ ++cX;
if (tTd(7, 50))
- dprintf("assign_queueid: random_offset = %ld (%d)\n",
- random_offset, (int)(cX + random_offset) % QIC_LEN);
+ sm_dprintf("assign_queueid: random_offset = %ld (%d)\n",
+ random_offset, seq);
tm = gmtime(&LastQueueTime);
idbuf[0] = QueueIdChars[tm->tm_year % QIC_LEN];
@@ -2825,19 +5101,29 @@ assign_queueid(e)
idbuf[3] = QueueIdChars[tm->tm_hour];
idbuf[4] = QueueIdChars[tm->tm_min];
idbuf[5] = QueueIdChars[tm->tm_sec];
- idbuf[6] = QueueIdChars[((int)cX++ + random_offset) % QIC_LEN];
- (void) snprintf(&idbuf[7], sizeof idbuf - 7, "%05d",
- (int) LastQueuePid);
- e->e_id = newstr(idbuf);
- define('i', e->e_id, e);
- e->e_queuedir = NOQDIR;
+ idbuf[6] = QueueIdChars[seq / QIC_LEN];
+ idbuf[7] = QueueIdChars[seq % QIC_LEN];
+ (void) sm_snprintf(&idbuf[8], sizeof idbuf - 8, "%06d",
+ (int) LastQueuePid);
+ e->e_id = sm_rpool_strdup_x(e->e_rpool, idbuf);
+ macdefine(&e->e_macro, A_PERM, 'i', e->e_id);
+#if 0
+ /* XXX: inherited from MainEnvelope */
+ e->e_qgrp = NOQGRP; /* too early to do anything else */
+ e->e_qdir = NOQDIR;
+ e->e_xfqgrp = NOQGRP;
+#endif /* 0 */
+#if _FFR_QUARANTINE
+ /* New ID means it's not on disk yet */
+ e->e_qfletter = '\0';
+#endif /* _FFR_QUARANTINE */
if (tTd(7, 1))
- dprintf("assign_queueid: assigned id %s, e=%lx\n",
- e->e_id, (u_long) e);
+ sm_dprintf("assign_queueid: assigned id %s, e=%p\n",
+ e->e_id, e);
if (LogLevel > 93)
sm_syslog(LOG_DEBUG, e->e_id, "assigned id");
}
- /*
+/*
** SYNC_QUEUE_TIME -- Assure exclusive PID in any given second
**
** Make sure one PID can't be used by two processes in any one second.
@@ -2852,19 +5138,20 @@ assign_queueid(e)
** Returns:
** none
*/
+
void
sync_queue_time()
{
-# if FAST_PID_RECYCLE
+#if FAST_PID_RECYCLE
if (OpMode != MD_TEST &&
OpMode != MD_VERIFY &&
LastQueueTime > 0 &&
- LastQueuePid == getpid() &&
+ LastQueuePid == CurrentPid &&
curtime() == LastQueueTime)
(void) sleep(1);
-# endif /* FAST_PID_RECYCLE */
+#endif /* FAST_PID_RECYCLE */
}
- /*
+/*
** UNLOCKQUEUE -- unlock the queue entry for a specified envelope
**
** Parameters:
@@ -2882,13 +5169,13 @@ unlockqueue(e)
ENVELOPE *e;
{
if (tTd(51, 4))
- dprintf("unlockqueue(%s)\n",
+ sm_dprintf("unlockqueue(%s)\n",
e->e_id == NULL ? "NOQUEUE" : e->e_id);
/* if there is a lock file in the envelope, close it */
if (e->e_lockfp != NULL)
- (void) fclose(e->e_lockfp);
+ (void) sm_io_close(e->e_lockfp, SM_TIME_DEFAULT);
e->e_lockfp = NULL;
/* don't create a queue id if we don't already have one */
@@ -2899,10 +5186,9 @@ unlockqueue(e)
if (LogLevel > 87)
sm_syslog(LOG_DEBUG, e->e_id, "unlock");
if (!tTd(51, 104))
- xunlink(queuename(e, 'x'));
-
+ (void) xunlink(queuename(e, XSCRPT_LETTER));
}
- /*
+/*
** SETCTLUSER -- create a controlling address
**
** Create a fake "address" given only a local login name; this is
@@ -2910,19 +5196,20 @@ unlockqueue(e)
**
** Parameters:
** user -- the user name of the controlling user.
-** qfver -- the version stamp of this qf file.
+** qfver -- the version stamp of this queue file.
+** e -- envelope
**
** Returns:
-** An address descriptor for the controlling user.
+** An address descriptor for the controlling user,
+** using storage allocated from e->e_rpool.
**
-** Side Effects:
-** none.
*/
static ADDRESS *
-setctluser(user, qfver)
+setctluser(user, qfver, e)
char *user;
int qfver;
+ ENVELOPE *e;
{
register ADDRESS *a;
struct passwd *pw;
@@ -2939,23 +5226,18 @@ setctluser(user, qfver)
** Set up addr fields for controlling user.
*/
- a = (ADDRESS *) xalloc(sizeof *a);
+ a = (ADDRESS *) sm_rpool_malloc_x(e->e_rpool, sizeof *a);
memset((char *) a, '\0', sizeof *a);
- if (*user == '\0')
- {
- p = NULL;
- a->q_user = newstr(DefUser);
- }
- else if (*user == ':')
+ if (*user == ':')
{
p = &user[1];
- a->q_user = newstr(p);
+ a->q_user = sm_rpool_strdup_x(e->e_rpool, p);
}
else
{
p = strtok(user, ":");
- a->q_user = newstr(user);
+ a->q_user = sm_rpool_strdup_x(e->e_rpool, user);
if (qfver >= 2)
{
if ((p = strtok(NULL, ":")) != NULL)
@@ -2980,7 +5262,7 @@ setctluser(user, qfver)
else if (strcmp(pw->pw_dir, "/") == 0)
a->q_home = "";
else
- a->q_home = newstr(pw->pw_dir);
+ a->q_home = sm_rpool_strdup_x(e->e_rpool, pw->pw_dir);
a->q_uid = pw->pw_uid;
a->q_gid = pw->pw_gid;
a->q_flags |= QGOODUID;
@@ -2990,13 +5272,13 @@ setctluser(user, qfver)
a->q_flags |= QPRIMARY; /* flag as a "ctladdr" */
a->q_mailer = LocalMailer;
if (p == NULL)
- a->q_paddr = newstr(a->q_user);
+ a->q_paddr = sm_rpool_strdup_x(e->e_rpool, a->q_user);
else
- a->q_paddr = newstr(p);
+ a->q_paddr = sm_rpool_strdup_x(e->e_rpool, p);
return a;
}
- /*
-** LOSEQFILE -- save the qf as Qf and try to let someone know
+/*
+** LOSEQFILE -- rename queue file with LOSEQF_LETTER & try to let someone know
**
** Parameters:
** e -- the envelope (e->e_id will be used).
@@ -3011,23 +5293,63 @@ loseqfile(e, why)
register ENVELOPE *e;
char *why;
{
+ bool loseit = true;
char *p;
char buf[MAXPATHLEN];
if (e == NULL || e->e_id == NULL)
return;
- p = queuename(e, 'q');
- if (strlen(p) >= (SIZE_T) sizeof buf)
+ p = queuename(e, ANYQFL_LETTER);
+ if (sm_strlcpy(buf, p, sizeof buf) >= sizeof buf)
return;
- (void) strlcpy(buf, p, sizeof buf);
- p = queuename(e, LOSEQF_LETTER);
- if (rename(buf, p) < 0)
- syserr("cannot rename(%s, %s), uid=%d", buf, p, geteuid());
- else if (LogLevel > 0)
- sm_syslog(LOG_ALERT, e->e_id,
- "Losing %s: %s", buf, why);
+ if (!bitset(EF_INQUEUE, e->e_flags))
+ queueup(e, false, true);
+#if _FFR_QUARANTINE
+ else if (QueueMode == QM_LOST)
+ loseit = false;
+#endif /* _FFR_QUARANTINE */
+
+ /* if already lost, no need to re-lose */
+ if (loseit)
+ {
+ p = queuename(e, LOSEQF_LETTER);
+ if (rename(buf, p) < 0)
+ syserr("cannot rename(%s, %s), uid=%d",
+ buf, p, geteuid());
+ else if (LogLevel > 0)
+ sm_syslog(LOG_ALERT, e->e_id,
+ "Losing %s: %s", buf, why);
+ }
+ if (e->e_dfp != NULL)
+ {
+ (void) sm_io_close(e->e_dfp, SM_TIME_DEFAULT);
+ e->e_dfp = NULL;
+ }
+ e->e_flags &= ~EF_HAS_DF;
+}
+/*
+** NAME2QID -- translate a queue group name to a queue group id
+**
+** Parameters:
+** queuename -- name of queue group.
+**
+** Returns:
+** queue group id if found.
+** NOQGRP otherwise.
+*/
+
+int
+name2qid(queuename)
+ char *queuename;
+{
+ register STAB *s;
+
+ s = stab(queuename, ST_QUEUE, ST_FIND);
+ if (s == NULL)
+ return NOQGRP;
+ return s->s_quegrp->qg_index;
}
- /*
+/*
** QID_PRINTNAME -- create externally printable version of queue id
**
** Parameters:
@@ -3052,100 +5374,225 @@ qid_printname(e)
else
id = e->e_id;
- if (e->e_queuedir == NOQDIR)
+ if (e->e_qdir == NOQDIR)
return id;
- (void) snprintf(idbuf, sizeof idbuf, "%.32s/%s",
- QPaths[e->e_queuedir].qp_name, id);
+ (void) sm_snprintf(idbuf, sizeof idbuf, "%.32s/%s",
+ Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_name,
+ id);
return idbuf;
}
- /*
-** QID_PRINTQUEUE -- create full version of queue directory for df files
+/*
+** QID_PRINTQUEUE -- create full version of queue directory for data files
**
** Parameters:
-** queuedir -- the short version of the queue directory
+** qgrp -- index in queue group.
+** qdir -- the short version of the queue directory
**
** Returns:
-** the full pathname to the queue (static)
+** the full pathname to the queue (might point to a static var)
*/
char *
-qid_printqueue(queuedir)
- int queuedir;
+qid_printqueue(qgrp, qdir)
+ int qgrp;
+ int qdir;
{
char *subdir;
static char dir[MAXPATHLEN];
- if (queuedir == NOQDIR)
- return QueueDir;
+ if (qdir == NOQDIR)
+ return Queue[qgrp]->qg_qdir;
- if (strcmp(QPaths[queuedir].qp_name, ".") == 0)
+ if (strcmp(Queue[qgrp]->qg_qpaths[qdir].qp_name, ".") == 0)
subdir = NULL;
else
- subdir = QPaths[queuedir].qp_name;
+ subdir = Queue[qgrp]->qg_qpaths[qdir].qp_name;
- (void) snprintf(dir, sizeof dir, "%s%s%s%s", QueueDir,
+ (void) sm_strlcpyn(dir, sizeof dir, 4,
+ Queue[qgrp]->qg_qdir,
subdir == NULL ? "" : "/",
subdir == NULL ? "" : subdir,
- (bitset(QP_SUBDF, QPaths[queuedir].qp_subdirs) ? "/df" : ""));
+ (bitset(QP_SUBDF,
+ Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
+ ? "/df" : ""));
return dir;
}
- /*
-** SETNEWQUEUE -- Sets a new queue directory
+
+/*
+** PICKQDIR -- Pick a queue directory from a queue group
+**
+** Parameters:
+** qg -- queue group
+** fsize -- file size in bytes
+** e -- envelope, or NULL
**
-** Assign a queue directory to an envelope and store the directory
-** in e->e_queuedir. The queue is chosen at random.
+** Result:
+** NOQDIR if no queue directory in qg has enough free space to
+** hold a file of size 'fsize', otherwise the index of
+** a randomly selected queue directory which resides on a
+** file system with enough disk space.
+** XXX This could be extended to select a queuedir with
+** a few (the fewest?) number of entries. That data
+** is available if shared memory is used.
+**
+** Side Effects:
+** If the request fails and e != NULL then sm_syslog is called.
+*/
+
+int
+pickqdir(qg, fsize, e)
+ QUEUEGRP *qg;
+ long fsize;
+ ENVELOPE *e;
+{
+ int qdir;
+ int i;
+ long avail = 0;
+
+ /* Pick a random directory, as a starting point. */
+ if (qg->qg_numqueues <= 1)
+ qdir = 0;
+ else
+ qdir = get_rand_mod(qg->qg_numqueues);
+
+ if (MinBlocksFree <= 0 && fsize <= 0)
+ return qdir;
+
+ /*
+ ** Now iterate over the queue directories,
+ ** looking for a directory with enough space for this message.
+ */
+
+ i = qdir;
+ do
+ {
+ QPATHS *qp = &qg->qg_qpaths[i];
+ long needed = 0;
+ long fsavail = 0;
+
+ if (fsize > 0)
+ needed += fsize / FILE_SYS_BLKSIZE(qp->qp_fsysidx)
+ + ((fsize % FILE_SYS_BLKSIZE(qp->qp_fsysidx)
+ > 0) ? 1 : 0);
+ if (MinBlocksFree > 0)
+ needed += MinBlocksFree;
+ fsavail = FILE_SYS_AVAIL(qp->qp_fsysidx);
+#if SM_CONF_SHM
+ if (fsavail <= 0)
+ {
+ long blksize;
+
+ /*
+ ** might be not correctly updated,
+ ** let's try to get the info directly.
+ */
+
+ fsavail = freediskspace(FILE_SYS_NAME(qp->qp_fsysidx),
+ &blksize);
+ if (fsavail < 0)
+ fsavail = 0;
+ }
+#endif /* SM_CONF_SHM */
+ if (needed <= fsavail)
+ return i;
+ if (avail < fsavail)
+ avail = fsavail;
+
+ if (qg->qg_numqueues > 0)
+ i = (i + 1) % qg->qg_numqueues;
+ } while (i != qdir);
+
+ if (e != NULL && LogLevel > 0)
+ sm_syslog(LOG_ALERT, e->e_id,
+ "low on space (%s needs %ld bytes + %ld blocks in %s), max avail: %ld",
+ CurHostName == NULL ? "SMTP-DAEMON" : CurHostName,
+ fsize, MinBlocksFree,
+ qg->qg_qdir, avail);
+ return NOQDIR;
+}
+/*
+** SETNEWQUEUE -- Sets a new queue group and directory
**
-** This routine may be improved in the future to allow for more
-** elaborate queueing schemes. Suggestions and code contributions
-** are welcome.
+** Assign a queue group and directory to an envelope and store the
+** directory in e->e_qdir.
**
** Parameters:
** e -- envelope to assign a queue for.
**
** Returns:
-** none.
+** true if successful
+** false otherwise
+**
+** Side Effects:
+** On success, e->e_qgrp and e->e_qdir are non-negative.
+** On failure (not enough disk space),
+** e->qgrp = NOQGRP, e->e_qdir = NOQDIR
+** and usrerr() is invoked (which could raise an exception).
*/
-void
+bool
setnewqueue(e)
ENVELOPE *e;
{
- int idx;
-
if (tTd(41, 20))
- dprintf("setnewqueue: called\n");
+ sm_dprintf("setnewqueue: called\n");
+
+ /* not set somewhere else */
+ if (e->e_qgrp == NOQGRP)
+ {
+ /*
+ ** Use the queue group of the first recipient, as set by
+ ** the "queuegroup" rule set. If that is not defined, then
+ ** use the queue group of the mailer of the first recipient.
+ ** If that is not defined either, then use the default
+ ** queue group.
+ */
+
+ if (e->e_sendqueue == NULL)
+ e->e_qgrp = 0;
+ else if (e->e_sendqueue->q_qgrp >= 0)
+ e->e_qgrp = e->e_sendqueue->q_qgrp;
+ else if (e->e_sendqueue->q_mailer != NULL &&
+ ISVALIDQGRP(e->e_sendqueue->q_mailer->m_qgrp))
+ e->e_qgrp = e->e_sendqueue->q_mailer->m_qgrp;
+ else
+ e->e_qgrp = 0;
+ e->e_dfqgrp = e->e_qgrp;
+ }
- if (e->e_queuedir != NOQDIR)
+ if (ISVALIDQDIR(e->e_qdir) && ISVALIDQDIR(e->e_dfqdir))
{
if (tTd(41, 20))
- dprintf("setnewqueue: e_queuedir already assigned (%s)\n",
- qid_printqueue(e->e_queuedir));
- return;
+ sm_dprintf("setnewqueue: e_qdir already assigned (%s)\n",
+ qid_printqueue(e->e_qgrp, e->e_qdir));
+ return true;
}
- if (NumQueues <= 1)
- idx = 0;
- else
+ filesys_update();
+ e->e_qdir = pickqdir(Queue[e->e_qgrp], e->e_msgsize, e);
+ if (e->e_qdir == NOQDIR)
{
-#if RANDOMSHIFT
- /* lower bits are not random "enough", select others */
- idx = (get_random() >> RANDOMSHIFT) % NumQueues;
-#else /* RANDOMSHIFT */
- idx = get_random() % NumQueues;
-#endif /* RANDOMSHIFT */
- if (tTd(41, 15))
- dprintf("setnewqueue: get_random() %% %d = %d\n",
- NumQueues, idx);
+ e->e_qgrp = NOQGRP;
+ if (!bitset(EF_FATALERRS, e->e_flags))
+ usrerr("452 4.4.5 Insufficient disk space; try again later");
+ e->e_flags |= EF_FATALERRS;
+ return false;
}
- e->e_queuedir = idx;
if (tTd(41, 3))
- dprintf("setnewqueue: Assigned queue directory %s\n",
- qid_printqueue(e->e_queuedir));
-}
+ sm_dprintf("setnewqueue: Assigned queue directory %s\n",
+ qid_printqueue(e->e_qgrp, e->e_qdir));
- /*
+ if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR)
+ {
+ e->e_xfqgrp = e->e_qgrp;
+ e->e_xfqdir = e->e_qdir;
+ }
+ e->e_dfqdir = e->e_qdir;
+ return true;
+}
+/*
** CHKQDIR -- check a queue directory
**
** Parameters:
@@ -3167,52 +5614,53 @@ chkqdir(name, sff)
/* skip over . and .. directories */
if (name[0] == '.' &&
(name[1] == '\0' || (name[1] == '.' && name[2] == '\0')))
- return FALSE;
-# if HASLSTAT
+ return false;
+#if HASLSTAT
if (lstat(name, &statb) < 0)
-# else /* HASLSTAT */
+#else /* HASLSTAT */
if (stat(name, &statb) < 0)
-# endif /* HASLSTAT */
+#endif /* HASLSTAT */
{
if (tTd(41, 2))
- dprintf("multiqueue_cache: stat(\"%s\"): %s\n",
- name, errstring(errno));
- return FALSE;
+ sm_dprintf("chkqdir: stat(\"%s\"): %s\n",
+ name, sm_errstring(errno));
+ return false;
}
-# if HASLSTAT
+#if HASLSTAT
if (S_ISLNK(statb.st_mode))
{
/*
** For a symlink we need to make sure the
** target is a directory
*/
+
if (stat(name, &statb) < 0)
{
if (tTd(41, 2))
- dprintf("multiqueue_cache: stat(\"%s\"): %s\n",
- name, errstring(errno));
- return FALSE;
+ sm_dprintf("chkqdir: stat(\"%s\"): %s\n",
+ name, sm_errstring(errno));
+ return false;
}
}
-# endif /* HASLSTAT */
+#endif /* HASLSTAT */
if (!S_ISDIR(statb.st_mode))
{
if (tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": Not a directory\n",
+ sm_dprintf("chkqdir: \"%s\": Not a directory\n",
name);
- return FALSE;
+ return false;
}
/* Print a warning if unsafe (but still use it) */
+ /* XXX do this only if we want the warning? */
i = safedirpath(name, RunAsUid, RunAsGid, NULL, sff, 0, 0);
if (i != 0 && tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": Not safe: %s\n",
- name, errstring(i));
- return TRUE;
+ sm_dprintf("chkqdir: \"%s\": Not safe: %s\n",
+ name, sm_errstring(i));
+ return true;
}
-
- /*
+/*
** MULTIQUEUE_CACHE -- cache a list of paths to queues.
**
** Each potential queue is checked as the cache is built.
@@ -3221,206 +5669,1234 @@ chkqdir(name, sff)
** (although code for that is not ready yet).
**
** Parameters:
-** none
+** basedir -- base of all queue directories.
+** blen -- strlen(basedir).
+** qg -- queue group.
+** qn -- number of queue directories already cached.
+** phash -- pointer to hash value over queue dirs.
+#if SM_CONF_SHM
+** only used if shared memory is active.
+#endif * SM_CONF_SHM *
**
** Returns:
-** none
+** new number of queue directories.
*/
-void
-multiqueue_cache()
+#define INITIAL_SLOTS 20
+#define ADD_SLOTS 10
+
+static int
+multiqueue_cache(basedir, blen, qg, qn, phash)
+ char *basedir;
+ int blen;
+ QUEUEGRP *qg;
+ int qn;
+ unsigned int *phash;
{
- register DIR *dp;
- register struct dirent *d;
char *cp;
int i, len;
int slotsleft = 0;
long sff = SFF_ANYFILE;
char qpath[MAXPATHLEN];
char subdir[MAXPATHLEN];
+ char prefix[MAXPATHLEN]; /* dir relative to basedir */
if (tTd(41, 20))
- dprintf("multiqueue_cache: called\n");
+ sm_dprintf("multiqueue_cache: called\n");
- if (NumQueues != 0 && QPaths != NULL)
+ /* Initialize to current directory */
+ prefix[0] = '.';
+ prefix[1] = '\0';
+ if (qg->qg_numqueues != 0 && qg->qg_qpaths != NULL)
{
- for (i = 0; i < NumQueues; i++)
+ for (i = 0; i < qg->qg_numqueues; i++)
{
- if (QPaths[i].qp_name != NULL)
- sm_free(QPaths[i].qp_name);
+ if (qg->qg_qpaths[i].qp_name != NULL)
+ (void) sm_free(qg->qg_qpaths[i].qp_name); /* XXX */
}
- sm_free((char *)QPaths);
- QPaths = NULL;
- NumQueues = 0;
+ (void) sm_free((char *) qg->qg_qpaths); /* XXX */
+ qg->qg_qpaths = NULL;
+ qg->qg_numqueues = 0;
}
/* If running as root, allow safedirpath() checks to use privs */
if (RunAsUid == 0)
sff |= SFF_ROOTOK;
- (void) snprintf(qpath, sizeof qpath, "%s", QueueDir);
- len = strlen(qpath) - 1;
- cp = &qpath[len];
+ if (!SM_IS_DIR_START(qg->qg_qdir))
+ {
+ /*
+ ** XXX we could add basedir, but then we have to realloc()
+ ** the string... Maybe another time.
+ */
+
+ syserr("QueuePath %s not absolute", qg->qg_qdir);
+ ExitStat = EX_CONFIG;
+ return qn;
+ }
+
+ /* qpath: directory of current workgroup */
+ len = sm_strlcpy(qpath, qg->qg_qdir, sizeof qpath);
+ if (len >= sizeof qpath)
+ {
+ syserr("QueuePath %.256s too long (%d max)",
+ qg->qg_qdir, (int) sizeof qpath);
+ ExitStat = EX_CONFIG;
+ return qn;
+ }
+
+ /* begin of qpath must be same as basedir */
+ if (strncmp(basedir, qpath, blen) != 0 &&
+ (strncmp(basedir, qpath, blen - 1) != 0 || len != blen - 1))
+ {
+ syserr("QueuePath %s not subpath of QueueDirectory %s",
+ qpath, basedir);
+ ExitStat = EX_CONFIG;
+ return qn;
+ }
+
+ /* Do we have a nested subdirectory? */
+ if (blen < len && SM_FIRST_DIR_DELIM(qg->qg_qdir + blen) != NULL)
+ {
+
+ /* Copy subdirectory into prefix for later use */
+ if (sm_strlcpy(prefix, qg->qg_qdir + blen, sizeof prefix) >=
+ sizeof prefix)
+ {
+ syserr("QueuePath %.256s too long (%d max)",
+ qg->qg_qdir, (int) sizeof qpath);
+ ExitStat = EX_CONFIG;
+ return qn;
+ }
+ cp = SM_LAST_DIR_DELIM(prefix);
+ SM_ASSERT(cp != NULL);
+ *cp = '\0'; /* cut off trailing / */
+ }
+
+ /* This is guaranteed by the basedir check above */
+ SM_ASSERT(len >= blen - 1);
+ cp = &qpath[len - 1];
if (*cp == '*')
{
- *cp = '\0';
- if ((cp = strrchr(qpath, '/')) == NULL)
+ register DIR *dp;
+ register struct dirent *d;
+ int off;
+ char *delim;
+ char relpath[MAXPATHLEN];
+
+ *cp = '\0'; /* Overwrite wildcard */
+ if ((cp = SM_LAST_DIR_DELIM(qpath)) == NULL)
{
syserr("QueueDirectory: can not wildcard relative path");
if (tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": Can not wildcard relative path.\n",
+ sm_dprintf("multiqueue_cache: \"%s*\": Can not wildcard relative path.\n",
qpath);
ExitStat = EX_CONFIG;
- return;
+ return qn;
}
if (cp == qpath)
{
/*
** Special case of top level wildcard, like /foo*
+ ** Change to //foo*
*/
- (void) snprintf(qpath + 1, sizeof qpath - 1,
- "%s", qpath);
+ (void) sm_strlcpy(qpath + 1, qpath, sizeof qpath - 1);
++cp;
}
- *(cp++) = '\0';
- len = strlen(cp);
+ delim = cp;
+ *(cp++) = '\0'; /* Replace / with \0 */
+ len = strlen(cp); /* Last component of queue directory */
+
+ /*
+ ** Path relative to basedir, with trailing /
+ ** It will be modified below to specify the subdirectories
+ ** so they can be opened without chdir().
+ */
+
+ off = sm_strlcpyn(relpath, sizeof relpath, 2, prefix, "/");
+ SM_ASSERT(off < sizeof relpath);
if (tTd(41, 2))
- dprintf("multiqueue_cache: prefix=\"%s\"\n", cp);
+ sm_dprintf("multiqueue_cache: prefix=\"%s%s\"\n",
+ relpath, cp);
- QueueDir = newstr(qpath);
+ /* It is always basedir: we don't need to store it per group */
+ /* XXX: optimize this! -> one more global? */
+ qg->qg_qdir = newstr(basedir);
+ qg->qg_qdir[blen - 1] = '\0'; /* cut off trailing / */
/*
** XXX Should probably wrap this whole loop in a timeout
** in case some wag decides to NFS mount the queues.
*/
- /* test path to get warning messages */
- i= safedirpath(QueueDir, RunAsUid, RunAsGid, NULL, sff, 0, 0);
- if (i != 0 && tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": Not safe: %s\n",
- QueueDir, errstring(i));
-
- if (chdir(QueueDir) < 0)
+ /* Test path to get warning messages. */
+ if (qn == 0)
{
- syserr("can not chdir(%s)", QueueDir);
- if (tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": %s\n",
- qpath, errstring(errno));
- ExitStat = EX_CONFIG;
- return;
+ /* XXX qg_runasuid and qg_runasgid for specials? */
+ i = safedirpath(basedir, RunAsUid, RunAsGid, NULL,
+ sff, 0, 0);
+ if (i != 0 && tTd(41, 2))
+ sm_dprintf("multiqueue_cache: \"%s\": Not safe: %s\n",
+ basedir, sm_errstring(i));
}
- if ((dp = opendir(".")) == NULL)
+ if ((dp = opendir(prefix)) == NULL)
{
- syserr("can not opendir(%s)", QueueDir);
+ syserr("can not opendir(%s/%s)", qg->qg_qdir, prefix);
if (tTd(41, 2))
- dprintf("multiqueue_cache: opendir(\"%s\"): %s\n",
- QueueDir, errstring(errno));
+ sm_dprintf("multiqueue_cache: opendir(\"%s/%s\"): %s\n",
+ qg->qg_qdir, prefix,
+ sm_errstring(errno));
ExitStat = EX_CONFIG;
- return;
+ return qn;
}
while ((d = readdir(dp)) != NULL)
{
- if (strncmp(d->d_name, cp, len) != 0)
+ i = strlen(d->d_name);
+ if (i < len || strncmp(d->d_name, cp, len) != 0)
{
if (tTd(41, 5))
- dprintf("multiqueue_cache: \"%s\", skipped\n",
+ sm_dprintf("multiqueue_cache: \"%s\", skipped\n",
d->d_name);
continue;
}
- if (!chkqdir(d->d_name, sff))
+
+ /* Create relative pathname: prefix + local directory */
+ i = sizeof(relpath) - off;
+ if (sm_strlcpy(relpath + off, d->d_name, i) >= i)
+ continue; /* way too long */
+
+ if (!chkqdir(relpath, sff))
continue;
- if (QPaths == NULL)
+ if (qg->qg_qpaths == NULL)
{
- slotsleft = 20;
- QPaths = (QPATHS *)xalloc((sizeof *QPaths) *
- slotsleft);
- NumQueues = 0;
+ slotsleft = INITIAL_SLOTS;
+ qg->qg_qpaths = (QPATHS *)xalloc((sizeof *qg->qg_qpaths) *
+ slotsleft);
+ qg->qg_numqueues = 0;
}
else if (slotsleft < 1)
{
- QPaths = (QPATHS *)xrealloc((char *)QPaths,
- (sizeof *QPaths) *
- (NumQueues + 10));
- if (QPaths == NULL)
+ qg->qg_qpaths = (QPATHS *)sm_realloc((char *)qg->qg_qpaths,
+ (sizeof *qg->qg_qpaths) *
+ (qg->qg_numqueues +
+ ADD_SLOTS));
+ if (qg->qg_qpaths == NULL)
{
(void) closedir(dp);
- return;
+ return qn;
}
- slotsleft += 10;
+ slotsleft += ADD_SLOTS;
}
/* check subdirs */
- QPaths[NumQueues].qp_subdirs = QP_NOSUB;
- (void) snprintf(subdir, sizeof subdir, "%s/%s/%s",
- qpath, d->d_name, "qf");
- if (chkqdir(subdir, sff))
- QPaths[NumQueues].qp_subdirs |= QP_SUBQF;
-
- (void) snprintf(subdir, sizeof subdir, "%s/%s/%s",
- qpath, d->d_name, "df");
- if (chkqdir(subdir, sff))
- QPaths[NumQueues].qp_subdirs |= QP_SUBDF;
-
- (void) snprintf(subdir, sizeof subdir, "%s/%s/%s",
- qpath, d->d_name, "xf");
- if (chkqdir(subdir, sff))
- QPaths[NumQueues].qp_subdirs |= QP_SUBXF;
+ qg->qg_qpaths[qg->qg_numqueues].qp_subdirs = QP_NOSUB;
+
+#define CHKRSUBDIR(name, flag) \
+ (void) sm_strlcpyn(subdir, sizeof subdir, 3, relpath, "/", name); \
+ if (chkqdir(subdir, sff)) \
+ qg->qg_qpaths[qg->qg_numqueues].qp_subdirs |= flag; \
+ else
+
+
+ CHKRSUBDIR("qf", QP_SUBQF);
+ CHKRSUBDIR("df", QP_SUBDF);
+ CHKRSUBDIR("xf", QP_SUBXF);
/* assert(strlen(d->d_name) < MAXPATHLEN - 14) */
/* maybe even - 17 (subdirs) */
- QPaths[NumQueues].qp_name = newstr(d->d_name);
+
+ if (prefix[0] != '.')
+ qg->qg_qpaths[qg->qg_numqueues].qp_name =
+ newstr(relpath);
+ else
+ qg->qg_qpaths[qg->qg_numqueues].qp_name =
+ newstr(d->d_name);
+
if (tTd(41, 2))
- dprintf("multiqueue_cache: %d: \"%s\" cached (%x).\n",
- NumQueues, d->d_name,
- QPaths[NumQueues].qp_subdirs);
- NumQueues++;
+ sm_dprintf("multiqueue_cache: %d: \"%s\" cached (%x).\n",
+ qg->qg_numqueues, relpath,
+ qg->qg_qpaths[qg->qg_numqueues].qp_subdirs);
+#if SM_CONF_SHM
+ qg->qg_qpaths[qg->qg_numqueues].qp_idx = qn;
+ *phash = hash_q(relpath, *phash);
+#endif /* SM_CONF_SHM */
+ qg->qg_numqueues++;
+ ++qn;
slotsleft--;
}
(void) closedir(dp);
+
+ /* undo damage */
+ *delim = '/';
}
- if (NumQueues == 0)
+ if (qg->qg_numqueues == 0)
{
- if (*cp != '*' && tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": No wildcard suffix character\n",
- QueueDir);
- QPaths = (QPATHS *)xalloc(sizeof *QPaths);
- QPaths[0].qp_name = newstr(".");
- QPaths[0].qp_subdirs = QP_NOSUB;
- NumQueues = 1;
+ qg->qg_qpaths = (QPATHS *) xalloc(sizeof *qg->qg_qpaths);
/* test path to get warning messages */
- (void) safedirpath(QueueDir, RunAsUid, RunAsGid,
- NULL, sff, 0, 0);
- if (chdir(QueueDir) < 0)
+ i = safedirpath(qpath, RunAsUid, RunAsGid, NULL, sff, 0, 0);
+ if (i == ENOENT)
{
- syserr("can not chdir(%s)", QueueDir);
+ syserr("can not opendir(%s)", qpath);
if (tTd(41, 2))
- dprintf("multiqueue_cache: \"%s\": %s\n",
- QueueDir, errstring(errno));
+ sm_dprintf("multiqueue_cache: opendir(\"%s\"): %s\n",
+ qpath, sm_errstring(i));
ExitStat = EX_CONFIG;
+ return qn;
}
+ qg->qg_qpaths[0].qp_subdirs = QP_NOSUB;
+ qg->qg_numqueues = 1;
+
/* check subdirs */
- (void) snprintf(subdir, sizeof subdir, "%s/qf", QueueDir);
- if (chkqdir(subdir, sff))
- QPaths[0].qp_subdirs |= QP_SUBQF;
+#define CHKSUBDIR(name, flag) \
+ (void) sm_strlcpyn(subdir, sizeof subdir, 3, qg->qg_qdir, "/", name); \
+ if (chkqdir(subdir, sff)) \
+ qg->qg_qpaths[0].qp_subdirs |= flag; \
+ else
- (void) snprintf(subdir, sizeof subdir, "%s/df", QueueDir);
- if (chkqdir(subdir, sff))
- QPaths[0].qp_subdirs |= QP_SUBDF;
+ CHKSUBDIR("qf", QP_SUBQF);
+ CHKSUBDIR("df", QP_SUBDF);
+ CHKSUBDIR("xf", QP_SUBXF);
- (void) snprintf(subdir, sizeof subdir, "%s/xf", QueueDir);
- if (chkqdir(subdir, sff))
- QPaths[0].qp_subdirs |= QP_SUBXF;
+ if (qg->qg_qdir[blen - 1] != '\0' &&
+ qg->qg_qdir[blen] != '\0')
+ {
+ /*
+ ** Copy the last component into qpaths and
+ ** cut off qdir
+ */
+
+ qg->qg_qpaths[0].qp_name = newstr(qg->qg_qdir + blen);
+ qg->qg_qdir[blen - 1] = '\0';
+ }
+ else
+ qg->qg_qpaths[0].qp_name = newstr(".");
+
+#if SM_CONF_SHM
+ qg->qg_qpaths[0].qp_idx = qn;
+ *phash = hash_q(qg->qg_qpaths[0].qp_name, *phash);
+#endif /* SM_CONF_SHM */
+ ++qn;
+ }
+ return qn;
+}
+
+/*
+** FILESYS_FIND -- find entry in FileSys table, or add new one
+**
+** Given the pathname of a directory, determine the file system
+** in which that directory resides, and return a pointer to the
+** entry in the FileSys table that describes the file system.
+** A new entry is added if necessary (and requested).
+** If the directory does not exist, -1 is returned.
+**
+** Parameters:
+** path -- pathname of directory
+** add -- add to structure if not found.
+**
+** Returns:
+** >=0: found: index in file system table
+** <0: some error, i.e.,
+** FSF_TOO_MANY: too many filesystems (-> syserr())
+** FSF_STAT_FAIL: can't stat() filesystem (-> syserr())
+** FSF_NOT_FOUND: not in list
+*/
+
+static short filesys_find __P((char *, bool));
+
+#define FSF_NOT_FOUND (-1)
+#define FSF_STAT_FAIL (-2)
+#define FSF_TOO_MANY (-3)
+
+static short
+filesys_find(path, add)
+ char *path;
+ bool add;
+{
+ struct stat st;
+ short i;
+
+ if (stat(path, &st) < 0)
+ {
+ syserr("cannot stat queue directory %s", path);
+ return FSF_STAT_FAIL;
+ }
+ for (i = 0; i < NumFileSys; ++i)
+ {
+ if (FILE_SYS_DEV(i) == st.st_dev)
+ return i;
+ }
+ if (i >= MAXFILESYS)
+ {
+ syserr("too many queue file systems (%d max)", MAXFILESYS);
+ return FSF_TOO_MANY;
}
+ if (!add)
+ return FSF_NOT_FOUND;
+
+ ++NumFileSys;
+ FILE_SYS_NAME(i) = path;
+ FILE_SYS_DEV(i) = st.st_dev;
+ FILE_SYS_AVAIL(i) = 0;
+ FILE_SYS_BLKSIZE(i) = 1024; /* avoid divide by zero */
+ return i;
}
-# if 0
- /*
+/*
+** FILESYS_SETUP -- set up mapping from queue directories to file systems
+**
+** This data structure is used to efficiently check the amount of
+** free space available in a set of queue directories.
+**
+** Parameters:
+** add -- initialize structure if necessary.
+**
+** Returns:
+** 0: success
+** <0: some error, i.e.,
+** FSF_NOT_FOUND: not in list
+** FSF_STAT_FAIL: can't stat() filesystem (-> syserr())
+** FSF_TOO_MANY: too many filesystems (-> syserr())
+*/
+
+static int filesys_setup __P((bool));
+
+static int
+filesys_setup(add)
+ bool add;
+{
+ int i, j;
+ short fs;
+ int ret;
+
+ ret = 0;
+ for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
+ {
+ for (j = 0; j < Queue[i]->qg_numqueues; ++j)
+ {
+ QPATHS *qp = &Queue[i]->qg_qpaths[j];
+
+ fs = filesys_find(qp->qp_name, add);
+ if (fs >= 0)
+ qp->qp_fsysidx = fs;
+ else
+ qp->qp_fsysidx = 0;
+ if (fs < ret)
+ ret = fs;
+ }
+ }
+ return ret;
+}
+
+/*
+** FILESYS_UPDATE -- update amount of free space on all file systems
+**
+** The FileSys table is used to cache the amount of free space
+** available on all queue directory file systems.
+** This function updates the cached information if it has expired.
+**
+** Parameters:
+** none.
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** Updates FileSys table.
+*/
+
+void
+filesys_update()
+{
+ int i;
+ long avail, blksize;
+ time_t now;
+ static time_t nextupdate = 0;
+
+#if SM_CONF_SHM
+ /* only the daemon updates this structure */
+ if (ShmId != SM_SHM_NO_ID && DaemonPid != CurrentPid)
+ return;
+#endif /* SM_CONF_SHM */
+ now = curtime();
+ if (now < nextupdate)
+ return;
+ nextupdate = now + FILESYS_UPDATE_INTERVAL;
+ for (i = 0; i < NumFileSys; ++i)
+ {
+ FILESYS *fs = &FILE_SYS(i);
+
+ avail = freediskspace(FILE_SYS_NAME(i), &blksize);
+ if (avail < 0 || blksize <= 0)
+ {
+ if (LogLevel > 5)
+ sm_syslog(LOG_ERR, NOQID,
+ "filesys_update failed: %s, fs=%s, avail=%ld, blocksize=%ld",
+ sm_errstring(errno),
+ FILE_SYS_NAME(i), avail, blksize);
+ fs->fs_avail = 0;
+ fs->fs_blksize = 1024; /* avoid divide by zero */
+ nextupdate = now + 2; /* let's do this soon again */
+ }
+ else
+ {
+ fs->fs_avail = avail;
+ fs->fs_blksize = blksize;
+ }
+ }
+}
+
+#if _FFR_ANY_FREE_FS
+/*
+** FILESYS_FREE -- check whether there is at least one fs with enough space.
+**
+** Parameters:
+** fsize -- file size in bytes
+**
+** Returns:
+** true iff there is one fs with more than fsize bytes free.
+*/
+
+bool
+filesys_free(fsize)
+ long fsize;
+{
+ int i;
+
+ if (fsize <= 0)
+ return true;
+ for (i = 0; i < NumFileSys; ++i)
+ {
+ long needed = 0;
+
+ if (FILE_SYS_AVAIL(i) < 0 || FILE_SYS_BLKSIZE(i) <= 0)
+ continue;
+ needed += fsize / FILE_SYS_BLKSIZE(i)
+ + ((fsize % FILE_SYS_BLKSIZE(i)
+ > 0) ? 1 : 0)
+ + MinBlocksFree;
+ if (needed <= FILE_SYS_AVAIL(i))
+ return true;
+ }
+ return false;
+}
+#endif /* _FFR_ANY_FREE_FS */
+
+#if _FFR_CONTROL_MSTAT
+/*
+** DISK_STATUS -- show amount of free space in queue directories
+**
+** Parameters:
+** out -- output file pointer.
+** prefix -- string to output in front of each line.
+**
+** Returns:
+** none.
+*/
+
+void
+disk_status(out, prefix)
+ SM_FILE_T *out;
+ char *prefix;
+{
+ int i;
+ long avail, blksize;
+ long free;
+
+ for (i = 0; i < NumFileSys; ++i)
+ {
+ avail = freediskspace(FILE_SYS_NAME(i), &blksize);
+ if (avail >= 0 && blksize > 0)
+ {
+ free = (long)((double) avail *
+ ((double) blksize / 1024));
+ }
+ else
+ free = -1;
+ (void) sm_io_fprintf(out, SM_TIME_DEFAULT,
+ "%s%d/%s/%ld\r\n",
+ prefix, i,
+ FILE_SYS_NAME(i),
+ free);
+ }
+}
+#endif /* _FFR_CONTROL_MSTAT */
+
+#if SM_CONF_SHM
+/*
+** UPD_QS -- update information about queue when adding/deleting an entry
+**
+** Parameters:
+** e -- envelope.
+** delete -- delete/add entry.
+** avail -- update the space available as well.
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** Modifies available space in filesystem.
+** Changes number of entries in queue directory.
+*/
+
+void
+upd_qs(e, delete, avail)
+ ENVELOPE *e;
+ bool delete;
+ bool avail;
+{
+ short fidx;
+ int idx;
+ long s;
+
+ if (ShmId == SM_SHM_NO_ID || e == NULL)
+ return;
+ if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR)
+ return;
+ idx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_idx;
+
+ /* XXX in theory this needs to be protected with a mutex */
+ if (QSHM_ENTRIES(idx) >= 0)
+ {
+ if (delete)
+ --QSHM_ENTRIES(idx);
+ else
+ ++QSHM_ENTRIES(idx);
+ }
+
+ fidx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_fsysidx;
+ if (fidx < 0)
+ return;
+
+ /* update available space also? (might be loseqfile) */
+ if (!avail)
+ return;
+
+ /* convert size to blocks; this causes rounding errors */
+ s = e->e_msgsize / FILE_SYS_BLKSIZE(fidx);
+ if (s == 0)
+ return;
+
+ /* XXX in theory this needs to be protected with a mutex */
+ if (delete)
+ FILE_SYS_AVAIL(fidx) += s;
+ else
+ FILE_SYS_AVAIL(fidx) -= s;
+
+}
+/*
+** INIT_SHM -- initialize shared memory structure
+**
+** Initialize or attach to shared memory segment.
+** Currently it is not a fatal error if this doesn't work.
+** However, it causes us to have a "fallback" storage location
+** for everything that is supposed to be in the shared memory,
+** which makes the code slightly ugly.
+**
+** Parameters:
+** qn -- number of queue directories.
+** owner -- owner of shared memory.
+** hash -- identifies data that is stored in shared memory.
+**
+** Returns:
+** none.
+*/
+
+static void init_shm __P((int, bool, unsigned int));
+
+static void
+init_shm(qn, owner, hash)
+ int qn;
+ bool owner;
+ unsigned int hash;
+{
+ int i;
+
+ PtrFileSys = &FileSys[0];
+ PNumFileSys = &Numfilesys;
+
+ /* This allows us to disable shared memory at runtime. */
+ if (ShmKey != 0)
+ {
+ int count;
+ int save_errno;
+ size_t shms;
+
+ count = 0;
+ shms = SM_T_SIZE + qn * sizeof(QUEUE_SHM_T);
+ for (;;)
+ {
+ /* XXX: maybe allow read access for group? */
+ Pshm = sm_shmstart(ShmKey, shms, SHM_R|SHM_W, &ShmId,
+ owner);
+ save_errno = errno;
+ if (Pshm != NULL || save_errno != EEXIST)
+ break;
+ if (++count >= 3)
+ break;
+ sleep(count);
+ }
+ if (Pshm != NULL)
+ {
+ int *p;
+
+ p = (int *) Pshm;
+ if (owner)
+ {
+ *p = (int) shms;
+ *((pid_t *) SHM_OFF_PID(Pshm)) = CurrentPid;
+ p = (int *) SHM_OFF_TAG(Pshm);
+ *p = hash;
+ }
+ else
+ {
+ if (*p != (int) shms)
+ {
+ save_errno = EINVAL;
+ cleanup_shm(false);
+ goto error;
+ }
+ p = (int *) SHM_OFF_TAG(Pshm);
+ if (*p != (int) hash)
+ {
+ save_errno = EINVAL;
+ cleanup_shm(false);
+ goto error;
+ }
+
+ /*
+ ** XXX how to check the pid?
+ ** Read it from the pid-file? That does
+ ** not need to exist.
+ ** We could disable shm if we can't confirm
+ ** that it is the right one.
+ */
+ }
+
+ PtrFileSys = (FILESYS *) OFF_FILE_SYS(Pshm);
+ PNumFileSys = (int *) OFF_NUM_FILE_SYS(Pshm);
+ QShm = (QUEUE_SHM_T *) OFF_QUEUE_SHM(Pshm);
+ PRSATmpCnt = (int *) OFF_RSA_TMP_CNT(Pshm);
+ *PRSATmpCnt = 0;
+ if (owner)
+ {
+ /* initialize values in shared memory */
+ NumFileSys = 0;
+ for (i = 0; i < qn; i++)
+ QShm[i].qs_entries = -1;
+ }
+ return;
+ }
+ error:
+ if (LogLevel > (owner ? 8 : 11))
+ {
+ sm_syslog(owner ? LOG_ERR : LOG_NOTICE, NOQID,
+ "can't %s shared memory, key=%ld: %s",
+ owner ? "initialize" : "attach to",
+ (long) ShmKey, sm_errstring(save_errno));
+ }
+ }
+}
+#endif /* SM_CONF_SHM */
+
+/*
+** SETUP_QUEUES -- setup all queue groups
+**
+** Parameters:
+** owner -- owner of shared memory.
+**
+** Returns:
+** none.
+**
+#if SM_CONF_SHM
+** Side Effects:
+** attaches shared memory.
+#endif * SM_CONF_SHM *
+*/
+
+void
+setup_queues(owner)
+ bool owner;
+{
+ int i, qn, len;
+ unsigned int hashval;
+ char basedir[MAXPATHLEN];
+ struct stat st;
+
+ /*
+ ** Determine basedir for all queue directories.
+ ** All queue directories must be (first level) subdirectories
+ ** of the basedir. The basedir is the QueueDir
+ ** without wildcards, but with trailing /
+ */
+
+ hashval = 0;
+ errno = 0;
+ len = sm_strlcpy(basedir, QueueDir, sizeof basedir);
+ if (len >= sizeof basedir)
+ {
+ syserr("QueueDirectory: path too long: %d, max %d",
+ len, (int) sizeof basedir);
+ ExitStat = EX_CONFIG;
+ return;
+ }
+ SM_ASSERT(len > 0);
+ if (basedir[len - 1] == '*')
+ {
+ char *cp;
+
+ cp = SM_LAST_DIR_DELIM(basedir);
+ if (cp == NULL)
+ {
+ syserr("QueueDirectory: can not wildcard relative path \"%s\"",
+ QueueDir);
+ if (tTd(41, 2))
+ sm_dprintf("setup_queues: \"%s\": Can not wildcard relative path.\n",
+ QueueDir);
+ ExitStat = EX_CONFIG;
+ return;
+ }
+
+ /* cut off wildcard pattern */
+ *++cp = '\0';
+ len = cp - basedir;
+ }
+ else if (!SM_IS_DIR_DELIM(basedir[len - 1]))
+ {
+ /* append trailing slash since it is a directory */
+ basedir[len] = '/';
+ basedir[++len] = '\0';
+ }
+
+ /* len counts up to the last directory delimiter */
+ SM_ASSERT(basedir[len - 1] == '/');
+
+ if (chdir(basedir) < 0)
+ {
+ int save_errno = errno;
+
+ syserr("can not chdir(%s)", basedir);
+ if (save_errno == EACCES)
+ (void) sm_io_fprintf(smioerr, SM_TIME_DEFAULT,
+ "Program mode requires special privileges, e.g., root or TrustedUser.\n");
+ if (tTd(41, 2))
+ sm_dprintf("setup_queues: \"%s\": %s\n",
+ basedir, sm_errstring(errno));
+ ExitStat = EX_CONFIG;
+ return;
+ }
+#if SM_CONF_SHM
+ hashval = hash_q(basedir, hashval);
+#endif /* SM_CONF_SHM */
+
+ /* initialize map for queue runs */
+ clrbitmap(DoQueueRun);
+
+
+ if (UseMSP && OpMode != MD_TEST)
+ {
+ long sff = SFF_CREAT;
+
+ if (stat(".", &st) < 0)
+ {
+ syserr("can not stat(%s)", basedir);
+ if (tTd(41, 2))
+ sm_dprintf("setup_queues: \"%s\": %s\n",
+ basedir, sm_errstring(errno));
+ ExitStat = EX_CONFIG;
+ return;
+ }
+ if (RunAsUid == 0)
+ sff |= SFF_ROOTOK;
+
+ /*
+ ** Check queue directory permissions.
+ ** Can we write to a group writable queue directory?
+ */
+
+ if (bitset(S_IWGRP, QueueFileMode) &&
+ bitset(S_IWGRP, st.st_mode) &&
+ safefile(" ", RunAsUid, RunAsGid, RunAsUserName, sff,
+ QueueFileMode, NULL) != 0)
+ {
+ syserr("can not write to queue directory %s (RunAsGid=%d, required=%d)",
+ basedir, (int) RunAsGid, (int) st.st_gid);
+ }
+ if (bitset(S_IWOTH|S_IXOTH, st.st_mode))
+ {
+#if _FFR_MSP_PARANOIA
+ syserr("dangerous permissions=%o on queue directory %s",
+ (int) st.st_mode, basedir);
+#else /* _FFR_MSP_PARANOIA */
+ if (LogLevel > 0)
+ sm_syslog(LOG_ERR, NOQID,
+ "dangerous permissions=%o on queue directory %s",
+ (int) st.st_mode, basedir);
+#endif /* _FFR_MSP_PARANOIA */
+ }
+#if _FFR_MSP_PARANOIA
+ if (NumQueue > 1)
+ syserr("can not use multiple queues for MSP");
+#endif /* _FFR_MSP_PARANOIA */
+ }
+
+ /* initial number of queue directories */
+ qn = 0;
+ for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
+ qn = multiqueue_cache(basedir, len, Queue[i], qn, &hashval);
+
+#if SM_CONF_SHM
+ init_shm(qn, owner, hashval);
+ i = filesys_setup(owner || ShmId == SM_SHM_NO_ID);
+ if (i == FSF_NOT_FOUND)
+ {
+ /*
+ ** We didn't get the right filesystem data
+ ** This may happen if we don't have the right shared memory.
+ ** So let's do this without shared memory.
+ */
+
+ SM_ASSERT(!owner);
+ cleanup_shm(false); /* release shared memory */
+ i = filesys_setup(false);
+ if (i < 0)
+ syserr("filesys_setup failed twice, result=%d", i);
+ else if (LogLevel > 8)
+ sm_syslog(LOG_WARNING, NOQID,
+ "shared memory does not contain expected data, ignored");
+ }
+#else /* SM_CONF_SHM */
+ i = filesys_setup(true);
+#endif /* SM_CONF_SHM */
+ if (i < 0)
+ ExitStat = EX_CONFIG;
+}
+
+#if SM_CONF_SHM
+/*
+** CLEANUP_SHM -- do some cleanup work for shared memory etc
+**
+** Parameters:
+** owner -- owner of shared memory?
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** detaches shared memory.
+*/
+
+void
+cleanup_shm(owner)
+ bool owner;
+{
+ if (ShmId != SM_SHM_NO_ID)
+ {
+ if (sm_shmstop(Pshm, ShmId, owner) < 0 && LogLevel > 8)
+ sm_syslog(LOG_INFO, NOQID, "sh_shmstop failed=%s",
+ sm_errstring(errno));
+ Pshm = NULL;
+ ShmId = SM_SHM_NO_ID;
+ }
+}
+#endif /* SM_CONF_SHM */
+
+/*
+** CLEANUP_QUEUES -- do some cleanup work for queues
+**
+** Parameters:
+** none.
+**
+** Returns:
+** none.
+**
+*/
+
+void
+cleanup_queues()
+{
+ sync_queue_time();
+}
+/*
+** SET_DEF_QUEUEVAL -- set default values for a queue group.
+**
+** Parameters:
+** qg -- queue group
+** all -- set all values (true for default group)?
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** sets default values for the queue group.
+*/
+
+void
+set_def_queueval(qg, all)
+ QUEUEGRP *qg;
+ bool all;
+{
+ if (bitnset(QD_DEFINED, qg->qg_flags))
+ return;
+ if (all)
+ qg->qg_qdir = QueueDir;
+#if 0
+ qg->qg_sortorder = QueueSortOrder;
+#endif /* 0 */
+ qg->qg_maxqrun = all ? MaxRunnersPerQueue : -1;
+ qg->qg_nice = NiceQueueRun;
+}
+/*
+** MAKEQUEUE -- define a new queue.
+**
+** Parameters:
+** line -- description of queue. This is in labeled fields.
+** The fields are:
+** F -- the flags associated with the queue
+** I -- the interval between running the queue
+** J -- the maximum # of jobs in work list
+** [M -- the maximum # of jobs in a queue run]
+** N -- the niceness at which to run
+** P -- the path to the queue
+** S -- the queue sorting order
+** R -- number of parallel queue runners
+** r -- max recipients per envelope
+** The first word is the canonical name of the queue.
+** qdef -- this is a 'Q' definition from .cf
+**
+** Returns:
+** none.
+**
+** Side Effects:
+** enters the queue into the queue table.
+*/
+
+void
+makequeue(line, qdef)
+ char *line;
+ bool qdef;
+{
+ register char *p;
+ register QUEUEGRP *qg;
+ register STAB *s;
+ int i;
+ char fcode;
+
+ /* allocate a queue and set up defaults */
+ qg = (QUEUEGRP *) xalloc(sizeof *qg);
+ memset((char *) qg, '\0', sizeof *qg);
+
+ if (line[0] == '\0')
+ {
+ syserr("name required for queue");
+ return;
+ }
+
+ /* collect the queue name */
+ for (p = line;
+ *p != '\0' && *p != ',' && !(isascii(*p) && isspace(*p));
+ p++)
+ continue;
+ if (*p != '\0')
+ *p++ = '\0';
+ qg->qg_name = newstr(line);
+
+ /* set default values, can be overridden below */
+ set_def_queueval(qg, false);
+
+ /* now scan through and assign info from the fields */
+ while (*p != '\0')
+ {
+ auto char *delimptr;
+
+ while (*p != '\0' &&
+ (*p == ',' || (isascii(*p) && isspace(*p))))
+ p++;
+
+ /* p now points to field code */
+ fcode = *p;
+ while (*p != '\0' && *p != '=' && *p != ',')
+ p++;
+ if (*p++ != '=')
+ {
+ syserr("queue %s: `=' expected", qg->qg_name);
+ return;
+ }
+ while (isascii(*p) && isspace(*p))
+ p++;
+
+ /* p now points to the field body */
+ p = munchstring(p, &delimptr, ',');
+
+ /* install the field into the queue struct */
+ switch (fcode)
+ {
+ case 'P': /* pathname */
+ if (*p == '\0')
+ syserr("queue %s: empty path name",
+ qg->qg_name);
+ else
+ qg->qg_qdir = newstr(p);
+ break;
+
+ case 'F': /* flags */
+ for (; *p != '\0'; p++)
+ if (!(isascii(*p) && isspace(*p)))
+ setbitn(*p, qg->qg_flags);
+ break;
+
+ /*
+ ** Do we need two intervals here:
+ ** One for persistent queue runners,
+ ** one for "normal" queue runs?
+ */
+
+ case 'I': /* interval between running the queue */
+ qg->qg_queueintvl = convtime(p, 'm');
+ break;
+
+ case 'N': /* run niceness */
+ qg->qg_nice = atoi(p);
+ break;
+
+ case 'R': /* maximum # of runners for the group */
+ i = atoi(p);
+
+ /* can't have more runners than allowed total */
+ if (MaxQueueChildren > 0 && i > MaxQueueChildren)
+ {
+ qg->qg_maxqrun = MaxQueueChildren;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Q=%s: R=%d exceeds MaxQueueChildren=%d, set to MaxQueueChildren\n",
+ qg->qg_name, i,
+ MaxQueueChildren);
+ }
+ else
+ qg->qg_maxqrun = i;
+ break;
+
+ case 'J': /* maximum # of jobs in work list */
+ qg->qg_maxlist = atoi(p);
+ break;
+
+ case 'r': /* max recipients per envelope */
+ qg->qg_maxrcpt = atoi(p);
+ break;
+
+#if 0
+ case 'S': /* queue sorting order */
+ switch (*p)
+ {
+ case 'h': /* Host first */
+ case 'H':
+ qg->qg_sortorder = QSO_BYHOST;
+ break;
+
+ case 'p': /* Priority order */
+ case 'P':
+ qg->qg_sortorder = QSO_BYPRIORITY;
+ break;
+
+ case 't': /* Submission time */
+ case 'T':
+ qg->qg_sortorder = QSO_BYTIME;
+ break;
+
+ case 'f': /* File name */
+ case 'F':
+ qg->qg_sortorder = QSO_BYFILENAME;
+ break;
+
+ case 'm': /* Modification time */
+ case 'M':
+ qgrp->qg_sortorder = QSO_BYMODTIME;
+ break;
+
+ default:
+ syserr("Invalid queue sort order \"%s\"", p);
+ }
+ break;
+#endif /* 0 */
+
+ default:
+ syserr("Q%s: unknown queue equate %c=",
+ qg->qg_name, fcode);
+ break;
+ }
+
+ p = delimptr;
+ }
+
+#if !HASNICE
+ if (qg->qg_nice != NiceQueueRun)
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Q%s: Warning: N= set on system that doesn't support nice()\n",
+ qg->qg_name);
+ }
+#endif /* !HASNICE */
+
+ /* do some rationality checking */
+ if (NumQueue >= MAXQUEUEGROUPS)
+ {
+ syserr("too many queue groups defined (%d max)",
+ MAXQUEUEGROUPS);
+ return;
+ }
+
+ if (qg->qg_qdir == NULL)
+ {
+ if (QueueDir == NULL || *QueueDir == '\0')
+ {
+ syserr("QueueDir must be defined before queue groups");
+ return;
+ }
+ qg->qg_qdir = newstr(QueueDir);
+ }
+
+ if (qg->qg_maxqrun > 1 && !bitnset(QD_FORK, qg->qg_flags))
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Warning: Q=%s: R=%d: multiple queue runners specified\n\tbut flag '%c' is not set\n",
+ qg->qg_name, qg->qg_maxqrun, QD_FORK);
+ }
+
+ /* enter the queue into the symbol table */
+ if (tTd(37, 8))
+ sm_syslog(LOG_INFO, NOQID,
+ "Adding %s to stab, path: %s", qg->qg_name,
+ qg->qg_qdir);
+ s = stab(qg->qg_name, ST_QUEUE, ST_ENTER);
+ if (s->s_quegrp != NULL)
+ {
+ i = s->s_quegrp->qg_index;
+
+ /* XXX what about the pointers inside this struct? */
+ sm_free(s->s_quegrp); /* XXX */
+ }
+ else
+ i = NumQueue++;
+ Queue[i] = s->s_quegrp = qg;
+ qg->qg_index = i;
+
+ /* set default value for max queue runners */
+ if (qg->qg_maxqrun < 0)
+ {
+ if (MaxRunnersPerQueue > 0)
+ qg->qg_maxqrun = MaxRunnersPerQueue;
+ else
+ qg->qg_maxqrun = 1;
+ }
+ if (qdef)
+ setbitn(QD_DEFINED, qg->qg_flags);
+}
+#if 0
+/*
** HASHFQN -- calculate a hash value for a fully qualified host name
**
** Arguments:
@@ -3441,7 +6917,6 @@ hashfqn(fqn, buckets)
{
register char *p;
register int h = 0, hash, cnt;
-# define WATERINC (1000)
if (fqn == NULL)
return -1;
@@ -3465,10 +6940,10 @@ hashfqn(fqn, buckets)
return hash;
}
-# endif /* 0 */
+#endif /* 0 */
-# if _FFR_QUEUEDELAY
- /*
+#if _FFR_QUEUEDELAY
+/*
** QUEUEDELAY -- compute queue delay time
**
** Parameters:
@@ -3503,5 +6978,1330 @@ queuedelay(e)
qd = MinQueueAge;
return qd;
}
-# endif /* _FFR_QUEUEDELAY */
-#endif /* QUEUE */
+#endif /* _FFR_QUEUEDELAY */
+
+/*
+** A structure for sorting Queue according to maxqrun without
+** screwing up Queue itself.
+*/
+
+struct sortqgrp
+{
+ int sg_idx; /* original index */
+ int sg_maxqrun; /* max queue runners */
+};
+typedef struct sortqgrp SORTQGRP_T;
+static int cmpidx __P((const void *, const void *));
+
+static int
+cmpidx(a, b)
+ const void *a;
+ const void *b;
+{
+ /* The sort is highest to lowest, so the comparison is reversed */
+ if (((SORTQGRP_T *)a)->sg_maxqrun < ((SORTQGRP_T *)b)->sg_maxqrun)
+ return 1;
+ else if (((SORTQGRP_T *)a)->sg_maxqrun > ((SORTQGRP_T *)b)->sg_maxqrun)
+ return -1;
+ else
+ return 0;
+}
+
+/*
+** MAKEWORKGROUP -- balance queue groups into work groups per MaxQueueChildren
+**
+** Take the now defined queue groups and assign them to work groups.
+** This is done to balance out the number of concurrently active
+** queue runners such that MaxQueueChildren is not exceeded. This may
+** result in more than one queue group per work group. In such a case
+** the number of running queue groups in that work group will have no
+** more than the work group maximum number of runners (a "fair" portion
+** of MaxQueueRunners). All queue groups within a work group will get a
+** chance at running.
+**
+** Parameters:
+** none.
+**
+** Returns:
+** nothing.
+**
+** Side Effects:
+** Sets up WorkGrp structure.
+*/
+
+void
+makeworkgroups()
+{
+ int i, j, total_runners = 0;
+ int dir;
+ SORTQGRP_T si[MAXQUEUEGROUPS + 1];
+
+ if (NumQueue == 1 && strcmp(Queue[0]->qg_name, "mqueue") == 0)
+ {
+ /*
+ ** There is only the "mqueue" queue group (a default)
+ ** containing all of the queues. We want to provide to
+ ** this queue group the maximum allowable queue runners.
+ ** To match older behavior (8.10/8.11) we'll try for
+ ** 1 runner per queue capping it at MaxQueueChildren.
+ ** So if there are N queues, then there will be N runners
+ ** for the "mqueue" queue group (where N is kept less than
+ ** MaxQueueChildren).
+ */
+
+ NumWorkGroups = 1;
+ WorkGrp[0].wg_numqgrp = 1;
+ WorkGrp[0].wg_qgs = (QUEUEGRP **) xalloc(sizeof(QUEUEGRP *));
+ WorkGrp[0].wg_qgs[0] = Queue[0];
+ if (MaxQueueChildren > 0 &&
+ Queue[0]->qg_numqueues > MaxQueueChildren)
+ WorkGrp[0].wg_runners = MaxQueueChildren;
+ else
+ WorkGrp[0].wg_runners = Queue[0]->qg_numqueues;
+
+ Queue[0]->qg_wgrp = 0;
+
+ /* can't have more runners than allowed total */
+ if (MaxQueueChildren > 0 &&
+ Queue[0]->qg_maxqrun > MaxQueueChildren)
+ Queue[0]->qg_maxqrun = MaxQueueChildren;
+ WorkGrp[0].wg_maxact = Queue[0]->qg_maxqrun;
+ WorkGrp[0].wg_lowqintvl = Queue[0]->qg_queueintvl;
+ return;
+ }
+
+ for (i = 0; i < NumQueue; i++)
+ {
+ si[i].sg_maxqrun = Queue[i]->qg_maxqrun;
+ si[i].sg_idx = i;
+ }
+ qsort(si, NumQueue, sizeof(si[0]), cmpidx);
+
+ NumWorkGroups = 0;
+ for (i = 0; i < NumQueue; i++)
+ {
+ total_runners += si[i].sg_maxqrun;
+ if (MaxQueueChildren <= 0 || total_runners <= MaxQueueChildren)
+ NumWorkGroups++;
+ else
+ break;
+ }
+
+ if (NumWorkGroups < 1)
+ NumWorkGroups = 1; /* gotta have one at least */
+ else if (NumWorkGroups > MAXWORKGROUPS)
+ NumWorkGroups = MAXWORKGROUPS; /* the limit */
+
+ /*
+ ** We now know the number of work groups to pack the queue groups
+ ** into. The queue groups in 'Queue' are sorted from highest
+ ** to lowest for the number of runners per queue group.
+ ** We put the queue groups with the largest number of runners
+ ** into work groups first. Then the smaller ones are fitted in
+ ** where it looks best.
+ */
+
+ j = 0;
+ dir = 1;
+ for (i = 0; i < NumQueue; i++)
+ {
+ /* a to-and-fro packing scheme, continue from last position */
+ if (j >= NumWorkGroups)
+ {
+ dir = -1;
+ j = NumWorkGroups - 1;
+ }
+ else if (j < 0)
+ {
+ j = 0;
+ dir = 1;
+ }
+
+ WorkGrp[j].wg_qgs = (QUEUEGRP **)sm_realloc(WorkGrp[j].wg_qgs,
+ sizeof(QUEUEGRP *) *
+ (WorkGrp[j].wg_numqgrp + 1));
+ if (WorkGrp[j].wg_qgs == NULL)
+ {
+ syserr("@cannot allocate memory for work queues, need %d bytes",
+ (int) (sizeof(QUEUEGRP *) *
+ (WorkGrp[j].wg_numqgrp + 1)));
+ }
+
+ WorkGrp[j].wg_qgs[WorkGrp[j].wg_numqgrp] = Queue[si[i].sg_idx];
+ WorkGrp[j].wg_numqgrp++;
+ WorkGrp[j].wg_runners += Queue[i]->qg_maxqrun;
+ Queue[si[i].sg_idx]->qg_wgrp = j;
+
+ if (WorkGrp[j].wg_maxact == 0)
+ {
+ /* can't have more runners than allowed total */
+ if (MaxQueueChildren > 0 &&
+ Queue[i]->qg_maxqrun > MaxQueueChildren)
+ Queue[i]->qg_maxqrun = MaxQueueChildren;
+ WorkGrp[j].wg_maxact = Queue[i]->qg_maxqrun;
+ }
+
+ /*
+ ** XXX: must wg_lowqintvl be the GCD?
+ ** qg1: 2m, qg2: 3m, minimum: 2m, when do queue runs for
+ ** qg2 occur?
+ */
+
+ /* keep track of the lowest interval for a persistent runner */
+ if (Queue[si[i].sg_idx]->qg_queueintvl > 0 &&
+ WorkGrp[j].wg_lowqintvl < Queue[si[i].sg_idx]->qg_queueintvl)
+ WorkGrp[j].wg_lowqintvl = Queue[si[i].sg_idx]->qg_queueintvl;
+ j += dir;
+ }
+ if (tTd(41, 9))
+ {
+ for (i = 0; i < NumWorkGroups; i++)
+ {
+ sm_dprintf("Workgroup[%d]=", i);
+ for (j = 0; j < WorkGrp[i].wg_numqgrp; j++)
+ {
+ sm_dprintf("%s, ",
+ WorkGrp[i].wg_qgs[j]->qg_name);
+ }
+ sm_dprintf("\n");
+ }
+ }
+}
+
+/*
+** DUP_DF -- duplicate envelope data file
+**
+** Copy the data file from the 'old' envelope to the 'new' envelope
+** in the most efficient way possible.
+**
+** Create a hard link from the 'old' data file to the 'new' data file.
+** If the old and new queue directories are on different file systems,
+** then the new data file link is created in the old queue directory,
+** and the new queue file will contain a 'd' record pointing to the
+** directory containing the new data file.
+**
+** Parameters:
+** old -- old envelope.
+** new -- new envelope.
+**
+** Results:
+** Returns true on success, false on failure.
+**
+** Side Effects:
+** On success, the new data file is created.
+** On fatal failure, EF_FATALERRS is set in old->e_flags.
+*/
+
+static bool dup_df __P((ENVELOPE *, ENVELOPE *));
+
+static bool
+dup_df(old, new)
+ ENVELOPE *old;
+ ENVELOPE *new;
+{
+ int ofs, nfs, r;
+ char opath[MAXPATHLEN];
+ char npath[MAXPATHLEN];
+
+ SM_REQUIRE(bitset(EF_HAS_DF, old->e_flags));
+ SM_REQUIRE(ISVALIDQGRP(old->e_qgrp) && ISVALIDQDIR(old->e_qdir));
+ SM_REQUIRE(ISVALIDQGRP(new->e_qgrp) && ISVALIDQDIR(new->e_qdir));
+
+ (void) sm_strlcpy(opath, queuename(old, DATAFL_LETTER), sizeof opath);
+ (void) sm_strlcpy(npath, queuename(new, DATAFL_LETTER), sizeof npath);
+
+ if (old->e_dfp != NULL)
+ {
+ r = sm_io_setinfo(old->e_dfp, SM_BF_COMMIT, NULL);
+ if (r < 0 && errno != EINVAL)
+ {
+ syserr("@can't commit %s", opath);
+ old->e_flags |= EF_FATALERRS;
+ return false;
+ }
+ }
+
+ /*
+ ** Attempt to create a hard link, if we think both old and new
+ ** are on the same file system, otherwise copy the file.
+ **
+ ** Don't waste time attempting a hard link unless old and new
+ ** are on the same file system.
+ */
+
+ ofs = Queue[old->e_qgrp]->qg_qpaths[old->e_qdir].qp_fsysidx;
+ nfs = Queue[new->e_qgrp]->qg_qpaths[new->e_qdir].qp_fsysidx;
+ if (FILE_SYS_DEV(ofs) == FILE_SYS_DEV(nfs))
+ {
+ if (link(opath, npath) == 0)
+ {
+ new->e_flags |= EF_HAS_DF;
+ SYNC_DIR(npath, true);
+ return true;
+ }
+ goto error;
+ }
+
+ /*
+ ** Can't link across queue directories, so try to create a hard
+ ** link in the same queue directory as the old df file.
+ ** The qf file will refer to the new df file using a 'd' record.
+ */
+
+ new->e_dfqgrp = old->e_dfqgrp;
+ new->e_dfqdir = old->e_dfqdir;
+ (void) sm_strlcpy(npath, queuename(new, DATAFL_LETTER), sizeof npath);
+ if (link(opath, npath) == 0)
+ {
+ new->e_flags |= EF_HAS_DF;
+ SYNC_DIR(npath, true);
+ return true;
+ }
+
+ error:
+ if (LogLevel > 0)
+ sm_syslog(LOG_ERR, old->e_id,
+ "dup_df: can't link %s to %s, error=%s, envelope splitting failed",
+ opath, npath, sm_errstring(errno));
+ return false;
+}
+
+/*
+** SPLIT_ENV -- Allocate a new envelope based on a given envelope.
+**
+** Parameters:
+** e -- envelope.
+** sendqueue -- sendqueue for new envelope.
+** qgrp -- index of queue group.
+** qdir -- queue directory.
+**
+** Results:
+** new envelope.
+**
+*/
+
+static ENVELOPE *split_env __P((ENVELOPE *, ADDRESS *, int, int));
+
+static ENVELOPE *
+split_env(e, sendqueue, qgrp, qdir)
+ ENVELOPE *e;
+ ADDRESS *sendqueue;
+ int qgrp;
+ int qdir;
+{
+ ENVELOPE *ee;
+
+ ee = (ENVELOPE *) sm_rpool_malloc_x(e->e_rpool, sizeof *ee);
+ STRUCTCOPY(*e, *ee);
+ ee->e_message = NULL; /* XXX use original message? */
+ ee->e_id = NULL;
+ assign_queueid(ee);
+ ee->e_sendqueue = sendqueue;
+ ee->e_flags &= ~(EF_INQUEUE|EF_CLRQUEUE|EF_FATALERRS
+ |EF_SENDRECEIPT|EF_RET_PARAM|EF_HAS_DF);
+ ee->e_flags |= EF_NORECEIPT; /* XXX really? */
+ ee->e_from.q_state = QS_SENDER;
+ ee->e_dfp = NULL;
+ ee->e_lockfp = NULL;
+ if (e->e_xfp != NULL)
+ ee->e_xfp = sm_io_dup(e->e_xfp);
+ ee->e_qgrp = ee->e_dfqgrp = qgrp;
+ ee->e_qdir = ee->e_dfqdir = qdir;
+ ee->e_errormode = EM_MAIL;
+ ee->e_statmsg = NULL;
+#if _FFR_QUARANTINE
+ if (e->e_quarmsg != NULL)
+ ee->e_quarmsg = sm_rpool_strdup_x(ee->e_rpool,
+ e->e_quarmsg);
+#endif /* _FFR_QUARANTINE */
+
+ /*
+ ** XXX Not sure if this copying is necessary.
+ ** sendall() does this copying, but I don't know if that is
+ ** because of the storage management discipline we were using
+ ** before rpools were introduced, or if it is because these lists
+ ** can be modified later.
+ */
+
+ ee->e_header = copyheader(e->e_header, ee->e_rpool);
+ ee->e_errorqueue = copyqueue(e->e_errorqueue, ee->e_rpool);
+
+ return ee;
+}
+
+/* return values from split functions, check also below! */
+#define SM_SPLIT_FAIL (0)
+#define SM_SPLIT_NONE (1)
+#define SM_SPLIT_NEW(n) (1 + (n))
+
+/*
+** SPLIT_ACROSS_QUEUE_GROUPS
+**
+** This function splits an envelope across multiple queue groups
+** based on the queue group of each recipient.
+**
+** Parameters:
+** e -- envelope.
+**
+** Results:
+** SM_SPLIT_FAIL on failure
+** SM_SPLIT_NONE if no splitting occurred,
+** or 1 + the number of additional envelopes created.
+**
+** Side Effects:
+** On success, e->e_sibling points to a list of zero or more
+** additional envelopes, and the associated data files exist
+** on disk. But the queue files are not created.
+**
+** On failure, e->e_sibling is not changed.
+** The order of recipients in e->e_sendqueue is permuted.
+** Abandoned data files for additional envelopes that failed
+** to be created may exist on disk.
+*/
+
+static int q_qgrp_compare __P((const void *, const void *));
+static int e_filesys_compare __P((const void *, const void *));
+
+static int
+q_qgrp_compare(p1, p2)
+ const void *p1;
+ const void *p2;
+{
+ ADDRESS **pq1 = (ADDRESS **) p1;
+ ADDRESS **pq2 = (ADDRESS **) p2;
+
+ return (*pq1)->q_qgrp - (*pq2)->q_qgrp;
+}
+
+static int
+e_filesys_compare(p1, p2)
+ const void *p1;
+ const void *p2;
+{
+ ENVELOPE **pe1 = (ENVELOPE **) p1;
+ ENVELOPE **pe2 = (ENVELOPE **) p2;
+ int fs1, fs2;
+
+ fs1 = Queue[(*pe1)->e_qgrp]->qg_qpaths[(*pe1)->e_qdir].qp_fsysidx;
+ fs2 = Queue[(*pe2)->e_qgrp]->qg_qpaths[(*pe2)->e_qdir].qp_fsysidx;
+ if (FILE_SYS_DEV(fs1) < FILE_SYS_DEV(fs2))
+ return -1;
+ if (FILE_SYS_DEV(fs1) > FILE_SYS_DEV(fs2))
+ return 1;
+ return 0;
+}
+
+static int
+split_across_queue_groups(e)
+ ENVELOPE *e;
+{
+ int naddrs, nsplits, i;
+ char **pvp;
+ ADDRESS *q, **addrs;
+ ENVELOPE *ee, *es;
+ ENVELOPE *splits[MAXQUEUEGROUPS];
+ char pvpbuf[PSBUFSIZE];
+
+ SM_REQUIRE(ISVALIDQGRP(e->e_qgrp));
+
+ /* Count addresses and assign queue groups. */
+ naddrs = 0;
+ for (q = e->e_sendqueue; q != NULL; q = q->q_next)
+ {
+ if (QS_IS_DEAD(q->q_state))
+ continue;
+ ++naddrs;
+
+ /* bad addresses and those already sent stay put */
+ if (QS_IS_BADADDR(q->q_state) ||
+ QS_IS_SENT(q->q_state))
+ q->q_qgrp = e->e_qgrp;
+ else if (!ISVALIDQGRP(q->q_qgrp))
+ {
+ /* call ruleset which should return a queue group */
+ i = rscap(RS_QUEUEGROUP, q->q_user, NULL, e, &pvp,
+ pvpbuf, sizeof(pvpbuf));
+ if (i == EX_OK &&
+ pvp != NULL && pvp[0] != NULL &&
+ (pvp[0][0] & 0377) == CANONNET &&
+ pvp[1] != NULL && pvp[1][0] != '\0')
+ {
+ i = name2qid(pvp[1]);
+ if (ISVALIDQGRP(i))
+ {
+ q->q_qgrp = i;
+ if (tTd(20, 4))
+ sm_syslog(LOG_INFO, NOQID,
+ "queue group name %s -> %d",
+ pvp[1], i);
+ continue;
+ }
+ else if (LogLevel > 10)
+ sm_syslog(LOG_INFO, NOQID,
+ "can't find queue group name %s, selection ignored",
+ pvp[1]);
+ }
+ if (q->q_mailer != NULL &&
+ ISVALIDQGRP(q->q_mailer->m_qgrp))
+ q->q_qgrp = q->q_mailer->m_qgrp;
+ else
+ q->q_qgrp = 0;
+ }
+ }
+
+ /* only one address? nothing to split. */
+ if (naddrs <= 1)
+ return SM_SPLIT_NONE;
+
+ /* sort the addresses by queue group */
+ addrs = sm_rpool_malloc_x(e->e_rpool, naddrs * sizeof(ADDRESS *));
+ for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next)
+ {
+ if (QS_IS_DEAD(q->q_state))
+ continue;
+ addrs[i++] = q;
+ }
+ qsort(addrs, naddrs, sizeof(ADDRESS *), q_qgrp_compare);
+
+ /* split into multiple envelopes, by queue group */
+ nsplits = 0;
+ es = NULL;
+ e->e_sendqueue = NULL;
+ for (i = 0; i < naddrs; ++i)
+ {
+ if (i == naddrs - 1 || addrs[i]->q_qgrp != addrs[i + 1]->q_qgrp)
+ addrs[i]->q_next = NULL;
+ else
+ addrs[i]->q_next = addrs[i + 1];
+
+ /* same queue group as original envelope? */
+ if (addrs[i]->q_qgrp == e->e_qgrp)
+ {
+ if (e->e_sendqueue == NULL)
+ e->e_sendqueue = addrs[i];
+ continue;
+ }
+
+ /* different queue group than original envelope */
+ if (es == NULL || addrs[i]->q_qgrp != es->e_qgrp)
+ {
+ ee = split_env(e, addrs[i], addrs[i]->q_qgrp, NOQDIR);
+ es = ee;
+ splits[nsplits++] = ee;
+ }
+ }
+
+ /* no splits? return right now. */
+ if (nsplits <= 0)
+ return SM_SPLIT_NONE;
+
+ /* assign a queue directory to each additional envelope */
+ for (i = 0; i < nsplits; ++i)
+ {
+ es = splits[i];
+#if 0
+ es->e_qdir = pickqdir(Queue[es->e_qgrp], es->e_msgsize, es);
+#endif /* 0 */
+ if (!setnewqueue(es))
+ goto failure;
+ }
+
+ /* sort the additional envelopes by queue file system */
+ qsort(splits, nsplits, sizeof(ENVELOPE *), e_filesys_compare);
+
+ /* create data files for each additional envelope */
+ if (!dup_df(e, splits[0]))
+ {
+ i = 0;
+ goto failure;
+ }
+ for (i = 1; i < nsplits; ++i)
+ {
+ /* copy or link to the previous data file */
+ if (!dup_df(splits[i - 1], splits[i]))
+ goto failure;
+ }
+
+ /* success: prepend the new envelopes to the e->e_sibling list */
+ for (i = 0; i < nsplits; ++i)
+ {
+ es = splits[i];
+ es->e_sibling = e->e_sibling;
+ e->e_sibling = es;
+ }
+ return SM_SPLIT_NEW(nsplits);
+
+ /* failure: clean up */
+ failure:
+ if (i > 0)
+ {
+ int j;
+
+ for (j = 0; j < i; j++)
+ (void) unlink(queuename(splits[j], DATAFL_LETTER));
+ }
+ e->e_sendqueue = addrs[0];
+ for (i = 0; i < naddrs - 1; ++i)
+ addrs[i]->q_next = addrs[i + 1];
+ addrs[naddrs - 1]->q_next = NULL;
+ return SM_SPLIT_FAIL;
+}
+
+/*
+** SPLIT_WITHIN_QUEUE
+**
+** Split an envelope with multiple recipients into several
+** envelopes within the same queue directory, if the number of
+** recipients exceeds the limit for the queue group.
+**
+** Parameters:
+** e -- envelope.
+**
+** Results:
+** SM_SPLIT_FAIL on failure
+** SM_SPLIT_NONE if no splitting occurred,
+** or 1 + the number of additional envelopes created.
+*/
+
+#define SPLIT_LOG_LEVEL 8
+
+static int split_within_queue __P((ENVELOPE *));
+
+static int
+split_within_queue(e)
+ ENVELOPE *e;
+{
+ int maxrcpt, nrcpt, ndead, nsplit, i;
+ int j, l;
+ char *lsplits;
+ ADDRESS *q, **addrs;
+ ENVELOPE *ee, *firstsibling;
+
+ if (!ISVALIDQGRP(e->e_qgrp) || bitset(EF_SPLIT, e->e_flags))
+ return SM_SPLIT_NONE;
+
+ /* don't bother if there is no recipient limit */
+ maxrcpt = Queue[e->e_qgrp]->qg_maxrcpt;
+ if (maxrcpt <= 0)
+ return SM_SPLIT_NONE;
+
+ /* count recipients */
+ nrcpt = 0;
+ for (q = e->e_sendqueue; q != NULL; q = q->q_next)
+ {
+ if (QS_IS_DEAD(q->q_state))
+ continue;
+ ++nrcpt;
+ }
+ if (nrcpt <= maxrcpt)
+ return SM_SPLIT_NONE;
+
+ /*
+ ** Preserve the recipient list
+ ** so that we can restore it in case of error.
+ ** (But we discard dead addresses.)
+ */
+
+ addrs = sm_rpool_malloc_x(e->e_rpool, nrcpt * sizeof(ADDRESS *));
+ for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next)
+ {
+ if (QS_IS_DEAD(q->q_state))
+ continue;
+ addrs[i++] = q;
+ }
+
+ /*
+ ** Partition the recipient list so that bad and sent addresses
+ ** come first. These will go with the original envelope, and
+ ** do not count towards the maxrcpt limit.
+ ** addrs[] does not contain QS_IS_DEAD() addresses.
+ */
+
+ ndead = 0;
+ for (i = 0; i < nrcpt; ++i)
+ {
+ if (QS_IS_BADADDR(addrs[i]->q_state) ||
+ QS_IS_SENT(addrs[i]->q_state) ||
+ QS_IS_DEAD(addrs[i]->q_state)) /* for paranoia's sake */
+ {
+ if (i > ndead)
+ {
+ ADDRESS *tmp = addrs[i];
+
+ addrs[i] = addrs[ndead];
+ addrs[ndead] = tmp;
+ }
+ ++ndead;
+ }
+ }
+
+ /* Check if no splitting required. */
+ if (nrcpt - ndead <= maxrcpt)
+ return SM_SPLIT_NONE;
+
+ /* fix links */
+ for (i = 0; i < nrcpt - 1; ++i)
+ addrs[i]->q_next = addrs[i + 1];
+ addrs[nrcpt - 1]->q_next = NULL;
+ e->e_sendqueue = addrs[0];
+
+ /* prepare buffer for logging */
+ if (LogLevel > SPLIT_LOG_LEVEL)
+ {
+ l = MAXLINE;
+ lsplits = sm_malloc(l);
+ if (lsplits != NULL)
+ *lsplits = '\0';
+ j = 0;
+ }
+ else
+ {
+ /* get rid of stupid compiler warnings */
+ lsplits = NULL;
+ j = l = 0;
+ }
+
+ /* split the envelope */
+ firstsibling = e->e_sibling;
+ i = maxrcpt + ndead;
+ nsplit = 0;
+ for (;;)
+ {
+ addrs[i - 1]->q_next = NULL;
+ ee = split_env(e, addrs[i], e->e_qgrp, e->e_qdir);
+ if (!dup_df(e, ee))
+ {
+
+ ee = firstsibling;
+ while (ee != NULL)
+ {
+ (void) unlink(queuename(ee, DATAFL_LETTER));
+ ee = ee->e_sibling;
+ }
+
+ /* Error. Restore e's sibling & recipient lists. */
+ e->e_sibling = firstsibling;
+ for (i = 0; i < nrcpt - 1; ++i)
+ addrs[i]->q_next = addrs[i + 1];
+ return SM_SPLIT_FAIL;
+ }
+
+ /* prepend the new envelope to e->e_sibling */
+ ee->e_sibling = e->e_sibling;
+ e->e_sibling = ee;
+ ++nsplit;
+ if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL)
+ {
+ if (j >= l - strlen(ee->e_id) - 3)
+ {
+ char *p;
+
+ l += MAXLINE;
+ p = sm_realloc(lsplits, l);
+ if (p == NULL)
+ {
+ /* let's try to get this done */
+ sm_free(lsplits);
+ lsplits = NULL;
+ }
+ else
+ lsplits = p;
+ }
+ if (lsplits != NULL)
+ {
+ if (j == 0)
+ j += sm_strlcat(lsplits + j,
+ ee->e_id,
+ l - j);
+ else
+ j += sm_strlcat2(lsplits + j,
+ "; ",
+ ee->e_id,
+ l - j);
+ SM_ASSERT(j < l);
+ }
+ }
+ if (nrcpt - i <= maxrcpt)
+ break;
+ i += maxrcpt;
+ }
+ if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL && nsplit > 0)
+ {
+ sm_syslog(LOG_NOTICE, e->e_id,
+ "split: maxrcpts=%d, rcpts=%d, count=%d, id%s=%s",
+ maxrcpt, nrcpt - ndead, nsplit,
+ nsplit > 1 ? "s" : "", lsplits);
+ sm_free(lsplits);
+ }
+ return SM_SPLIT_NEW(nsplit);
+}
+/*
+** SPLIT_BY_RECIPIENT
+**
+** Split an envelope with multiple recipients into multiple
+** envelopes as required by the sendmail configuration.
+**
+** Parameters:
+** e -- envelope.
+**
+** Results:
+** Returns true on success, false on failure.
+**
+** Side Effects:
+** see split_across_queue_groups(), split_within_queue(e)
+*/
+
+bool
+split_by_recipient(e)
+ ENVELOPE *e;
+{
+ int split, n, i, j, l;
+ char *lsplits;
+ ENVELOPE *ee, *next, *firstsibling;
+
+ if (OpMode == SM_VERIFY || !ISVALIDQGRP(e->e_qgrp) ||
+ bitset(EF_SPLIT, e->e_flags))
+ return true;
+ n = split_across_queue_groups(e);
+ if (n == SM_SPLIT_FAIL)
+ return false;
+ firstsibling = ee = e->e_sibling;
+ if (n > 1 && LogLevel > SPLIT_LOG_LEVEL)
+ {
+ l = MAXLINE;
+ lsplits = sm_malloc(l);
+ if (lsplits != NULL)
+ *lsplits = '\0';
+ j = 0;
+ }
+ else
+ {
+ /* get rid of stupid compiler warnings */
+ lsplits = NULL;
+ j = l = 0;
+ }
+ for (i = 1; i < n; ++i)
+ {
+ next = ee->e_sibling;
+ if (split_within_queue(ee) == SM_SPLIT_FAIL)
+ {
+ e->e_sibling = firstsibling;
+ return false;
+ }
+ ee->e_flags |= EF_SPLIT;
+ if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL)
+ {
+ if (j >= l - strlen(ee->e_id) - 3)
+ {
+ char *p;
+
+ l += MAXLINE;
+ p = sm_realloc(lsplits, l);
+ if (p == NULL)
+ {
+ /* let's try to get this done */
+ sm_free(lsplits);
+ lsplits = NULL;
+ }
+ else
+ lsplits = p;
+ }
+ if (lsplits != NULL)
+ {
+ if (j == 0)
+ j += sm_strlcat(lsplits + j,
+ ee->e_id, l - j);
+ else
+ j += sm_strlcat2(lsplits + j, "; ",
+ ee->e_id, l - j);
+ SM_ASSERT(j < l);
+ }
+ }
+ ee = next;
+ }
+ if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL && n > 1)
+ {
+ sm_syslog(LOG_NOTICE, e->e_id, "split: count=%d, id%s=%s",
+ n - 1, n > 2 ? "s" : "", lsplits);
+ sm_free(lsplits);
+ }
+ split = split_within_queue(e) != SM_SPLIT_FAIL;
+ if (split)
+ e->e_flags |= EF_SPLIT;
+ return split;
+}
+
+#if _FFR_QUARANTINE
+/*
+** QUARANTINE_QUEUE_ITEM -- {un,}quarantine a single envelope
+**
+** Add/remove quarantine reason and requeue appropriately.
+**
+** Parameters:
+** qgrp -- queue group for the item
+** qdir -- queue directory in the given queue group
+** e -- envelope information for the item
+** reason -- quarantine reason, NULL means unquarantine.
+**
+** Results:
+** true if item changed, false otherwise
+**
+** Side Effects:
+** Changes quarantine tag in queue file and renames it.
+*/
+
+static bool
+quarantine_queue_item(qgrp, qdir, e, reason)
+ int qgrp;
+ int qdir;
+ ENVELOPE *e;
+ char *reason;
+{
+ bool dirty = false;
+ bool failing = false;
+ bool foundq = false;
+ bool finished = false;
+ int fd;
+ int flags;
+ int oldtype;
+ int newtype;
+ int save_errno;
+ MODE_T oldumask = 0;
+ SM_FILE_T *oldqfp, *tempqfp;
+ char *bp;
+ char oldqf[MAXPATHLEN];
+ char tempqf[MAXPATHLEN];
+ char newqf[MAXPATHLEN];
+ char buf[MAXLINE];
+
+ oldtype = queue_letter(e, ANYQFL_LETTER);
+ (void) sm_strlcpy(oldqf, queuename(e, ANYQFL_LETTER), sizeof oldqf);
+ (void) sm_strlcpy(tempqf, queuename(e, NEWQFL_LETTER), sizeof tempqf);
+
+ /*
+ ** Instead of duplicating all the open
+ ** and lock code here, tell readqf() to
+ ** do that work and return the open
+ ** file pointer in e_lockfp. Note that
+ ** we must release the locks properly when
+ ** we are done.
+ */
+
+ if (!readqf(e, true))
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s\n", qid_printname(e));
+ return false;
+ }
+ oldqfp = e->e_lockfp;
+
+ /* open the new queue file */
+ flags = O_CREAT|O_WRONLY|O_EXCL;
+ if (bitset(S_IWGRP, QueueFileMode))
+ oldumask = umask(002);
+ fd = open(tempqf, flags, QueueFileMode);
+ if (bitset(S_IWGRP, QueueFileMode))
+ (void) umask(oldumask);
+ RELEASE_QUEUE;
+
+ if (fd < 0)
+ {
+ save_errno = errno;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Could not open %s: %s\n",
+ qid_printname(e), tempqf,
+ sm_errstring(save_errno));
+ (void) sm_io_close(oldqfp, SM_TIME_DEFAULT);
+ return false;
+ }
+ if (!lockfile(fd, tempqf, NULL, LOCK_EX|LOCK_NB))
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Could not lock %s\n",
+ qid_printname(e), tempqf);
+ (void) close(fd);
+ (void) sm_io_close(oldqfp, SM_TIME_DEFAULT);
+ return false;
+ }
+
+ tempqfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT, (void *) &fd,
+ SM_IO_WRONLY, NULL);
+ if (tempqfp == NULL)
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Could not lock %s\n",
+ qid_printname(e), tempqf);
+ (void) close(fd);
+ (void) sm_io_close(oldqfp, SM_TIME_DEFAULT);
+ return false;
+ }
+
+ /* Copy the data over, changing the quarantine reason */
+ while ((bp = fgetfolded(buf, sizeof buf, oldqfp)) != NULL)
+ {
+ if (tTd(40, 4))
+ sm_dprintf("+++++ %s\n", bp);
+ switch (bp[0])
+ {
+ case 'q': /* quarantine reason */
+ foundq = true;
+ if (reason == NULL)
+ {
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%s: Removed quarantine of \"%s\"\n",
+ e->e_id, &bp[1]);
+ }
+ sm_syslog(LOG_INFO, e->e_id, "unquarantine");
+ dirty = true;
+ continue;
+ }
+ else if (strcmp(reason, &bp[1]) == 0)
+ {
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%s: Already quarantined with \"%s\"\n",
+ e->e_id, reason);
+ }
+ (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT,
+ "q%s\n", reason);
+ }
+ else
+ {
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%s: Quarantine changed from \"%s\" to \"%s\"\n",
+ e->e_id, &bp[1],
+ reason);
+ }
+ (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT,
+ "q%s\n", reason);
+ sm_syslog(LOG_INFO, e->e_id, "quarantine=%s",
+ reason);
+ dirty = true;
+ }
+ break;
+
+ case 'R':
+ /*
+ ** If we are quarantining an unquarantined item,
+ ** need to put in a new 'q' line before it's
+ ** too late.
+ */
+
+ if (!foundq && reason != NULL)
+ {
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT,
+ "%s: Quarantined with \"%s\"\n",
+ e->e_id, reason);
+ }
+ (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT,
+ "q%s\n", reason);
+ sm_syslog(LOG_INFO, e->e_id, "quarantine=%s",
+ reason);
+ foundq = true;
+ dirty = true;
+ }
+
+ /* Copy the line to the new file */
+ (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT,
+ "%s\n", bp);
+ break;
+
+ case '.':
+ finished = true;
+ /* FALLTHROUGH */
+
+ default:
+ /* Copy the line to the new file */
+ (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT,
+ "%s\n", bp);
+ break;
+ }
+ }
+
+ /* Make sure we read the whole old file */
+ errno = sm_io_error(tempqfp);
+ if (errno != 0 && errno != SM_IO_EOF)
+ {
+ save_errno = errno;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Error reading %s: %s\n",
+ qid_printname(e), oldqf,
+ sm_errstring(save_errno));
+ failing = true;
+ }
+
+ if (!failing && !finished)
+ {
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Incomplete file: %s\n",
+ qid_printname(e), oldqf);
+ failing = true;
+ }
+
+ /* Check if we actually changed anything or we can just bail now */
+ if (!dirty)
+ {
+ /* pretend we failed, even though we technically didn't */
+ failing = true;
+ }
+
+ /* Make sure we wrote things out safely */
+ if (!failing &&
+ (sm_io_flush(tempqfp, SM_TIME_DEFAULT) != 0 ||
+ ((SuperSafe == SAFE_REALLY || SuperSafe == SAFE_INTERACTIVE) &&
+ fsync(sm_io_getinfo(tempqfp, SM_IO_WHAT_FD, NULL)) < 0) ||
+ ((errno = sm_io_error(tempqfp)) != 0)))
+ {
+ save_errno = errno;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Skipping %s: Error writing %s: %s\n",
+ qid_printname(e), tempqf,
+ sm_errstring(save_errno));
+ failing = true;
+ }
+
+
+ /* Figure out the new filename */
+ newtype = (reason == NULL ? NORMQF_LETTER : QUARQF_LETTER);
+ if (oldtype == newtype)
+ {
+ /* going to rename tempqf to oldqf */
+ (void) sm_strlcpy(newqf, oldqf, sizeof newqf);
+ }
+ else
+ {
+ /* going to rename tempqf to new name based on newtype */
+ (void) sm_strlcpy(newqf, queuename(e, newtype), sizeof newqf);
+ }
+
+ save_errno = 0;
+
+ /* rename tempqf to newqf */
+ if (!failing &&
+ rename(tempqf, newqf) < 0)
+ save_errno = (errno == 0) ? EINVAL : errno;
+
+ /* Check rename() success */
+ if (!failing && save_errno != 0)
+ {
+ sm_syslog(LOG_DEBUG, e->e_id,
+ "quarantine_queue_item: rename(%s, %s): %s",
+ tempqf, newqf, sm_errstring(save_errno));
+
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Error renaming %s to %s: %s\n",
+ tempqf, newqf,
+ sm_errstring(save_errno));
+ if (oldtype == newtype)
+ {
+ /*
+ ** Bail here since we don't know the state of
+ ** the filesystem and may need to keep tempqf
+ ** for the user to rescue us.
+ */
+
+ RELEASE_QUEUE;
+ errno = save_errno;
+ syserr("!452 Error renaming control file %s", tempqf);
+ /* NOTREACHED */
+ }
+ else
+ {
+ /* remove new file (if rename() half completed) */
+ if (xunlink(newqf) < 0)
+ {
+ save_errno = errno;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Error removing %s: %s\n",
+ newqf,
+ sm_errstring(save_errno));
+ }
+
+ /* tempqf removed below */
+ failing = true;
+ }
+
+ }
+
+ /* If changing file types, need to remove old type */
+ if (!failing && oldtype != newtype)
+ {
+ if (xunlink(oldqf) < 0)
+ {
+ save_errno = errno;
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Error removing %s: %s\n",
+ oldqf, sm_errstring(save_errno));
+ }
+ }
+
+ /* see if anything above failed */
+ if (failing)
+ {
+ /* Something failed: remove new file, old file still there */
+ (void) xunlink(tempqf);
+ }
+
+ /*
+ ** fsync() after file operations to make sure metadata is
+ ** written to disk on filesystems in which renames are
+ ** not guaranteed. It's ok if they fail, mail won't be lost.
+ */
+
+ if (SuperSafe != SAFE_NO)
+ {
+ /* for soft-updates */
+ (void) fsync(sm_io_getinfo(tempqfp,
+ SM_IO_WHAT_FD, NULL));
+
+ if (!failing)
+ {
+ /* for soft-updates */
+ (void) fsync(sm_io_getinfo(oldqfp,
+ SM_IO_WHAT_FD, NULL));
+ }
+
+ /* for other odd filesystems */
+ SYNC_DIR(tempqf, false);
+ }
+
+ /* Close up shop */
+ RELEASE_QUEUE;
+ if (tempqfp != NULL)
+ (void) sm_io_close(tempqfp, SM_TIME_DEFAULT);
+ if (oldqfp != NULL)
+ (void) sm_io_close(oldqfp, SM_TIME_DEFAULT);
+
+ /* All went well */
+ return !failing;
+}
+
+/*
+** QUARANTINE_QUEUE -- {un,}quarantine matching items in the queue
+**
+** Read all matching queue items, add/remove quarantine
+** reason, and requeue appropriately.
+**
+** Parameters:
+** reason -- quarantine reason, "." means unquarantine.
+** qgrplimit -- limit to single queue group unless NOQGRP
+**
+** Results:
+** none.
+**
+** Side Effects:
+** Lots of changes to the queue.
+*/
+
+void
+quarantine_queue(reason, qgrplimit)
+ char *reason;
+ int qgrplimit;
+{
+ int changed = 0;
+ int qgrp;
+
+ /* Convert internal representation of unquarantine */
+ if (reason != NULL && reason[0] == '.' && reason[1] == '\0')
+ reason = NULL;
+
+ if (reason != NULL)
+ {
+ /* clean it */
+ reason = newstr(denlstring(reason, true, true));
+ }
+
+ for (qgrp = 0; qgrp < NumQueue && Queue[qgrp] != NULL; qgrp++)
+ {
+ int qdir;
+
+ if (qgrplimit != NOQGRP && qgrplimit != qgrp)
+ continue;
+
+ for (qdir = 0; qdir < Queue[qgrp]->qg_numqueues; qdir++)
+ {
+ int i;
+ int nrequests;
+
+ if (StopRequest)
+ stop_sendmail();
+
+ nrequests = gatherq(qgrp, qdir, true, NULL, NULL);
+
+ /* first see if there is anything */
+ if (nrequests <= 0)
+ {
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT, "%s: no matches\n",
+ qid_printqueue(qgrp, qdir));
+ }
+ continue;
+ }
+
+ if (Verbose)
+ {
+ (void) sm_io_fprintf(smioout,
+ SM_TIME_DEFAULT, "Processing %s:\n",
+ qid_printqueue(qgrp, qdir));
+ }
+
+ for (i = 0; i < WorkListCount; i++)
+ {
+ ENVELOPE e;
+
+ if (StopRequest)
+ stop_sendmail();
+
+ /* setup envelope */
+ clearenvelope(&e, true, sm_rpool_new_x(NULL));
+ e.e_id = WorkList[i].w_name + 2;
+ e.e_qgrp = qgrp;
+ e.e_qdir = qdir;
+
+ if (tTd(70, 101))
+ {
+ sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "Would do %s\n", e.e_id);
+ changed++;
+ }
+ else if (quarantine_queue_item(qgrp, qdir,
+ &e, reason))
+ changed++;
+
+ /* clean up */
+ sm_rpool_free(e.e_rpool);
+ e.e_rpool = NULL;
+ }
+ if (WorkList != NULL)
+ sm_free(WorkList); /* XXX */
+ WorkList = NULL;
+ WorkListSize = 0;
+ WorkListCount = 0;
+ }
+ }
+ if (Verbose)
+ {
+ if (changed == 0)
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "No changes\n");
+ else
+ (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT,
+ "%d change%s\n",
+ changed,
+ changed == 1 ? "" : "s");
+ }
+}
+#endif /* _FFR_QUARANTINE */
OpenPOWER on IntegriCloud