diff options
author | gshapiro <gshapiro@FreeBSD.org> | 2002-02-17 21:56:45 +0000 |
---|---|---|
committer | gshapiro <gshapiro@FreeBSD.org> | 2002-02-17 21:56:45 +0000 |
commit | 8449595fe97f4474b9b9a7e4edee1ef35dcff393 (patch) | |
tree | e7a33b132264d449a512ddf4a8685df097669c1d /contrib/sendmail/src/queue.c | |
parent | 289b381b31415647269c7520d881017e2dcb27f1 (diff) | |
download | FreeBSD-src-8449595fe97f4474b9b9a7e4edee1ef35dcff393.zip FreeBSD-src-8449595fe97f4474b9b9a7e4edee1ef35dcff393.tar.gz |
Import sendmail 8.12.2
Diffstat (limited to 'contrib/sendmail/src/queue.c')
-rw-r--r-- | contrib/sendmail/src/queue.c | 6910 |
1 files changed, 5855 insertions, 1055 deletions
diff --git a/contrib/sendmail/src/queue.c b/contrib/sendmail/src/queue.c index aeed7f9..6a2da9c 100644 --- a/contrib/sendmail/src/queue.c +++ b/contrib/sendmail/src/queue.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 1998-2001 Sendmail, Inc. and its suppliers. + * Copyright (c) 1998-2002 Sendmail, Inc. and its suppliers. * All rights reserved. * Copyright (c) 1983, 1995-1997 Eric P. Allman. All rights reserved. * Copyright (c) 1988, 1993 @@ -11,28 +11,41 @@ * */ - #include <sendmail.h> -#ifndef lint -# if QUEUE -static char id[] = "@(#)$Id: queue.c,v 8.343.4.62 2001/07/20 00:53:01 gshapiro Exp $ (with queueing)"; -# else /* QUEUE */ -static char id[] = "@(#)$Id: queue.c,v 8.343.4.62 2001/07/20 00:53:01 gshapiro Exp $ (without queueing)"; -# endif /* QUEUE */ -#endif /* ! lint */ +SM_RCSID("@(#)$Id: queue.c,v 8.834 2002/01/08 23:04:58 ca Exp $") + +#include <dirent.h> + +#if SM_CONF_SHM +# include <sm/shm.h> +#endif /* SM_CONF_SHM */ -# include <dirent.h> +# define RELEASE_QUEUE (void) 0 +# define ST_INODE(st) (st).st_ino -#if QUEUE -# if _FFR_QUEUEDELAY -# define QF_VERSION 5 /* version number of this queue format */ +/* +** Historical notes: +** QF_VERSION==4 was sendmail 8.10/8.11 without _FFR_QUEUEDELAY +** QF_VERSION==5 was sendmail 8.10/8.11 with _FFR_QUEUEDELAY +*/ + +#if _FFR_QUEUEDELAY +# define QF_VERSION 7 /* version number of this queue format */ static time_t queuedelay __P((ENVELOPE *)); -# else /* _FFR_QUEUEDELAY */ -# define QF_VERSION 4 /* version number of this queue format */ -# define queuedelay(e) MinQueueAge -# endif /* _FFR_QUEUEDELAY */ +#define queuedelay_qfver_unsupported(qfver) false +#else /* _FFR_QUEUEDELAY */ +# define QF_VERSION 6 /* version number of this queue format */ +# define queuedelay(e) MinQueueAge +#define queuedelay_qfver_unsupported(qfver) ((qfver) == 5 || (qfver) == 7) +#endif /* _FFR_QUEUEDELAY */ +#if _FFR_QUARANTINE +static char queue_letter __P((ENVELOPE *, int)); +static bool quarantine_queue_item __P((int, int, ENVELOPE *, char *)); +#endif /* _FFR_QUARANTINE */ + +/* Naming convention: qgrp: index of queue group, qg: QUEUEGROUP */ /* ** Work queue. @@ -45,28 +58,214 @@ struct work bool w_lock; /* is message locked? */ bool w_tooyoung; /* is it too young to run? */ long w_pri; /* priority of message, see below */ - time_t w_ctime; /* creation time of message */ + time_t w_ctime; /* creation time */ + time_t w_mtime; /* modification time */ + int w_qgrp; /* queue group located in */ + int w_qdir; /* queue directory located in */ struct work *w_next; /* next in queue */ }; typedef struct work WORK; -static WORK *WorkQ; /* queue of things to be done */ +static WORK *WorkQ; /* queue of things to be done */ +static int NumWorkGroups; /* number of work groups */ + +/* +** use of DoQueueRun: +** NumQueue: indicates that a queue run is needed, look at individual bits +** 0 - NumQueue-1: indicates that a queue run for this queue group +** is needed. +*/ + +static BITMAP256 volatile DoQueueRun; /* non-interrupt time queue run needed */ + +/* +** Work group definition structure. +** Each work group contains one or more queue groups. This is done +** to manage the number of queue group runners active at the same time +** to be within the constraints of MaxQueueChildren (if it is set). +** The number of queue groups that can be run on the next work run +** is kept track of. The queue groups are run in a round robin. +*/ + +struct workgrp +{ + int wg_numqgrp; /* number of queue groups in work grp */ + int wg_runners; /* total runners */ + int wg_curqgrp; /* current queue group */ + QUEUEGRP **wg_qgs; /* array of queue groups */ + int wg_maxact; /* max # of active runners */ + time_t wg_lowqintvl; /* lowest queue interval */ + int wg_restart; /* needs restarting? */ + int wg_restartcnt; /* count of times restarted */ +}; + +typedef struct workgrp WORKGRP; + +static WORKGRP volatile WorkGrp[MAXWORKGROUPS + 1]; /* work groups */ + +#if SM_HEAP_CHECK +static SM_DEBUG_T DebugLeakQ = SM_DEBUG_INITIALIZER("leak_q", + "@(#)$Debug: leak_q - trace memory leaks during queue processing $"); +#endif /* SM_HEAP_CHECK */ -static void grow_wlist __P((int)); -static int orderq __P((int, bool)); -static void printctladdr __P((ADDRESS *, FILE *)); -static int print_single_queue __P((int)); -static bool readqf __P((ENVELOPE *)); -static void runqueueevent __P((void)); -static int run_single_queue __P((int, bool, bool)); +/* +** We use EmptyString instead of "" to avoid +** 'zero-length format string' warnings from gcc +*/ + +static const char EmptyString[] = ""; + +static void grow_wlist __P((int, int)); +static int multiqueue_cache __P((char *, int, QUEUEGRP *, int, unsigned int *)); +static int gatherq __P((int, int, bool, bool *, bool *)); +static int sortq __P((int)); +static void printctladdr __P((ADDRESS *, SM_FILE_T *)); +static bool readqf __P((ENVELOPE *, bool)); +static void restart_work_group __P((int)); +static void runner_work __P((ENVELOPE *, int, bool, int, int)); +static void schedule_queue_runs __P((bool, int)); static char *strrev __P((char *)); -static ADDRESS *setctluser __P((char *, int)); +static ADDRESS *setctluser __P((char *, int, ENVELOPE *)); +#if _FFR_RHS +static int sm_strshufflecmp __P((char *, char *)); +static void init_shuffle_alphabet __P(()); +#endif /* _FFR_RHS */ static int workcmpf0(); static int workcmpf1(); static int workcmpf2(); static int workcmpf3(); static int workcmpf4(); +static int workcmpf5(); +static int workcmpf6(); +#if _FFR_RHS +static int workcmpf7(); +#endif /* _FFR_RHS */ + +#if RANDOMSHIFT +# define get_rand_mod(m) ((get_random() >> RANDOMSHIFT) % (m)) +#else /* RANDOMSHIFT */ +# define get_rand_mod(m) (get_random() % (m)) +#endif /* RANDOMSHIFT */ + +/* +** File system definition. +** Used to keep track of how much free space is available +** on a file system in which one or more queue directories reside. +*/ + +typedef struct filesys_shared FILESYS; + +struct filesys_shared +{ + dev_t fs_dev; /* unique device id */ + long fs_avail; /* number of free blocks available */ + long fs_blksize; /* block size, in bytes */ +}; + +/* probably kept in shared memory */ +static FILESYS FileSys[MAXFILESYS]; /* queue file systems */ +static char *FSPath[MAXFILESYS]; /* pathnames for file systems */ + +#if SM_CONF_SHM + +/* +** Shared memory data +** +** Current layout: +** size -- size of shared memory segment +** pid -- pid of owner, should be a unique id to avoid misinterpretations +** by other processes. +** tag -- should be a unique id to avoid misinterpretations by others. +** idea: hash over configuration data that will be stored here. +** NumFileSys -- number of file systems. +** FileSys -- (arrary of) structure for used file systems. +** RSATmpCnt -- counter for number of uses of ephemeral RSA key. +** QShm -- (array of) structure for information about queue directories. +*/ + +/* +** Queue data in shared memory +*/ + +typedef struct queue_shared QUEUE_SHM_T; + +struct queue_shared +{ + int qs_entries; /* number of entries */ + /* XXX more to follow? */ +}; + +static void *Pshm; /* pointer to shared memory */ +static FILESYS *PtrFileSys; /* pointer to queue file system array */ +int ShmId = SM_SHM_NO_ID; /* shared memory id */ +static QUEUE_SHM_T *QShm; /* pointer to shared queue data */ + +# define SHM_OFF_PID(p) (((char *) (p)) + sizeof(int)) +# define SHM_OFF_TAG(p) (((char *) (p)) + sizeof(pid_t) + sizeof(int)) +# define SHM_OFF_HEAD (sizeof(pid_t) + sizeof(int) * 2) + +/* how to access FileSys */ +# define FILE_SYS(i) (PtrFileSys[i]) + +/* first entry is a tag, for now just the size */ +# define OFF_FILE_SYS(p) (((char *) (p)) + SHM_OFF_HEAD) + +/* offset for PNumFileSys */ +# define OFF_NUM_FILE_SYS(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys)) + +/* offset for PRSATmpCnt */ +# define OFF_RSA_TMP_CNT(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int)) +int *PRSATmpCnt; + +/* offset for queue_shm */ +# define OFF_QUEUE_SHM(p) (((char *) (p)) + SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int) * 2) + +#define QSHM_ENTRIES(i) QShm[i].qs_entries + +/* basic size of shared memory segment */ +# define SM_T_SIZE (SHM_OFF_HEAD + sizeof(FileSys) + sizeof(int) * 2) + +static unsigned int hash_q __P((char *, unsigned int)); + +/* +** HASH_Q -- simple hash function +** +** Parameters: +** p -- string to hash. +** h -- hash start value (from previous run). +** +** Returns: +** hash value. +*/ + +static unsigned int +hash_q(p, h) + char *p; + unsigned int h; +{ + int c, d; + + while (*p != '\0') + { + d = *p++; + c = d; + c ^= c<<6; + h += (c<<11) ^ (c>>1); + h ^= (d<<14) + (d<<7) + (d<<4) + d; + } + return h; +} + +#else /* SM_CONF_SHM */ +# define FILE_SYS(i) FileSys[i] +#endif /* SM_CONF_SHM */ + +/* access to the various components of file system data */ +#define FILE_SYS_NAME(i) FSPath[i] +#define FILE_SYS_AVAIL(i) FILE_SYS(i).fs_avail +#define FILE_SYS_BLKSIZE(i) FILE_SYS(i).fs_blksize +#define FILE_SYS_DEV(i) FILE_SYS(i).fs_dev /* ** Current qf file field assignments: @@ -75,9 +274,10 @@ static int workcmpf4(); ** B body type ** C controlling user ** D data file name +** d data file directory name (added in 8.12) ** E error recipient ** F flag bits -** G queue delay algorithm +** G queue delay algorithm (_FFR_QUEUEDELAY) ** H header ** I data file's inode number ** K time of last delivery attempt @@ -85,43 +285,44 @@ static int workcmpf4(); ** M message (obsolete) ** N number of delivery attempts ** P message priority +** q quarantine reason (_FFR_QUARANTINE) ** Q original recipient (ORCPT=) +** r final recipient (Final-Recipient: DSN field) ** R recipient ** S sender ** T init time ** V queue file version -** X character set (_FFR_SAVE_CHARSET) -** Y current delay +** X free (was: character set if _FFR_SAVE_CHARSET) +** Y current delay (_FFR_QUEUEDELAY) ** Z original envelope id from ESMTP +** ! deliver by (added in 8.12) ** $ define macro ** . terminate file */ -/* +/* ** QUEUEUP -- queue a message up for future transmission. ** ** Parameters: ** e -- the envelope to queue up. -** announce -- if TRUE, tell when you are queueing up. +** announce -- if true, tell when you are queueing up. +** msync -- if true, then fsync() if SuperSafe interactive mode. ** ** Returns: ** none. ** ** Side Effects: -** The current request are saved in a control file. +** The current request is saved in a control file. ** The queue file is left locked. */ -# define TEMPQF_LETTER 'T' -# define LOSEQF_LETTER 'Q' - void -queueup(e, announce) +queueup(e, announce, msync) register ENVELOPE *e; bool announce; + bool msync; { - char *qf; - register FILE *tfp; + register SM_FILE_T *tfp; register HDR *h; register ADDRESS *q; int tfd = -1; @@ -130,7 +331,9 @@ queueup(e, announce) register char *p; MAILER nullmailer; MCI mcibuf; + char qf[MAXPATHLEN]; char tf[MAXPATHLEN]; + char df[MAXPATHLEN]; char buf[MAXLINE]; /* @@ -138,36 +341,28 @@ queueup(e, announce) */ newid = (e->e_id == NULL) || !bitset(EF_INQUEUE, e->e_flags); - - /* if newid, queuename will create a locked qf file in e->lockfp */ - (void) strlcpy(tf, queuename(e, 't'), sizeof tf); + (void) sm_strlcpy(tf, queuename(e, NEWQFL_LETTER), sizeof tf); tfp = e->e_lockfp; if (tfp == NULL) - newid = FALSE; + newid = false; - /* if newid, just write the qf file directly (instead of tf file) */ + /* if newid, write the queue file directly (instead of temp file) */ if (!newid) { - int flags; - - flags = O_CREAT|O_WRONLY|O_EXCL; + const int flags = O_CREAT|O_WRONLY|O_EXCL; /* get a locked tf file */ for (i = 0; i < 128; i++) { if (tfd < 0) { -#if _FFR_QUEUE_FILE_MODE - MODE_T oldumask; + MODE_T oldumask = 0; if (bitset(S_IWGRP, QueueFileMode)) oldumask = umask(002); tfd = open(tf, flags, QueueFileMode); if (bitset(S_IWGRP, QueueFileMode)) (void) umask(oldumask); -#else /* _FFR_QUEUE_FILE_MODE */ - tfd = open(tf, flags, FileMode); -#endif /* _FFR_QUEUE_FILE_MODE */ if (tfd < 0) { @@ -176,7 +371,8 @@ queueup(e, announce) if (LogLevel > 0 && (i % 32) == 0) sm_syslog(LOG_ALERT, e->e_id, "queueup: cannot create %s, uid=%d: %s", - tf, geteuid(), errstring(errno)); + tf, geteuid(), + sm_errstring(errno)); } } if (tfd >= 0) @@ -186,7 +382,7 @@ queueup(e, announce) else if (LogLevel > 0 && (i % 32) == 0) sm_syslog(LOG_ALERT, e->e_id, "queueup: cannot lock %s: %s", - tf, errstring(errno)); + tf, sm_errstring(errno)); if ((i % 32) == 31) { (void) close(tfd); @@ -202,11 +398,13 @@ queueup(e, announce) else (void) sleep(i % 32); } - if (tfd < 0 || (tfp = fdopen(tfd, "w")) == NULL) + if (tfd < 0 || (tfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT, + (void *) &tfd, SM_IO_WRONLY, + NULL)) == NULL) { int save_errno = errno; - printopenfds(TRUE); + printopenfds(true); errno = save_errno; syserr("!queueup: cannot create queue temp file %s, uid=%d", tf, geteuid()); @@ -214,146 +412,192 @@ queueup(e, announce) } if (tTd(40, 1)) - dprintf("\n>>>>> queueing %s/qf%s%s >>>>>\n", - qid_printqueue(e->e_queuedir), e->e_id, - newid ? " (new id)" : ""); + sm_dprintf("\n>>>>> queueing %s/%s%s >>>>>\n", + qid_printqueue(e->e_qgrp, e->e_qdir), + queuename(e, ANYQFL_LETTER), + newid ? " (new id)" : ""); if (tTd(40, 3)) { - dprintf(" e_flags="); + sm_dprintf(" e_flags="); printenvflags(e); } if (tTd(40, 32)) { - dprintf(" sendq="); - printaddr(e->e_sendqueue, TRUE); + sm_dprintf(" sendq="); + printaddr(e->e_sendqueue, true); } if (tTd(40, 9)) { - dprintf(" tfp="); - dumpfd(fileno(tfp), TRUE, FALSE); - dprintf(" lockfp="); + sm_dprintf(" tfp="); + dumpfd(sm_io_getinfo(tfp, SM_IO_WHAT_FD, NULL), true, false); + sm_dprintf(" lockfp="); if (e->e_lockfp == NULL) - dprintf("NULL\n"); + sm_dprintf("NULL\n"); else - dumpfd(fileno(e->e_lockfp), TRUE, FALSE); + dumpfd(sm_io_getinfo(e->e_lockfp, SM_IO_WHAT_FD, NULL), + true, false); } /* ** If there is no data file yet, create one. */ + (void) sm_strlcpy(df, queuename(e, DATAFL_LETTER), sizeof df); if (bitset(EF_HAS_DF, e->e_flags)) { - if (e->e_dfp != NULL && bfcommit(e->e_dfp) < 0) + if (e->e_dfp != NULL && + SuperSafe != SAFE_REALLY && + sm_io_setinfo(e->e_dfp, SM_BF_COMMIT, NULL) < 0 && + errno != EINVAL) + { syserr("!queueup: cannot commit data file %s, uid=%d", - queuename(e, 'd'), geteuid()); + queuename(e, DATAFL_LETTER), geteuid()); + } + if (e->e_dfp != NULL && + SuperSafe == SAFE_INTERACTIVE && msync) + { + if (tTd(40,32)) + sm_syslog(LOG_INFO, e->e_id, + "queueup: fsync(e->e_dfp)"); + + if (fsync(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD, + NULL)) < 0) + { + if (newid) + syserr("!552 Error writing data file %s", + df); + else + syserr("!452 Error writing data file %s", + df); + } + } } else { int dfd; - register FILE *dfp = NULL; - char dfname[MAXPATHLEN]; + MODE_T oldumask = 0; + register SM_FILE_T *dfp = NULL; struct stat stbuf; - if (e->e_dfp != NULL && bftest(e->e_dfp)) + if (e->e_dfp != NULL && + sm_io_getinfo(e->e_dfp, SM_IO_WHAT_ISTYPE, BF_FILE_TYPE)) syserr("committing over bf file"); - (void) strlcpy(dfname, queuename(e, 'd'), sizeof dfname); -#if _FFR_QUEUE_FILE_MODE - { - MODE_T oldumask; - - if (bitset(S_IWGRP, QueueFileMode)) - oldumask = umask(002); - dfd = open(dfname, O_WRONLY|O_CREAT|O_TRUNC, - QueueFileMode); - if (bitset(S_IWGRP, QueueFileMode)) - (void) umask(oldumask); - } -#else /* _FFR_QUEUE_FILE_MODE */ - dfd = open(dfname, O_WRONLY|O_CREAT|O_TRUNC, FileMode); -#endif /* _FFR_QUEUE_FILE_MODE */ - if (dfd < 0 || (dfp = fdopen(dfd, "w")) == NULL) + if (bitset(S_IWGRP, QueueFileMode)) + oldumask = umask(002); + dfd = open(df, O_WRONLY|O_CREAT|O_TRUNC, QueueFileMode); + if (bitset(S_IWGRP, QueueFileMode)) + (void) umask(oldumask); + if (dfd < 0 || (dfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT, + (void *) &dfd, SM_IO_WRONLY, + NULL)) == NULL) syserr("!queueup: cannot create data temp file %s, uid=%d", - dfname, geteuid()); + df, geteuid()); if (fstat(dfd, &stbuf) < 0) e->e_dfino = -1; else { e->e_dfdev = stbuf.st_dev; - e->e_dfino = stbuf.st_ino; + e->e_dfino = ST_INODE(stbuf); } e->e_flags |= EF_HAS_DF; memset(&mcibuf, '\0', sizeof mcibuf); mcibuf.mci_out = dfp; mcibuf.mci_mailer = FileMailer; (*e->e_putbody)(&mcibuf, e, NULL); - if (fclose(dfp) < 0) + + if (SuperSafe == SAFE_REALLY || + (SuperSafe == SAFE_INTERACTIVE && msync)) + { + if (tTd(40,32)) + sm_syslog(LOG_INFO, e->e_id, + "queueup: fsync(dfp)"); + + if (fsync(sm_io_getinfo(dfp, SM_IO_WHAT_FD, NULL)) < 0) + { + if (newid) + syserr("!552 Error writing data file %s", + df); + else + syserr("!452 Error writing data file %s", + df); + } + } + + if (sm_io_close(dfp, SM_TIME_DEFAULT) < 0) syserr("!queueup: cannot save data temp file %s, uid=%d", - dfname, geteuid()); + df, geteuid()); e->e_putbody = putbody; } /* ** Output future work requests. ** Priority and creation time should be first, since - ** they are required by orderq. + ** they are required by gatherq. */ /* output queue version number (must be first!) */ - fprintf(tfp, "V%d\n", QF_VERSION); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "V%d\n", QF_VERSION); /* output creation time */ - fprintf(tfp, "T%ld\n", (long) e->e_ctime); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "T%ld\n", (long) e->e_ctime); /* output last delivery time */ -# if _FFR_QUEUEDELAY - fprintf(tfp, "K%ld\n", (long) e->e_dtime); - fprintf(tfp, "G%d\n", e->e_queuealg); - fprintf(tfp, "Y%ld\n", (long) e->e_queuedelay); +#if _FFR_QUEUEDELAY + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "K%ld\n", (long) e->e_dtime); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "G%d\n", e->e_queuealg); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Y%ld\n", (long) e->e_queuedelay); if (tTd(40, 64)) sm_syslog(LOG_INFO, e->e_id, "queue alg: %d delay %ld next: %ld (now: %ld)\n", e->e_queuealg, e->e_queuedelay, e->e_dtime, curtime()); -# else /* _FFR_QUEUEDELAY */ - fprintf(tfp, "K%ld\n", (long) e->e_dtime); -# endif /* _FFR_QUEUEDELAY */ +#else /* _FFR_QUEUEDELAY */ + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "K%ld\n", (long) e->e_dtime); +#endif /* _FFR_QUEUEDELAY */ /* output number of delivery attempts */ - fprintf(tfp, "N%d\n", e->e_ntries); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "N%d\n", e->e_ntries); /* output message priority */ - fprintf(tfp, "P%ld\n", e->e_msgpriority); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "P%ld\n", e->e_msgpriority); + + /* + ** If data file is in a different directory than the queue file, + ** output a "d" record naming the directory of the data file. + */ + + if (e->e_dfqgrp != e->e_qgrp) + { + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "d%s\n", + Queue[e->e_dfqgrp]->qg_qpaths[e->e_dfqdir].qp_name); + } /* output inode number of data file */ /* XXX should probably include device major/minor too */ if (e->e_dfino != -1) { - /*CONSTCOND*/ - if (sizeof e->e_dfino > sizeof(long)) - fprintf(tfp, "I%ld/%ld/%s\n", - (long) major(e->e_dfdev), - (long) minor(e->e_dfdev), - quad_to_string(e->e_dfino)); - else - fprintf(tfp, "I%ld/%ld/%lu\n", - (long) major(e->e_dfdev), - (long) minor(e->e_dfdev), - (unsigned long) e->e_dfino); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "I%ld/%ld/%llu\n", + (long) major(e->e_dfdev), + (long) minor(e->e_dfdev), + (ULONGLONG_T) e->e_dfino); } /* output body type */ if (e->e_bodytype != NULL) - fprintf(tfp, "B%s\n", denlstring(e->e_bodytype, TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "B%s\n", + denlstring(e->e_bodytype, true, false)); -# if _FFR_SAVE_CHARSET - if (e->e_charset != NULL) - fprintf(tfp, "X%s\n", denlstring(e->e_charset, TRUE, FALSE)); -# endif /* _FFR_SAVE_CHARSET */ +#if _FFR_QUARANTINE + /* quarantine reason */ + if (e->e_quarmsg != NULL) + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "q%s\n", + denlstring(e->e_quarmsg, true, false)); +#endif /* _FFR_QUARANTINE */ /* message from envelope, if it exists */ if (e->e_message != NULL) - fprintf(tfp, "M%s\n", denlstring(e->e_message, TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "M%s\n", + denlstring(e->e_message, true, false)); /* send various flag bits through */ p = buf; @@ -369,28 +613,35 @@ queueup(e, announce) *p++ = 'd'; if (bitset(EF_NO_BODY_RETN, e->e_flags)) *p++ = 'n'; + if (bitset(EF_SPLIT, e->e_flags)) + *p++ = 's'; *p++ = '\0'; if (buf[0] != '\0') - fprintf(tfp, "F%s\n", buf); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "F%s\n", buf); /* save $={persistentMacros} macro values */ - queueup_macros(macid("{persistentMacros}", NULL), tfp, e); + queueup_macros(macid("{persistentMacros}"), tfp, e); /* output name of sender */ if (bitnset(M_UDBENVELOPE, e->e_from.q_mailer->m_flags)) p = e->e_sender; else p = e->e_from.q_paddr; - fprintf(tfp, "S%s\n", denlstring(p, TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "S%s\n", + denlstring(p, true, false)); /* output ESMTP-supplied "original" information */ if (e->e_envid != NULL) - fprintf(tfp, "Z%s\n", denlstring(e->e_envid, TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Z%s\n", + denlstring(e->e_envid, true, false)); /* output AUTH= parameter */ if (e->e_auth_param != NULL) - fprintf(tfp, "A%s\n", denlstring(e->e_auth_param, - TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "A%s\n", + denlstring(e->e_auth_param, true, false)); + if (e->e_dlvr_flag != 0) + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "!%c %ld\n", + (char) e->e_dlvr_flag, e->e_deliver_by); /* output list of recipient addresses */ printctladdr(NULL, NULL); @@ -399,40 +650,58 @@ queueup(e, announce) if (!QS_IS_UNDELIVERED(q->q_state)) continue; + /* message for this recipient, if it exists */ + if (q->q_message != NULL) + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "M%s\n", + denlstring(q->q_message, true, + false)); + printctladdr(q, tfp); if (q->q_orcpt != NULL) - fprintf(tfp, "Q%s\n", - denlstring(q->q_orcpt, TRUE, FALSE)); - - (void) putc('R', tfp); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "Q%s\n", + denlstring(q->q_orcpt, true, + false)); + if (q->q_finalrcpt != NULL) + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "r%s\n", + denlstring(q->q_finalrcpt, true, + false)); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'R'); if (bitset(QPRIMARY, q->q_flags)) - (void) putc('P', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'P'); if (bitset(QHASNOTIFY, q->q_flags)) - (void) putc('N', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'N'); if (bitset(QPINGONSUCCESS, q->q_flags)) - (void) putc('S', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'S'); if (bitset(QPINGONFAILURE, q->q_flags)) - (void) putc('F', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'F'); if (bitset(QPINGONDELAY, q->q_flags)) - (void) putc('D', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'D'); if (q->q_alias != NULL && bitset(QALIAS, q->q_alias->q_flags)) - (void) putc('A', tfp); - (void) putc(':', tfp); - (void) fprintf(tfp, "%s\n", denlstring(q->q_paddr, TRUE, FALSE)); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, 'A'); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, ':'); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s\n", + denlstring(q->q_paddr, true, false)); if (announce) { + char *tag = "queued"; + +#if _FFR_QUARANTINE + if (e->e_quarmsg != NULL) + tag = "quarantined"; +#endif /* _FFR_QUARANTINE */ + e->e_to = q->q_paddr; - message("queued"); + message(tag); if (LogLevel > 8) logdelivery(q->q_mailer, NULL, q->q_status, - "queued", NULL, (time_t) 0, e); + tag, NULL, (time_t) 0, e); e->e_to = NULL; } if (tTd(40, 1)) { - dprintf("queueing "); - printaddr(q, FALSE); + sm_dprintf("queueing "); + printaddr(q, false); } } @@ -454,7 +723,7 @@ queueup(e, announce) mcibuf.mci_mailer = &nullmailer; mcibuf.mci_out = tfp; - define('g', "\201f", e); + macdefine(&e->e_macro, A_PERM, 'g', "\201f"); for (h = e->e_header; h != NULL; h = h->h_link) { if (h->h_value == NULL) @@ -474,16 +743,18 @@ queueup(e, announce) } /* output this header */ - fprintf(tfp, "H?"); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "H?"); /* output conditional macro if present */ if (h->h_macro != '\0') { if (bitset(0200, h->h_macro)) - fprintf(tfp, "${%s}", - macname(bitidx(h->h_macro))); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, + "${%s}", + macname(bitidx(h->h_macro))); else - fprintf(tfp, "$%c", h->h_macro); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, + "$%c", h->h_macro); } else if (!bitzerop(h->h_mflags) && bitset(H_CHECK|H_ACHECK, h->h_flags)) @@ -493,28 +764,29 @@ queueup(e, announce) /* if conditional, output the set of conditions */ for (j = '\0'; j <= '\177'; j++) if (bitnset(j, h->h_mflags)) - (void) putc(j, tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, + j); } - (void) putc('?', tfp); + (void) sm_io_putc(tfp, SM_TIME_DEFAULT, '?'); /* output the header: expand macros, convert addresses */ if (bitset(H_DEFAULT, h->h_flags) && !bitset(H_BINDLATE, h->h_flags)) { - fprintf(tfp, "%s: %s\n", - h->h_field, - denlstring(buf, FALSE, TRUE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s: %s\n", + h->h_field, + denlstring(buf, false, true)); } else if (bitset(H_FROM|H_RCPT, h->h_flags) && !bitset(H_BINDLATE, h->h_flags)) { bool oldstyle = bitset(EF_OLDSTYLE, e->e_flags); - FILE *savetrace = TrafficLogFile; + SM_FILE_T *savetrace = TrafficLogFile; TrafficLogFile = NULL; if (bitset(H_FROM, h->h_flags)) - oldstyle = FALSE; + oldstyle = false; commaize(h, h->h_value, oldstyle, &mcibuf, e); @@ -522,9 +794,10 @@ queueup(e, announce) } else { - fprintf(tfp, "%s: %s\n", - h->h_field, - denlstring(h->h_value, FALSE, TRUE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "%s: %s\n", + h->h_field, + denlstring(h->h_value, false, + true)); } } @@ -535,11 +808,13 @@ queueup(e, announce) ** scurrilous crackers from appending any data. */ - fprintf(tfp, ".\n"); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, ".\n"); - if (fflush(tfp) != 0 || - (SuperSafe && fsync(fileno(tfp)) < 0) || - ferror(tfp)) + if (sm_io_flush(tfp, SM_TIME_DEFAULT) != 0 || + ((SuperSafe == SAFE_REALLY || + (SuperSafe == SAFE_INTERACTIVE && msync)) && + fsync(sm_io_getinfo(tfp, SM_IO_WHAT_FD, NULL)) < 0) || + sm_io_error(tfp)) { if (newid) syserr("!552 Error writing control file %s", tf); @@ -549,44 +824,112 @@ queueup(e, announce) if (!newid) { - /* rename (locked) tf to be (locked) qf */ - qf = queuename(e, 'q'); +#if _FFR_QUARANTINE + char new = queue_letter(e, ANYQFL_LETTER); +#endif /* _FFR_QUARANTINE */ + + /* rename (locked) tf to be (locked) [qh]f */ + (void) sm_strlcpy(qf, queuename(e, ANYQFL_LETTER), + sizeof qf); if (rename(tf, qf) < 0) syserr("cannot rename(%s, %s), uid=%d", tf, qf, geteuid()); +# if _FFR_QUARANTINE + else + { + /* + ** Check if type has changed and only + ** remove the old item if the rename above + ** succeeded. + */ + + if (e->e_qfletter != '\0' && + e->e_qfletter != new) + { + if (tTd(40, 5)) + { + sm_dprintf("type changed from %c to %c\n", + e->e_qfletter, new); + } + + if (unlink(queuename(e, e->e_qfletter)) < 0) + { + /* XXX: something more drastic? */ + if (LogLevel > 0) + sm_syslog(LOG_ERR, e->e_id, + "queueup: unlink(%s) failed: %s", + queuename(e, e->e_qfletter), + sm_errstring(errno)); + } + } + } + e->e_qfletter = new; +# endif /* _FFR_QUARANTINE */ + /* - ** fsync() after renaming to make sure - ** metadata is written to disk on - ** filesystems in which renames are - ** not guaranteed such as softupdates. + ** fsync() after renaming to make sure metadata is + ** written to disk on filesystems in which renames are + ** not guaranteed. */ - if (tfd >= 0 && SuperSafe && fsync(tfd) < 0) - syserr("!queueup: cannot fsync queue temp file %s", tf); + if (SuperSafe != SAFE_NO) + { + /* for softupdates */ + if (tfd >= 0 && fsync(tfd) < 0) + { + syserr("!queueup: cannot fsync queue temp file %s", + tf); + } + SYNC_DIR(qf, true); + } - /* close and unlock old (locked) qf */ + /* close and unlock old (locked) queue file */ if (e->e_lockfp != NULL) - (void) fclose(e->e_lockfp); + (void) sm_io_close(e->e_lockfp, SM_TIME_DEFAULT); e->e_lockfp = tfp; + + /* save log info */ + if (LogLevel > 79) + sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", qf); } else - qf = tf; + { + /* save log info */ + if (LogLevel > 79) + sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", tf); + +#if _FFR_QUARANTINE + e->e_qfletter = queue_letter(e, ANYQFL_LETTER); +#endif /* _FFR_QUARANTINE */ + } + errno = 0; e->e_flags |= EF_INQUEUE; - /* save log info */ - if (LogLevel > 79) - sm_syslog(LOG_DEBUG, e->e_id, "queueup, qf=%s", qf); - if (tTd(40, 1)) - dprintf("<<<<< done queueing %s <<<<<\n\n", e->e_id); + sm_dprintf("<<<<< done queueing %s <<<<<\n\n", e->e_id); return; } +/* +** PRINTCTLADDR -- print control address to file. +** +** Parameters: +** a -- address. +** tfp -- file pointer. +** +** Returns: +** none. +** +** Side Effects: +** The control address (if changed) is printed to the file. +** The last control address and uid are saved. +*/ + static void printctladdr(a, tfp) register ADDRESS *a; - FILE *tfp; + SM_FILE_T *tfp; { char *user; register ADDRESS *q; @@ -599,7 +942,7 @@ printctladdr(a, tfp) if (a == NULL || a->q_alias == NULL || tfp == NULL) { if (lastctladdr != NULL && tfp != NULL) - fprintf(tfp, "C\n"); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C\n"); lastctladdr = NULL; lastuid = 0; return; @@ -629,77 +972,414 @@ printctladdr(a, tfp) lastctladdr = a; if (uid == 0 || user == NULL || user[0] == '\0') - fprintf(tfp, "C"); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C"); else - fprintf(tfp, "C%s:%ld:%ld", - denlstring(user, TRUE, FALSE), (long) uid, (long) gid); - fprintf(tfp, ":%s\n", denlstring(a->q_paddr, TRUE, FALSE)); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "C%s:%ld:%ld", + denlstring(user, true, false), (long) uid, + (long) gid); + (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, ":%s\n", + denlstring(a->q_paddr, true, false)); } -/* + +/* +** RUNNERS_SIGTERM -- propagate a SIGTERM to queue runner process +** +** This propagates the signal to the child processes that are queue +** runners. This is for a queue runner "cleanup". After all of the +** child queue runner processes are signaled (it should be SIGTERM +** being the sig) then the old signal handler (Oldsh) is called +** to handle any cleanup set for this process (provided it is not +** SIG_DFL or SIG_IGN). The signal may not be handled immediately +** if the BlockOldsh flag is set. If the current process doesn't +** have a parent then handle the signal immediately, regardless of +** BlockOldsh. +** +** Parameters: +** sig -- the signal number being sent +** +** Returns: +** none. +** +** Side Effects: +** Sets the NoMoreRunners boolean to true to stop more runners +** from being started in runqueue(). +** +** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD +** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE +** DOING. +*/ + +static bool volatile NoMoreRunners = false; +static sigfunc_t Oldsh_term = SIG_DFL; +static sigfunc_t Oldsh_hup = SIG_DFL; +static sigfunc_t volatile Oldsh = SIG_DFL; +static bool BlockOldsh = false; +static int volatile Oldsig = 0; +static SIGFUNC_DECL runners_sigterm __P((int)); +static SIGFUNC_DECL runners_sighup __P((int)); + +static SIGFUNC_DECL +runners_sigterm(sig) + int sig; +{ + int save_errno = errno; + + FIX_SYSV_SIGNAL(sig, runners_sigterm); + errno = save_errno; + CHECK_CRITICAL(sig); + NoMoreRunners = true; + Oldsh = Oldsh_term; + Oldsig = sig; + proc_list_signal(PROC_QUEUE, sig); + + if (!BlockOldsh || getppid() <= 1) + { + /* Check that a valid 'old signal handler' is callable */ + if (Oldsh_term != SIG_DFL && Oldsh_term != SIG_IGN && + Oldsh_term != runners_sigterm) + (*Oldsh_term)(sig); + } + errno = save_errno; + return SIGFUNC_RETURN; +} +/* +** RUNNERS_SIGHUP -- propagate a SIGHUP to queue runner process +** +** This propagates the signal to the child processes that are queue +** runners. This is for a queue runner "cleanup". After all of the +** child queue runner processes are signaled (it should be SIGHUP +** being the sig) then the old signal handler (Oldsh) is called to +** handle any cleanup set for this process (provided it is not SIG_DFL +** or SIG_IGN). The signal may not be handled immediately if the +** BlockOldsh flag is set. If the current process doesn't have +** a parent then handle the signal immediately, regardless of +** BlockOldsh. +** +** Parameters: +** sig -- the signal number being sent +** +** Returns: +** none. +** +** Side Effects: +** Sets the NoMoreRunners boolean to true to stop more runners +** from being started in runqueue(). +** +** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD +** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE +** DOING. +*/ + +static SIGFUNC_DECL +runners_sighup(sig) + int sig; +{ + int save_errno = errno; + + FIX_SYSV_SIGNAL(sig, runners_sighup); + errno = save_errno; + CHECK_CRITICAL(sig); + NoMoreRunners = true; + Oldsh = Oldsh_hup; + Oldsig = sig; + proc_list_signal(PROC_QUEUE, sig); + + if (!BlockOldsh || getppid() <= 1) + { + /* Check that a valid 'old signal handler' is callable */ + if (Oldsh_hup != SIG_DFL && Oldsh_hup != SIG_IGN && + Oldsh_hup != runners_sighup) + (*Oldsh_hup)(sig); + } + errno = save_errno; + return SIGFUNC_RETURN; +} +/* +** MARK_WORK_GROUP_RESTART -- mark a work group as needing a restart +** +** Sets a workgroup for restarting. +** +** Parameters: +** wgrp -- the work group id to restart. +** reason -- why (signal?), -1 to turn off restart +** +** Returns: +** none. +** +** Side effects: +** May set global RestartWorkGroup to true. +** +** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD +** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE +** DOING. +*/ + +void +mark_work_group_restart(wgrp, reason) + int wgrp; + int reason; +{ + if (wgrp < 0 || wgrp > NumWorkGroups) + return; + + WorkGrp[wgrp].wg_restart = reason; + if (reason >= 0) + RestartWorkGroup = true; +} +/* +** RESTART_MARKED_WORK_GROUPS -- restart work groups marked as needing restart +** +** Restart any workgroup marked as needing a restart provided more +** runners are allowed. +** +** Parameters: +** none. +** +** Returns: +** none. +** +** Side effects: +** Sets global RestartWorkGroup to false. +*/ + +void +restart_marked_work_groups() +{ + int i; + int wasblocked; + + if (NoMoreRunners) + return; + + /* Block SIGCHLD so reapchild() doesn't mess with us */ + wasblocked = sm_blocksignal(SIGCHLD); + + for (i = 0; i < NumWorkGroups; i++) + { + if (WorkGrp[i].wg_restart >= 0) + { + if (LogLevel > 8) + sm_syslog(LOG_ERR, NOQID, + "restart queue runner=%d due to signal 0x%x", + i, WorkGrp[i].wg_restart); + restart_work_group(i); + } + } + RestartWorkGroup = false; + + if (wasblocked == 0) + (void) sm_releasesignal(SIGCHLD); +} +/* +** RESTART_WORK_GROUP -- restart a specific work group +** +** Restart a specific workgroup provided more runners are allowed. +** If the requested work group has been restarted too many times log +** this and refuse to restart. +** +** Parameters: +** wgrp -- the work group id to restart +** +** Returns: +** none. +** +** Side Effects: +** starts another process doing the work of wgrp +*/ + +#define MAX_PERSIST_RESTART 10 /* max allowed number of restarts */ + +static void +restart_work_group(wgrp) + int wgrp; +{ + if (NoMoreRunners || + wgrp < 0 || wgrp > NumWorkGroups) + return; + + WorkGrp[wgrp].wg_restart = -1; + if (WorkGrp[wgrp].wg_restartcnt < MAX_PERSIST_RESTART) + { + /* avoid overflow; increment here */ + WorkGrp[wgrp].wg_restartcnt++; + (void) run_work_group(wgrp, true, false, true, true); + } + else + { + sm_syslog(LOG_ERR, NOQID, + "ERROR: persistent queue runner=%d restarted too many times, queue runner lost", + wgrp); + } +} +/* +** SCHEDULE_QUEUE_RUNS -- schedule the next queue run for a work group. +** +** Parameters: +** runall -- schedule even if individual bit is not set. +** wgrp -- the work group id to schedule. +** +** Returns: +** nothing +*/ + +#define INCR_MOD(v, m) if (++v >= m) \ + v = 0; \ + else + +static void +schedule_queue_runs(runall, wgrp) + bool runall; + int wgrp; +{ + int qgrp, cgrp, endgrp; + + /* + ** This is a bit ugly since we have to duplicate the + ** code that "walks" through a work queue group. + */ + + cgrp = endgrp = WorkGrp[wgrp].wg_curqgrp; + do + { + time_t qintvl; + + qgrp = WorkGrp[wgrp].wg_qgs[cgrp]->qg_index; + if (Queue[qgrp]->qg_queueintvl > 0) + qintvl = Queue[qgrp]->qg_queueintvl; + else if (QueueIntvl > 0) + qintvl = QueueIntvl; + else + qintvl = (time_t) 0; + if ((runall || bitnset(qgrp, DoQueueRun)) && qintvl > 0) + (void) sm_setevent(qintvl, runqueueevent, qgrp); +#if _FFR_QUEUE_SCHED_DBG + if (tTd(69, 10)) + sm_syslog(LOG_INFO, NOQID, + "sqr: wgrp=%d, cgrp=%d, qgrp=%d, intvl=%ld, QI=%ld, runall=%d, bit=%d, sched=%d", + wgrp, cgrp, qgrp, Queue[qgrp]->qg_queueintvl, + QueueIntvl, runall, bitnset(qgrp, DoQueueRun), + (runall || bitnset(qgrp, DoQueueRun)) && + qintvl > 0); +#endif /* _FFR_QUEUE_SCHED_DBG */ + clrbitn(qgrp, DoQueueRun); + INCR_MOD(cgrp, WorkGrp[wgrp].wg_numqgrp); + } while (endgrp != cgrp); +} +/* ** RUNQUEUE -- run the jobs in the queue. ** ** Gets the stuff out of the queue in some presumably logical ** order and processes them. ** ** Parameters: -** forkflag -- TRUE if the queue scanning should be done in +** forkflag -- true if the queue scanning should be done in ** a child process. We double-fork so it is not our ** child and we don't have to clean up after it. -** FALSE can be ignored if we have multiple queues. -** verbose -- if TRUE, print out status information. +** false can be ignored if we have multiple queues. +** verbose -- if true, print out status information. +** persistent -- persistent queue runner? +** runall -- run all groups or only a subset (DoQueueRun)? ** ** Returns: -** TRUE if the queue run successfully began. +** true if the queue run successfully began. ** ** Side Effects: -** runs things in the mail queue. +** runs things in the mail queue using run_work_group(). +** maybe schedules next queue run. +** */ static ENVELOPE QueueEnvelope; /* the queue run envelope */ -int NumQueues = 0; /* number of queues */ static time_t LastQueueTime = 0; /* last time a queue ID assigned */ static pid_t LastQueuePid = -1; /* last PID which had a queue ID */ -struct qpaths_s -{ - char *qp_name; /* name of queue dir */ - short qp_subdirs; /* use subdirs? */ -}; - -typedef struct qpaths_s QPATHS; - /* values for qp_supdirs */ #define QP_NOSUB 0x0000 /* No subdirectories */ #define QP_SUBDF 0x0001 /* "df" subdirectory */ #define QP_SUBQF 0x0002 /* "qf" subdirectory */ #define QP_SUBXF 0x0004 /* "xf" subdirectory */ -static QPATHS *QPaths = NULL; /* list of queue directories */ - bool -runqueue(forkflag, verbose) +runqueue(forkflag, verbose, persistent, runall) bool forkflag; bool verbose; + bool persistent; + bool runall; { int i; - bool ret = TRUE; + bool ret = true; static int curnum = 0; + sigfunc_t cursh; +#if SM_HEAP_CHECK + SM_NONVOLATILE int oldgroup = 0; + + if (sm_debug_active(&DebugLeakQ, 1)) + { + oldgroup = sm_heap_group(); + sm_heap_newgroup(); + sm_dprintf("runqueue() heap group #%d\n", sm_heap_group()); + } +#endif /* SM_HEAP_CHECK */ + + /* queue run has been started, don't do any more this time */ + clrbitn(NumQueue, DoQueueRun); - DoQueueRun = FALSE; + /* more than one queue or more than one directory per queue */ + if (!forkflag && !verbose && + (WorkGrp[0].wg_qgs[0]->qg_numqueues > 1 || NumWorkGroups > 1 || + WorkGrp[0].wg_numqgrp > 1)) + forkflag = true; + /* + ** For controlling queue runners via signals sent to this process. + ** Oldsh* will get called too by runners_sig* (if it is not SIG_IGN + ** or SIG_DFL) to preserve cleanup behavior. Now that this process + ** will have children (and perhaps grandchildren) this handler will + ** be left in place. This is because this process, once it has + ** finished spinning off queue runners, may go back to doing something + ** else (like being a daemon). And we still want on a SIG{TERM,HUP} to + ** clean up the child queue runners. Only install 'runners_sig*' once + ** else we'll get stuck looping forever. + */ - if (!forkflag && NumQueues > 1 && !verbose) - forkflag = TRUE; + cursh = sm_signal(SIGTERM, runners_sigterm); + if (cursh != runners_sigterm) + Oldsh_term = cursh; + cursh = sm_signal(SIGHUP, runners_sighup); + if (cursh != runners_sighup) + Oldsh_hup = cursh; - for (i = 0; i < NumQueues; i++) + for (i = 0; i < NumWorkGroups && !NoMoreRunners; i++) { /* - ** Pick up where we left off, in case we - ** used up all the children last time - ** without finishing. + ** If MaxQueueChildren active then test whether the start + ** of the next queue group's additional queue runners (maximum) + ** will result in MaxQueueChildren being exceeded. + ** + ** Note: do not use continue; even though another workgroup + ** may have fewer queue runners, this would be "unfair", + ** i.e., this work group might "starve" then. + */ + +#if _FFR_QUEUE_SCHED_DBG + if (tTd(69, 10)) + sm_syslog(LOG_INFO, NOQID, + "rq: curnum=%d, MaxQueueChildren=%d, CurRunners=%d, WorkGrp[curnum].wg_maxact=%d", + curnum, MaxQueueChildren, CurRunners, + WorkGrp[curnum].wg_maxact); +#endif /* _FFR_QUEUE_SCHED_DBG */ + if (MaxQueueChildren > 0 && + CurRunners + WorkGrp[curnum].wg_maxact > MaxQueueChildren) + break; + + /* + ** Pick up where we left off (curnum), in case we + ** used up all the children last time without finishing. + ** This give a round-robin fairness to queue runs. */ - ret = run_single_queue(curnum, forkflag, verbose); + ret = run_work_group(curnum, forkflag, verbose, persistent, + runall); /* ** Failure means a message was printed for ETRN @@ -709,71 +1389,289 @@ runqueue(forkflag, verbose) if (!ret) break; - if (++curnum >= NumQueues) - curnum = 0; + /* Success means the runner count needs to be updated. */ + CurRunners += WorkGrp[curnum].wg_maxact; + if (!persistent) + schedule_queue_runs(runall, curnum); + INCR_MOD(curnum, NumWorkGroups); + } + + /* schedule left over queue runs */ + if (i < NumWorkGroups && !NoMoreRunners && !persistent) + { + int h; + + for (h = curnum; i < NumWorkGroups; i++) + { + schedule_queue_runs(runall, h); + INCR_MOD(h, NumWorkGroups); + } } - if (QueueIntvl != 0) - (void) setevent(QueueIntvl, runqueueevent, 0); + + +#if SM_HEAP_CHECK + if (sm_debug_active(&DebugLeakQ, 1)) + sm_heap_setgroup(oldgroup); +#endif /* SM_HEAP_CHECK */ return ret; } -/* -** RUN_SINGLE_QUEUE -- run the jobs in a single queue. +/* +** RUNNER_WORK -- have a queue runner do its work +** +** Have a queue runner do its work a list of entries. +** When work isn't directly being done then this process can take a signal +** and terminate immediately (in a clean fashion of course). +** When work is directly being done, it's not to be interrupted +** immediately: the work should be allowed to finish at a clean point +** before termination (in a clean fashion of course). +** +** Parameters: +** e -- envelope. +** sequenceno -- 'th process to run WorkQ. +** didfork -- did the calling process fork()? +** skip -- process only each skip'th item. +** njobs -- number of jobs in WorkQ. +** +** Returns: +** none. +** +** Side Effects: +** runs things in the mail queue. +*/ + +/* Get new load average every 30 seconds. */ +#define GET_NEW_LA_TIME 30 + +static void +runner_work(e, sequenceno, didfork, skip, njobs) + register ENVELOPE *e; + int sequenceno; + bool didfork; + int skip; + int njobs; +{ + int n; + WORK *w; + time_t current_la_time, now; + + current_la_time = curtime(); + + /* + ** Here we temporarily block the second calling of the handlers. + ** This allows us to handle the signal without terminating in the + ** middle of direct work. If a signal does come, the test for + ** NoMoreRunners will find it. + */ + + BlockOldsh = true; + + /* process them once at a time */ + while (WorkQ != NULL) + { +#if SM_HEAP_CHECK + SM_NONVOLATILE int oldgroup = 0; + + if (sm_debug_active(&DebugLeakQ, 1)) + { + oldgroup = sm_heap_group(); + sm_heap_newgroup(); + sm_dprintf("run_queue_group() heap group #%d\n", + sm_heap_group()); + } +#endif /* SM_HEAP_CHECK */ + + /* do no more work */ + if (NoMoreRunners) + { + /* Check that a valid signal handler is callable */ + if (Oldsh != SIG_DFL && Oldsh != SIG_IGN && + Oldsh != runners_sighup && + Oldsh != runners_sigterm) + (*Oldsh)(Oldsig); + break; + } + + w = WorkQ; /* assign current work item */ + + /* + ** Set the head of the WorkQ to the next work item. + ** It is set 'skip' ahead (the number of parallel queue + ** runners working on WorkQ together) since each runner + ** works on every 'skip'th (N-th) item. + */ + + for (n = 0; n < skip && WorkQ != NULL; n++) + WorkQ = WorkQ->w_next; + e->e_to = NULL; + + /* + ** Ignore jobs that are too expensive for the moment. + ** + ** Get new load average every GET_NEW_LA_TIME seconds. + */ + + now = curtime(); + if (current_la_time < now - GET_NEW_LA_TIME) + { + sm_getla(); + current_la_time = now; + } + if (shouldqueue(WkRecipFact, current_la_time)) + { + char *msg = "Aborting queue run: load average too high"; + + if (Verbose) + message("%s", msg); + if (LogLevel > 8) + sm_syslog(LOG_INFO, NOQID, "runqueue: %s", msg); + break; + } + if (shouldqueue(w->w_pri, w->w_ctime)) + { + if (Verbose) + message(EmptyString); + if (QueueSortOrder == QSO_BYPRIORITY) + { + if (Verbose) + message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue", + qid_printqueue(w->w_qgrp, + w->w_qdir), + w->w_name + 2, sequenceno, + njobs); + if (LogLevel > 8) + sm_syslog(LOG_INFO, NOQID, + "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)", + qid_printqueue(w->w_qgrp, + w->w_qdir), + w->w_name + 2, w->w_pri, + CurrentLA, sequenceno, + njobs); + break; + } + else if (Verbose) + message("Skipping %s/%s (sequence %d of %d)", + qid_printqueue(w->w_qgrp, w->w_qdir), + w->w_name + 2, sequenceno, njobs); + } + else + { + if (Verbose) + { + message(EmptyString); + message("Running %s/%s (sequence %d of %d)", + qid_printqueue(w->w_qgrp, w->w_qdir), + w->w_name + 2, sequenceno, njobs); + } + if (didfork && MaxQueueChildren > 0) + { + sm_blocksignal(SIGCHLD); + (void) sm_signal(SIGCHLD, reapchild); + } + if (tTd(63, 100)) + sm_syslog(LOG_DEBUG, NOQID, + "runqueue %s dowork(%s)", + qid_printqueue(w->w_qgrp, w->w_qdir), + w->w_name + 2); + + (void) dowork(w->w_qgrp, w->w_qdir, w->w_name + 2, + false, false, e); + errno = 0; + } + sm_free(w->w_name); /* XXX */ + if (w->w_host != NULL) + sm_free(w->w_host); /* XXX */ + sm_free((char *) w); /* XXX */ + sequenceno += skip; /* next sequence number */ +#if SM_HEAP_CHECK + if (sm_debug_active(&DebugLeakQ, 1)) + sm_heap_setgroup(oldgroup); +#endif /* SM_HEAP_CHECK */ + } + + BlockOldsh = false; + + /* check the signals didn't happen during the revert */ + if (NoMoreRunners) + { + /* Check that a valid signal handler is callable */ + if (Oldsh != SIG_DFL && Oldsh != SIG_IGN && + Oldsh != runners_sighup && Oldsh != runners_sigterm) + (*Oldsh)(Oldsig); + } + + Oldsh = SIG_DFL; /* after the NoMoreRunners check */ +} +/* +** RUN_WORK_GROUP -- run the jobs in a queue group from a work group. ** ** Gets the stuff out of the queue in some presumably logical ** order and processes them. ** ** Parameters: -** queuedir -- queue to process -** forkflag -- TRUE if the queue scanning should be done in +** wgrp -- work group to process. +** forkflag -- true if the queue scanning should be done in ** a child process. We double-fork so it is not our ** child and we don't have to clean up after it. -** verbose -- if TRUE, print out status information. +** verbose -- if true, print out status information. +** persistent -- persistent queue runner? +** runall -- true: run all of the queue groups in this work group ** ** Returns: -** TRUE if the queue run successfully began. +** true if the queue run successfully began. ** ** Side Effects: ** runs things in the mail queue. */ -static bool -run_single_queue(queuedir, forkflag, verbose) - int queuedir; +/* Minimum sleep time for persistent queue runners */ +#define MIN_SLEEP_TIME 5 + +bool +run_work_group(wgrp, forkflag, verbose, persistent, runall) + int wgrp; bool forkflag; bool verbose; + bool persistent; + bool runall; { register ENVELOPE *e; - int njobs; - int sequenceno = 0; - time_t current_la_time, now; + int njobs, qdir; + int sequenceno = 1; + int qgrp, endgrp, h, i; + time_t current_la_time; + bool full, more; + SM_RPOOL_T *rpool; + extern void rmexpstab __P((void)); extern ENVELOPE BlankEnvelope; + extern SIGFUNC_DECL reapchild __P((int)); + + if (wgrp < 0) + return false; /* ** If no work will ever be selected, don't even bother reading ** the queue. */ - CurrentLA = sm_getla(NULL); /* get load average */ + sm_getla(); /* get load average */ current_la_time = curtime(); - if (shouldqueue(WkRecipFact, current_la_time)) + if (!persistent && shouldqueue(WkRecipFact, current_la_time)) { char *msg = "Skipping queue run -- load average too high"; if (verbose) message("458 %s\n", msg); if (LogLevel > 8) - sm_syslog(LOG_INFO, NOQID, - "runqueue: %s", - msg); - return FALSE; + sm_syslog(LOG_INFO, NOQID, "runqueue: %s", msg); + return false; } /* ** See if we already have too many children. */ - if (forkflag && QueueIntvl != 0 && + if (forkflag && WorkGrp[wgrp].wg_lowqintvl > 0 && !persistent && MaxChildren > 0 && CurChildren >= MaxChildren) { char *msg = "Skipping queue run -- too many children"; @@ -781,10 +1679,9 @@ run_single_queue(queuedir, forkflag, verbose) if (verbose) message("458 %s (%d)\n", msg, CurChildren); if (LogLevel > 8) - sm_syslog(LOG_INFO, NOQID, - "runqueue: %s (%d)", + sm_syslog(LOG_INFO, NOQID, "runqueue: %s (%d)", msg, CurChildren); - return FALSE; + return false; } /* @@ -795,81 +1692,84 @@ run_single_queue(queuedir, forkflag, verbose) { pid_t pid; - (void) blocksignal(SIGCHLD); - (void) setsignal(SIGCHLD, reapchild); + (void) sm_blocksignal(SIGCHLD); + (void) sm_signal(SIGCHLD, reapchild); pid = dofork(); if (pid == -1) { const char *msg = "Skipping queue run -- fork() failed"; - const char *err = errstring(errno); + const char *err = sm_errstring(errno); if (verbose) message("458 %s: %s\n", msg, err); if (LogLevel > 8) - sm_syslog(LOG_INFO, NOQID, - "runqueue: %s: %s", + sm_syslog(LOG_INFO, NOQID, "runqueue: %s: %s", msg, err); - (void) releasesignal(SIGCHLD); - return FALSE; + (void) sm_releasesignal(SIGCHLD); + return false; } if (pid != 0) { /* parent -- pick up intermediate zombie */ - (void) blocksignal(SIGALRM); - proc_list_add(pid, "Queue runner", PROC_QUEUE); - (void) releasesignal(SIGALRM); - (void) releasesignal(SIGCHLD); - return TRUE; + (void) sm_blocksignal(SIGALRM); + + /* wgrp only used when queue runners are persistent */ + proc_list_add(pid, "Queue runner", PROC_QUEUE, + WorkGrp[wgrp].wg_maxact, + persistent ? wgrp : -1); + (void) sm_releasesignal(SIGALRM); + (void) sm_releasesignal(SIGCHLD); + return true; } + /* child -- clean up signals */ /* Reset global flags */ RestartRequest = NULL; + RestartWorkGroup = false; ShutdownRequest = NULL; PendingSignal = 0; + CurrentPid = getpid(); + /* + ** Initialize exception stack and default exception + ** handler for child process. + */ + + sm_exc_newthread(fatal_error); clrcontrol(); proc_list_clear(); /* Add parent process as first child item */ - proc_list_add(getpid(), "Queue runner child process", - PROC_QUEUE_CHILD); - (void) releasesignal(SIGCHLD); - (void) setsignal(SIGCHLD, SIG_DFL); - (void) setsignal(SIGHUP, SIG_DFL); - (void) setsignal(SIGTERM, intsig); + proc_list_add(CurrentPid, "Queue runner child process", + PROC_QUEUE_CHILD, 0, -1); + (void) sm_releasesignal(SIGCHLD); + (void) sm_signal(SIGCHLD, SIG_DFL); + (void) sm_signal(SIGHUP, SIG_DFL); + (void) sm_signal(SIGTERM, intsig); } - sm_setproctitle(TRUE, CurEnv, "running queue: %s", - qid_printqueue(queuedir)); - - if (LogLevel > 69 || tTd(63, 99)) - sm_syslog(LOG_DEBUG, NOQID, - "runqueue %s, pid=%d, forkflag=%d", - qid_printqueue(queuedir), (int) getpid(), forkflag); - /* ** Release any resources used by the daemon code. */ -# if DAEMON clrdaemon(); -# endif /* DAEMON */ /* force it to run expensive jobs */ - NoConnect = FALSE; + NoConnect = false; /* drop privileges */ if (geteuid() == (uid_t) 0) - (void) drop_privileges(FALSE); + (void) drop_privileges(false); /* ** Create ourselves an envelope */ CurEnv = &QueueEnvelope; - e = newenvelope(&QueueEnvelope, CurEnv); + rpool = sm_rpool_new_x(NULL); + e = newenvelope(&QueueEnvelope, CurEnv, rpool); e->e_flags = BlankEnvelope.e_flags; e->e_parent = NULL; @@ -877,7 +1777,7 @@ run_single_queue(queuedir, forkflag, verbose) if (forkflag) { disconnect(1, e); - QuickAbort = FALSE; + QuickAbort = false; } /* @@ -886,200 +1786,494 @@ run_single_queue(queuedir, forkflag, verbose) */ if (QueueLimitId != NULL || QueueLimitSender != NULL || +#if _FFR_QUARANTINE + QueueLimitQuarantine != NULL || +#endif /* _FFR_QUARANTINE */ QueueLimitRecipient != NULL) { - IgnoreHostStatus = TRUE; + IgnoreHostStatus = true; MinQueueAge = 0; } /* + ** Here is where we choose the queue group from the work group. + ** The caller of the "domorework" label must setup a new envelope. + */ + + endgrp = WorkGrp[wgrp].wg_curqgrp; /* to not spin endlessly */ + + domorework: + + /* + ** Run a queue group if: + ** runall is set or the bit for this group is set. + */ + + for (;;) + { + /* + ** Find the next queue group within the work group that + ** has been marked as needing a run. + */ + + qgrp = WorkGrp[wgrp].wg_qgs[WorkGrp[wgrp].wg_curqgrp]->qg_index; + WorkGrp[wgrp].wg_curqgrp++; /* advance */ + WorkGrp[wgrp].wg_curqgrp %= WorkGrp[wgrp].wg_numqgrp; /* wrap */ + if (runall || bitnset(qgrp, DoQueueRun)) + break; + if (endgrp == WorkGrp[wgrp].wg_curqgrp) + { + e->e_id = NULL; + if (forkflag) + finis(true, true, ExitStat); + return true; /* we're done */ + } + } + + qdir = Queue[qgrp]->qg_curnum; /* round-robin init of queue position */ +#if _FFR_QUEUE_SCHED_DBG + if (tTd(69, 12)) + sm_syslog(LOG_INFO, NOQID, + "rwg: wgrp=%d, qgrp=%d, qdir=%d, name=%s, curqgrp=%d, numgrps=%d", + wgrp, qgrp, qdir, qid_printqueue(qgrp, qdir), + WorkGrp[wgrp].wg_curqgrp, WorkGrp[wgrp].wg_numqgrp); +#endif /* _FFR_QUEUE_SCHED_DBG */ + +#if HASNICE + /* tweak niceness of queue runs */ + if (Queue[qgrp]->qg_nice > 0) + (void) nice(Queue[qgrp]->qg_nice); +#endif /* HASNICE */ + + /* XXX running queue group... */ + sm_setproctitle(true, CurEnv, "running queue: %s", + qid_printqueue(qgrp, qdir)); + + if (LogLevel > 69 || tTd(63, 99)) + sm_syslog(LOG_DEBUG, NOQID, + "runqueue %s, pid=%d, forkflag=%d", + qid_printqueue(qgrp, qdir), (int) CurrentPid, + forkflag); + + /* ** Start making passes through the queue. ** First, read and sort the entire queue. ** Then, process the work in that order. ** But if you take too long, start over. */ + for (i = 0; i < Queue[qgrp]->qg_numqueues; i++) + { + h = gatherq(qgrp, qdir, false, &full, &more); +#if SM_CONF_SHM + if (ShmId != SM_SHM_NO_ID) + QSHM_ENTRIES(Queue[qgrp]->qg_qpaths[qdir].qp_idx) = h; +#endif /* SM_CONF_SHM */ + /* If there are no more items in this queue advance */ + if (!more) + { + /* A round-robin advance */ + qdir++; + qdir %= Queue[qgrp]->qg_numqueues; + } + + /* Has the WorkList reached the limit? */ + if (full) + break; /* don't try to gather more */ + } + /* order the existing work requests */ - njobs = orderq(queuedir, FALSE); + njobs = sortq(Queue[qgrp]->qg_maxlist); + Queue[qgrp]->qg_curnum = qdir; /* update */ - /* process them once at a time */ - while (WorkQ != NULL) + if (!Verbose && bitnset(QD_FORK, Queue[qgrp]->qg_flags)) { - WORK *w = WorkQ; - - WorkQ = WorkQ->w_next; - e->e_to = NULL; + int loop, maxrunners; + pid_t pid; /* - ** Ignore jobs that are too expensive for the moment. - ** - ** Get new load average every 30 seconds. + ** For this WorkQ we want to fork off N children (maxrunners) + ** at this point. Each child has a copy of WorkQ. Each child + ** will process every N-th item. The parent will wait for all + ** of the children to finish before moving on to the next + ** queue group within the work group. This saves us forking + ** a new runner-child for each work item. + ** It's valid for qg_maxqrun == 0 since this may be an + ** explicit "don't run this queue" setting. */ - now = curtime(); - if (current_la_time < now - 30) + maxrunners = Queue[qgrp]->qg_maxqrun; + + /* No need to have more runners then there are jobs */ + if (maxrunners > njobs) + maxrunners = njobs; + for (loop = 0; loop < maxrunners; loop++) { - CurrentLA = sm_getla(e); - current_la_time = now; + /* + ** Since the delivery may happen in a child and the + ** parent does not wait, the parent may close the + ** maps thereby removing any shared memory used by + ** the map. Therefore, close the maps now so the + ** child will dynamically open them if necessary. + */ + + closemaps(false); + + pid = fork(); + if (pid < 0) + { + syserr("run_work_group: cannot fork"); + return 0; + } + else if (pid > 0) + { + /* parent -- clean out connection cache */ + mci_flush(false, NULL); + WorkQ = WorkQ->w_next; /* for the skip */ + sequenceno++; + proc_list_add(pid, "Queue child runner process", + PROC_QUEUE_CHILD, 0, -1); + + /* No additional work, no additional runners */ + if (WorkQ == NULL) + break; + } + else + { + /* child -- Reset global flags */ + RestartRequest = NULL; + RestartWorkGroup = false; + ShutdownRequest = NULL; + PendingSignal = 0; + CurrentPid = getpid(); + + /* + ** Initialize exception stack and default + ** exception handler for child process. + ** When fork()'d the child now has a private + ** copy of WorkQ at its current position. + */ + + sm_exc_newthread(fatal_error); + + /* + ** SMTP processes (whether -bd or -bs) set + ** SIGCHLD to reapchild to collect + ** children status. However, at delivery + ** time, that status must be collected + ** by sm_wait() to be dealt with properly + ** (check success of delivery based + ** on status code, etc). Therefore, if we + ** are an SMTP process, reset SIGCHLD + ** back to the default so reapchild + ** doesn't collect status before + ** sm_wait(). + */ + + if (OpMode == MD_SMTP || + OpMode == MD_DAEMON || + MaxQueueChildren > 0) + { + proc_list_clear(); + sm_releasesignal(SIGCHLD); + (void) sm_signal(SIGCHLD, SIG_DFL); + } + + /* child -- error messages to the transcript */ + QuickAbort = OnlyOneError = false; + runner_work(e, sequenceno, true, + maxrunners, njobs); + + /* This child is done */ + finis(true, true, ExitStat); + /* NOTREACHED */ + } } - if (shouldqueue(WkRecipFact, current_la_time)) + + sm_releasesignal(SIGCHLD); + + /* + ** Wait until all of the runners have completed before + ** seeing if there is another queue group in the + ** work group to process. + ** XXX Future enhancement: don't wait() for all children + ** here, just go ahead and make sure that overall the number + ** of children is not exceeded. + */ + + while (CurChildren > 0) { - char *msg = "Aborting queue run: load average too high"; + int status; + pid_t ret; - if (Verbose) - message("%s", msg); - if (LogLevel > 8) - sm_syslog(LOG_INFO, NOQID, - "runqueue: %s", - msg); - break; + while ((ret = sm_wait(&status)) <= 0) + continue; + proc_list_drop(ret, status, NULL); } - sequenceno++; - if (shouldqueue(w->w_pri, w->w_ctime)) + } + else + { + /* + ** When current process will not fork children to do the work, + ** it will do the work itself. The 'skip' will be 1 since + ** there are no child runners to divide the work across. + */ + + runner_work(e, sequenceno, false, 1, njobs); + } + + /* free memory allocated by newenvelope() above */ + sm_rpool_free(rpool); + QueueEnvelope.e_rpool = NULL; + + /* Are there still more queues in the work group to process? */ + if (endgrp != WorkGrp[wgrp].wg_curqgrp) + { + rpool = sm_rpool_new_x(NULL); + e = newenvelope(&QueueEnvelope, CurEnv, rpool); + e->e_flags = BlankEnvelope.e_flags; + goto domorework; + } + + /* No more queues in work group to process. Now check persistent. */ + if (persistent) + { + time_t now; + + sequenceno = 1; + sm_setproctitle(true, CurEnv, "running queue: %s", + qid_printqueue(qgrp, qdir)); + + /* + ** close bogus maps, i.e., maps which caused a tempfail, + ** so we get fresh map connections on the next lookup. + ** closemaps() is also called when children are started. + */ + + closemaps(true); + + /* Close any cached connections. */ + mci_flush(true, NULL); + + /* Clean out expired related entries. */ + rmexpstab(); + +#if NAMED_BIND + /* Update MX records for FallBackMX. */ + if (FallBackMX != NULL) + (void) getfallbackmxrr(FallBackMX); +#endif /* NAMED_BIND */ + +#if USERDB + /* close UserDatabase */ + _udbx_close(); +#endif /* USERDB */ + +#if SM_HEAP_CHECK + if (sm_debug_active(&SmHeapCheck, 2) + && access("memdump", F_OK) == 0 + ) { - if (Verbose) - message(""); - if (QueueSortOrder == QSO_BYPRIORITY) + SM_FILE_T *out; + + remove("memdump"); + out = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, + "memdump.out", SM_IO_APPEND, NULL); + if (out != NULL) { - if (Verbose) - message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue", - qid_printqueue(queuedir), - w->w_name + 2, - sequenceno, - njobs); - if (LogLevel > 8) - sm_syslog(LOG_INFO, NOQID, - "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)", - qid_printqueue(queuedir), - w->w_name + 2, - w->w_pri, - CurrentLA, - sequenceno, - njobs); - break; + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, "----------------------\n"); + sm_heap_report(out, + sm_debug_level(&SmHeapCheck) - 1); + (void) sm_io_close(out, SM_TIME_DEFAULT); } - else if (Verbose) - message("Skipping %s/%s (sequence %d of %d)", - qid_printqueue(queuedir), - w->w_name + 2, - sequenceno, njobs); } +#endif /* SM_HEAP_CHECK */ + + /* let me rest for a second to catch my breath */ + if (njobs == 0 && WorkGrp[wgrp].wg_lowqintvl < MIN_SLEEP_TIME) + sleep(MIN_SLEEP_TIME); + else if (WorkGrp[wgrp].wg_lowqintvl <= 0) + sleep(QueueIntvl > 0 ? QueueIntvl : MIN_SLEEP_TIME); else - { - pid_t pid; + sleep(WorkGrp[wgrp].wg_lowqintvl); - if (Verbose) - { - message(""); - message("Running %s/%s (sequence %d of %d)", - qid_printqueue(queuedir), - w->w_name + 2, - sequenceno, njobs); - } - if (tTd(63, 100)) - sm_syslog(LOG_DEBUG, NOQID, - "runqueue %s dowork(%s)", - qid_printqueue(queuedir), - w->w_name + 2); + /* + ** Get the LA outside the WorkQ loop if necessary. + ** In a persistent queue runner the code is repeated over + ** and over but gatherq() may ignore entries due to + ** shouldqueue() (do we really have to do this twice?). + ** Hence the queue runners would just idle around when once + ** CurrentLA caused all entries in a queue to be ignored. + */ - pid = dowork(queuedir, w->w_name + 2, - ForkQueueRuns, FALSE, e); - errno = 0; - if (pid != 0) - (void) waitfor(pid); + now = curtime(); + if (njobs == 0 && current_la_time < now - GET_NEW_LA_TIME) + { + sm_getla(); + current_la_time = now; } - sm_free(w->w_name); - if (w->w_host) - sm_free(w->w_host); - sm_free((char *) w); + rpool = sm_rpool_new_x(NULL); + e = newenvelope(&QueueEnvelope, CurEnv, rpool); + e->e_flags = BlankEnvelope.e_flags; + goto domorework; } /* exit without the usual cleanup */ e->e_id = NULL; if (forkflag) - finis(TRUE, ExitStat); + finis(true, true, ExitStat); /* NOTREACHED */ - return TRUE; + return true; } /* -** RUNQUEUEEVENT -- stub for use in setevent +** DOQUEUERUN -- do a queue run? +*/ + +bool +doqueuerun() +{ + return bitnset(NumQueue, DoQueueRun); +} + +/* +** RUNQUEUEEVENT -- stub for use in sm_setevent +** +** Sets the bit to indicate that on the next run this queue should be +** processed. The work group that the queue group is a member of has its +** count of queue's to process updated. ** ** Parameters: -** none. +** qgrp -- the index of the queue group. ** ** Returns: ** none. ** +** Side Effects: +** The work group that the queue group is a member of has its +** count of queues to process updated. +** The invocation of this function via an alarm may interrupt +** a set of actions. Thus errno may be set in that context. +** We need to restore errno at the end of this function to ensure +** that any work done here that sets errno doesn't return a +** misleading/false errno value. Errno may be EINTR upon entry to +** this function because of non-restartable/continuable system +** API was active. Iff this is true we will override errno as +** a timeout (as a more accurate error message). +** ** NOTE: THIS CAN BE CALLED FROM A SIGNAL HANDLER. DO NOT ADD ** ANYTHING TO THIS ROUTINE UNLESS YOU KNOW WHAT YOU ARE ** DOING. */ -static void -runqueueevent() +void +runqueueevent(qgrp) + int qgrp; { - DoQueueRun = TRUE; + int i; + int save_errno = errno; + + /* + ** Set the general bit that we want a queue run, + ** tested in doqueuerun() + */ + + setbitn(NumQueue, DoQueueRun); + + /* if it is a specific group: set that bit */ + if (qgrp != NOQGRP) + { + setbitn(qgrp, DoQueueRun); + goto ret; + } + + /* for all others: set the bit if it doesn't have a queue interval */ + for (i = 0; i < NumQueue; i++) + { + if (Queue[i]->qg_queueintvl <= 0) + setbitn(i, DoQueueRun); + } + + ret: + errno = save_errno; + if (errno == EINTR) + errno = ETIMEDOUT; } -/* -** ORDERQ -- order the work queue. +/* +** GATHERQ -- gather messages from the message queue(s) the work queue. ** ** Parameters: -** queuedir -- the index of the queue directory. +** qgrp -- the index of the queue group. +** qdir -- the index of the queue directory. ** doall -- if set, include everything in the queue (even ** the jobs that cannot be run because the load -** average is too high). Otherwise, exclude those -** jobs. +** average is too high, or MaxQueueRun is reached). +** Otherwise, exclude those jobs. +** full -- (optional) to be set 'true' if WorkList is full +** more -- (optional) to be set 'true' if there are still more +** messages in this queue not added to WorkList ** ** Returns: ** The number of request in the queue (not necessarily -** the number of requests in WorkQ however). +** the number of requests in WorkList however). ** ** Side Effects: -** Sets WorkQ to the queue of available work, in order. +** prepares available work into WorkList */ -# define NEED_P 001 -# define NEED_T 002 -# define NEED_R 004 -# define NEED_S 010 -# define NEED_H 020 +#define NEED_P 0001 /* 'P': priority */ +#define NEED_T 0002 /* 'T': time */ +#define NEED_R 0004 /* 'R': recipient */ +#define NEED_S 0010 /* 'S': sender */ +#define NEED_H 0020 /* host */ +#if _FFR_QUARANTINE +# define HAS_QUARANTINE 0040 /* has an unexpected 'q' line */ +# define NEED_QUARANTINE 0100 /* 'q': reason */ +#endif /* _FFR_QUARANTINE */ -static WORK *WorkList = NULL; -static int WorkListSize = 0; +static WORK *WorkList = NULL; /* list of unsort work */ +static int WorkListSize = 0; /* current max size of WorkList */ +static int WorkListCount = 0; /* # of work items in WorkList */ static int -orderq(queuedir, doall) - int queuedir; +gatherq(qgrp, qdir, doall, full, more) + int qgrp; + int qdir; bool doall; + bool *full; + bool *more; { register struct dirent *d; register WORK *w; register char *p; DIR *f; - register int i; - int wn = -1; - int wc; + int i, num_ent; + int wn; QUEUE_CHAR *check; char qd[MAXPATHLEN]; char qf[MAXPATHLEN]; - if (queuedir == NOQDIR) - (void) strlcpy(qd, ".", sizeof qd); + wn = WorkListCount - 1; + num_ent = 0; + if (qdir == NOQDIR) + (void) sm_strlcpy(qd, ".", sizeof qd); else - (void) snprintf(qd, sizeof qd, "%s%s", - QPaths[queuedir].qp_name, - (bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : "")); + (void) sm_strlcpyn(qd, sizeof qd, 2, + Queue[qgrp]->qg_qpaths[qdir].qp_name, + (bitset(QP_SUBQF, + Queue[qgrp]->qg_qpaths[qdir].qp_subdirs) + ? "/qf" : "")); if (tTd(41, 1)) { - dprintf("orderq:\n"); + sm_dprintf("gatherq:\n"); check = QueueLimitId; while (check != NULL) { - dprintf("\tQueueLimitId = %s\n", + sm_dprintf("\tQueueLimitId = %s%s\n", + check->queue_negate ? "!" : "", check->queue_match); check = check->queue_next; } @@ -1087,7 +2281,8 @@ orderq(queuedir, doall) check = QueueLimitSender; while (check != NULL) { - dprintf("\tQueueLimitSender = %s\n", + sm_dprintf("\tQueueLimitSender = %s%s\n", + check->queue_negate ? "!" : "", check->queue_match); check = check->queue_next; } @@ -1095,30 +2290,37 @@ orderq(queuedir, doall) check = QueueLimitRecipient; while (check != NULL) { - dprintf("\tQueueLimitRecipient = %s\n", + sm_dprintf("\tQueueLimitRecipient = %s%s\n", + check->queue_negate ? "!" : "", check->queue_match); check = check->queue_next; } - } - /* clear out old WorkQ */ - for (w = WorkQ; w != NULL; ) - { - register WORK *nw = w->w_next; - - WorkQ = nw; - sm_free(w->w_name); - if (w->w_host != NULL) - sm_free(w->w_host); - sm_free((char *) w); - w = nw; +#if _FFR_QUARANTINE + if (QueueMode == QM_QUARANTINE) + { + check = QueueLimitQuarantine; + while (check != NULL) + { + sm_dprintf("\tQueueLimitQuarantine = %s%s\n", + check->queue_negate ? "!" : "", + check->queue_match); + check = check->queue_next; + } + } +#endif /* _FFR_QUARANTINE */ } /* open the queue directory */ f = opendir(qd); if (f == NULL) { - syserr("orderq: cannot open \"%s\"", qid_printqueue(queuedir)); + syserr("gatherq: cannot open \"%s\"", + qid_printqueue(qgrp, qdir)); + if (full != NULL) + *full = WorkListCount >= MaxQueueRun && MaxQueueRun > 0; + if (more != NULL) + *more = false; return 0; } @@ -1128,26 +2330,43 @@ orderq(queuedir, doall) while ((d = readdir(f)) != NULL) { - FILE *cf; + SM_FILE_T *cf; int qfver = 0; char lbuf[MAXNAME + 1]; struct stat sbuf; if (tTd(41, 50)) - dprintf("orderq: checking %s\n", d->d_name); + sm_dprintf("gatherq: checking %s..", d->d_name); /* is this an interesting entry? */ - if (d->d_name[0] != 'q' || d->d_name[1] != 'f') +#if _FFR_QUARANTINE + if (!(((QueueMode == QM_NORMAL && + d->d_name[0] == NORMQF_LETTER) || + (QueueMode == QM_QUARANTINE && + d->d_name[0] == QUARQF_LETTER) || + (QueueMode == QM_LOST && + d->d_name[0] == LOSEQF_LETTER)) && + d->d_name[1] == 'f')) +#else /* _FFR_QUARANTINE */ + if (d->d_name[0] != NORMQF_LETTER || d->d_name[1] != 'f') +#endif /* _FFR_QUARANTINE */ + { + if (tTd(41, 50)) + sm_dprintf(" skipping\n"); continue; + } + if (tTd(41, 50)) + sm_dprintf("\n"); if (strlen(d->d_name) >= MAXQFNAME) { if (Verbose) - printf("orderq: %s too long, %d max characters\n", - d->d_name, MAXQFNAME); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "gatherq: %s too long, %d max characters\n", + d->d_name, MAXQFNAME); if (LogLevel > 0) sm_syslog(LOG_ALERT, NOQID, - "orderq: %s too long, %d max characters", + "gatherq: %s too long, %d max characters", d->d_name, MAXQFNAME); continue; } @@ -1155,7 +2374,8 @@ orderq(queuedir, doall) check = QueueLimitId; while (check != NULL) { - if (strcontainedin(check->queue_match, d->d_name)) + if (strcontainedin(true, check->queue_match, + d->d_name) != check->queue_negate) break; else check = check->queue_next; @@ -1169,76 +2389,109 @@ orderq(queuedir, doall) if (wn == MaxQueueRun && LogLevel > 0) sm_syslog(LOG_WARNING, NOQID, "WorkList for %s maxed out at %d", - qid_printqueue(queuedir), + qid_printqueue(qgrp, qdir), MaxQueueRun); - continue; + if (doall) + continue; /* just count entries */ + break; } if (wn >= WorkListSize) { - grow_wlist(queuedir); + grow_wlist(qgrp, qdir); if (wn >= WorkListSize) continue; } + SM_ASSERT(wn >= 0); w = &WorkList[wn]; - (void) snprintf(qf, sizeof qf, "%s/%s", qd, d->d_name); + (void) sm_strlcpyn(qf, sizeof qf, 3, qd, "/", d->d_name); if (stat(qf, &sbuf) < 0) { if (errno != ENOENT) sm_syslog(LOG_INFO, NOQID, - "orderq: can't stat %s/%s", - qid_printqueue(queuedir), d->d_name); + "gatherq: can't stat %s/%s", + qid_printqueue(qgrp, qdir), + d->d_name); wn--; continue; } if (!bitset(S_IFREG, sbuf.st_mode)) { /* Yikes! Skip it or we will hang on open! */ - syserr("orderq: %s/%s is not a regular file", - qid_printqueue(queuedir), d->d_name); + if (!((d->d_name[0] == DATAFL_LETTER || + d->d_name[0] == NORMQF_LETTER || +#if _FFR_QUARANTINE + d->d_name[0] == QUARQF_LETTER || + d->d_name[0] == LOSEQF_LETTER || +#endif /* _FFR_QUARANTINE */ + d->d_name[0] == XSCRPT_LETTER) && + d->d_name[1] == 'f' && d->d_name[2] == '\0')) + syserr("gatherq: %s/%s is not a regular file", + qid_printqueue(qgrp, qdir), d->d_name); wn--; continue; } /* avoid work if possible */ - if (QueueSortOrder == QSO_BYFILENAME && + if ((QueueSortOrder == QSO_BYFILENAME || + QueueSortOrder == QSO_BYMODTIME || + QueueSortOrder == QSO_RANDOM) && +#if _FFR_QUARANTINE + QueueLimitQuarantine == NULL && +#endif /* _FFR_QUARANTINE */ QueueLimitSender == NULL && QueueLimitRecipient == NULL) { + w->w_qgrp = qgrp; + w->w_qdir = qdir; w->w_name = newstr(d->d_name); w->w_host = NULL; - w->w_lock = w->w_tooyoung = FALSE; + w->w_lock = w->w_tooyoung = false; w->w_pri = 0; w->w_ctime = 0; + w->w_mtime = sbuf.st_mtime; + ++num_ent; continue; } /* open control file */ - cf = fopen(qf, "r"); - - if (cf == NULL) + cf = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDONLY, + NULL); + if (cf == NULL && OpMode != MD_PRINT) { /* this may be some random person sending hir msgs */ - /* syserr("orderq: cannot open %s", cbuf); */ if (tTd(41, 2)) - dprintf("orderq: cannot open %s: %s\n", - d->d_name, errstring(errno)); + sm_dprintf("gatherq: cannot open %s: %s\n", + d->d_name, sm_errstring(errno)); errno = 0; wn--; continue; } + w->w_qgrp = qgrp; + w->w_qdir = qdir; w->w_name = newstr(d->d_name); w->w_host = NULL; - w->w_lock = !lockfile(fileno(cf), w->w_name, NULL, LOCK_SH|LOCK_NB); - w->w_tooyoung = FALSE; + if (cf != NULL) + { + w->w_lock = !lockfile(sm_io_getinfo(cf, SM_IO_WHAT_FD, + NULL), + w->w_name, NULL, + LOCK_SH|LOCK_NB); + } + w->w_tooyoung = false; /* make sure jobs in creation don't clog queue */ w->w_pri = 0x7fffffff; w->w_ctime = 0; + w->w_mtime = sbuf.st_mtime; /* extract useful information */ - i = NEED_P | NEED_T; - if (QueueSortOrder == QSO_BYHOST) + i = NEED_P|NEED_T; + if (QueueSortOrder == QSO_BYHOST +#if _FFR_RHS + || QueueSortOrder == QSO_BYSHUFFLE +#endif /* _FFR_RHS */ + ) { /* need w_host set for host sort order */ i |= NEED_H; @@ -1247,7 +2500,13 @@ orderq(queuedir, doall) i |= NEED_S; if (QueueLimitRecipient != NULL) i |= NEED_R; - while (i != 0 && fgets(lbuf, sizeof lbuf, cf) != NULL) +#if _FFR_QUARANTINE + if (QueueLimitQuarantine != NULL) + i |= NEED_QUARANTINE; +#endif /* _FFR_QUARANTINE */ + while (cf != NULL && i != 0 && + sm_io_fgets(cf, SM_TIME_DEFAULT, lbuf, + sizeof lbuf) != NULL) { int c; time_t age; @@ -1258,7 +2517,8 @@ orderq(queuedir, doall) else { /* flush rest of overly long line */ - while ((c = getc(cf)) != EOF && c != '\n') + while ((c = sm_io_getc(cf, SM_TIME_DEFAULT)) + != SM_IO_EOF && c != '\n') continue; } @@ -1278,11 +2538,51 @@ orderq(queuedir, doall) i &= ~NEED_T; break; +#if _FFR_QUARANTINE + case 'q': + if (QueueMode != QM_QUARANTINE && + QueueMode != QM_LOST) + { + if (tTd(41, 49)) + sm_dprintf("%s not marked as quarantined but has a 'q' line\n", + w->w_name); + i |= HAS_QUARANTINE; + } + else if (QueueMode == QM_QUARANTINE) + { + if (QueueLimitQuarantine == NULL) + { + i &= ~NEED_QUARANTINE; + break; + } + p = &lbuf[1]; + check = QueueLimitQuarantine; + while (check != NULL) + { + if (strcontainedin(false, + check->queue_match, + p) != + check->queue_negate) + break; + else + check = check->queue_next; + } + if (check != NULL) + i &= ~NEED_QUARANTINE; + } + break; +#endif /* _FFR_QUARANTINE */ + case 'R': if (w->w_host == NULL && (p = strrchr(&lbuf[1], '@')) != NULL) { - w->w_host = strrev(&p[1]); +#if _FFR_RHS + if (QueueSortOrder == QSO_BYSHUFFLE) + w->w_host = newstr(&p[1]); + else +#endif /* _FFR_RHS */ + w->w_host = strrev(&p[1]); makelower(w->w_host); i &= ~NEED_H; } @@ -1302,8 +2602,10 @@ orderq(queuedir, doall) check = QueueLimitRecipient; while (check != NULL) { - if (strcontainedin(check->queue_match, - p)) + if (strcontainedin(true, + check->queue_match, + p) != + check->queue_negate) break; else check = check->queue_next; @@ -1316,8 +2618,10 @@ orderq(queuedir, doall) check = QueueLimitSender; while (check != NULL) { - if (strcontainedin(check->queue_match, - &lbuf[1])) + if (strcontainedin(true, + check->queue_match, + &lbuf[1]) != + check->queue_negate) break; else check = check->queue_next; @@ -1330,15 +2634,15 @@ orderq(queuedir, doall) age = curtime() - (time_t) atol(&lbuf[1]); if (age >= 0 && MinQueueAge > 0 && age < MinQueueAge) - w->w_tooyoung = TRUE; + w->w_tooyoung = true; break; case 'N': if (atol(&lbuf[1]) == 0) - w->w_tooyoung = FALSE; + w->w_tooyoung = false; break; -# if _FFR_QUEUEDELAY +#if _FFR_QUEUEDELAY /* case 'G': queuealg = atoi(lbuf[1]); @@ -1347,32 +2651,104 @@ orderq(queuedir, doall) queuedelay = (time_t) atol(&lbuf[1]); break; */ -# endif /* _FFR_QUEUEDELAY */ +#endif /* _FFR_QUEUEDELAY */ } } - (void) fclose(cf); + if (cf != NULL) + (void) sm_io_close(cf, SM_TIME_DEFAULT); if ((!doall && shouldqueue(w->w_pri, w->w_ctime)) || +#if _FFR_QUARANTINE + bitset(HAS_QUARANTINE, i) || + bitset(NEED_QUARANTINE, i) || +#endif /* _FFR_QUARANTINE */ bitset(NEED_R|NEED_S, i)) { /* don't even bother sorting this job in */ if (tTd(41, 49)) - dprintf("skipping %s (%x)\n", w->w_name, i); - sm_free(w->w_name); - if (w->w_host) - sm_free(w->w_host); + sm_dprintf("skipping %s (%x)\n", w->w_name, i); + sm_free(w->w_name); /* XXX */ + if (w->w_host != NULL) + sm_free(w->w_host); /* XXX */ wn--; } + else + ++num_ent; } (void) closedir(f); wn++; - WorkQ = NULL; - if (WorkList == NULL) + i = wn - WorkListCount; + WorkListCount += SM_MIN(num_ent, WorkListSize); + + if (more != NULL) + *more = WorkListCount < wn; + + if (full != NULL) + *full = (wn >= MaxQueueRun && MaxQueueRun > 0) || + (WorkList == NULL && wn > 0); + + return i; +} +/* +** SORTQ -- sort the work list +** +** First the old WorkQ is cleared away. Then the WorkList is sorted +** for all items so that important (higher sorting value) items are not +** trunctated off. Then the most important items are moved from +** WorkList to WorkQ. The lower count of 'max' or MaxListCount items +** are moved. +** +** Parameters: +** max -- maximum number of items to be placed in WorkQ +** +** Returns: +** the number of items in WorkQ +** +** Side Effects: +** WorkQ gets released and filled with new work. WorkList +** gets released. Work items get sorted in order. +*/ + +static int +sortq(max) + int max; +{ + register int i; /* local counter */ + register WORK *w; /* tmp item pointer */ + int wc = WorkListCount; /* trim size for WorkQ */ + + if (WorkQ != NULL) + { + /* Clear out old WorkQ. */ + for (w = WorkQ; w != NULL; ) + { + register WORK *nw = w->w_next; + + WorkQ = nw; + sm_free(w->w_name); /* XXX */ + if (w->w_host != NULL) + sm_free(w->w_host); /* XXX */ + sm_free((char *) w); /* XXX */ + w = nw; + } + sm_free((char *) WorkQ); + WorkQ = NULL; + } + + if (WorkList == NULL || wc <= 0) return 0; - wc = min(wn, WorkListSize); - if (wc > MaxQueueRun && MaxQueueRun > 0) - wc = MaxQueueRun; + + /* Check if the per queue group item limit will be exceeded */ + if (wc > max && max > 0) + wc = max; + + /* + ** The sort now takes place using all of the items in WorkList. + ** The list gets trimmed to the most important items after the sort. + ** If the trim were to happen before the sort then one or more + ** important items might get truncated off -- not what we want. + */ if (QueueSortOrder == QSO_BYHOST) { @@ -1401,11 +2777,12 @@ orderq(queuedir, doall) { if (WorkList[i].w_host == NULL && w->w_host == NULL) - WorkList[i].w_lock = TRUE; + WorkList[i].w_lock = true; else if (WorkList[i].w_host != NULL && w->w_host != NULL && - sm_strcasecmp(WorkList[i].w_host, w->w_host) == 0) - WorkList[i].w_lock = TRUE; + sm_strcasecmp(WorkList[i].w_host, + w->w_host) == 0) + WorkList[i].w_lock = true; else break; } @@ -1429,11 +2806,42 @@ orderq(queuedir, doall) else if (QueueSortOrder == QSO_BYFILENAME) { /* - ** Sort based on qf filename. + ** Sort based on queue filename. */ qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf4); } + else if (QueueSortOrder == QSO_RANDOM) + { + /* + ** Sort randomly. + ** workcmpf5() returns a random 1 or -1. + ** As long as nobody does a verification pass over the + ** sorted list, we should be golden. + */ + + qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf5); + } + else if (QueueSortOrder == QSO_BYMODTIME) + { + /* + ** Simple sort based on modification time of queue file. + ** This puts the oldest items first. + */ + + qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf6); + } +#if _FFR_RHS + else if (QueueSortOrder == QSO_BYSHUFFLE) + { + /* + ** Simple sort based on shuffled host name. + */ + + init_shuffle_alphabet(); + qsort((char *) WorkList, wc, sizeof *WorkList, workcmpf7); + } +#endif /* _FFR_RHS */ else { /* @@ -1446,45 +2854,52 @@ orderq(queuedir, doall) /* ** Convert the work list into canonical form. ** Should be turning it into a list of envelopes here perhaps. + ** Only take the most important items up to the per queue group + ** maximum. */ for (i = wc; --i >= 0; ) { w = (WORK *) xalloc(sizeof *w); + w->w_qgrp = WorkList[i].w_qgrp; + w->w_qdir = WorkList[i].w_qdir; w->w_name = WorkList[i].w_name; w->w_host = WorkList[i].w_host; w->w_lock = WorkList[i].w_lock; w->w_tooyoung = WorkList[i].w_tooyoung; w->w_pri = WorkList[i].w_pri; w->w_ctime = WorkList[i].w_ctime; + w->w_mtime = WorkList[i].w_mtime; w->w_next = WorkQ; WorkQ = w; } if (WorkList != NULL) - sm_free(WorkList); + sm_free(WorkList); /* XXX */ WorkList = NULL; WorkListSize = 0; + WorkListCount = 0; if (tTd(40, 1)) { for (w = WorkQ; w != NULL; w = w->w_next) { if (w->w_host != NULL) - dprintf("%22s: pri=%ld %s\n", + sm_dprintf("%22s: pri=%ld %s\n", w->w_name, w->w_pri, w->w_host); else - dprintf("%32s: pri=%ld\n", + sm_dprintf("%32s: pri=%ld\n", w->w_name, w->w_pri); } } - return wn; + return wc; /* return number of WorkQ items */ } -/* +/* ** GROW_WLIST -- make the work list larger ** ** Parameters: -** queuedir -- the index for the queue directory. +** qgrp -- the index for the queue group. +** qdir -- the index for the queue directory. ** ** Returns: ** none. @@ -1496,11 +2911,12 @@ orderq(queuedir, doall) */ static void -grow_wlist(queuedir) - int queuedir; +grow_wlist(qgrp, qdir) + int qgrp; + int qdir; { if (tTd(41, 1)) - dprintf("grow_wlist: WorkListSize=%d\n", WorkListSize); + sm_dprintf("grow_wlist: WorkListSize=%d\n", WorkListSize); if (WorkList == NULL) { WorkList = (WORK *) xalloc((sizeof *WorkList) * @@ -1510,8 +2926,8 @@ grow_wlist(queuedir) else { int newsize = WorkListSize + QUEUESEGSIZE; - WORK *newlist = (WORK *) xrealloc((char *)WorkList, - (unsigned)sizeof(WORK) * (newsize + 1)); + WORK *newlist = (WORK *) sm_realloc((char *) WorkList, + (unsigned) sizeof(WORK) * (newsize + 1)); if (newlist != NULL) { @@ -1521,7 +2937,7 @@ grow_wlist(queuedir) { sm_syslog(LOG_INFO, NOQID, "grew WorkList for %s to %d", - qid_printqueue(queuedir), + qid_printqueue(qgrp, qdir), WorkListSize); } } @@ -1529,13 +2945,13 @@ grow_wlist(queuedir) { sm_syslog(LOG_ALERT, NOQID, "FAILED to grow WorkList for %s to %d", - qid_printqueue(queuedir), newsize); + qid_printqueue(qgrp, qdir), newsize); } } if (tTd(41, 1)) - dprintf("grow_wlist: WorkListSize now %d\n", WorkListSize); + sm_dprintf("grow_wlist: WorkListSize now %d\n", WorkListSize); } -/* +/* ** WORKCMPF0 -- simple priority-only compare function. ** ** Parameters: @@ -1547,8 +2963,6 @@ grow_wlist(queuedir) ** 0 if a == b ** +1 if a > b ** -** Side Effects: -** none. */ static int @@ -1566,7 +2980,7 @@ workcmpf0(a, b) else return -1; } -/* +/* ** WORKCMPF1 -- first compare function for ordering work based on host name. ** ** Sorts on host name, lock status, and priority in that order. @@ -1580,8 +2994,6 @@ workcmpf0(a, b) ** 0 if a == b ** >0 if a > b ** -** Side Effects: -** none. */ static int @@ -1607,7 +3019,7 @@ workcmpf1(a, b) /* job priority */ return workcmpf0(a, b); } -/* +/* ** WORKCMPF2 -- second compare function for ordering work based on host name. ** ** Sorts on lock status, host name, and priority in that order. @@ -1621,8 +3033,6 @@ workcmpf1(a, b) ** 0 if a == b ** >0 if a > b ** -** Side Effects: -** none. */ static int @@ -1648,7 +3058,7 @@ workcmpf2(a, b) /* job priority */ return workcmpf0(a, b); } -/* +/* ** WORKCMPF3 -- simple submission-time-only compare function. ** ** Parameters: @@ -1660,8 +3070,6 @@ workcmpf2(a, b) ** 0 if a == b ** +1 if a > b ** -** Side Effects: -** none. */ static int @@ -1676,7 +3084,7 @@ workcmpf3(a, b) else return 0; } -/* +/* ** WORKCMPF4 -- compare based on file name ** ** Parameters: @@ -1688,8 +3096,6 @@ workcmpf3(a, b) ** 0 if a == b ** +1 if a > b ** -** Side Effects: -** none. */ static int @@ -1699,7 +3105,93 @@ workcmpf4(a, b) { return strcmp(a->w_name, b->w_name); } -/* +/* +** WORKCMPF5 -- compare based on assigned random number +** +** Parameters: +** a -- the first argument (ignored). +** b -- the second argument (ignored). +** +** Returns: +** randomly 1/-1 +*/ + +/* ARGSUSED0 */ +static int +workcmpf5(a, b) + register WORK *a; + register WORK *b; +{ + return (get_rand_mod(2)) ? 1 : -1; +} +/* +** WORKCMPF6 -- simple modification-time-only compare function. +** +** Parameters: +** a -- the first argument. +** b -- the second argument. +** +** Returns: +** -1 if a < b +** 0 if a == b +** +1 if a > b +** +*/ + +static int +workcmpf6(a, b) + register WORK *a; + register WORK *b; +{ + if (a->w_mtime > b->w_mtime) + return 1; + else if (a->w_mtime < b->w_mtime) + return -1; + else + return 0; +} +#if _FFR_RHS +/* +** WORKCMPF7 -- compare function for ordering work based on shuffled host name. +** +** Sorts on lock status, host name, and priority in that order. +** +** Parameters: +** a -- the first argument. +** b -- the second argument. +** +** Returns: +** <0 if a < b +** 0 if a == b +** >0 if a > b +** +*/ + +static int +workcmpf7(a, b) + register WORK *a; + register WORK *b; +{ + int i; + + /* lock status */ + if (a->w_lock != b->w_lock) + return a->w_lock - b->w_lock; + + /* host name */ + if (a->w_host != NULL && b->w_host == NULL) + return 1; + else if (a->w_host == NULL && b->w_host != NULL) + return -1; + if (a->w_host != NULL && b->w_host != NULL && + (i = sm_strshufflecmp(a->w_host, b->w_host)) != 0) + return i; + + /* job priority */ + return workcmpf0(a, b); +} +#endif /* _FFR_RHS */ +/* ** STRREV -- reverse string ** ** Returns a pointer to a new string that is the reverse of @@ -1727,11 +3219,71 @@ strrev(fwd) rev[len] = '\0'; return rev; } -/* + +#if _FFR_RHS + +#define NASCII 128 +#define NCHAR 256 + +static unsigned char ShuffledAlphabet[NCHAR]; + +void +init_shuffle_alphabet() +{ + static bool init = false; + int i; + + if (init) + return; + + /* fill the ShuffledAlphabet */ + for (i = 0; i < NCHAR; i++) + ShuffledAlphabet[i] = i; + + /* mix it */ + for (i = 1; i < NCHAR; i++) + { + register int j = get_random() % NCHAR; + register int tmp; + + tmp = ShuffledAlphabet[j]; + ShuffledAlphabet[j] = ShuffledAlphabet[i]; + ShuffledAlphabet[i] = tmp; + } + + /* make it case insensitive */ + for (i = 'A'; i <= 'Z'; i++) + ShuffledAlphabet[i] = ShuffledAlphabet[i + 'a' - 'A']; + + /* fill the upper part */ + for (i = 0; i < NCHAR; i++) + ShuffledAlphabet[i + NCHAR] = ShuffledAlphabet[i]; + init = true; +} + +static int +sm_strshufflecmp(a, b) + char *a; + char *b; +{ + const unsigned char *us1 = (const unsigned char *) a; + const unsigned char *us2 = (const unsigned char *) b; + + while (ShuffledAlphabet[*us1] == ShuffledAlphabet[*us2++]) + { + if (*us1++ == '\0') + return 0; + } + return (ShuffledAlphabet[*us1] - ShuffledAlphabet[*--us2]); +} +#endif /* _FFR_RHS */ + +/* ** DOWORK -- do a work request. ** ** Parameters: -** queuedir -- the index of the queue directory for the job. +** qgrp -- the index of the queue group for the job. +** qdir -- the index of the queue directory for the job. ** id -- the ID of the job to run. ** forkflag -- if set, run this in background. ** requeueflag -- if set, reinstantiate the queue quickly. @@ -1748,17 +3300,19 @@ strrev(fwd) */ pid_t -dowork(queuedir, id, forkflag, requeueflag, e) - int queuedir; +dowork(qgrp, qdir, id, forkflag, requeueflag, e) + int qgrp; + int qdir; char *id; bool forkflag; bool requeueflag; register ENVELOPE *e; { register pid_t pid; + SM_RPOOL_T *rpool; if (tTd(40, 1)) - dprintf("dowork(%s/%s)\n", qid_printqueue(queuedir), id); + sm_dprintf("dowork(%s/%s)\n", qid_printqueue(qgrp, qdir), id); /* ** Fork for work. @@ -1774,7 +3328,7 @@ dowork(queuedir, id, forkflag, requeueflag, e) ** child will dynamically open them if necessary. */ - closemaps(); + closemaps(false); pid = fork(); if (pid < 0) @@ -1785,12 +3339,38 @@ dowork(queuedir, id, forkflag, requeueflag, e) else if (pid > 0) { /* parent -- clean out connection cache */ - mci_flush(FALSE, NULL); + mci_flush(false, NULL); } else { + /* + ** Initialize exception stack and default exception + ** handler for child process. + */ + + /* Reset global flags */ + RestartRequest = NULL; + RestartWorkGroup = false; + ShutdownRequest = NULL; + PendingSignal = 0; + CurrentPid = getpid(); + sm_exc_newthread(fatal_error); + + /* + ** See note above about SMTP processes and SIGCHLD. + */ + + if (OpMode == MD_SMTP || + OpMode == MD_DAEMON || + MaxQueueChildren > 0) + { + proc_list_clear(); + sm_releasesignal(SIGCHLD); + (void) sm_signal(SIGCHLD, SIG_DFL); + } + /* child -- error messages to the transcript */ - QuickAbort = OnlyOneError = FALSE; + QuickAbort = OnlyOneError = false; } } else @@ -1808,94 +3388,306 @@ dowork(queuedir, id, forkflag, requeueflag, e) ** can recover on interrupt. */ - /* Reset global flags */ - RestartRequest = NULL; - ShutdownRequest = NULL; - PendingSignal = 0; + if (forkflag) + { + /* Reset global flags */ + RestartRequest = NULL; + RestartWorkGroup = false; + ShutdownRequest = NULL; + PendingSignal = 0; + } /* set basic modes, etc. */ - (void) alarm(0); + sm_clear_events(); clearstats(); - clearenvelope(e, FALSE); + rpool = sm_rpool_new_x(NULL); + clearenvelope(e, false, rpool); e->e_flags |= EF_QUEUERUN|EF_GLOBALERRS; set_delivery_mode(SM_DELIVER, e); e->e_errormode = EM_MAIL; e->e_id = id; - e->e_queuedir = queuedir; - GrabTo = UseErrorsTo = FALSE; + e->e_qgrp = qgrp; + e->e_qdir = qdir; + GrabTo = UseErrorsTo = false; ExitStat = EX_OK; if (forkflag) { disconnect(1, e); - OpMode = MD_QUEUERUN; + set_op_mode(MD_QUEUERUN); } - sm_setproctitle(TRUE, e, "%s: from queue", qid_printname(e)); + sm_setproctitle(true, e, "%s from queue", qid_printname(e)); if (LogLevel > 76) - sm_syslog(LOG_DEBUG, e->e_id, - "dowork, pid=%d", - (int) getpid()); + sm_syslog(LOG_DEBUG, e->e_id, "dowork, pid=%d", + (int) CurrentPid); /* don't use the headers from sendmail.cf... */ e->e_header = NULL; /* read the queue control file -- return if locked */ - if (!readqf(e)) + if (!readqf(e, false)) { if (tTd(40, 4) && e->e_id != NULL) - dprintf("readqf(%s) failed\n", + sm_dprintf("readqf(%s) failed\n", qid_printname(e)); e->e_id = NULL; if (forkflag) - finis(FALSE, EX_OK); + finis(false, true, EX_OK); else + { + /* adding this frees 8 bytes */ + clearenvelope(e, false, rpool); + + /* adding this frees 12 bytes */ + sm_rpool_free(rpool); + e->e_rpool = NULL; return 0; + } } e->e_flags |= EF_INQUEUE; - eatheader(e, requeueflag); + eatheader(e, requeueflag, true); if (requeueflag) - queueup(e, FALSE); + queueup(e, false, false); /* do the delivery */ sendall(e, SM_DELIVER); /* finish up and exit */ if (forkflag) - finis(TRUE, ExitStat); + finis(true, true, ExitStat); else - dropenvelope(e, TRUE); + { + dropenvelope(e, true, false); + sm_rpool_free(rpool); + e->e_rpool = NULL; + } } e->e_id = NULL; return pid; } -/* + +/* +** DOWORKLIST -- process a list of envelopes as work requests +** +** Similar to dowork(), except that after forking, it processes an +** envelope and its siblings, treating each envelope as a work request. +** +** Parameters: +** el -- envelope to be processed including its siblings. +** forkflag -- if set, run this in background. +** requeueflag -- if set, reinstantiate the queue quickly. +** This is used when expanding aliases in the queue. +** If forkflag is also set, it doesn't wait for the +** child. +** +** Returns: +** process id of process that is running the queue job. +** +** Side Effects: +** The work request is satisfied if possible. +*/ + +pid_t +doworklist(el, forkflag, requeueflag) + ENVELOPE *el; + bool forkflag; + bool requeueflag; +{ + register pid_t pid; + ENVELOPE *ei; + + if (tTd(40, 1)) + sm_dprintf("doworklist()\n"); + + /* + ** Fork for work. + */ + + if (forkflag) + { + /* + ** Since the delivery may happen in a child and the + ** parent does not wait, the parent may close the + ** maps thereby removing any shared memory used by + ** the map. Therefore, close the maps now so the + ** child will dynamically open them if necessary. + */ + + closemaps(false); + + pid = fork(); + if (pid < 0) + { + syserr("doworklist: cannot fork"); + return 0; + } + else if (pid > 0) + { + /* parent -- clean out connection cache */ + mci_flush(false, NULL); + } + else + { + /* + ** Initialize exception stack and default exception + ** handler for child process. + */ + + /* Reset global flags */ + RestartRequest = NULL; + RestartWorkGroup = false; + ShutdownRequest = NULL; + PendingSignal = 0; + CurrentPid = getpid(); + sm_exc_newthread(fatal_error); + + /* + ** See note above about SMTP processes and SIGCHLD. + */ + + if (OpMode == MD_SMTP || + OpMode == MD_DAEMON || + MaxQueueChildren > 0) + { + proc_list_clear(); + sm_releasesignal(SIGCHLD); + (void) sm_signal(SIGCHLD, SIG_DFL); + } + + /* child -- error messages to the transcript */ + QuickAbort = OnlyOneError = false; + } + } + else + { + pid = 0; + } + + if (pid != 0) + return pid; + + /* + ** IN CHILD + ** Lock the control file to avoid duplicate deliveries. + ** Then run the file as though we had just read it. + ** We save an idea of the temporary name so we + ** can recover on interrupt. + */ + + if (forkflag) + { + /* Reset global flags */ + RestartRequest = NULL; + RestartWorkGroup = false; + ShutdownRequest = NULL; + PendingSignal = 0; + } + + /* set basic modes, etc. */ + sm_clear_events(); + clearstats(); + GrabTo = UseErrorsTo = false; + ExitStat = EX_OK; + if (forkflag) + { + disconnect(1, el); + set_op_mode(MD_QUEUERUN); + } + if (LogLevel > 76) + sm_syslog(LOG_DEBUG, el->e_id, "doworklist, pid=%d", + (int) CurrentPid); + + for (ei = el; ei != NULL; ei = ei->e_sibling) + { + ENVELOPE e; + SM_RPOOL_T *rpool; + + if (WILL_BE_QUEUED(ei->e_sendmode)) + continue; +#if _FFR_QUARANTINE + else if (QueueMode != QM_QUARANTINE && + ei->e_quarmsg != NULL) + continue; +#endif /* _FFR_QUARANTINE */ + + rpool = sm_rpool_new_x(NULL); + clearenvelope(&e, true, rpool); + e.e_flags |= EF_QUEUERUN|EF_GLOBALERRS; + set_delivery_mode(SM_DELIVER, &e); + e.e_errormode = EM_MAIL; + e.e_id = ei->e_id; + e.e_qgrp = ei->e_qgrp; + e.e_qdir = ei->e_qdir; + openxscript(&e); + sm_setproctitle(true, &e, "%s from queue", qid_printname(&e)); + + /* don't use the headers from sendmail.cf... */ + e.e_header = NULL; + CurEnv = &e; + + /* read the queue control file -- return if locked */ + if (readqf(&e, false)) + { + e.e_flags |= EF_INQUEUE; + eatheader(&e, requeueflag, true); + + if (requeueflag) + queueup(&e, false, false); + + /* do the delivery */ + sendall(&e, SM_DELIVER); + dropenvelope(&e, true, false); + } + else + { + if (tTd(40, 4) && e.e_id != NULL) + sm_dprintf("readqf(%s) failed\n", + qid_printname(&e)); + } + sm_rpool_free(rpool); + ei->e_id = NULL; + } + + /* restore CurEnv */ + CurEnv = el; + + /* finish up and exit */ + if (forkflag) + finis(true, true, ExitStat); + return 0; +} +/* ** READQF -- read queue file and set up environment. ** ** Parameters: ** e -- the envelope of the job to run. +** openonly -- only open the qf (returned as e_lockfp) ** ** Returns: -** TRUE if it successfully read the queue file. -** FALSE otherwise. +** true if it successfully read the queue file. +** false otherwise. ** ** Side Effects: ** The queue file is returned locked. */ static bool -readqf(e) +readqf(e, openonly) register ENVELOPE *e; + bool openonly; { - register FILE *qfp; + register SM_FILE_T *qfp; ADDRESS *ctladdr; struct stat st, stf; char *bp; int qfver = 0; long hdrsize = 0; register char *p; + char *frcpt = NULL; char *orcpt = NULL; - bool nomore = FALSE; + bool nomore = false; + bool bogus = false; MODE_T qsafe; char qf[MAXPATHLEN]; char buf[MAXLINE]; @@ -1904,33 +3696,37 @@ readqf(e) ** Read and process the file. */ - (void) strlcpy(qf, queuename(e, 'q'), sizeof qf); - qfp = fopen(qf, "r+"); + (void) sm_strlcpy(qf, queuename(e, ANYQFL_LETTER), sizeof qf); + qfp = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDWR, NULL); if (qfp == NULL) { int save_errno = errno; if (tTd(40, 8)) - dprintf("readqf(%s): fopen failure (%s)\n", - qf, errstring(errno)); + sm_dprintf("readqf(%s): sm_io_open failure (%s)\n", + qf, sm_errstring(errno)); errno = save_errno; if (errno != ENOENT ) syserr("readqf: no control file %s", qf); - return FALSE; + RELEASE_QUEUE; + return false; } - if (!lockfile(fileno(qfp), qf, NULL, LOCK_EX|LOCK_NB)) + if (!lockfile(sm_io_getinfo(qfp, SM_IO_WHAT_FD, NULL), qf, NULL, + LOCK_EX|LOCK_NB)) { /* being processed by another queuer */ if (Verbose) - printf("%s: locked\n", e->e_id); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "%s: locked\n", e->e_id); if (tTd(40, 8)) - dprintf("%s: locked\n", e->e_id); + sm_dprintf("%s: locked\n", e->e_id); if (LogLevel > 19) sm_syslog(LOG_DEBUG, e->e_id, "locked"); - (void) fclose(qfp); - return FALSE; + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } /* @@ -1952,35 +3748,38 @@ readqf(e) */ if (stat(qf, &stf) < 0 || - fstat(fileno(qfp), &st) < 0) + fstat(sm_io_getinfo(qfp, SM_IO_WHAT_FD, NULL), &st) < 0) { /* must have been being processed by someone else */ if (tTd(40, 8)) - dprintf("readqf(%s): [f]stat failure (%s)\n", - qf, errstring(errno)); - (void) fclose(qfp); - return FALSE; + sm_dprintf("readqf(%s): [f]stat failure (%s)\n", + qf, sm_errstring(errno)); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } if (st.st_nlink != stf.st_nlink || st.st_dev != stf.st_dev || - st.st_ino != stf.st_ino || -# if HAS_ST_GEN && 0 /* AFS returns garbage in st_gen */ + ST_INODE(st) != ST_INODE(stf) || +#if HAS_ST_GEN && 0 /* AFS returns garbage in st_gen */ st.st_gen != stf.st_gen || -# endif /* HAS_ST_GEN && 0 */ +#endif /* HAS_ST_GEN && 0 */ st.st_uid != stf.st_uid || st.st_gid != stf.st_gid || st.st_size != stf.st_size) { /* changed after opened */ if (Verbose) - printf("%s: changed\n", e->e_id); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "%s: changed\n", e->e_id); if (tTd(40, 8)) - dprintf("%s: changed\n", e->e_id); + sm_dprintf("%s: changed\n", e->e_id); if (LogLevel > 19) sm_syslog(LOG_DEBUG, e->e_id, "changed"); - (void) fclose(qfp); - return FALSE; + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } /* @@ -1988,39 +3787,78 @@ readqf(e) */ qsafe = S_IWOTH|S_IWGRP; -#if _FFR_QUEUE_FILE_MODE if (bitset(S_IWGRP, QueueFileMode)) qsafe &= ~S_IWGRP; -#endif /* _FFR_QUEUE_FILE_MODE */ - if ((st.st_uid != geteuid() && - st.st_uid != TrustedUid && - geteuid() != RealUid) || - bitset(qsafe, st.st_mode)) + bogus = st.st_uid != geteuid() && + st.st_uid != TrustedUid && + geteuid() != RealUid; + + /* + ** If this qf file results from a set-group-ID binary, then + ** we check whether the directory is group-writable, + ** the queue file mode contains the group-writable bit, and + ** the groups are the same. + ** Notice: this requires that the set-group-ID binary is used to + ** run the queue! + */ + + if (bogus && st.st_gid == getegid() && UseMSP) + { + char delim; + struct stat dst; + + bp = SM_LAST_DIR_DELIM(qf); + if (bp == NULL) + delim = '\0'; + else + { + delim = *bp; + *bp = '\0'; + } + if (stat(delim == '\0' ? "." : qf, &dst) < 0) + syserr("readqf: cannot stat directory %s", + delim == '\0' ? "." : qf); + else + { + bogus = !(bitset(S_IWGRP, QueueFileMode) && + bitset(S_IWGRP, dst.st_mode) && + dst.st_gid == st.st_gid); + } + if (delim != '\0') + *bp = delim; + } + if (!bogus) + bogus = bitset(qsafe, st.st_mode); + if (bogus) { if (LogLevel > 0) { sm_syslog(LOG_ALERT, e->e_id, - "bogus queue file, uid=%d, mode=%o", - st.st_uid, st.st_mode); + "bogus queue file, uid=%d, gid=%d, mode=%o", + st.st_uid, st.st_gid, st.st_mode); } if (tTd(40, 8)) - dprintf("readqf(%s): bogus file\n", qf); - loseqfile(e, "bogus file uid in mqueue"); - (void) fclose(qfp); - return FALSE; + sm_dprintf("readqf(%s): bogus file\n", qf); + e->e_flags |= EF_INQUEUE; + if (!openonly) + loseqfile(e, "bogus file uid/gid in mqueue"); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } if (st.st_size == 0) { /* must be a bogus file -- if also old, just remove it */ - if (st.st_ctime + 10 * 60 < curtime()) + if (!openonly && st.st_ctime + 10 * 60 < curtime()) { - (void) xunlink(queuename(e, 'd')); - (void) xunlink(queuename(e, 'q')); + (void) xunlink(queuename(e, DATAFL_LETTER)); + (void) xunlink(queuename(e, ANYQFL_LETTER)); } - (void) fclose(qfp); - return FALSE; + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } if (st.st_nlink == 0) @@ -2030,151 +3868,203 @@ readqf(e) ** unlinked. Just assume it is zero length. */ - (void) fclose(qfp); - return FALSE; + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } +#if _FFR_TRUSTED_QF + /* + ** If we don't own the file mark it as unsafe. + ** However, allow TrustedUser to own it as well + ** in case TrustedUser manipulates the queue. + */ + + if (st.st_uid != geteuid() && st.st_uid != TrustedUid) + e->e_flags |= EF_UNSAFE; +#else /* _FFR_TRUSTED_QF */ + /* If we don't own the file mark it as unsafe */ + if (st.st_uid != geteuid()) + e->e_flags |= EF_UNSAFE; +#endif /* _FFR_TRUSTED_QF */ + /* good file -- save this lock */ e->e_lockfp = qfp; + /* Just wanted the open file */ + if (openonly) + return true; + /* do basic system initialization */ initsys(e); - define('i', e->e_id, e); + macdefine(&e->e_macro, A_PERM, 'i', e->e_id); LineNumber = 0; e->e_flags |= EF_GLOBALERRS; - OpMode = MD_QUEUERUN; + set_op_mode(MD_QUEUERUN); ctladdr = NULL; +#if _FFR_QUARANTINE + e->e_qfletter = queue_letter(e, ANYQFL_LETTER); +#endif /* _FFR_QUARANTINE */ + e->e_dfqgrp = e->e_qgrp; + e->e_dfqdir = e->e_qdir; +#if _FFR_QUEUE_MACRO + macdefine(&e->e_macro, A_TEMP, macid("{queue}"), + qid_printqueue(e->e_qgrp, e->e_qdir)); +#endif /* _FFR_QUEUE_MACRO */ e->e_dfino = -1; e->e_msgsize = -1; -# if _FFR_QUEUEDELAY +#if _FFR_QUEUEDELAY e->e_queuealg = QD_LINEAR; e->e_queuedelay = (time_t) 0; -# endif /* _FFR_QUEUEDELAY */ +#endif /* _FFR_QUEUEDELAY */ while ((bp = fgetfolded(buf, sizeof buf, qfp)) != NULL) { - u_long qflags; + unsigned long qflags; ADDRESS *q; - int mid; + int r; time_t now; auto char *ep; if (tTd(40, 4)) - dprintf("+++++ %s\n", bp); + sm_dprintf("+++++ %s\n", bp); if (nomore) { /* hack attack */ - syserr("SECURITY ALERT: extra data in qf: %s", bp); - (void) fclose(qfp); + hackattack: + syserr("SECURITY ALERT: extra or bogus data in queue file: %s", + bp); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + + /* the file is already on disk */ + e->e_flags |= EF_INQUEUE; loseqfile(e, "bogus queue line"); - return FALSE; + RELEASE_QUEUE; + return false; } switch (bp[0]) { - case 'V': /* queue file version number */ - qfver = atoi(&bp[1]); - if (qfver <= QF_VERSION) - break; - syserr("Version number in qf (%d) greater than max (%d)", - qfver, QF_VERSION); - (void) fclose(qfp); - loseqfile(e, "unsupported qf file version"); - return FALSE; + case 'A': /* AUTH= parameter */ + if (!xtextok(&bp[1])) + goto hackattack; + e->e_auth_param = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + break; + + case 'B': /* body type */ + r = check_bodytype(&bp[1]); + if (!BODYTYPE_VALID(r)) + goto hackattack; + e->e_bodytype = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + break; case 'C': /* specify controlling user */ - ctladdr = setctluser(&bp[1], qfver); + ctladdr = setctluser(&bp[1], qfver, e); break; - case 'Q': /* original recipient */ - orcpt = newstr(&bp[1]); + case 'D': /* data file name */ + /* obsolete -- ignore */ break; - case 'R': /* specify recipient */ - p = bp; - qflags = 0; - if (qfver >= 1) + case 'd': /* data file directory name */ { - /* get flag bits */ - while (*++p != '\0' && *p != ':') + int qgrp, qdir; + +#if _FFR_MSP_PARANOIA + /* forbid queue groups in MSP? */ + if (UseMSP) + goto hackattack; +#endif /* _FFR_MSP_PARANOIA */ + for (qgrp = 0; + qgrp < NumQueue && Queue[qgrp] != NULL; + ++qgrp) { - switch (*p) + for (qdir = 0; + qdir < Queue[qgrp]->qg_numqueues; + ++qdir) { - case 'N': - qflags |= QHASNOTIFY; - break; - - case 'S': - qflags |= QPINGONSUCCESS; - break; - - case 'F': - qflags |= QPINGONFAILURE; - break; - - case 'D': - qflags |= QPINGONDELAY; - break; - - case 'P': - qflags |= QPRIMARY; - break; - - case 'A': - if (ctladdr != NULL) - ctladdr->q_flags |= QALIAS; - break; + if (strcmp(&bp[1], + Queue[qgrp]->qg_qpaths[qdir].qp_name) + == 0) + { + e->e_dfqgrp = qgrp; + e->e_dfqdir = qdir; + goto done; + } } } + loseqfile(e, "bogus queue file directory"); + RELEASE_QUEUE; + return false; + done: + break; } - else - qflags |= QPRIMARY; - q = parseaddr(++p, NULLADDR, RF_COPYALL, '\0', NULL, e); - if (q != NULL) - { - q->q_alias = ctladdr; - if (qfver >= 1) - q->q_flags &= ~Q_PINGFLAGS; - q->q_flags |= qflags; - q->q_orcpt = orcpt; - (void) recipient(q, &e->e_sendqueue, 0, e); - } - orcpt = NULL; - break; case 'E': /* specify error recipient */ /* no longer used */ break; - case 'H': /* header */ - (void) chompheader(&bp[1], CHHDR_QUEUE, NULL, e); - hdrsize += strlen(&bp[1]); - break; + case 'F': /* flag bits */ + if (strncmp(bp, "From ", 5) == 0) + { + /* we are being spoofed! */ + syserr("SECURITY ALERT: bogus qf line %s", bp); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + loseqfile(e, "bogus queue line"); + RELEASE_QUEUE; + return false; + } + for (p = &bp[1]; *p != '\0'; p++) + { + switch (*p) + { + case '8': /* has 8 bit data */ + e->e_flags |= EF_HAS8BIT; + break; - case 'L': /* Solaris Content-Length: */ - case 'M': /* message */ - /* ignore this; we want a new message next time */ - break; + case 'b': /* delete Bcc: header */ + e->e_flags |= EF_DELETE_BCC; + break; - case 'S': /* sender */ - setsender(newstr(&bp[1]), e, NULL, '\0', TRUE); - break; + case 'd': /* envelope has DSN RET= */ + e->e_flags |= EF_RET_PARAM; + break; - case 'B': /* body type */ - e->e_bodytype = newstr(&bp[1]); + case 'n': /* don't return body */ + e->e_flags |= EF_NO_BODY_RETN; + break; + + case 'r': /* response */ + e->e_flags |= EF_RESPONSE; + break; + + case 's': /* split */ + e->e_flags |= EF_SPLIT; + break; + + case 'w': /* warning sent */ + e->e_flags |= EF_WARNING; + break; + } + } break; -# if _FFR_SAVE_CHARSET - case 'X': /* character set */ - e->e_charset = newstr(&bp[1]); +#if _FFR_QUEUEDELAY + case 'G': /* queue delay algorithm */ + e->e_queuealg = atoi(&buf[1]); break; -# endif /* _FFR_SAVE_CHARSET */ +#endif /* _FFR_QUEUEDELAY */ - case 'D': /* data file name */ - /* obsolete -- ignore */ +#if _FFR_QUARANTINE + case 'q': /* quarantine reason */ + e->e_quarmsg = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + macdefine(&e->e_macro, A_PERM, + macid("{quarantine}"), e->e_quarmsg); break; +#endif /* _FFR_QUARANTINE */ - case 'T': /* init time */ - e->e_ctime = atol(&bp[1]); + case 'H': /* header */ + (void) chompheader(&bp[1], CHHDR_QUEUE, NULL, e); + hdrsize += strlen(&bp[1]); break; case 'I': /* data file's inode number */ @@ -2185,14 +4075,10 @@ readqf(e) e->e_dtime = atol(&buf[1]); break; -# if _FFR_QUEUEDELAY - case 'G': /* queue delay algorithm */ - e->e_queuealg = atoi(&buf[1]); - break; - case 'Y': /* current delay */ - e->e_queuedelay = (time_t) atol(&buf[1]); + case 'L': /* Solaris Content-Length: */ + case 'M': /* message */ + /* ignore this; we want a new message next time */ break; -# endif /* _FFR_QUEUEDELAY */ case 'N': /* number of delivery attempts */ e->e_ntries = atoi(&buf[1]); @@ -2204,12 +4090,14 @@ readqf(e) { char *howlong; - howlong = pintvl(now - e->e_dtime, TRUE); + howlong = pintvl(now - e->e_dtime, true); if (Verbose) - printf("%s: too young (%s)\n", - e->e_id, howlong); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%s: too young (%s)\n", + e->e_id, howlong); if (tTd(40, 8)) - dprintf("%s: too young (%s)\n", + sm_dprintf("%s: too young (%s)\n", e->e_id, howlong); if (LogLevel > 19) sm_syslog(LOG_DEBUG, e->e_id, @@ -2217,11 +4105,13 @@ readqf(e) howlong); e->e_id = NULL; unlockqueue(e); - return FALSE; + RELEASE_QUEUE; + return false; } - define(macid("{ntries}", NULL), newstr(&buf[1]), e); + macdefine(&e->e_macro, A_TEMP, + macid("{ntries}"), &buf[1]); -# if NAMED_BIND +#if NAMED_BIND /* adjust BIND parameters immediately */ if (e->e_ntries == 0) { @@ -2233,108 +4123,151 @@ readqf(e) _res.retry = TimeOuts.res_retry[RES_TO_NORMAL]; _res.retrans = TimeOuts.res_retrans[RES_TO_NORMAL]; } -# endif /* NAMED_BIND */ +#endif /* NAMED_BIND */ break; case 'P': /* message priority */ e->e_msgpriority = atol(&bp[1]) + WkTimeFact; break; - case 'F': /* flag bits */ - if (strncmp(bp, "From ", 5) == 0) - { - /* we are being spoofed! */ - syserr("SECURITY ALERT: bogus qf line %s", bp); - (void) fclose(qfp); - loseqfile(e, "bogus queue line"); - return FALSE; - } - for (p = &bp[1]; *p != '\0'; p++) + case 'Q': /* original recipient */ + orcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + break; + + case 'r': /* original recipient */ + frcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + break; + + case 'R': /* specify recipient */ + p = bp; + qflags = 0; + if (qfver >= 1) { - switch (*p) + /* get flag bits */ + while (*++p != '\0' && *p != ':') { - case 'w': /* warning sent */ - e->e_flags |= EF_WARNING; - break; + switch (*p) + { + case 'N': + qflags |= QHASNOTIFY; + break; - case 'r': /* response */ - e->e_flags |= EF_RESPONSE; - break; + case 'S': + qflags |= QPINGONSUCCESS; + break; - case '8': /* has 8 bit data */ - e->e_flags |= EF_HAS8BIT; - break; + case 'F': + qflags |= QPINGONFAILURE; + break; - case 'b': /* delete Bcc: header */ - e->e_flags |= EF_DELETE_BCC; - break; + case 'D': + qflags |= QPINGONDELAY; + break; - case 'd': /* envelope has DSN RET= */ - e->e_flags |= EF_RET_PARAM; - break; + case 'P': + qflags |= QPRIMARY; + break; - case 'n': /* don't return body */ - e->e_flags |= EF_NO_BODY_RETN; - break; + case 'A': + if (ctladdr != NULL) + ctladdr->q_flags |= QALIAS; + break; + + default: /* ignore or complain? */ + break; + } } } + else + qflags |= QPRIMARY; + q = parseaddr(++p, NULLADDR, RF_COPYALL, '\0', NULL, e, + true); + if (q != NULL) + { + q->q_alias = ctladdr; + if (qfver >= 1) + q->q_flags &= ~Q_PINGFLAGS; + q->q_flags |= qflags; + q->q_finalrcpt = frcpt; + q->q_orcpt = orcpt; + (void) recipient(q, &e->e_sendqueue, 0, e); + } + frcpt = NULL; + orcpt = NULL; break; - case 'Z': /* original envelope id from ESMTP */ - e->e_envid = newstr(&bp[1]); - define(macid("{dsn_envid}", NULL), newstr(&bp[1]), e); + case 'S': /* sender */ + setsender(sm_rpool_strdup_x(e->e_rpool, &bp[1]), + e, NULL, '\0', true); break; - case 'A': /* AUTH= parameter */ - e->e_auth_param = newstr(&bp[1]); + case 'T': /* init time */ + e->e_ctime = atol(&bp[1]); + break; + + case 'V': /* queue file version number */ + qfver = atoi(&bp[1]); + if (queuedelay_qfver_unsupported(qfver)) + syserr("queue file version %d not supported: %s", + qfver, + "sendmail not compiled with _FFR_QUEUEDELAY"); + if (qfver <= QF_VERSION) + break; + syserr("Version number in queue file (%d) greater than max (%d)", + qfver, QF_VERSION); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + loseqfile(e, "unsupported queue file version"); + RELEASE_QUEUE; + return false; + /* NOTREACHED */ + break; + +#if _FFR_QUEUEDELAY + case 'Y': /* current delay */ + e->e_queuedelay = (time_t) atol(&buf[1]); + break; +#endif /* _FFR_QUEUEDELAY */ + + case 'Z': /* original envelope id from ESMTP */ + e->e_envid = sm_rpool_strdup_x(e->e_rpool, &bp[1]); + macdefine(&e->e_macro, A_PERM, + macid("{dsn_envid}"), e->e_envid); break; + case '!': /* deliver by */ + + /* format: flag (1 char) space long-integer */ + e->e_dlvr_flag = buf[1]; + e->e_deliver_by = strtol(&buf[3], NULL, 10); + case '$': /* define macro */ { char *p; - mid = macid(&bp[1], &ep); - if (mid == 0) + /* XXX elimate p? */ + r = macid_parse(&bp[1], &ep); + if (r == 0) break; - - p = newstr(ep); - define(mid, p, e); - - /* - ** HACK ALERT: Unfortunately, 8.10 and - ** 8.11 reused the ${if_addr} and - ** ${if_family} macros for both the incoming - ** interface address/family (getrequests()) - ** and the outgoing interface address/family - ** (makeconnection()). In order for D_BINDIF - ** to work properly, have to preserve the - ** incoming information in the queue file for - ** later delivery attempts. The original - ** information is stored in the envelope - ** in readqf() so it can be stored in - ** queueup_macros(). This should be fixed - ** in 8.12. - */ - - if (strcmp(macname(mid), "if_addr") == 0) - e->e_if_macros[EIF_ADDR] = p; + p = sm_rpool_strdup_x(e->e_rpool, ep); + macdefine(&e->e_macro, A_PERM, r, p); } break; case '.': /* terminate file */ - nomore = TRUE; + nomore = true; break; default: syserr("readqf: %s: line %d: bad line \"%s\"", qf, LineNumber, shortenstring(bp, MAXSHORTSTR)); - (void) fclose(qfp); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); loseqfile(e, "unrecognized line"); - return FALSE; + RELEASE_QUEUE; + return false; } if (bp != buf) - sm_free(bp); + sm_free(bp); /* XXX */ } /* @@ -2345,25 +4278,38 @@ readqf(e) if (LineNumber == 0) { errno = 0; - e->e_flags |= EF_CLRQUEUE | EF_FATALERRS | EF_RESPONSE; - return TRUE; + e->e_flags |= EF_CLRQUEUE|EF_FATALERRS|EF_RESPONSE; + RELEASE_QUEUE; + return true; + } + + /* Check to make sure we have a complete queue file read */ + if (!nomore) + { + syserr("readqf: %s: incomplete queue file read", qf); + (void) sm_io_close(qfp, SM_TIME_DEFAULT); + RELEASE_QUEUE; + return false; } /* possibly set ${dsn_ret} macro */ if (bitset(EF_RET_PARAM, e->e_flags)) { if (bitset(EF_NO_BODY_RETN, e->e_flags)) - define(macid("{dsn_ret}", NULL), "hdrs", e); + macdefine(&e->e_macro, A_PERM, + macid("{dsn_ret}"), "hdrs"); else - define(macid("{dsn_ret}", NULL), "full", e); + macdefine(&e->e_macro, A_PERM, + macid("{dsn_ret}"), "full"); } /* ** Arrange to read the data file. */ - p = queuename(e, 'd'); - e->e_dfp = fopen(p, "r"); + p = queuename(e, DATAFL_LETTER); + e->e_dfp = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, p, SM_IO_RDONLY, + NULL); if (e->e_dfp == NULL) { syserr("readqf: cannot open %s", p); @@ -2371,17 +4317,19 @@ readqf(e) else { e->e_flags |= EF_HAS_DF; - if (fstat(fileno(e->e_dfp), &st) >= 0) + if (fstat(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD, NULL), &st) + >= 0) { e->e_msgsize = st.st_size + hdrsize; e->e_dfdev = st.st_dev; - e->e_dfino = st.st_ino; + e->e_dfino = ST_INODE(st); } } - return TRUE; + RELEASE_QUEUE; + return true; } -/* +/* ** PRTSTR -- print a string, "unprintable" characters are shown as \oct ** ** Parameters: @@ -2389,7 +4337,7 @@ readqf(e) ** ml -- maximum length of output ** ** Returns: -** none. +** number of entries ** ** Side Effects: ** Prints a string on stdout. @@ -2400,7 +4348,7 @@ prtstr(s, ml) char *s; int ml; { - char c; + int c; if (s == NULL) return; @@ -2410,20 +4358,104 @@ prtstr(s, ml) { if (ml-- > 0) { - putchar(c); - putchar(c); + (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c); + (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c); } } else if (isascii(c) && isprint(c)) - putchar(c); + (void) sm_io_putc(smioout, SM_TIME_DEFAULT, c); else { if ((ml -= 3) > 0) - printf("\\%03o", c); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "\\%03o", c & 0xFF); } } } -/* +/* +** PRINTNQE -- print out number of entries in the mail queue +** +** Parameters: +** out -- output file pointer. +** prefix -- string to output in front of each line. +** +** Returns: +** none. +*/ + +void +printnqe(out, prefix) + SM_FILE_T *out; + char *prefix; +{ +#if SM_CONF_SHM + int i, k = 0, nrequests = 0; + bool unknown = false; + + if (ShmId == SM_SHM_NO_ID) + { + if (prefix == NULL) + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "Data unavailable: shared memory not updated\n"); + else + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%sNOTCONFIGURED:-1\r\n", prefix); + return; + } + for (i = 0; i < NumQueue && Queue[i] != NULL; i++) + { + int j; + + k++; + for (j = 0; j < Queue[i]->qg_numqueues; j++) + { + int n; + + if (StopRequest) + stop_sendmail(); + + n = QSHM_ENTRIES(Queue[i]->qg_qpaths[j].qp_idx); + if (prefix != NULL) + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%s%s:%d\r\n", + prefix, qid_printqueue(i, j), n); + else if (n < 0) + { + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%s: unknown number of entries\n", + qid_printqueue(i, j)); + unknown = true; + } + else if (n == 0) + { + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%s is empty\n", + qid_printqueue(i, j)); + } + else if (n > 0) + { + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%s: entries=%d\n", + qid_printqueue(i, j), n); + nrequests += n; + k++; + } + } + } + if (prefix == NULL && k > 1) + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "\t\tTotal requests: %d%s\n", + nrequests, unknown ? " (about)" : ""); +#else /* SM_CONF_SHM */ + if (prefix == NULL) + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "Data unavailable without shared memory support\n"); + else + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%sNOTAVAILABLE:-1\r\n", prefix); +#endif /* SM_CONF_SHM */ +} +/* ** PRINTQUEUE -- print out a representation of the mail queue ** ** Parameters: @@ -2439,54 +4471,69 @@ prtstr(s, ml) void printqueue() { - int i, nrequests = 0; + int i, k = 0, nrequests = 0; - for (i = 0; i < NumQueues; i++) + for (i = 0; i < NumQueue && Queue[i] != NULL; i++) { - if (StopRequest) - stop_sendmail(); - nrequests += print_single_queue(i); + int j; + + k++; + for (j = 0; j < Queue[i]->qg_numqueues; j++) + { + if (StopRequest) + stop_sendmail(); + nrequests += print_single_queue(i, j); + k++; + } } - if (NumQueues > 1) - printf("\t\tTotal Requests: %d\n", nrequests); + if (k > 1) + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "\t\tTotal requests: %d\n", + nrequests); } -/* +/* ** PRINT_SINGLE_QUEUE -- print out a representation of a single mail queue ** ** Parameters: -** queuedir -- queue directory +** qgrp -- the index of the queue group. +** qdir -- the queue directory. ** ** Returns: -** number of entries +** number of requests in mail queue. ** ** Side Effects: ** Prints a listing of the mail queue on the standard output. */ -static int -print_single_queue(queuedir) - int queuedir; +int +print_single_queue(qgrp, qdir) + int qgrp; + int qdir; { register WORK *w; - FILE *f; + SM_FILE_T *f; int nrequests; char qd[MAXPATHLEN]; char qddf[MAXPATHLEN]; char buf[MAXLINE]; - if (queuedir == NOQDIR) + if (qdir == NOQDIR) { - (void) strlcpy(qd, ".", sizeof qd); - (void) strlcpy(qddf, ".", sizeof qddf); + (void) sm_strlcpy(qd, ".", sizeof qd); + (void) sm_strlcpy(qddf, ".", sizeof qddf); } else { - (void) snprintf(qd, sizeof qd, "%s%s", - QPaths[queuedir].qp_name, - (bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : "")); - (void) snprintf(qddf, sizeof qddf, "%s%s", - QPaths[queuedir].qp_name, - (bitset(QP_SUBDF, QPaths[queuedir].qp_subdirs) ? "/df" : "")); + (void) sm_strlcpyn(qd, sizeof qd, 2, + Queue[qgrp]->qg_qpaths[qdir].qp_name, + (bitset(QP_SUBQF, + Queue[qgrp]->qg_qpaths[qdir].qp_subdirs) + ? "/qf" : "")); + (void) sm_strlcpyn(qddf, sizeof qddf, 2, + Queue[qgrp]->qg_qpaths[qdir].qp_name, + (bitset(QP_SUBDF, + Queue[qgrp]->qg_qpaths[qdir].qp_subdirs) + ? "/df" : "")); } /* @@ -2496,17 +4543,18 @@ print_single_queue(queuedir) if (bitset(PRIV_RESTRICTMAILQ, PrivacyFlags) && RealUid != 0) { struct stat st; -# ifdef NGROUPS_MAX +#ifdef NGROUPS_MAX int n; extern GIDSET_T InitialGidSet[NGROUPS_MAX]; -# endif /* NGROUPS_MAX */ +#endif /* NGROUPS_MAX */ if (stat(qd, &st) < 0) { - syserr("Cannot stat %s", qid_printqueue(queuedir)); + syserr("Cannot stat %s", + qid_printqueue(qgrp, qdir)); return 0; } -# ifdef NGROUPS_MAX +#ifdef NGROUPS_MAX n = NGROUPS_MAX; while (--n >= 0) { @@ -2514,9 +4562,9 @@ print_single_queue(queuedir) break; } if (n < 0 && RealGid != st.st_gid) -# else /* NGROUPS_MAX */ +#else /* NGROUPS_MAX */ if (RealGid != st.st_gid) -# endif /* NGROUPS_MAX */ +#endif /* NGROUPS_MAX */ { usrerr("510 You are not permitted to see the queue"); setstat(EX_NOPERM); @@ -2528,7 +4576,8 @@ print_single_queue(queuedir) ** Read and order the queue. */ - nrequests = orderq(queuedir, TRUE); + nrequests = gatherq(qgrp, qdir, true, NULL, NULL); + (void) sortq(Queue[qgrp]->qg_maxlist); /* ** Print the work list that we have read. @@ -2537,20 +4586,25 @@ print_single_queue(queuedir) /* first see if there is anything */ if (nrequests <= 0) { - printf("%s is empty\n", qid_printqueue(queuedir)); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "%s is empty\n", + qid_printqueue(qgrp, qdir)); return 0; } - CurrentLA = sm_getla(NULL); /* get load average */ + sm_getla(); /* get load average */ - printf("\t\t%s (%d request%s", qid_printqueue(queuedir), nrequests, - nrequests == 1 ? "" : "s"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "\t\t%s (%d request%s", + qid_printqueue(qgrp, qdir), + nrequests, nrequests == 1 ? "" : "s"); if (MaxQueueRun > 0 && nrequests > MaxQueueRun) - printf(", only %d printed", MaxQueueRun); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + ", only %d printed", MaxQueueRun); if (Verbose) - printf(")\n----Q-ID---- --Size-- -Priority- ---Q-Time--- ---------Sender/Recipient--------\n"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + ")\n-----Q-ID----- --Size-- -Priority- ---Q-Time--- --------Sender/Recipient--------\n"); else - printf(")\n----Q-ID---- --Size-- -----Q-Time----- ------------Sender/Recipient------------\n"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + ")\n-----Q-ID----- --Size-- -----Q-Time----- ------------Sender/Recipient-----------\n"); for (w = WorkQ; w != NULL; w = w->w_next) { struct stat st; @@ -2558,6 +4612,9 @@ print_single_queue(queuedir) long dfsize; int flags = 0; int qfver; +#if _FFR_QUARANTINE + char quarmsg[MAXLINE]; +#endif /* _FFR_QUARANTINE */ char statmsg[MAXLINE]; char bodytype[MAXNAME + 1]; char qf[MAXPATHLEN]; @@ -2565,34 +4622,80 @@ print_single_queue(queuedir) if (StopRequest) stop_sendmail(); - printf("%12s", w->w_name + 2); - (void) snprintf(qf, sizeof qf, "%s/%s", qd, w->w_name); - f = fopen(qf, "r"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "%13s", + w->w_name + 2); + (void) sm_strlcpyn(qf, sizeof qf, 3, qd, "/", w->w_name); + f = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, qf, SM_IO_RDONLY, + NULL); if (f == NULL) { - printf(" (job completed)\n"); + if (errno == EPERM) + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + " (permission denied)\n"); + else if (errno == ENOENT) + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + " (job completed)\n"); + else + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + " (%s)\n", + sm_errstring(errno)); errno = 0; continue; } - w->w_name[0] = 'd'; - (void) snprintf(qf, sizeof qf, "%s/%s", qddf, w->w_name); + w->w_name[0] = DATAFL_LETTER; + (void) sm_strlcpyn(qf, sizeof qf, 3, qddf, "/", w->w_name); if (stat(qf, &st) >= 0) dfsize = st.st_size; else + { + ENVELOPE e; + + /* + ** Maybe the df file can't be statted because + ** it is in a different directory than the qf file. + ** In order to find out, we must read the qf file. + */ + + newenvelope(&e, &BlankEnvelope, sm_rpool_new_x(NULL)); + e.e_id = w->w_name + 2; + e.e_qgrp = qgrp; + e.e_qdir = qdir; dfsize = -1; + if (readqf(&e, false)) + { + char *df = queuename(&e, DATAFL_LETTER); + if (stat(df, &st) >= 0) + dfsize = st.st_size; + } + if (e.e_lockfp != NULL) + { + (void) sm_io_close(e.e_lockfp, SM_TIME_DEFAULT); + e.e_lockfp = NULL; + } + clearenvelope(&e, false, e.e_rpool); + sm_rpool_free(e.e_rpool); + } if (w->w_lock) - printf("*"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "*"); +#if _FFR_QUARANTINE + else if (QueueMode == QM_LOST) + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "?"); +#endif /* _FFR_QUARANTINE */ else if (w->w_tooyoung) - printf("-"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "-"); else if (shouldqueue(w->w_pri, w->w_ctime)) - printf("X"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "X"); else - printf(" "); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, " "); + errno = 0; +#if _FFR_QUARANTINE + quarmsg[0] = '\0'; +#endif /* _FFR_QUARANTINE */ statmsg[0] = bodytype[0] = '\0'; qfver = 0; - while (fgets(buf, sizeof buf, f) != NULL) + while (sm_io_fgets(f, SM_TIME_DEFAULT, buf, sizeof buf) != NULL) { register int i; register char *p; @@ -2600,7 +4703,7 @@ print_single_queue(queuedir) if (StopRequest) stop_sendmail(); - fixcrlf(buf, TRUE); + fixcrlf(buf, true); switch (buf[0]) { case 'V': /* queue file version */ @@ -2614,6 +4717,15 @@ print_single_queue(queuedir) statmsg[i] = '\0'; break; +#if _FFR_QUARANTINE + case 'q': /* quarantine reason */ + if ((i = strlen(&buf[1])) >= sizeof quarmsg) + i = sizeof quarmsg - 1; + memmove(quarmsg, &buf[1], i); + quarmsg[i] = '\0'; + break; +#endif /* _FFR_QUARANTINE */ + case 'B': /* body type */ if ((i = strlen(&buf[1])) >= sizeof bodytype) i = sizeof bodytype - 1; @@ -2624,33 +4736,58 @@ print_single_queue(queuedir) case 'S': /* sender name */ if (Verbose) { - printf("%8ld %10ld%c%.12s ", - dfsize, - w->w_pri, - bitset(EF_WARNING, flags) ? '+' : ' ', - ctime(&submittime) + 4); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%8ld %10ld%c%.12s ", + dfsize, + w->w_pri, + bitset(EF_WARNING, flags) + ? '+' : ' ', + ctime(&submittime) + 4); prtstr(&buf[1], 78); } else { - printf("%8ld %.16s ", dfsize, - ctime(&submittime)); - prtstr(&buf[1], 40); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%8ld %.16s ", + dfsize, + ctime(&submittime)); + prtstr(&buf[1], 39); } +#if _FFR_QUARANTINE + if (quarmsg[0] != '\0') + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n QUARANTINE: %.*s", + Verbose ? 100 : 60, + quarmsg); + quarmsg[0] = '\0'; + } +#endif /* _FFR_QUARANTINE */ if (statmsg[0] != '\0' || bodytype[0] != '\0') { - printf("\n %10.10s", bodytype); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n %10.10s", + bodytype); if (statmsg[0] != '\0') - printf(" (%.*s)", - Verbose ? 100 : 60, - statmsg); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + " (%.*s)", + Verbose ? 100 : 60, + statmsg); + statmsg[0] = '\0'; } break; case 'C': /* controlling user */ if (Verbose) - printf("\n\t\t\t\t (---%.74s---)", - &buf[1]); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n\t\t\t\t\t\t(---%.64s---)", + &buf[1]); break; case 'R': /* recipient name */ @@ -2664,13 +4801,25 @@ print_single_queue(queuedir) } if (Verbose) { - printf("\n\t\t\t\t\t "); - prtstr(p, 73); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n\t\t\t\t\t\t"); + prtstr(p, 71); } else { - printf("\n\t\t\t\t "); - prtstr(p, 40); + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n\t\t\t\t\t "); + prtstr(p, 38); + } + if (Verbose && statmsg[0] != '\0') + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "\n\t\t (%.100s)", + statmsg); + statmsg[0] = '\0'; } break; @@ -2691,13 +4840,65 @@ print_single_queue(queuedir) } } if (submittime == (time_t) 0) - printf(" (no control file)"); - printf("\n"); - (void) fclose(f); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + " (no control file)"); + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "\n"); + (void) sm_io_close(f, SM_TIME_DEFAULT); } return nrequests; } -/* + +#if _FFR_QUARANTINE +/* +** QUEUE_LETTER -- get the proper queue letter for the current QueueMode. +** +** Parameters: +** e -- envelope to build it in/from. +** type -- the file type, used as the first character +** of the file name. +** +** Returns: +** the letter to use +*/ + +static char +queue_letter(e, type) + ENVELOPE *e; + int type; +{ + /* Change type according to QueueMode */ + if (type == ANYQFL_LETTER) + { + if (e->e_quarmsg != NULL) + type = QUARQF_LETTER; + else + { + switch (QueueMode) + { + case QM_NORMAL: + type = NORMQF_LETTER; + break; + + case QM_QUARANTINE: + type = QUARQF_LETTER; + break; + + case QM_LOST: + type = LOSEQF_LETTER; + break; + + default: + /* should never happen */ + abort(); + /* NOTREACHED */ + } + } + } + return type; +} +#endif /* _FFR_QUARANTINE */ + +/* ** QUEUENAME -- build a file name in the queue directory for this envelope. ** ** Parameters: @@ -2719,60 +4920,121 @@ queuename(e, type) register ENVELOPE *e; int type; { - char *sub = ""; + int qd, qg; + char *sub = "/"; + char pref[3]; static char buf[MAXPATHLEN]; /* Assign an ID if needed */ if (e->e_id == NULL) assign_queueid(e); - /* Assign a queue directory if needed */ - if (e->e_queuedir == NOQDIR) - setnewqueue(e); +#if _FFR_QUARANTINE + type = queue_letter(e, type); +#endif /* _FFR_QUARANTINE */ + + /* begin of filename */ + pref[0] = (char) type; + pref[1] = 'f'; + pref[2] = '\0'; + + /* Assign a queue group/directory if needed */ + if (type == XSCRPT_LETTER) + { + /* + ** We don't want to call setnewqueue() if we are fetching + ** the pathname of the transcript file, because setnewqueue + ** chooses a queue, and sometimes we need to write to the + ** transcript file before we have gathered enough information + ** to choose a queue. + */ + + if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR) + { + if (e->e_qgrp != NOQGRP && e->e_qdir != NOQDIR) + { + e->e_xfqgrp = e->e_qgrp; + e->e_xfqdir = e->e_qdir; + } + else + { + e->e_xfqgrp = 0; + if (Queue[e->e_xfqgrp]->qg_numqueues <= 1) + e->e_xfqdir = 0; + else + { + e->e_xfqdir = get_rand_mod( + Queue[e->e_xfqgrp]->qg_numqueues); + } + } + } + qd = e->e_xfqdir; + qg = e->e_xfqgrp; + } + else + { + if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR) + setnewqueue(e); + if (type == DATAFL_LETTER) + { + qd = e->e_dfqdir; + qg = e->e_dfqgrp; + } + else + { + qd = e->e_qdir; + qg = e->e_qgrp; + } + } - if (e->e_queuedir == NOQDIR) - (void) snprintf(buf, sizeof buf, "%cf%s", - type, e->e_id); + if (e->e_qdir == NOQDIR) + (void) sm_strlcpyn(buf, sizeof buf, 2, pref, e->e_id); else { switch (type) { - case 'd': - if (bitset(QP_SUBDF, QPaths[e->e_queuedir].qp_subdirs)) - sub = "/df"; + case DATAFL_LETTER: + if (bitset(QP_SUBDF, Queue[qg]->qg_qpaths[qd].qp_subdirs)) + sub = "/df/"; break; +#if _FFR_QUARANTINE + case QUARQF_LETTER: +#endif /* _FFR_QUARANTINE */ case TEMPQF_LETTER: - case 't': + case NEWQFL_LETTER: case LOSEQF_LETTER: - case 'q': - if (bitset(QP_SUBQF, QPaths[e->e_queuedir].qp_subdirs)) - sub = "/qf"; + case NORMQF_LETTER: + if (bitset(QP_SUBQF, Queue[qg]->qg_qpaths[qd].qp_subdirs)) + sub = "/qf/"; break; - case 'x': - if (bitset(QP_SUBXF, QPaths[e->e_queuedir].qp_subdirs)) - sub = "/xf"; + case XSCRPT_LETTER: + if (bitset(QP_SUBXF, Queue[qg]->qg_qpaths[qd].qp_subdirs)) + sub = "/xf/"; break; + + default: + sm_abort("queuename: bad queue file type %d", type); } - (void) snprintf(buf, sizeof buf, "%s%s/%cf%s", - QPaths[e->e_queuedir].qp_name, - sub, type, e->e_id); + (void) sm_strlcpyn(buf, sizeof buf, 4, + Queue[qg]->qg_qpaths[qd].qp_name, + sub, pref, e->e_id); } if (tTd(7, 2)) - dprintf("queuename: %s\n", buf); + sm_dprintf("queuename: %s\n", buf); return buf; } -/* +/* ** ASSIGN_QUEUEID -- assign a queue ID for this envelope. ** ** Assigns an id code if one does not already exist. ** This code assumes that nothing will remain in the queue for ** longer than 60 years. It is critical that files with the given -** name not already exist in the queue. -** Also initializes e_queuedir to NOQDIR. +** name do not already exist in the queue. +** [No longer initializes e_qdir to NOQDIR.] ** ** Parameters: ** e -- envelope to set it in. @@ -2783,22 +5045,26 @@ queuename(e, type) static const char QueueIdChars[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx"; # define QIC_LEN 60 +# define queuenextid() CurrentPid + void assign_queueid(e) register ENVELOPE *e; { - pid_t pid = getpid(); - static char cX = 0; + pid_t pid = queuenextid(); + static int cX = 0; static long random_offset; struct tm *tm; char idbuf[MAXQFNAME - 2]; + int seq; if (e->e_id != NULL) return; /* see if we need to get a new base time/pid */ - if (cX >= QIC_LEN || LastQueueTime == 0 || LastQueuePid != pid) + if (cX >= QIC_LEN * QIC_LEN || LastQueueTime == 0 || + LastQueuePid != pid) { time_t then = LastQueueTime; @@ -2811,12 +5077,22 @@ assign_queueid(e) { (void) sleep(1); } - LastQueuePid = getpid(); + LastQueuePid = queuenextid(); cX = 0; } + + /* + ** Generate a new sequence number between 0 and QIC_LEN*QIC_LEN-1. + ** This lets us generate up to QIC_LEN*QIC_LEN unique queue ids + ** per second, per process. With envelope splitting, + ** a single message can consume many queue ids. + */ + + seq = (int)((cX + random_offset) % (QIC_LEN * QIC_LEN)); + ++cX; if (tTd(7, 50)) - dprintf("assign_queueid: random_offset = %ld (%d)\n", - random_offset, (int)(cX + random_offset) % QIC_LEN); + sm_dprintf("assign_queueid: random_offset = %ld (%d)\n", + random_offset, seq); tm = gmtime(&LastQueueTime); idbuf[0] = QueueIdChars[tm->tm_year % QIC_LEN]; @@ -2825,19 +5101,29 @@ assign_queueid(e) idbuf[3] = QueueIdChars[tm->tm_hour]; idbuf[4] = QueueIdChars[tm->tm_min]; idbuf[5] = QueueIdChars[tm->tm_sec]; - idbuf[6] = QueueIdChars[((int)cX++ + random_offset) % QIC_LEN]; - (void) snprintf(&idbuf[7], sizeof idbuf - 7, "%05d", - (int) LastQueuePid); - e->e_id = newstr(idbuf); - define('i', e->e_id, e); - e->e_queuedir = NOQDIR; + idbuf[6] = QueueIdChars[seq / QIC_LEN]; + idbuf[7] = QueueIdChars[seq % QIC_LEN]; + (void) sm_snprintf(&idbuf[8], sizeof idbuf - 8, "%06d", + (int) LastQueuePid); + e->e_id = sm_rpool_strdup_x(e->e_rpool, idbuf); + macdefine(&e->e_macro, A_PERM, 'i', e->e_id); +#if 0 + /* XXX: inherited from MainEnvelope */ + e->e_qgrp = NOQGRP; /* too early to do anything else */ + e->e_qdir = NOQDIR; + e->e_xfqgrp = NOQGRP; +#endif /* 0 */ +#if _FFR_QUARANTINE + /* New ID means it's not on disk yet */ + e->e_qfletter = '\0'; +#endif /* _FFR_QUARANTINE */ if (tTd(7, 1)) - dprintf("assign_queueid: assigned id %s, e=%lx\n", - e->e_id, (u_long) e); + sm_dprintf("assign_queueid: assigned id %s, e=%p\n", + e->e_id, e); if (LogLevel > 93) sm_syslog(LOG_DEBUG, e->e_id, "assigned id"); } -/* +/* ** SYNC_QUEUE_TIME -- Assure exclusive PID in any given second ** ** Make sure one PID can't be used by two processes in any one second. @@ -2852,19 +5138,20 @@ assign_queueid(e) ** Returns: ** none */ + void sync_queue_time() { -# if FAST_PID_RECYCLE +#if FAST_PID_RECYCLE if (OpMode != MD_TEST && OpMode != MD_VERIFY && LastQueueTime > 0 && - LastQueuePid == getpid() && + LastQueuePid == CurrentPid && curtime() == LastQueueTime) (void) sleep(1); -# endif /* FAST_PID_RECYCLE */ +#endif /* FAST_PID_RECYCLE */ } -/* +/* ** UNLOCKQUEUE -- unlock the queue entry for a specified envelope ** ** Parameters: @@ -2882,13 +5169,13 @@ unlockqueue(e) ENVELOPE *e; { if (tTd(51, 4)) - dprintf("unlockqueue(%s)\n", + sm_dprintf("unlockqueue(%s)\n", e->e_id == NULL ? "NOQUEUE" : e->e_id); /* if there is a lock file in the envelope, close it */ if (e->e_lockfp != NULL) - (void) fclose(e->e_lockfp); + (void) sm_io_close(e->e_lockfp, SM_TIME_DEFAULT); e->e_lockfp = NULL; /* don't create a queue id if we don't already have one */ @@ -2899,10 +5186,9 @@ unlockqueue(e) if (LogLevel > 87) sm_syslog(LOG_DEBUG, e->e_id, "unlock"); if (!tTd(51, 104)) - xunlink(queuename(e, 'x')); - + (void) xunlink(queuename(e, XSCRPT_LETTER)); } -/* +/* ** SETCTLUSER -- create a controlling address ** ** Create a fake "address" given only a local login name; this is @@ -2910,19 +5196,20 @@ unlockqueue(e) ** ** Parameters: ** user -- the user name of the controlling user. -** qfver -- the version stamp of this qf file. +** qfver -- the version stamp of this queue file. +** e -- envelope ** ** Returns: -** An address descriptor for the controlling user. +** An address descriptor for the controlling user, +** using storage allocated from e->e_rpool. ** -** Side Effects: -** none. */ static ADDRESS * -setctluser(user, qfver) +setctluser(user, qfver, e) char *user; int qfver; + ENVELOPE *e; { register ADDRESS *a; struct passwd *pw; @@ -2939,23 +5226,18 @@ setctluser(user, qfver) ** Set up addr fields for controlling user. */ - a = (ADDRESS *) xalloc(sizeof *a); + a = (ADDRESS *) sm_rpool_malloc_x(e->e_rpool, sizeof *a); memset((char *) a, '\0', sizeof *a); - if (*user == '\0') - { - p = NULL; - a->q_user = newstr(DefUser); - } - else if (*user == ':') + if (*user == ':') { p = &user[1]; - a->q_user = newstr(p); + a->q_user = sm_rpool_strdup_x(e->e_rpool, p); } else { p = strtok(user, ":"); - a->q_user = newstr(user); + a->q_user = sm_rpool_strdup_x(e->e_rpool, user); if (qfver >= 2) { if ((p = strtok(NULL, ":")) != NULL) @@ -2980,7 +5262,7 @@ setctluser(user, qfver) else if (strcmp(pw->pw_dir, "/") == 0) a->q_home = ""; else - a->q_home = newstr(pw->pw_dir); + a->q_home = sm_rpool_strdup_x(e->e_rpool, pw->pw_dir); a->q_uid = pw->pw_uid; a->q_gid = pw->pw_gid; a->q_flags |= QGOODUID; @@ -2990,13 +5272,13 @@ setctluser(user, qfver) a->q_flags |= QPRIMARY; /* flag as a "ctladdr" */ a->q_mailer = LocalMailer; if (p == NULL) - a->q_paddr = newstr(a->q_user); + a->q_paddr = sm_rpool_strdup_x(e->e_rpool, a->q_user); else - a->q_paddr = newstr(p); + a->q_paddr = sm_rpool_strdup_x(e->e_rpool, p); return a; } -/* -** LOSEQFILE -- save the qf as Qf and try to let someone know +/* +** LOSEQFILE -- rename queue file with LOSEQF_LETTER & try to let someone know ** ** Parameters: ** e -- the envelope (e->e_id will be used). @@ -3011,23 +5293,63 @@ loseqfile(e, why) register ENVELOPE *e; char *why; { + bool loseit = true; char *p; char buf[MAXPATHLEN]; if (e == NULL || e->e_id == NULL) return; - p = queuename(e, 'q'); - if (strlen(p) >= (SIZE_T) sizeof buf) + p = queuename(e, ANYQFL_LETTER); + if (sm_strlcpy(buf, p, sizeof buf) >= sizeof buf) return; - (void) strlcpy(buf, p, sizeof buf); - p = queuename(e, LOSEQF_LETTER); - if (rename(buf, p) < 0) - syserr("cannot rename(%s, %s), uid=%d", buf, p, geteuid()); - else if (LogLevel > 0) - sm_syslog(LOG_ALERT, e->e_id, - "Losing %s: %s", buf, why); + if (!bitset(EF_INQUEUE, e->e_flags)) + queueup(e, false, true); +#if _FFR_QUARANTINE + else if (QueueMode == QM_LOST) + loseit = false; +#endif /* _FFR_QUARANTINE */ + + /* if already lost, no need to re-lose */ + if (loseit) + { + p = queuename(e, LOSEQF_LETTER); + if (rename(buf, p) < 0) + syserr("cannot rename(%s, %s), uid=%d", + buf, p, geteuid()); + else if (LogLevel > 0) + sm_syslog(LOG_ALERT, e->e_id, + "Losing %s: %s", buf, why); + } + if (e->e_dfp != NULL) + { + (void) sm_io_close(e->e_dfp, SM_TIME_DEFAULT); + e->e_dfp = NULL; + } + e->e_flags &= ~EF_HAS_DF; +} +/* +** NAME2QID -- translate a queue group name to a queue group id +** +** Parameters: +** queuename -- name of queue group. +** +** Returns: +** queue group id if found. +** NOQGRP otherwise. +*/ + +int +name2qid(queuename) + char *queuename; +{ + register STAB *s; + + s = stab(queuename, ST_QUEUE, ST_FIND); + if (s == NULL) + return NOQGRP; + return s->s_quegrp->qg_index; } -/* +/* ** QID_PRINTNAME -- create externally printable version of queue id ** ** Parameters: @@ -3052,100 +5374,225 @@ qid_printname(e) else id = e->e_id; - if (e->e_queuedir == NOQDIR) + if (e->e_qdir == NOQDIR) return id; - (void) snprintf(idbuf, sizeof idbuf, "%.32s/%s", - QPaths[e->e_queuedir].qp_name, id); + (void) sm_snprintf(idbuf, sizeof idbuf, "%.32s/%s", + Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_name, + id); return idbuf; } -/* -** QID_PRINTQUEUE -- create full version of queue directory for df files +/* +** QID_PRINTQUEUE -- create full version of queue directory for data files ** ** Parameters: -** queuedir -- the short version of the queue directory +** qgrp -- index in queue group. +** qdir -- the short version of the queue directory ** ** Returns: -** the full pathname to the queue (static) +** the full pathname to the queue (might point to a static var) */ char * -qid_printqueue(queuedir) - int queuedir; +qid_printqueue(qgrp, qdir) + int qgrp; + int qdir; { char *subdir; static char dir[MAXPATHLEN]; - if (queuedir == NOQDIR) - return QueueDir; + if (qdir == NOQDIR) + return Queue[qgrp]->qg_qdir; - if (strcmp(QPaths[queuedir].qp_name, ".") == 0) + if (strcmp(Queue[qgrp]->qg_qpaths[qdir].qp_name, ".") == 0) subdir = NULL; else - subdir = QPaths[queuedir].qp_name; + subdir = Queue[qgrp]->qg_qpaths[qdir].qp_name; - (void) snprintf(dir, sizeof dir, "%s%s%s%s", QueueDir, + (void) sm_strlcpyn(dir, sizeof dir, 4, + Queue[qgrp]->qg_qdir, subdir == NULL ? "" : "/", subdir == NULL ? "" : subdir, - (bitset(QP_SUBDF, QPaths[queuedir].qp_subdirs) ? "/df" : "")); + (bitset(QP_SUBDF, + Queue[qgrp]->qg_qpaths[qdir].qp_subdirs) + ? "/df" : "")); return dir; } -/* -** SETNEWQUEUE -- Sets a new queue directory + +/* +** PICKQDIR -- Pick a queue directory from a queue group +** +** Parameters: +** qg -- queue group +** fsize -- file size in bytes +** e -- envelope, or NULL ** -** Assign a queue directory to an envelope and store the directory -** in e->e_queuedir. The queue is chosen at random. +** Result: +** NOQDIR if no queue directory in qg has enough free space to +** hold a file of size 'fsize', otherwise the index of +** a randomly selected queue directory which resides on a +** file system with enough disk space. +** XXX This could be extended to select a queuedir with +** a few (the fewest?) number of entries. That data +** is available if shared memory is used. +** +** Side Effects: +** If the request fails and e != NULL then sm_syslog is called. +*/ + +int +pickqdir(qg, fsize, e) + QUEUEGRP *qg; + long fsize; + ENVELOPE *e; +{ + int qdir; + int i; + long avail = 0; + + /* Pick a random directory, as a starting point. */ + if (qg->qg_numqueues <= 1) + qdir = 0; + else + qdir = get_rand_mod(qg->qg_numqueues); + + if (MinBlocksFree <= 0 && fsize <= 0) + return qdir; + + /* + ** Now iterate over the queue directories, + ** looking for a directory with enough space for this message. + */ + + i = qdir; + do + { + QPATHS *qp = &qg->qg_qpaths[i]; + long needed = 0; + long fsavail = 0; + + if (fsize > 0) + needed += fsize / FILE_SYS_BLKSIZE(qp->qp_fsysidx) + + ((fsize % FILE_SYS_BLKSIZE(qp->qp_fsysidx) + > 0) ? 1 : 0); + if (MinBlocksFree > 0) + needed += MinBlocksFree; + fsavail = FILE_SYS_AVAIL(qp->qp_fsysidx); +#if SM_CONF_SHM + if (fsavail <= 0) + { + long blksize; + + /* + ** might be not correctly updated, + ** let's try to get the info directly. + */ + + fsavail = freediskspace(FILE_SYS_NAME(qp->qp_fsysidx), + &blksize); + if (fsavail < 0) + fsavail = 0; + } +#endif /* SM_CONF_SHM */ + if (needed <= fsavail) + return i; + if (avail < fsavail) + avail = fsavail; + + if (qg->qg_numqueues > 0) + i = (i + 1) % qg->qg_numqueues; + } while (i != qdir); + + if (e != NULL && LogLevel > 0) + sm_syslog(LOG_ALERT, e->e_id, + "low on space (%s needs %ld bytes + %ld blocks in %s), max avail: %ld", + CurHostName == NULL ? "SMTP-DAEMON" : CurHostName, + fsize, MinBlocksFree, + qg->qg_qdir, avail); + return NOQDIR; +} +/* +** SETNEWQUEUE -- Sets a new queue group and directory ** -** This routine may be improved in the future to allow for more -** elaborate queueing schemes. Suggestions and code contributions -** are welcome. +** Assign a queue group and directory to an envelope and store the +** directory in e->e_qdir. ** ** Parameters: ** e -- envelope to assign a queue for. ** ** Returns: -** none. +** true if successful +** false otherwise +** +** Side Effects: +** On success, e->e_qgrp and e->e_qdir are non-negative. +** On failure (not enough disk space), +** e->qgrp = NOQGRP, e->e_qdir = NOQDIR +** and usrerr() is invoked (which could raise an exception). */ -void +bool setnewqueue(e) ENVELOPE *e; { - int idx; - if (tTd(41, 20)) - dprintf("setnewqueue: called\n"); + sm_dprintf("setnewqueue: called\n"); + + /* not set somewhere else */ + if (e->e_qgrp == NOQGRP) + { + /* + ** Use the queue group of the first recipient, as set by + ** the "queuegroup" rule set. If that is not defined, then + ** use the queue group of the mailer of the first recipient. + ** If that is not defined either, then use the default + ** queue group. + */ + + if (e->e_sendqueue == NULL) + e->e_qgrp = 0; + else if (e->e_sendqueue->q_qgrp >= 0) + e->e_qgrp = e->e_sendqueue->q_qgrp; + else if (e->e_sendqueue->q_mailer != NULL && + ISVALIDQGRP(e->e_sendqueue->q_mailer->m_qgrp)) + e->e_qgrp = e->e_sendqueue->q_mailer->m_qgrp; + else + e->e_qgrp = 0; + e->e_dfqgrp = e->e_qgrp; + } - if (e->e_queuedir != NOQDIR) + if (ISVALIDQDIR(e->e_qdir) && ISVALIDQDIR(e->e_dfqdir)) { if (tTd(41, 20)) - dprintf("setnewqueue: e_queuedir already assigned (%s)\n", - qid_printqueue(e->e_queuedir)); - return; + sm_dprintf("setnewqueue: e_qdir already assigned (%s)\n", + qid_printqueue(e->e_qgrp, e->e_qdir)); + return true; } - if (NumQueues <= 1) - idx = 0; - else + filesys_update(); + e->e_qdir = pickqdir(Queue[e->e_qgrp], e->e_msgsize, e); + if (e->e_qdir == NOQDIR) { -#if RANDOMSHIFT - /* lower bits are not random "enough", select others */ - idx = (get_random() >> RANDOMSHIFT) % NumQueues; -#else /* RANDOMSHIFT */ - idx = get_random() % NumQueues; -#endif /* RANDOMSHIFT */ - if (tTd(41, 15)) - dprintf("setnewqueue: get_random() %% %d = %d\n", - NumQueues, idx); + e->e_qgrp = NOQGRP; + if (!bitset(EF_FATALERRS, e->e_flags)) + usrerr("452 4.4.5 Insufficient disk space; try again later"); + e->e_flags |= EF_FATALERRS; + return false; } - e->e_queuedir = idx; if (tTd(41, 3)) - dprintf("setnewqueue: Assigned queue directory %s\n", - qid_printqueue(e->e_queuedir)); -} + sm_dprintf("setnewqueue: Assigned queue directory %s\n", + qid_printqueue(e->e_qgrp, e->e_qdir)); -/* + if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR) + { + e->e_xfqgrp = e->e_qgrp; + e->e_xfqdir = e->e_qdir; + } + e->e_dfqdir = e->e_qdir; + return true; +} +/* ** CHKQDIR -- check a queue directory ** ** Parameters: @@ -3167,52 +5614,53 @@ chkqdir(name, sff) /* skip over . and .. directories */ if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && name[2] == '\0'))) - return FALSE; -# if HASLSTAT + return false; +#if HASLSTAT if (lstat(name, &statb) < 0) -# else /* HASLSTAT */ +#else /* HASLSTAT */ if (stat(name, &statb) < 0) -# endif /* HASLSTAT */ +#endif /* HASLSTAT */ { if (tTd(41, 2)) - dprintf("multiqueue_cache: stat(\"%s\"): %s\n", - name, errstring(errno)); - return FALSE; + sm_dprintf("chkqdir: stat(\"%s\"): %s\n", + name, sm_errstring(errno)); + return false; } -# if HASLSTAT +#if HASLSTAT if (S_ISLNK(statb.st_mode)) { /* ** For a symlink we need to make sure the ** target is a directory */ + if (stat(name, &statb) < 0) { if (tTd(41, 2)) - dprintf("multiqueue_cache: stat(\"%s\"): %s\n", - name, errstring(errno)); - return FALSE; + sm_dprintf("chkqdir: stat(\"%s\"): %s\n", + name, sm_errstring(errno)); + return false; } } -# endif /* HASLSTAT */ +#endif /* HASLSTAT */ if (!S_ISDIR(statb.st_mode)) { if (tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": Not a directory\n", + sm_dprintf("chkqdir: \"%s\": Not a directory\n", name); - return FALSE; + return false; } /* Print a warning if unsafe (but still use it) */ + /* XXX do this only if we want the warning? */ i = safedirpath(name, RunAsUid, RunAsGid, NULL, sff, 0, 0); if (i != 0 && tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": Not safe: %s\n", - name, errstring(i)); - return TRUE; + sm_dprintf("chkqdir: \"%s\": Not safe: %s\n", + name, sm_errstring(i)); + return true; } - -/* +/* ** MULTIQUEUE_CACHE -- cache a list of paths to queues. ** ** Each potential queue is checked as the cache is built. @@ -3221,206 +5669,1234 @@ chkqdir(name, sff) ** (although code for that is not ready yet). ** ** Parameters: -** none +** basedir -- base of all queue directories. +** blen -- strlen(basedir). +** qg -- queue group. +** qn -- number of queue directories already cached. +** phash -- pointer to hash value over queue dirs. +#if SM_CONF_SHM +** only used if shared memory is active. +#endif * SM_CONF_SHM * ** ** Returns: -** none +** new number of queue directories. */ -void -multiqueue_cache() +#define INITIAL_SLOTS 20 +#define ADD_SLOTS 10 + +static int +multiqueue_cache(basedir, blen, qg, qn, phash) + char *basedir; + int blen; + QUEUEGRP *qg; + int qn; + unsigned int *phash; { - register DIR *dp; - register struct dirent *d; char *cp; int i, len; int slotsleft = 0; long sff = SFF_ANYFILE; char qpath[MAXPATHLEN]; char subdir[MAXPATHLEN]; + char prefix[MAXPATHLEN]; /* dir relative to basedir */ if (tTd(41, 20)) - dprintf("multiqueue_cache: called\n"); + sm_dprintf("multiqueue_cache: called\n"); - if (NumQueues != 0 && QPaths != NULL) + /* Initialize to current directory */ + prefix[0] = '.'; + prefix[1] = '\0'; + if (qg->qg_numqueues != 0 && qg->qg_qpaths != NULL) { - for (i = 0; i < NumQueues; i++) + for (i = 0; i < qg->qg_numqueues; i++) { - if (QPaths[i].qp_name != NULL) - sm_free(QPaths[i].qp_name); + if (qg->qg_qpaths[i].qp_name != NULL) + (void) sm_free(qg->qg_qpaths[i].qp_name); /* XXX */ } - sm_free((char *)QPaths); - QPaths = NULL; - NumQueues = 0; + (void) sm_free((char *) qg->qg_qpaths); /* XXX */ + qg->qg_qpaths = NULL; + qg->qg_numqueues = 0; } /* If running as root, allow safedirpath() checks to use privs */ if (RunAsUid == 0) sff |= SFF_ROOTOK; - (void) snprintf(qpath, sizeof qpath, "%s", QueueDir); - len = strlen(qpath) - 1; - cp = &qpath[len]; + if (!SM_IS_DIR_START(qg->qg_qdir)) + { + /* + ** XXX we could add basedir, but then we have to realloc() + ** the string... Maybe another time. + */ + + syserr("QueuePath %s not absolute", qg->qg_qdir); + ExitStat = EX_CONFIG; + return qn; + } + + /* qpath: directory of current workgroup */ + len = sm_strlcpy(qpath, qg->qg_qdir, sizeof qpath); + if (len >= sizeof qpath) + { + syserr("QueuePath %.256s too long (%d max)", + qg->qg_qdir, (int) sizeof qpath); + ExitStat = EX_CONFIG; + return qn; + } + + /* begin of qpath must be same as basedir */ + if (strncmp(basedir, qpath, blen) != 0 && + (strncmp(basedir, qpath, blen - 1) != 0 || len != blen - 1)) + { + syserr("QueuePath %s not subpath of QueueDirectory %s", + qpath, basedir); + ExitStat = EX_CONFIG; + return qn; + } + + /* Do we have a nested subdirectory? */ + if (blen < len && SM_FIRST_DIR_DELIM(qg->qg_qdir + blen) != NULL) + { + + /* Copy subdirectory into prefix for later use */ + if (sm_strlcpy(prefix, qg->qg_qdir + blen, sizeof prefix) >= + sizeof prefix) + { + syserr("QueuePath %.256s too long (%d max)", + qg->qg_qdir, (int) sizeof qpath); + ExitStat = EX_CONFIG; + return qn; + } + cp = SM_LAST_DIR_DELIM(prefix); + SM_ASSERT(cp != NULL); + *cp = '\0'; /* cut off trailing / */ + } + + /* This is guaranteed by the basedir check above */ + SM_ASSERT(len >= blen - 1); + cp = &qpath[len - 1]; if (*cp == '*') { - *cp = '\0'; - if ((cp = strrchr(qpath, '/')) == NULL) + register DIR *dp; + register struct dirent *d; + int off; + char *delim; + char relpath[MAXPATHLEN]; + + *cp = '\0'; /* Overwrite wildcard */ + if ((cp = SM_LAST_DIR_DELIM(qpath)) == NULL) { syserr("QueueDirectory: can not wildcard relative path"); if (tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": Can not wildcard relative path.\n", + sm_dprintf("multiqueue_cache: \"%s*\": Can not wildcard relative path.\n", qpath); ExitStat = EX_CONFIG; - return; + return qn; } if (cp == qpath) { /* ** Special case of top level wildcard, like /foo* + ** Change to //foo* */ - (void) snprintf(qpath + 1, sizeof qpath - 1, - "%s", qpath); + (void) sm_strlcpy(qpath + 1, qpath, sizeof qpath - 1); ++cp; } - *(cp++) = '\0'; - len = strlen(cp); + delim = cp; + *(cp++) = '\0'; /* Replace / with \0 */ + len = strlen(cp); /* Last component of queue directory */ + + /* + ** Path relative to basedir, with trailing / + ** It will be modified below to specify the subdirectories + ** so they can be opened without chdir(). + */ + + off = sm_strlcpyn(relpath, sizeof relpath, 2, prefix, "/"); + SM_ASSERT(off < sizeof relpath); if (tTd(41, 2)) - dprintf("multiqueue_cache: prefix=\"%s\"\n", cp); + sm_dprintf("multiqueue_cache: prefix=\"%s%s\"\n", + relpath, cp); - QueueDir = newstr(qpath); + /* It is always basedir: we don't need to store it per group */ + /* XXX: optimize this! -> one more global? */ + qg->qg_qdir = newstr(basedir); + qg->qg_qdir[blen - 1] = '\0'; /* cut off trailing / */ /* ** XXX Should probably wrap this whole loop in a timeout ** in case some wag decides to NFS mount the queues. */ - /* test path to get warning messages */ - i= safedirpath(QueueDir, RunAsUid, RunAsGid, NULL, sff, 0, 0); - if (i != 0 && tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": Not safe: %s\n", - QueueDir, errstring(i)); - - if (chdir(QueueDir) < 0) + /* Test path to get warning messages. */ + if (qn == 0) { - syserr("can not chdir(%s)", QueueDir); - if (tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": %s\n", - qpath, errstring(errno)); - ExitStat = EX_CONFIG; - return; + /* XXX qg_runasuid and qg_runasgid for specials? */ + i = safedirpath(basedir, RunAsUid, RunAsGid, NULL, + sff, 0, 0); + if (i != 0 && tTd(41, 2)) + sm_dprintf("multiqueue_cache: \"%s\": Not safe: %s\n", + basedir, sm_errstring(i)); } - if ((dp = opendir(".")) == NULL) + if ((dp = opendir(prefix)) == NULL) { - syserr("can not opendir(%s)", QueueDir); + syserr("can not opendir(%s/%s)", qg->qg_qdir, prefix); if (tTd(41, 2)) - dprintf("multiqueue_cache: opendir(\"%s\"): %s\n", - QueueDir, errstring(errno)); + sm_dprintf("multiqueue_cache: opendir(\"%s/%s\"): %s\n", + qg->qg_qdir, prefix, + sm_errstring(errno)); ExitStat = EX_CONFIG; - return; + return qn; } while ((d = readdir(dp)) != NULL) { - if (strncmp(d->d_name, cp, len) != 0) + i = strlen(d->d_name); + if (i < len || strncmp(d->d_name, cp, len) != 0) { if (tTd(41, 5)) - dprintf("multiqueue_cache: \"%s\", skipped\n", + sm_dprintf("multiqueue_cache: \"%s\", skipped\n", d->d_name); continue; } - if (!chkqdir(d->d_name, sff)) + + /* Create relative pathname: prefix + local directory */ + i = sizeof(relpath) - off; + if (sm_strlcpy(relpath + off, d->d_name, i) >= i) + continue; /* way too long */ + + if (!chkqdir(relpath, sff)) continue; - if (QPaths == NULL) + if (qg->qg_qpaths == NULL) { - slotsleft = 20; - QPaths = (QPATHS *)xalloc((sizeof *QPaths) * - slotsleft); - NumQueues = 0; + slotsleft = INITIAL_SLOTS; + qg->qg_qpaths = (QPATHS *)xalloc((sizeof *qg->qg_qpaths) * + slotsleft); + qg->qg_numqueues = 0; } else if (slotsleft < 1) { - QPaths = (QPATHS *)xrealloc((char *)QPaths, - (sizeof *QPaths) * - (NumQueues + 10)); - if (QPaths == NULL) + qg->qg_qpaths = (QPATHS *)sm_realloc((char *)qg->qg_qpaths, + (sizeof *qg->qg_qpaths) * + (qg->qg_numqueues + + ADD_SLOTS)); + if (qg->qg_qpaths == NULL) { (void) closedir(dp); - return; + return qn; } - slotsleft += 10; + slotsleft += ADD_SLOTS; } /* check subdirs */ - QPaths[NumQueues].qp_subdirs = QP_NOSUB; - (void) snprintf(subdir, sizeof subdir, "%s/%s/%s", - qpath, d->d_name, "qf"); - if (chkqdir(subdir, sff)) - QPaths[NumQueues].qp_subdirs |= QP_SUBQF; - - (void) snprintf(subdir, sizeof subdir, "%s/%s/%s", - qpath, d->d_name, "df"); - if (chkqdir(subdir, sff)) - QPaths[NumQueues].qp_subdirs |= QP_SUBDF; - - (void) snprintf(subdir, sizeof subdir, "%s/%s/%s", - qpath, d->d_name, "xf"); - if (chkqdir(subdir, sff)) - QPaths[NumQueues].qp_subdirs |= QP_SUBXF; + qg->qg_qpaths[qg->qg_numqueues].qp_subdirs = QP_NOSUB; + +#define CHKRSUBDIR(name, flag) \ + (void) sm_strlcpyn(subdir, sizeof subdir, 3, relpath, "/", name); \ + if (chkqdir(subdir, sff)) \ + qg->qg_qpaths[qg->qg_numqueues].qp_subdirs |= flag; \ + else + + + CHKRSUBDIR("qf", QP_SUBQF); + CHKRSUBDIR("df", QP_SUBDF); + CHKRSUBDIR("xf", QP_SUBXF); /* assert(strlen(d->d_name) < MAXPATHLEN - 14) */ /* maybe even - 17 (subdirs) */ - QPaths[NumQueues].qp_name = newstr(d->d_name); + + if (prefix[0] != '.') + qg->qg_qpaths[qg->qg_numqueues].qp_name = + newstr(relpath); + else + qg->qg_qpaths[qg->qg_numqueues].qp_name = + newstr(d->d_name); + if (tTd(41, 2)) - dprintf("multiqueue_cache: %d: \"%s\" cached (%x).\n", - NumQueues, d->d_name, - QPaths[NumQueues].qp_subdirs); - NumQueues++; + sm_dprintf("multiqueue_cache: %d: \"%s\" cached (%x).\n", + qg->qg_numqueues, relpath, + qg->qg_qpaths[qg->qg_numqueues].qp_subdirs); +#if SM_CONF_SHM + qg->qg_qpaths[qg->qg_numqueues].qp_idx = qn; + *phash = hash_q(relpath, *phash); +#endif /* SM_CONF_SHM */ + qg->qg_numqueues++; + ++qn; slotsleft--; } (void) closedir(dp); + + /* undo damage */ + *delim = '/'; } - if (NumQueues == 0) + if (qg->qg_numqueues == 0) { - if (*cp != '*' && tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": No wildcard suffix character\n", - QueueDir); - QPaths = (QPATHS *)xalloc(sizeof *QPaths); - QPaths[0].qp_name = newstr("."); - QPaths[0].qp_subdirs = QP_NOSUB; - NumQueues = 1; + qg->qg_qpaths = (QPATHS *) xalloc(sizeof *qg->qg_qpaths); /* test path to get warning messages */ - (void) safedirpath(QueueDir, RunAsUid, RunAsGid, - NULL, sff, 0, 0); - if (chdir(QueueDir) < 0) + i = safedirpath(qpath, RunAsUid, RunAsGid, NULL, sff, 0, 0); + if (i == ENOENT) { - syserr("can not chdir(%s)", QueueDir); + syserr("can not opendir(%s)", qpath); if (tTd(41, 2)) - dprintf("multiqueue_cache: \"%s\": %s\n", - QueueDir, errstring(errno)); + sm_dprintf("multiqueue_cache: opendir(\"%s\"): %s\n", + qpath, sm_errstring(i)); ExitStat = EX_CONFIG; + return qn; } + qg->qg_qpaths[0].qp_subdirs = QP_NOSUB; + qg->qg_numqueues = 1; + /* check subdirs */ - (void) snprintf(subdir, sizeof subdir, "%s/qf", QueueDir); - if (chkqdir(subdir, sff)) - QPaths[0].qp_subdirs |= QP_SUBQF; +#define CHKSUBDIR(name, flag) \ + (void) sm_strlcpyn(subdir, sizeof subdir, 3, qg->qg_qdir, "/", name); \ + if (chkqdir(subdir, sff)) \ + qg->qg_qpaths[0].qp_subdirs |= flag; \ + else - (void) snprintf(subdir, sizeof subdir, "%s/df", QueueDir); - if (chkqdir(subdir, sff)) - QPaths[0].qp_subdirs |= QP_SUBDF; + CHKSUBDIR("qf", QP_SUBQF); + CHKSUBDIR("df", QP_SUBDF); + CHKSUBDIR("xf", QP_SUBXF); - (void) snprintf(subdir, sizeof subdir, "%s/xf", QueueDir); - if (chkqdir(subdir, sff)) - QPaths[0].qp_subdirs |= QP_SUBXF; + if (qg->qg_qdir[blen - 1] != '\0' && + qg->qg_qdir[blen] != '\0') + { + /* + ** Copy the last component into qpaths and + ** cut off qdir + */ + + qg->qg_qpaths[0].qp_name = newstr(qg->qg_qdir + blen); + qg->qg_qdir[blen - 1] = '\0'; + } + else + qg->qg_qpaths[0].qp_name = newstr("."); + +#if SM_CONF_SHM + qg->qg_qpaths[0].qp_idx = qn; + *phash = hash_q(qg->qg_qpaths[0].qp_name, *phash); +#endif /* SM_CONF_SHM */ + ++qn; + } + return qn; +} + +/* +** FILESYS_FIND -- find entry in FileSys table, or add new one +** +** Given the pathname of a directory, determine the file system +** in which that directory resides, and return a pointer to the +** entry in the FileSys table that describes the file system. +** A new entry is added if necessary (and requested). +** If the directory does not exist, -1 is returned. +** +** Parameters: +** path -- pathname of directory +** add -- add to structure if not found. +** +** Returns: +** >=0: found: index in file system table +** <0: some error, i.e., +** FSF_TOO_MANY: too many filesystems (-> syserr()) +** FSF_STAT_FAIL: can't stat() filesystem (-> syserr()) +** FSF_NOT_FOUND: not in list +*/ + +static short filesys_find __P((char *, bool)); + +#define FSF_NOT_FOUND (-1) +#define FSF_STAT_FAIL (-2) +#define FSF_TOO_MANY (-3) + +static short +filesys_find(path, add) + char *path; + bool add; +{ + struct stat st; + short i; + + if (stat(path, &st) < 0) + { + syserr("cannot stat queue directory %s", path); + return FSF_STAT_FAIL; + } + for (i = 0; i < NumFileSys; ++i) + { + if (FILE_SYS_DEV(i) == st.st_dev) + return i; + } + if (i >= MAXFILESYS) + { + syserr("too many queue file systems (%d max)", MAXFILESYS); + return FSF_TOO_MANY; } + if (!add) + return FSF_NOT_FOUND; + + ++NumFileSys; + FILE_SYS_NAME(i) = path; + FILE_SYS_DEV(i) = st.st_dev; + FILE_SYS_AVAIL(i) = 0; + FILE_SYS_BLKSIZE(i) = 1024; /* avoid divide by zero */ + return i; } -# if 0 -/* +/* +** FILESYS_SETUP -- set up mapping from queue directories to file systems +** +** This data structure is used to efficiently check the amount of +** free space available in a set of queue directories. +** +** Parameters: +** add -- initialize structure if necessary. +** +** Returns: +** 0: success +** <0: some error, i.e., +** FSF_NOT_FOUND: not in list +** FSF_STAT_FAIL: can't stat() filesystem (-> syserr()) +** FSF_TOO_MANY: too many filesystems (-> syserr()) +*/ + +static int filesys_setup __P((bool)); + +static int +filesys_setup(add) + bool add; +{ + int i, j; + short fs; + int ret; + + ret = 0; + for (i = 0; i < NumQueue && Queue[i] != NULL; i++) + { + for (j = 0; j < Queue[i]->qg_numqueues; ++j) + { + QPATHS *qp = &Queue[i]->qg_qpaths[j]; + + fs = filesys_find(qp->qp_name, add); + if (fs >= 0) + qp->qp_fsysidx = fs; + else + qp->qp_fsysidx = 0; + if (fs < ret) + ret = fs; + } + } + return ret; +} + +/* +** FILESYS_UPDATE -- update amount of free space on all file systems +** +** The FileSys table is used to cache the amount of free space +** available on all queue directory file systems. +** This function updates the cached information if it has expired. +** +** Parameters: +** none. +** +** Returns: +** none. +** +** Side Effects: +** Updates FileSys table. +*/ + +void +filesys_update() +{ + int i; + long avail, blksize; + time_t now; + static time_t nextupdate = 0; + +#if SM_CONF_SHM + /* only the daemon updates this structure */ + if (ShmId != SM_SHM_NO_ID && DaemonPid != CurrentPid) + return; +#endif /* SM_CONF_SHM */ + now = curtime(); + if (now < nextupdate) + return; + nextupdate = now + FILESYS_UPDATE_INTERVAL; + for (i = 0; i < NumFileSys; ++i) + { + FILESYS *fs = &FILE_SYS(i); + + avail = freediskspace(FILE_SYS_NAME(i), &blksize); + if (avail < 0 || blksize <= 0) + { + if (LogLevel > 5) + sm_syslog(LOG_ERR, NOQID, + "filesys_update failed: %s, fs=%s, avail=%ld, blocksize=%ld", + sm_errstring(errno), + FILE_SYS_NAME(i), avail, blksize); + fs->fs_avail = 0; + fs->fs_blksize = 1024; /* avoid divide by zero */ + nextupdate = now + 2; /* let's do this soon again */ + } + else + { + fs->fs_avail = avail; + fs->fs_blksize = blksize; + } + } +} + +#if _FFR_ANY_FREE_FS +/* +** FILESYS_FREE -- check whether there is at least one fs with enough space. +** +** Parameters: +** fsize -- file size in bytes +** +** Returns: +** true iff there is one fs with more than fsize bytes free. +*/ + +bool +filesys_free(fsize) + long fsize; +{ + int i; + + if (fsize <= 0) + return true; + for (i = 0; i < NumFileSys; ++i) + { + long needed = 0; + + if (FILE_SYS_AVAIL(i) < 0 || FILE_SYS_BLKSIZE(i) <= 0) + continue; + needed += fsize / FILE_SYS_BLKSIZE(i) + + ((fsize % FILE_SYS_BLKSIZE(i) + > 0) ? 1 : 0) + + MinBlocksFree; + if (needed <= FILE_SYS_AVAIL(i)) + return true; + } + return false; +} +#endif /* _FFR_ANY_FREE_FS */ + +#if _FFR_CONTROL_MSTAT +/* +** DISK_STATUS -- show amount of free space in queue directories +** +** Parameters: +** out -- output file pointer. +** prefix -- string to output in front of each line. +** +** Returns: +** none. +*/ + +void +disk_status(out, prefix) + SM_FILE_T *out; + char *prefix; +{ + int i; + long avail, blksize; + long free; + + for (i = 0; i < NumFileSys; ++i) + { + avail = freediskspace(FILE_SYS_NAME(i), &blksize); + if (avail >= 0 && blksize > 0) + { + free = (long)((double) avail * + ((double) blksize / 1024)); + } + else + free = -1; + (void) sm_io_fprintf(out, SM_TIME_DEFAULT, + "%s%d/%s/%ld\r\n", + prefix, i, + FILE_SYS_NAME(i), + free); + } +} +#endif /* _FFR_CONTROL_MSTAT */ + +#if SM_CONF_SHM +/* +** UPD_QS -- update information about queue when adding/deleting an entry +** +** Parameters: +** e -- envelope. +** delete -- delete/add entry. +** avail -- update the space available as well. +** +** Returns: +** none. +** +** Side Effects: +** Modifies available space in filesystem. +** Changes number of entries in queue directory. +*/ + +void +upd_qs(e, delete, avail) + ENVELOPE *e; + bool delete; + bool avail; +{ + short fidx; + int idx; + long s; + + if (ShmId == SM_SHM_NO_ID || e == NULL) + return; + if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR) + return; + idx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_idx; + + /* XXX in theory this needs to be protected with a mutex */ + if (QSHM_ENTRIES(idx) >= 0) + { + if (delete) + --QSHM_ENTRIES(idx); + else + ++QSHM_ENTRIES(idx); + } + + fidx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_fsysidx; + if (fidx < 0) + return; + + /* update available space also? (might be loseqfile) */ + if (!avail) + return; + + /* convert size to blocks; this causes rounding errors */ + s = e->e_msgsize / FILE_SYS_BLKSIZE(fidx); + if (s == 0) + return; + + /* XXX in theory this needs to be protected with a mutex */ + if (delete) + FILE_SYS_AVAIL(fidx) += s; + else + FILE_SYS_AVAIL(fidx) -= s; + +} +/* +** INIT_SHM -- initialize shared memory structure +** +** Initialize or attach to shared memory segment. +** Currently it is not a fatal error if this doesn't work. +** However, it causes us to have a "fallback" storage location +** for everything that is supposed to be in the shared memory, +** which makes the code slightly ugly. +** +** Parameters: +** qn -- number of queue directories. +** owner -- owner of shared memory. +** hash -- identifies data that is stored in shared memory. +** +** Returns: +** none. +*/ + +static void init_shm __P((int, bool, unsigned int)); + +static void +init_shm(qn, owner, hash) + int qn; + bool owner; + unsigned int hash; +{ + int i; + + PtrFileSys = &FileSys[0]; + PNumFileSys = &Numfilesys; + + /* This allows us to disable shared memory at runtime. */ + if (ShmKey != 0) + { + int count; + int save_errno; + size_t shms; + + count = 0; + shms = SM_T_SIZE + qn * sizeof(QUEUE_SHM_T); + for (;;) + { + /* XXX: maybe allow read access for group? */ + Pshm = sm_shmstart(ShmKey, shms, SHM_R|SHM_W, &ShmId, + owner); + save_errno = errno; + if (Pshm != NULL || save_errno != EEXIST) + break; + if (++count >= 3) + break; + sleep(count); + } + if (Pshm != NULL) + { + int *p; + + p = (int *) Pshm; + if (owner) + { + *p = (int) shms; + *((pid_t *) SHM_OFF_PID(Pshm)) = CurrentPid; + p = (int *) SHM_OFF_TAG(Pshm); + *p = hash; + } + else + { + if (*p != (int) shms) + { + save_errno = EINVAL; + cleanup_shm(false); + goto error; + } + p = (int *) SHM_OFF_TAG(Pshm); + if (*p != (int) hash) + { + save_errno = EINVAL; + cleanup_shm(false); + goto error; + } + + /* + ** XXX how to check the pid? + ** Read it from the pid-file? That does + ** not need to exist. + ** We could disable shm if we can't confirm + ** that it is the right one. + */ + } + + PtrFileSys = (FILESYS *) OFF_FILE_SYS(Pshm); + PNumFileSys = (int *) OFF_NUM_FILE_SYS(Pshm); + QShm = (QUEUE_SHM_T *) OFF_QUEUE_SHM(Pshm); + PRSATmpCnt = (int *) OFF_RSA_TMP_CNT(Pshm); + *PRSATmpCnt = 0; + if (owner) + { + /* initialize values in shared memory */ + NumFileSys = 0; + for (i = 0; i < qn; i++) + QShm[i].qs_entries = -1; + } + return; + } + error: + if (LogLevel > (owner ? 8 : 11)) + { + sm_syslog(owner ? LOG_ERR : LOG_NOTICE, NOQID, + "can't %s shared memory, key=%ld: %s", + owner ? "initialize" : "attach to", + (long) ShmKey, sm_errstring(save_errno)); + } + } +} +#endif /* SM_CONF_SHM */ + +/* +** SETUP_QUEUES -- setup all queue groups +** +** Parameters: +** owner -- owner of shared memory. +** +** Returns: +** none. +** +#if SM_CONF_SHM +** Side Effects: +** attaches shared memory. +#endif * SM_CONF_SHM * +*/ + +void +setup_queues(owner) + bool owner; +{ + int i, qn, len; + unsigned int hashval; + char basedir[MAXPATHLEN]; + struct stat st; + + /* + ** Determine basedir for all queue directories. + ** All queue directories must be (first level) subdirectories + ** of the basedir. The basedir is the QueueDir + ** without wildcards, but with trailing / + */ + + hashval = 0; + errno = 0; + len = sm_strlcpy(basedir, QueueDir, sizeof basedir); + if (len >= sizeof basedir) + { + syserr("QueueDirectory: path too long: %d, max %d", + len, (int) sizeof basedir); + ExitStat = EX_CONFIG; + return; + } + SM_ASSERT(len > 0); + if (basedir[len - 1] == '*') + { + char *cp; + + cp = SM_LAST_DIR_DELIM(basedir); + if (cp == NULL) + { + syserr("QueueDirectory: can not wildcard relative path \"%s\"", + QueueDir); + if (tTd(41, 2)) + sm_dprintf("setup_queues: \"%s\": Can not wildcard relative path.\n", + QueueDir); + ExitStat = EX_CONFIG; + return; + } + + /* cut off wildcard pattern */ + *++cp = '\0'; + len = cp - basedir; + } + else if (!SM_IS_DIR_DELIM(basedir[len - 1])) + { + /* append trailing slash since it is a directory */ + basedir[len] = '/'; + basedir[++len] = '\0'; + } + + /* len counts up to the last directory delimiter */ + SM_ASSERT(basedir[len - 1] == '/'); + + if (chdir(basedir) < 0) + { + int save_errno = errno; + + syserr("can not chdir(%s)", basedir); + if (save_errno == EACCES) + (void) sm_io_fprintf(smioerr, SM_TIME_DEFAULT, + "Program mode requires special privileges, e.g., root or TrustedUser.\n"); + if (tTd(41, 2)) + sm_dprintf("setup_queues: \"%s\": %s\n", + basedir, sm_errstring(errno)); + ExitStat = EX_CONFIG; + return; + } +#if SM_CONF_SHM + hashval = hash_q(basedir, hashval); +#endif /* SM_CONF_SHM */ + + /* initialize map for queue runs */ + clrbitmap(DoQueueRun); + + + if (UseMSP && OpMode != MD_TEST) + { + long sff = SFF_CREAT; + + if (stat(".", &st) < 0) + { + syserr("can not stat(%s)", basedir); + if (tTd(41, 2)) + sm_dprintf("setup_queues: \"%s\": %s\n", + basedir, sm_errstring(errno)); + ExitStat = EX_CONFIG; + return; + } + if (RunAsUid == 0) + sff |= SFF_ROOTOK; + + /* + ** Check queue directory permissions. + ** Can we write to a group writable queue directory? + */ + + if (bitset(S_IWGRP, QueueFileMode) && + bitset(S_IWGRP, st.st_mode) && + safefile(" ", RunAsUid, RunAsGid, RunAsUserName, sff, + QueueFileMode, NULL) != 0) + { + syserr("can not write to queue directory %s (RunAsGid=%d, required=%d)", + basedir, (int) RunAsGid, (int) st.st_gid); + } + if (bitset(S_IWOTH|S_IXOTH, st.st_mode)) + { +#if _FFR_MSP_PARANOIA + syserr("dangerous permissions=%o on queue directory %s", + (int) st.st_mode, basedir); +#else /* _FFR_MSP_PARANOIA */ + if (LogLevel > 0) + sm_syslog(LOG_ERR, NOQID, + "dangerous permissions=%o on queue directory %s", + (int) st.st_mode, basedir); +#endif /* _FFR_MSP_PARANOIA */ + } +#if _FFR_MSP_PARANOIA + if (NumQueue > 1) + syserr("can not use multiple queues for MSP"); +#endif /* _FFR_MSP_PARANOIA */ + } + + /* initial number of queue directories */ + qn = 0; + for (i = 0; i < NumQueue && Queue[i] != NULL; i++) + qn = multiqueue_cache(basedir, len, Queue[i], qn, &hashval); + +#if SM_CONF_SHM + init_shm(qn, owner, hashval); + i = filesys_setup(owner || ShmId == SM_SHM_NO_ID); + if (i == FSF_NOT_FOUND) + { + /* + ** We didn't get the right filesystem data + ** This may happen if we don't have the right shared memory. + ** So let's do this without shared memory. + */ + + SM_ASSERT(!owner); + cleanup_shm(false); /* release shared memory */ + i = filesys_setup(false); + if (i < 0) + syserr("filesys_setup failed twice, result=%d", i); + else if (LogLevel > 8) + sm_syslog(LOG_WARNING, NOQID, + "shared memory does not contain expected data, ignored"); + } +#else /* SM_CONF_SHM */ + i = filesys_setup(true); +#endif /* SM_CONF_SHM */ + if (i < 0) + ExitStat = EX_CONFIG; +} + +#if SM_CONF_SHM +/* +** CLEANUP_SHM -- do some cleanup work for shared memory etc +** +** Parameters: +** owner -- owner of shared memory? +** +** Returns: +** none. +** +** Side Effects: +** detaches shared memory. +*/ + +void +cleanup_shm(owner) + bool owner; +{ + if (ShmId != SM_SHM_NO_ID) + { + if (sm_shmstop(Pshm, ShmId, owner) < 0 && LogLevel > 8) + sm_syslog(LOG_INFO, NOQID, "sh_shmstop failed=%s", + sm_errstring(errno)); + Pshm = NULL; + ShmId = SM_SHM_NO_ID; + } +} +#endif /* SM_CONF_SHM */ + +/* +** CLEANUP_QUEUES -- do some cleanup work for queues +** +** Parameters: +** none. +** +** Returns: +** none. +** +*/ + +void +cleanup_queues() +{ + sync_queue_time(); +} +/* +** SET_DEF_QUEUEVAL -- set default values for a queue group. +** +** Parameters: +** qg -- queue group +** all -- set all values (true for default group)? +** +** Returns: +** none. +** +** Side Effects: +** sets default values for the queue group. +*/ + +void +set_def_queueval(qg, all) + QUEUEGRP *qg; + bool all; +{ + if (bitnset(QD_DEFINED, qg->qg_flags)) + return; + if (all) + qg->qg_qdir = QueueDir; +#if 0 + qg->qg_sortorder = QueueSortOrder; +#endif /* 0 */ + qg->qg_maxqrun = all ? MaxRunnersPerQueue : -1; + qg->qg_nice = NiceQueueRun; +} +/* +** MAKEQUEUE -- define a new queue. +** +** Parameters: +** line -- description of queue. This is in labeled fields. +** The fields are: +** F -- the flags associated with the queue +** I -- the interval between running the queue +** J -- the maximum # of jobs in work list +** [M -- the maximum # of jobs in a queue run] +** N -- the niceness at which to run +** P -- the path to the queue +** S -- the queue sorting order +** R -- number of parallel queue runners +** r -- max recipients per envelope +** The first word is the canonical name of the queue. +** qdef -- this is a 'Q' definition from .cf +** +** Returns: +** none. +** +** Side Effects: +** enters the queue into the queue table. +*/ + +void +makequeue(line, qdef) + char *line; + bool qdef; +{ + register char *p; + register QUEUEGRP *qg; + register STAB *s; + int i; + char fcode; + + /* allocate a queue and set up defaults */ + qg = (QUEUEGRP *) xalloc(sizeof *qg); + memset((char *) qg, '\0', sizeof *qg); + + if (line[0] == '\0') + { + syserr("name required for queue"); + return; + } + + /* collect the queue name */ + for (p = line; + *p != '\0' && *p != ',' && !(isascii(*p) && isspace(*p)); + p++) + continue; + if (*p != '\0') + *p++ = '\0'; + qg->qg_name = newstr(line); + + /* set default values, can be overridden below */ + set_def_queueval(qg, false); + + /* now scan through and assign info from the fields */ + while (*p != '\0') + { + auto char *delimptr; + + while (*p != '\0' && + (*p == ',' || (isascii(*p) && isspace(*p)))) + p++; + + /* p now points to field code */ + fcode = *p; + while (*p != '\0' && *p != '=' && *p != ',') + p++; + if (*p++ != '=') + { + syserr("queue %s: `=' expected", qg->qg_name); + return; + } + while (isascii(*p) && isspace(*p)) + p++; + + /* p now points to the field body */ + p = munchstring(p, &delimptr, ','); + + /* install the field into the queue struct */ + switch (fcode) + { + case 'P': /* pathname */ + if (*p == '\0') + syserr("queue %s: empty path name", + qg->qg_name); + else + qg->qg_qdir = newstr(p); + break; + + case 'F': /* flags */ + for (; *p != '\0'; p++) + if (!(isascii(*p) && isspace(*p))) + setbitn(*p, qg->qg_flags); + break; + + /* + ** Do we need two intervals here: + ** One for persistent queue runners, + ** one for "normal" queue runs? + */ + + case 'I': /* interval between running the queue */ + qg->qg_queueintvl = convtime(p, 'm'); + break; + + case 'N': /* run niceness */ + qg->qg_nice = atoi(p); + break; + + case 'R': /* maximum # of runners for the group */ + i = atoi(p); + + /* can't have more runners than allowed total */ + if (MaxQueueChildren > 0 && i > MaxQueueChildren) + { + qg->qg_maxqrun = MaxQueueChildren; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Q=%s: R=%d exceeds MaxQueueChildren=%d, set to MaxQueueChildren\n", + qg->qg_name, i, + MaxQueueChildren); + } + else + qg->qg_maxqrun = i; + break; + + case 'J': /* maximum # of jobs in work list */ + qg->qg_maxlist = atoi(p); + break; + + case 'r': /* max recipients per envelope */ + qg->qg_maxrcpt = atoi(p); + break; + +#if 0 + case 'S': /* queue sorting order */ + switch (*p) + { + case 'h': /* Host first */ + case 'H': + qg->qg_sortorder = QSO_BYHOST; + break; + + case 'p': /* Priority order */ + case 'P': + qg->qg_sortorder = QSO_BYPRIORITY; + break; + + case 't': /* Submission time */ + case 'T': + qg->qg_sortorder = QSO_BYTIME; + break; + + case 'f': /* File name */ + case 'F': + qg->qg_sortorder = QSO_BYFILENAME; + break; + + case 'm': /* Modification time */ + case 'M': + qgrp->qg_sortorder = QSO_BYMODTIME; + break; + + default: + syserr("Invalid queue sort order \"%s\"", p); + } + break; +#endif /* 0 */ + + default: + syserr("Q%s: unknown queue equate %c=", + qg->qg_name, fcode); + break; + } + + p = delimptr; + } + +#if !HASNICE + if (qg->qg_nice != NiceQueueRun) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Q%s: Warning: N= set on system that doesn't support nice()\n", + qg->qg_name); + } +#endif /* !HASNICE */ + + /* do some rationality checking */ + if (NumQueue >= MAXQUEUEGROUPS) + { + syserr("too many queue groups defined (%d max)", + MAXQUEUEGROUPS); + return; + } + + if (qg->qg_qdir == NULL) + { + if (QueueDir == NULL || *QueueDir == '\0') + { + syserr("QueueDir must be defined before queue groups"); + return; + } + qg->qg_qdir = newstr(QueueDir); + } + + if (qg->qg_maxqrun > 1 && !bitnset(QD_FORK, qg->qg_flags)) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Warning: Q=%s: R=%d: multiple queue runners specified\n\tbut flag '%c' is not set\n", + qg->qg_name, qg->qg_maxqrun, QD_FORK); + } + + /* enter the queue into the symbol table */ + if (tTd(37, 8)) + sm_syslog(LOG_INFO, NOQID, + "Adding %s to stab, path: %s", qg->qg_name, + qg->qg_qdir); + s = stab(qg->qg_name, ST_QUEUE, ST_ENTER); + if (s->s_quegrp != NULL) + { + i = s->s_quegrp->qg_index; + + /* XXX what about the pointers inside this struct? */ + sm_free(s->s_quegrp); /* XXX */ + } + else + i = NumQueue++; + Queue[i] = s->s_quegrp = qg; + qg->qg_index = i; + + /* set default value for max queue runners */ + if (qg->qg_maxqrun < 0) + { + if (MaxRunnersPerQueue > 0) + qg->qg_maxqrun = MaxRunnersPerQueue; + else + qg->qg_maxqrun = 1; + } + if (qdef) + setbitn(QD_DEFINED, qg->qg_flags); +} +#if 0 +/* ** HASHFQN -- calculate a hash value for a fully qualified host name ** ** Arguments: @@ -3441,7 +6917,6 @@ hashfqn(fqn, buckets) { register char *p; register int h = 0, hash, cnt; -# define WATERINC (1000) if (fqn == NULL) return -1; @@ -3465,10 +6940,10 @@ hashfqn(fqn, buckets) return hash; } -# endif /* 0 */ +#endif /* 0 */ -# if _FFR_QUEUEDELAY -/* +#if _FFR_QUEUEDELAY +/* ** QUEUEDELAY -- compute queue delay time ** ** Parameters: @@ -3503,5 +6978,1330 @@ queuedelay(e) qd = MinQueueAge; return qd; } -# endif /* _FFR_QUEUEDELAY */ -#endif /* QUEUE */ +#endif /* _FFR_QUEUEDELAY */ + +/* +** A structure for sorting Queue according to maxqrun without +** screwing up Queue itself. +*/ + +struct sortqgrp +{ + int sg_idx; /* original index */ + int sg_maxqrun; /* max queue runners */ +}; +typedef struct sortqgrp SORTQGRP_T; +static int cmpidx __P((const void *, const void *)); + +static int +cmpidx(a, b) + const void *a; + const void *b; +{ + /* The sort is highest to lowest, so the comparison is reversed */ + if (((SORTQGRP_T *)a)->sg_maxqrun < ((SORTQGRP_T *)b)->sg_maxqrun) + return 1; + else if (((SORTQGRP_T *)a)->sg_maxqrun > ((SORTQGRP_T *)b)->sg_maxqrun) + return -1; + else + return 0; +} + +/* +** MAKEWORKGROUP -- balance queue groups into work groups per MaxQueueChildren +** +** Take the now defined queue groups and assign them to work groups. +** This is done to balance out the number of concurrently active +** queue runners such that MaxQueueChildren is not exceeded. This may +** result in more than one queue group per work group. In such a case +** the number of running queue groups in that work group will have no +** more than the work group maximum number of runners (a "fair" portion +** of MaxQueueRunners). All queue groups within a work group will get a +** chance at running. +** +** Parameters: +** none. +** +** Returns: +** nothing. +** +** Side Effects: +** Sets up WorkGrp structure. +*/ + +void +makeworkgroups() +{ + int i, j, total_runners = 0; + int dir; + SORTQGRP_T si[MAXQUEUEGROUPS + 1]; + + if (NumQueue == 1 && strcmp(Queue[0]->qg_name, "mqueue") == 0) + { + /* + ** There is only the "mqueue" queue group (a default) + ** containing all of the queues. We want to provide to + ** this queue group the maximum allowable queue runners. + ** To match older behavior (8.10/8.11) we'll try for + ** 1 runner per queue capping it at MaxQueueChildren. + ** So if there are N queues, then there will be N runners + ** for the "mqueue" queue group (where N is kept less than + ** MaxQueueChildren). + */ + + NumWorkGroups = 1; + WorkGrp[0].wg_numqgrp = 1; + WorkGrp[0].wg_qgs = (QUEUEGRP **) xalloc(sizeof(QUEUEGRP *)); + WorkGrp[0].wg_qgs[0] = Queue[0]; + if (MaxQueueChildren > 0 && + Queue[0]->qg_numqueues > MaxQueueChildren) + WorkGrp[0].wg_runners = MaxQueueChildren; + else + WorkGrp[0].wg_runners = Queue[0]->qg_numqueues; + + Queue[0]->qg_wgrp = 0; + + /* can't have more runners than allowed total */ + if (MaxQueueChildren > 0 && + Queue[0]->qg_maxqrun > MaxQueueChildren) + Queue[0]->qg_maxqrun = MaxQueueChildren; + WorkGrp[0].wg_maxact = Queue[0]->qg_maxqrun; + WorkGrp[0].wg_lowqintvl = Queue[0]->qg_queueintvl; + return; + } + + for (i = 0; i < NumQueue; i++) + { + si[i].sg_maxqrun = Queue[i]->qg_maxqrun; + si[i].sg_idx = i; + } + qsort(si, NumQueue, sizeof(si[0]), cmpidx); + + NumWorkGroups = 0; + for (i = 0; i < NumQueue; i++) + { + total_runners += si[i].sg_maxqrun; + if (MaxQueueChildren <= 0 || total_runners <= MaxQueueChildren) + NumWorkGroups++; + else + break; + } + + if (NumWorkGroups < 1) + NumWorkGroups = 1; /* gotta have one at least */ + else if (NumWorkGroups > MAXWORKGROUPS) + NumWorkGroups = MAXWORKGROUPS; /* the limit */ + + /* + ** We now know the number of work groups to pack the queue groups + ** into. The queue groups in 'Queue' are sorted from highest + ** to lowest for the number of runners per queue group. + ** We put the queue groups with the largest number of runners + ** into work groups first. Then the smaller ones are fitted in + ** where it looks best. + */ + + j = 0; + dir = 1; + for (i = 0; i < NumQueue; i++) + { + /* a to-and-fro packing scheme, continue from last position */ + if (j >= NumWorkGroups) + { + dir = -1; + j = NumWorkGroups - 1; + } + else if (j < 0) + { + j = 0; + dir = 1; + } + + WorkGrp[j].wg_qgs = (QUEUEGRP **)sm_realloc(WorkGrp[j].wg_qgs, + sizeof(QUEUEGRP *) * + (WorkGrp[j].wg_numqgrp + 1)); + if (WorkGrp[j].wg_qgs == NULL) + { + syserr("@cannot allocate memory for work queues, need %d bytes", + (int) (sizeof(QUEUEGRP *) * + (WorkGrp[j].wg_numqgrp + 1))); + } + + WorkGrp[j].wg_qgs[WorkGrp[j].wg_numqgrp] = Queue[si[i].sg_idx]; + WorkGrp[j].wg_numqgrp++; + WorkGrp[j].wg_runners += Queue[i]->qg_maxqrun; + Queue[si[i].sg_idx]->qg_wgrp = j; + + if (WorkGrp[j].wg_maxact == 0) + { + /* can't have more runners than allowed total */ + if (MaxQueueChildren > 0 && + Queue[i]->qg_maxqrun > MaxQueueChildren) + Queue[i]->qg_maxqrun = MaxQueueChildren; + WorkGrp[j].wg_maxact = Queue[i]->qg_maxqrun; + } + + /* + ** XXX: must wg_lowqintvl be the GCD? + ** qg1: 2m, qg2: 3m, minimum: 2m, when do queue runs for + ** qg2 occur? + */ + + /* keep track of the lowest interval for a persistent runner */ + if (Queue[si[i].sg_idx]->qg_queueintvl > 0 && + WorkGrp[j].wg_lowqintvl < Queue[si[i].sg_idx]->qg_queueintvl) + WorkGrp[j].wg_lowqintvl = Queue[si[i].sg_idx]->qg_queueintvl; + j += dir; + } + if (tTd(41, 9)) + { + for (i = 0; i < NumWorkGroups; i++) + { + sm_dprintf("Workgroup[%d]=", i); + for (j = 0; j < WorkGrp[i].wg_numqgrp; j++) + { + sm_dprintf("%s, ", + WorkGrp[i].wg_qgs[j]->qg_name); + } + sm_dprintf("\n"); + } + } +} + +/* +** DUP_DF -- duplicate envelope data file +** +** Copy the data file from the 'old' envelope to the 'new' envelope +** in the most efficient way possible. +** +** Create a hard link from the 'old' data file to the 'new' data file. +** If the old and new queue directories are on different file systems, +** then the new data file link is created in the old queue directory, +** and the new queue file will contain a 'd' record pointing to the +** directory containing the new data file. +** +** Parameters: +** old -- old envelope. +** new -- new envelope. +** +** Results: +** Returns true on success, false on failure. +** +** Side Effects: +** On success, the new data file is created. +** On fatal failure, EF_FATALERRS is set in old->e_flags. +*/ + +static bool dup_df __P((ENVELOPE *, ENVELOPE *)); + +static bool +dup_df(old, new) + ENVELOPE *old; + ENVELOPE *new; +{ + int ofs, nfs, r; + char opath[MAXPATHLEN]; + char npath[MAXPATHLEN]; + + SM_REQUIRE(bitset(EF_HAS_DF, old->e_flags)); + SM_REQUIRE(ISVALIDQGRP(old->e_qgrp) && ISVALIDQDIR(old->e_qdir)); + SM_REQUIRE(ISVALIDQGRP(new->e_qgrp) && ISVALIDQDIR(new->e_qdir)); + + (void) sm_strlcpy(opath, queuename(old, DATAFL_LETTER), sizeof opath); + (void) sm_strlcpy(npath, queuename(new, DATAFL_LETTER), sizeof npath); + + if (old->e_dfp != NULL) + { + r = sm_io_setinfo(old->e_dfp, SM_BF_COMMIT, NULL); + if (r < 0 && errno != EINVAL) + { + syserr("@can't commit %s", opath); + old->e_flags |= EF_FATALERRS; + return false; + } + } + + /* + ** Attempt to create a hard link, if we think both old and new + ** are on the same file system, otherwise copy the file. + ** + ** Don't waste time attempting a hard link unless old and new + ** are on the same file system. + */ + + ofs = Queue[old->e_qgrp]->qg_qpaths[old->e_qdir].qp_fsysidx; + nfs = Queue[new->e_qgrp]->qg_qpaths[new->e_qdir].qp_fsysidx; + if (FILE_SYS_DEV(ofs) == FILE_SYS_DEV(nfs)) + { + if (link(opath, npath) == 0) + { + new->e_flags |= EF_HAS_DF; + SYNC_DIR(npath, true); + return true; + } + goto error; + } + + /* + ** Can't link across queue directories, so try to create a hard + ** link in the same queue directory as the old df file. + ** The qf file will refer to the new df file using a 'd' record. + */ + + new->e_dfqgrp = old->e_dfqgrp; + new->e_dfqdir = old->e_dfqdir; + (void) sm_strlcpy(npath, queuename(new, DATAFL_LETTER), sizeof npath); + if (link(opath, npath) == 0) + { + new->e_flags |= EF_HAS_DF; + SYNC_DIR(npath, true); + return true; + } + + error: + if (LogLevel > 0) + sm_syslog(LOG_ERR, old->e_id, + "dup_df: can't link %s to %s, error=%s, envelope splitting failed", + opath, npath, sm_errstring(errno)); + return false; +} + +/* +** SPLIT_ENV -- Allocate a new envelope based on a given envelope. +** +** Parameters: +** e -- envelope. +** sendqueue -- sendqueue for new envelope. +** qgrp -- index of queue group. +** qdir -- queue directory. +** +** Results: +** new envelope. +** +*/ + +static ENVELOPE *split_env __P((ENVELOPE *, ADDRESS *, int, int)); + +static ENVELOPE * +split_env(e, sendqueue, qgrp, qdir) + ENVELOPE *e; + ADDRESS *sendqueue; + int qgrp; + int qdir; +{ + ENVELOPE *ee; + + ee = (ENVELOPE *) sm_rpool_malloc_x(e->e_rpool, sizeof *ee); + STRUCTCOPY(*e, *ee); + ee->e_message = NULL; /* XXX use original message? */ + ee->e_id = NULL; + assign_queueid(ee); + ee->e_sendqueue = sendqueue; + ee->e_flags &= ~(EF_INQUEUE|EF_CLRQUEUE|EF_FATALERRS + |EF_SENDRECEIPT|EF_RET_PARAM|EF_HAS_DF); + ee->e_flags |= EF_NORECEIPT; /* XXX really? */ + ee->e_from.q_state = QS_SENDER; + ee->e_dfp = NULL; + ee->e_lockfp = NULL; + if (e->e_xfp != NULL) + ee->e_xfp = sm_io_dup(e->e_xfp); + ee->e_qgrp = ee->e_dfqgrp = qgrp; + ee->e_qdir = ee->e_dfqdir = qdir; + ee->e_errormode = EM_MAIL; + ee->e_statmsg = NULL; +#if _FFR_QUARANTINE + if (e->e_quarmsg != NULL) + ee->e_quarmsg = sm_rpool_strdup_x(ee->e_rpool, + e->e_quarmsg); +#endif /* _FFR_QUARANTINE */ + + /* + ** XXX Not sure if this copying is necessary. + ** sendall() does this copying, but I don't know if that is + ** because of the storage management discipline we were using + ** before rpools were introduced, or if it is because these lists + ** can be modified later. + */ + + ee->e_header = copyheader(e->e_header, ee->e_rpool); + ee->e_errorqueue = copyqueue(e->e_errorqueue, ee->e_rpool); + + return ee; +} + +/* return values from split functions, check also below! */ +#define SM_SPLIT_FAIL (0) +#define SM_SPLIT_NONE (1) +#define SM_SPLIT_NEW(n) (1 + (n)) + +/* +** SPLIT_ACROSS_QUEUE_GROUPS +** +** This function splits an envelope across multiple queue groups +** based on the queue group of each recipient. +** +** Parameters: +** e -- envelope. +** +** Results: +** SM_SPLIT_FAIL on failure +** SM_SPLIT_NONE if no splitting occurred, +** or 1 + the number of additional envelopes created. +** +** Side Effects: +** On success, e->e_sibling points to a list of zero or more +** additional envelopes, and the associated data files exist +** on disk. But the queue files are not created. +** +** On failure, e->e_sibling is not changed. +** The order of recipients in e->e_sendqueue is permuted. +** Abandoned data files for additional envelopes that failed +** to be created may exist on disk. +*/ + +static int q_qgrp_compare __P((const void *, const void *)); +static int e_filesys_compare __P((const void *, const void *)); + +static int +q_qgrp_compare(p1, p2) + const void *p1; + const void *p2; +{ + ADDRESS **pq1 = (ADDRESS **) p1; + ADDRESS **pq2 = (ADDRESS **) p2; + + return (*pq1)->q_qgrp - (*pq2)->q_qgrp; +} + +static int +e_filesys_compare(p1, p2) + const void *p1; + const void *p2; +{ + ENVELOPE **pe1 = (ENVELOPE **) p1; + ENVELOPE **pe2 = (ENVELOPE **) p2; + int fs1, fs2; + + fs1 = Queue[(*pe1)->e_qgrp]->qg_qpaths[(*pe1)->e_qdir].qp_fsysidx; + fs2 = Queue[(*pe2)->e_qgrp]->qg_qpaths[(*pe2)->e_qdir].qp_fsysidx; + if (FILE_SYS_DEV(fs1) < FILE_SYS_DEV(fs2)) + return -1; + if (FILE_SYS_DEV(fs1) > FILE_SYS_DEV(fs2)) + return 1; + return 0; +} + +static int +split_across_queue_groups(e) + ENVELOPE *e; +{ + int naddrs, nsplits, i; + char **pvp; + ADDRESS *q, **addrs; + ENVELOPE *ee, *es; + ENVELOPE *splits[MAXQUEUEGROUPS]; + char pvpbuf[PSBUFSIZE]; + + SM_REQUIRE(ISVALIDQGRP(e->e_qgrp)); + + /* Count addresses and assign queue groups. */ + naddrs = 0; + for (q = e->e_sendqueue; q != NULL; q = q->q_next) + { + if (QS_IS_DEAD(q->q_state)) + continue; + ++naddrs; + + /* bad addresses and those already sent stay put */ + if (QS_IS_BADADDR(q->q_state) || + QS_IS_SENT(q->q_state)) + q->q_qgrp = e->e_qgrp; + else if (!ISVALIDQGRP(q->q_qgrp)) + { + /* call ruleset which should return a queue group */ + i = rscap(RS_QUEUEGROUP, q->q_user, NULL, e, &pvp, + pvpbuf, sizeof(pvpbuf)); + if (i == EX_OK && + pvp != NULL && pvp[0] != NULL && + (pvp[0][0] & 0377) == CANONNET && + pvp[1] != NULL && pvp[1][0] != '\0') + { + i = name2qid(pvp[1]); + if (ISVALIDQGRP(i)) + { + q->q_qgrp = i; + if (tTd(20, 4)) + sm_syslog(LOG_INFO, NOQID, + "queue group name %s -> %d", + pvp[1], i); + continue; + } + else if (LogLevel > 10) + sm_syslog(LOG_INFO, NOQID, + "can't find queue group name %s, selection ignored", + pvp[1]); + } + if (q->q_mailer != NULL && + ISVALIDQGRP(q->q_mailer->m_qgrp)) + q->q_qgrp = q->q_mailer->m_qgrp; + else + q->q_qgrp = 0; + } + } + + /* only one address? nothing to split. */ + if (naddrs <= 1) + return SM_SPLIT_NONE; + + /* sort the addresses by queue group */ + addrs = sm_rpool_malloc_x(e->e_rpool, naddrs * sizeof(ADDRESS *)); + for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next) + { + if (QS_IS_DEAD(q->q_state)) + continue; + addrs[i++] = q; + } + qsort(addrs, naddrs, sizeof(ADDRESS *), q_qgrp_compare); + + /* split into multiple envelopes, by queue group */ + nsplits = 0; + es = NULL; + e->e_sendqueue = NULL; + for (i = 0; i < naddrs; ++i) + { + if (i == naddrs - 1 || addrs[i]->q_qgrp != addrs[i + 1]->q_qgrp) + addrs[i]->q_next = NULL; + else + addrs[i]->q_next = addrs[i + 1]; + + /* same queue group as original envelope? */ + if (addrs[i]->q_qgrp == e->e_qgrp) + { + if (e->e_sendqueue == NULL) + e->e_sendqueue = addrs[i]; + continue; + } + + /* different queue group than original envelope */ + if (es == NULL || addrs[i]->q_qgrp != es->e_qgrp) + { + ee = split_env(e, addrs[i], addrs[i]->q_qgrp, NOQDIR); + es = ee; + splits[nsplits++] = ee; + } + } + + /* no splits? return right now. */ + if (nsplits <= 0) + return SM_SPLIT_NONE; + + /* assign a queue directory to each additional envelope */ + for (i = 0; i < nsplits; ++i) + { + es = splits[i]; +#if 0 + es->e_qdir = pickqdir(Queue[es->e_qgrp], es->e_msgsize, es); +#endif /* 0 */ + if (!setnewqueue(es)) + goto failure; + } + + /* sort the additional envelopes by queue file system */ + qsort(splits, nsplits, sizeof(ENVELOPE *), e_filesys_compare); + + /* create data files for each additional envelope */ + if (!dup_df(e, splits[0])) + { + i = 0; + goto failure; + } + for (i = 1; i < nsplits; ++i) + { + /* copy or link to the previous data file */ + if (!dup_df(splits[i - 1], splits[i])) + goto failure; + } + + /* success: prepend the new envelopes to the e->e_sibling list */ + for (i = 0; i < nsplits; ++i) + { + es = splits[i]; + es->e_sibling = e->e_sibling; + e->e_sibling = es; + } + return SM_SPLIT_NEW(nsplits); + + /* failure: clean up */ + failure: + if (i > 0) + { + int j; + + for (j = 0; j < i; j++) + (void) unlink(queuename(splits[j], DATAFL_LETTER)); + } + e->e_sendqueue = addrs[0]; + for (i = 0; i < naddrs - 1; ++i) + addrs[i]->q_next = addrs[i + 1]; + addrs[naddrs - 1]->q_next = NULL; + return SM_SPLIT_FAIL; +} + +/* +** SPLIT_WITHIN_QUEUE +** +** Split an envelope with multiple recipients into several +** envelopes within the same queue directory, if the number of +** recipients exceeds the limit for the queue group. +** +** Parameters: +** e -- envelope. +** +** Results: +** SM_SPLIT_FAIL on failure +** SM_SPLIT_NONE if no splitting occurred, +** or 1 + the number of additional envelopes created. +*/ + +#define SPLIT_LOG_LEVEL 8 + +static int split_within_queue __P((ENVELOPE *)); + +static int +split_within_queue(e) + ENVELOPE *e; +{ + int maxrcpt, nrcpt, ndead, nsplit, i; + int j, l; + char *lsplits; + ADDRESS *q, **addrs; + ENVELOPE *ee, *firstsibling; + + if (!ISVALIDQGRP(e->e_qgrp) || bitset(EF_SPLIT, e->e_flags)) + return SM_SPLIT_NONE; + + /* don't bother if there is no recipient limit */ + maxrcpt = Queue[e->e_qgrp]->qg_maxrcpt; + if (maxrcpt <= 0) + return SM_SPLIT_NONE; + + /* count recipients */ + nrcpt = 0; + for (q = e->e_sendqueue; q != NULL; q = q->q_next) + { + if (QS_IS_DEAD(q->q_state)) + continue; + ++nrcpt; + } + if (nrcpt <= maxrcpt) + return SM_SPLIT_NONE; + + /* + ** Preserve the recipient list + ** so that we can restore it in case of error. + ** (But we discard dead addresses.) + */ + + addrs = sm_rpool_malloc_x(e->e_rpool, nrcpt * sizeof(ADDRESS *)); + for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next) + { + if (QS_IS_DEAD(q->q_state)) + continue; + addrs[i++] = q; + } + + /* + ** Partition the recipient list so that bad and sent addresses + ** come first. These will go with the original envelope, and + ** do not count towards the maxrcpt limit. + ** addrs[] does not contain QS_IS_DEAD() addresses. + */ + + ndead = 0; + for (i = 0; i < nrcpt; ++i) + { + if (QS_IS_BADADDR(addrs[i]->q_state) || + QS_IS_SENT(addrs[i]->q_state) || + QS_IS_DEAD(addrs[i]->q_state)) /* for paranoia's sake */ + { + if (i > ndead) + { + ADDRESS *tmp = addrs[i]; + + addrs[i] = addrs[ndead]; + addrs[ndead] = tmp; + } + ++ndead; + } + } + + /* Check if no splitting required. */ + if (nrcpt - ndead <= maxrcpt) + return SM_SPLIT_NONE; + + /* fix links */ + for (i = 0; i < nrcpt - 1; ++i) + addrs[i]->q_next = addrs[i + 1]; + addrs[nrcpt - 1]->q_next = NULL; + e->e_sendqueue = addrs[0]; + + /* prepare buffer for logging */ + if (LogLevel > SPLIT_LOG_LEVEL) + { + l = MAXLINE; + lsplits = sm_malloc(l); + if (lsplits != NULL) + *lsplits = '\0'; + j = 0; + } + else + { + /* get rid of stupid compiler warnings */ + lsplits = NULL; + j = l = 0; + } + + /* split the envelope */ + firstsibling = e->e_sibling; + i = maxrcpt + ndead; + nsplit = 0; + for (;;) + { + addrs[i - 1]->q_next = NULL; + ee = split_env(e, addrs[i], e->e_qgrp, e->e_qdir); + if (!dup_df(e, ee)) + { + + ee = firstsibling; + while (ee != NULL) + { + (void) unlink(queuename(ee, DATAFL_LETTER)); + ee = ee->e_sibling; + } + + /* Error. Restore e's sibling & recipient lists. */ + e->e_sibling = firstsibling; + for (i = 0; i < nrcpt - 1; ++i) + addrs[i]->q_next = addrs[i + 1]; + return SM_SPLIT_FAIL; + } + + /* prepend the new envelope to e->e_sibling */ + ee->e_sibling = e->e_sibling; + e->e_sibling = ee; + ++nsplit; + if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL) + { + if (j >= l - strlen(ee->e_id) - 3) + { + char *p; + + l += MAXLINE; + p = sm_realloc(lsplits, l); + if (p == NULL) + { + /* let's try to get this done */ + sm_free(lsplits); + lsplits = NULL; + } + else + lsplits = p; + } + if (lsplits != NULL) + { + if (j == 0) + j += sm_strlcat(lsplits + j, + ee->e_id, + l - j); + else + j += sm_strlcat2(lsplits + j, + "; ", + ee->e_id, + l - j); + SM_ASSERT(j < l); + } + } + if (nrcpt - i <= maxrcpt) + break; + i += maxrcpt; + } + if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL && nsplit > 0) + { + sm_syslog(LOG_NOTICE, e->e_id, + "split: maxrcpts=%d, rcpts=%d, count=%d, id%s=%s", + maxrcpt, nrcpt - ndead, nsplit, + nsplit > 1 ? "s" : "", lsplits); + sm_free(lsplits); + } + return SM_SPLIT_NEW(nsplit); +} +/* +** SPLIT_BY_RECIPIENT +** +** Split an envelope with multiple recipients into multiple +** envelopes as required by the sendmail configuration. +** +** Parameters: +** e -- envelope. +** +** Results: +** Returns true on success, false on failure. +** +** Side Effects: +** see split_across_queue_groups(), split_within_queue(e) +*/ + +bool +split_by_recipient(e) + ENVELOPE *e; +{ + int split, n, i, j, l; + char *lsplits; + ENVELOPE *ee, *next, *firstsibling; + + if (OpMode == SM_VERIFY || !ISVALIDQGRP(e->e_qgrp) || + bitset(EF_SPLIT, e->e_flags)) + return true; + n = split_across_queue_groups(e); + if (n == SM_SPLIT_FAIL) + return false; + firstsibling = ee = e->e_sibling; + if (n > 1 && LogLevel > SPLIT_LOG_LEVEL) + { + l = MAXLINE; + lsplits = sm_malloc(l); + if (lsplits != NULL) + *lsplits = '\0'; + j = 0; + } + else + { + /* get rid of stupid compiler warnings */ + lsplits = NULL; + j = l = 0; + } + for (i = 1; i < n; ++i) + { + next = ee->e_sibling; + if (split_within_queue(ee) == SM_SPLIT_FAIL) + { + e->e_sibling = firstsibling; + return false; + } + ee->e_flags |= EF_SPLIT; + if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL) + { + if (j >= l - strlen(ee->e_id) - 3) + { + char *p; + + l += MAXLINE; + p = sm_realloc(lsplits, l); + if (p == NULL) + { + /* let's try to get this done */ + sm_free(lsplits); + lsplits = NULL; + } + else + lsplits = p; + } + if (lsplits != NULL) + { + if (j == 0) + j += sm_strlcat(lsplits + j, + ee->e_id, l - j); + else + j += sm_strlcat2(lsplits + j, "; ", + ee->e_id, l - j); + SM_ASSERT(j < l); + } + } + ee = next; + } + if (LogLevel > SPLIT_LOG_LEVEL && lsplits != NULL && n > 1) + { + sm_syslog(LOG_NOTICE, e->e_id, "split: count=%d, id%s=%s", + n - 1, n > 2 ? "s" : "", lsplits); + sm_free(lsplits); + } + split = split_within_queue(e) != SM_SPLIT_FAIL; + if (split) + e->e_flags |= EF_SPLIT; + return split; +} + +#if _FFR_QUARANTINE +/* +** QUARANTINE_QUEUE_ITEM -- {un,}quarantine a single envelope +** +** Add/remove quarantine reason and requeue appropriately. +** +** Parameters: +** qgrp -- queue group for the item +** qdir -- queue directory in the given queue group +** e -- envelope information for the item +** reason -- quarantine reason, NULL means unquarantine. +** +** Results: +** true if item changed, false otherwise +** +** Side Effects: +** Changes quarantine tag in queue file and renames it. +*/ + +static bool +quarantine_queue_item(qgrp, qdir, e, reason) + int qgrp; + int qdir; + ENVELOPE *e; + char *reason; +{ + bool dirty = false; + bool failing = false; + bool foundq = false; + bool finished = false; + int fd; + int flags; + int oldtype; + int newtype; + int save_errno; + MODE_T oldumask = 0; + SM_FILE_T *oldqfp, *tempqfp; + char *bp; + char oldqf[MAXPATHLEN]; + char tempqf[MAXPATHLEN]; + char newqf[MAXPATHLEN]; + char buf[MAXLINE]; + + oldtype = queue_letter(e, ANYQFL_LETTER); + (void) sm_strlcpy(oldqf, queuename(e, ANYQFL_LETTER), sizeof oldqf); + (void) sm_strlcpy(tempqf, queuename(e, NEWQFL_LETTER), sizeof tempqf); + + /* + ** Instead of duplicating all the open + ** and lock code here, tell readqf() to + ** do that work and return the open + ** file pointer in e_lockfp. Note that + ** we must release the locks properly when + ** we are done. + */ + + if (!readqf(e, true)) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s\n", qid_printname(e)); + return false; + } + oldqfp = e->e_lockfp; + + /* open the new queue file */ + flags = O_CREAT|O_WRONLY|O_EXCL; + if (bitset(S_IWGRP, QueueFileMode)) + oldumask = umask(002); + fd = open(tempqf, flags, QueueFileMode); + if (bitset(S_IWGRP, QueueFileMode)) + (void) umask(oldumask); + RELEASE_QUEUE; + + if (fd < 0) + { + save_errno = errno; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Could not open %s: %s\n", + qid_printname(e), tempqf, + sm_errstring(save_errno)); + (void) sm_io_close(oldqfp, SM_TIME_DEFAULT); + return false; + } + if (!lockfile(fd, tempqf, NULL, LOCK_EX|LOCK_NB)) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Could not lock %s\n", + qid_printname(e), tempqf); + (void) close(fd); + (void) sm_io_close(oldqfp, SM_TIME_DEFAULT); + return false; + } + + tempqfp = sm_io_open(SmFtStdiofd, SM_TIME_DEFAULT, (void *) &fd, + SM_IO_WRONLY, NULL); + if (tempqfp == NULL) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Could not lock %s\n", + qid_printname(e), tempqf); + (void) close(fd); + (void) sm_io_close(oldqfp, SM_TIME_DEFAULT); + return false; + } + + /* Copy the data over, changing the quarantine reason */ + while ((bp = fgetfolded(buf, sizeof buf, oldqfp)) != NULL) + { + if (tTd(40, 4)) + sm_dprintf("+++++ %s\n", bp); + switch (bp[0]) + { + case 'q': /* quarantine reason */ + foundq = true; + if (reason == NULL) + { + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%s: Removed quarantine of \"%s\"\n", + e->e_id, &bp[1]); + } + sm_syslog(LOG_INFO, e->e_id, "unquarantine"); + dirty = true; + continue; + } + else if (strcmp(reason, &bp[1]) == 0) + { + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%s: Already quarantined with \"%s\"\n", + e->e_id, reason); + } + (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT, + "q%s\n", reason); + } + else + { + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%s: Quarantine changed from \"%s\" to \"%s\"\n", + e->e_id, &bp[1], + reason); + } + (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT, + "q%s\n", reason); + sm_syslog(LOG_INFO, e->e_id, "quarantine=%s", + reason); + dirty = true; + } + break; + + case 'R': + /* + ** If we are quarantining an unquarantined item, + ** need to put in a new 'q' line before it's + ** too late. + */ + + if (!foundq && reason != NULL) + { + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, + "%s: Quarantined with \"%s\"\n", + e->e_id, reason); + } + (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT, + "q%s\n", reason); + sm_syslog(LOG_INFO, e->e_id, "quarantine=%s", + reason); + foundq = true; + dirty = true; + } + + /* Copy the line to the new file */ + (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT, + "%s\n", bp); + break; + + case '.': + finished = true; + /* FALLTHROUGH */ + + default: + /* Copy the line to the new file */ + (void) sm_io_fprintf(tempqfp, SM_TIME_DEFAULT, + "%s\n", bp); + break; + } + } + + /* Make sure we read the whole old file */ + errno = sm_io_error(tempqfp); + if (errno != 0 && errno != SM_IO_EOF) + { + save_errno = errno; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Error reading %s: %s\n", + qid_printname(e), oldqf, + sm_errstring(save_errno)); + failing = true; + } + + if (!failing && !finished) + { + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Incomplete file: %s\n", + qid_printname(e), oldqf); + failing = true; + } + + /* Check if we actually changed anything or we can just bail now */ + if (!dirty) + { + /* pretend we failed, even though we technically didn't */ + failing = true; + } + + /* Make sure we wrote things out safely */ + if (!failing && + (sm_io_flush(tempqfp, SM_TIME_DEFAULT) != 0 || + ((SuperSafe == SAFE_REALLY || SuperSafe == SAFE_INTERACTIVE) && + fsync(sm_io_getinfo(tempqfp, SM_IO_WHAT_FD, NULL)) < 0) || + ((errno = sm_io_error(tempqfp)) != 0))) + { + save_errno = errno; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Skipping %s: Error writing %s: %s\n", + qid_printname(e), tempqf, + sm_errstring(save_errno)); + failing = true; + } + + + /* Figure out the new filename */ + newtype = (reason == NULL ? NORMQF_LETTER : QUARQF_LETTER); + if (oldtype == newtype) + { + /* going to rename tempqf to oldqf */ + (void) sm_strlcpy(newqf, oldqf, sizeof newqf); + } + else + { + /* going to rename tempqf to new name based on newtype */ + (void) sm_strlcpy(newqf, queuename(e, newtype), sizeof newqf); + } + + save_errno = 0; + + /* rename tempqf to newqf */ + if (!failing && + rename(tempqf, newqf) < 0) + save_errno = (errno == 0) ? EINVAL : errno; + + /* Check rename() success */ + if (!failing && save_errno != 0) + { + sm_syslog(LOG_DEBUG, e->e_id, + "quarantine_queue_item: rename(%s, %s): %s", + tempqf, newqf, sm_errstring(save_errno)); + + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Error renaming %s to %s: %s\n", + tempqf, newqf, + sm_errstring(save_errno)); + if (oldtype == newtype) + { + /* + ** Bail here since we don't know the state of + ** the filesystem and may need to keep tempqf + ** for the user to rescue us. + */ + + RELEASE_QUEUE; + errno = save_errno; + syserr("!452 Error renaming control file %s", tempqf); + /* NOTREACHED */ + } + else + { + /* remove new file (if rename() half completed) */ + if (xunlink(newqf) < 0) + { + save_errno = errno; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Error removing %s: %s\n", + newqf, + sm_errstring(save_errno)); + } + + /* tempqf removed below */ + failing = true; + } + + } + + /* If changing file types, need to remove old type */ + if (!failing && oldtype != newtype) + { + if (xunlink(oldqf) < 0) + { + save_errno = errno; + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Error removing %s: %s\n", + oldqf, sm_errstring(save_errno)); + } + } + + /* see if anything above failed */ + if (failing) + { + /* Something failed: remove new file, old file still there */ + (void) xunlink(tempqf); + } + + /* + ** fsync() after file operations to make sure metadata is + ** written to disk on filesystems in which renames are + ** not guaranteed. It's ok if they fail, mail won't be lost. + */ + + if (SuperSafe != SAFE_NO) + { + /* for soft-updates */ + (void) fsync(sm_io_getinfo(tempqfp, + SM_IO_WHAT_FD, NULL)); + + if (!failing) + { + /* for soft-updates */ + (void) fsync(sm_io_getinfo(oldqfp, + SM_IO_WHAT_FD, NULL)); + } + + /* for other odd filesystems */ + SYNC_DIR(tempqf, false); + } + + /* Close up shop */ + RELEASE_QUEUE; + if (tempqfp != NULL) + (void) sm_io_close(tempqfp, SM_TIME_DEFAULT); + if (oldqfp != NULL) + (void) sm_io_close(oldqfp, SM_TIME_DEFAULT); + + /* All went well */ + return !failing; +} + +/* +** QUARANTINE_QUEUE -- {un,}quarantine matching items in the queue +** +** Read all matching queue items, add/remove quarantine +** reason, and requeue appropriately. +** +** Parameters: +** reason -- quarantine reason, "." means unquarantine. +** qgrplimit -- limit to single queue group unless NOQGRP +** +** Results: +** none. +** +** Side Effects: +** Lots of changes to the queue. +*/ + +void +quarantine_queue(reason, qgrplimit) + char *reason; + int qgrplimit; +{ + int changed = 0; + int qgrp; + + /* Convert internal representation of unquarantine */ + if (reason != NULL && reason[0] == '.' && reason[1] == '\0') + reason = NULL; + + if (reason != NULL) + { + /* clean it */ + reason = newstr(denlstring(reason, true, true)); + } + + for (qgrp = 0; qgrp < NumQueue && Queue[qgrp] != NULL; qgrp++) + { + int qdir; + + if (qgrplimit != NOQGRP && qgrplimit != qgrp) + continue; + + for (qdir = 0; qdir < Queue[qgrp]->qg_numqueues; qdir++) + { + int i; + int nrequests; + + if (StopRequest) + stop_sendmail(); + + nrequests = gatherq(qgrp, qdir, true, NULL, NULL); + + /* first see if there is anything */ + if (nrequests <= 0) + { + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, "%s: no matches\n", + qid_printqueue(qgrp, qdir)); + } + continue; + } + + if (Verbose) + { + (void) sm_io_fprintf(smioout, + SM_TIME_DEFAULT, "Processing %s:\n", + qid_printqueue(qgrp, qdir)); + } + + for (i = 0; i < WorkListCount; i++) + { + ENVELOPE e; + + if (StopRequest) + stop_sendmail(); + + /* setup envelope */ + clearenvelope(&e, true, sm_rpool_new_x(NULL)); + e.e_id = WorkList[i].w_name + 2; + e.e_qgrp = qgrp; + e.e_qdir = qdir; + + if (tTd(70, 101)) + { + sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "Would do %s\n", e.e_id); + changed++; + } + else if (quarantine_queue_item(qgrp, qdir, + &e, reason)) + changed++; + + /* clean up */ + sm_rpool_free(e.e_rpool); + e.e_rpool = NULL; + } + if (WorkList != NULL) + sm_free(WorkList); /* XXX */ + WorkList = NULL; + WorkListSize = 0; + WorkListCount = 0; + } + } + if (Verbose) + { + if (changed == 0) + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "No changes\n"); + else + (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, + "%d change%s\n", + changed, + changed == 1 ? "" : "s"); + } +} +#endif /* _FFR_QUARANTINE */ |