summaryrefslogtreecommitdiffstats
path: root/sys/kern/kern_alq.c
diff options
context:
space:
mode:
authorjeff <jeff@FreeBSD.org>2002-09-22 07:11:14 +0000
committerjeff <jeff@FreeBSD.org>2002-09-22 07:11:14 +0000
commitac508fd5a56dbf8a567a69379d6cdbd1ab586ea2 (patch)
tree3b5c1a4b7dbe83eac2c0713fa153fa2ee4448971 /sys/kern/kern_alq.c
parentcd3515dad9bbfb7bda91400a4b42e2ad68b755e9 (diff)
downloadFreeBSD-src-ac508fd5a56dbf8a567a69379d6cdbd1ab586ea2.zip
FreeBSD-src-ac508fd5a56dbf8a567a69379d6cdbd1ab586ea2.tar.gz
- Add an asynchronous fixed length record logging mechanism called
ALQ (Asynch. Logging Queues). ALQ supports many seperate queues with different record and buffer sizes. It opens and logs to any vnode so it can be used with character devices as well as regular files. Reviewed in part by: phk, jake, markm
Diffstat (limited to 'sys/kern/kern_alq.c')
-rw-r--r--sys/kern/kern_alq.c499
1 files changed, 499 insertions, 0 deletions
diff --git a/sys/kern/kern_alq.c b/sys/kern/kern_alq.c
new file mode 100644
index 0000000..394b6c7
--- /dev/null
+++ b/sys/kern/kern_alq.c
@@ -0,0 +1,499 @@
+/*
+ * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice unmodified, this list of conditions, and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ *
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/kernel.h>
+#include <sys/kthread.h>
+#include <sys/lock.h>
+#include <sys/mutex.h>
+#include <sys/namei.h>
+#include <sys/proc.h>
+#include <sys/vnode.h>
+#include <sys/alq.h>
+#include <sys/malloc.h>
+#include <sys/unistd.h>
+#include <sys/fcntl.h>
+#include <sys/eventhandler.h>
+
+/* Async. Logging Queue */
+struct alq {
+ int aq_entmax; /* Max entries */
+ int aq_entlen; /* Entry length */
+ char *aq_entbuf; /* Buffer for stored entries */
+ int aq_flags; /* Queue flags */
+ struct mtx aq_mtx; /* Queue lock */
+ struct vnode *aq_vp; /* Open vnode handle */
+ struct thread *aq_td; /* Thread that opened the vnode */
+ struct ale *aq_first; /* First ent */
+ struct ale *aq_entfree; /* First free ent */
+ struct ale *aq_entvalid; /* First ent valid for writing */
+ LIST_ENTRY(alq) aq_act; /* List of active queues */
+ LIST_ENTRY(alq) aq_link; /* List of all queues */
+};
+
+#define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */
+#define AQ_ACTIVE 0x0002 /* on the active list */
+#define AQ_FLUSHING 0x0004 /* doing IO */
+#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */
+
+#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
+#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
+
+static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
+
+/*
+ * The ald_mtx protects the ald_queues list and the ald_active list.
+ */
+static struct mtx ald_mtx;
+static LIST_HEAD(, alq) ald_queues;
+static LIST_HEAD(, alq) ald_active;
+static struct proc *ald_thread;
+static int ald_shutingdown = 0;
+
+#define ALD_LOCK() mtx_lock(&ald_mtx)
+#define ALD_UNLOCK() mtx_unlock(&ald_mtx)
+
+/* Daemon functions */
+static int ald_add(struct alq *);
+static int ald_rem(struct alq *);
+static void ald_startup(void *);
+static void ald_daemon(void);
+static void ald_shutdown(void *, int);
+static void ald_activate(struct alq *);
+static void ald_deactivate(struct alq *);
+
+/* Internal queue functions */
+static void alq_shutdown(struct alq *);
+static int alq_doio(struct alq *);
+
+
+/*
+ * Add a new queue to the global list. Fail if we're shutting down.
+ */
+static int
+ald_add(struct alq *alq)
+{
+ int error;
+
+ error = 0;
+
+ ALD_LOCK();
+ if (ald_shutingdown) {
+ error = EBUSY;
+ goto done;
+ }
+ LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
+done:
+ ALD_UNLOCK();
+ return (error);
+}
+
+/*
+ * Remove a queue from the global list unless we're shutting down. If so,
+ * the ald will take care of cleaning up it's resources.
+ */
+static int
+ald_rem(struct alq *alq)
+{
+ int error;
+
+ error = 0;
+
+ ALD_LOCK();
+ if (ald_shutingdown) {
+ error = EBUSY;
+ goto done;
+ }
+ LIST_REMOVE(alq, aq_link);
+done:
+ ALD_UNLOCK();
+ return (error);
+}
+
+/*
+ * Put a queue on the active list. This will schedule it for writing.
+ */
+static void
+ald_activate(struct alq *alq)
+{
+ LIST_INSERT_HEAD(&ald_active, alq, aq_act);
+ wakeup(&ald_active);
+}
+
+static void
+ald_deactivate(struct alq *alq)
+{
+ LIST_REMOVE(alq, aq_act);
+ alq->aq_flags &= ~AQ_ACTIVE;
+}
+
+static void
+ald_startup(void *unused)
+{
+ mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
+ LIST_INIT(&ald_queues);
+ LIST_INIT(&ald_active);
+}
+
+static void
+ald_daemon(void)
+{
+ int needwakeup;
+ struct alq *alq;
+
+ mtx_lock(&Giant);
+
+ EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL,
+ SHUTDOWN_PRI_FIRST);
+
+ ALD_LOCK();
+
+ for (;;) {
+ while ((alq = LIST_FIRST(&ald_active)) == NULL)
+ msleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
+
+ ALQ_LOCK(alq);
+ ald_deactivate(alq);
+ ALD_UNLOCK();
+ needwakeup = alq_doio(alq);
+ ALQ_UNLOCK(alq);
+ if (needwakeup)
+ wakeup(alq);
+ ALD_LOCK();
+ }
+}
+
+static void
+ald_shutdown(void *arg, int howto)
+{
+ struct alq *alq;
+
+ ALD_LOCK();
+ ald_shutingdown = 1;
+
+ while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
+ LIST_REMOVE(alq, aq_link);
+ ALD_UNLOCK();
+ alq_shutdown(alq);
+ ALD_LOCK();
+ }
+ ALD_UNLOCK();
+}
+
+static void
+alq_shutdown(struct alq *alq)
+{
+ ALQ_LOCK(alq);
+
+ /* Stop any new writers. */
+ alq->aq_flags |= AQ_SHUTDOWN;
+
+ /* Drain IO */
+ while (alq->aq_flags & (AQ_FLUSHING|AQ_ACTIVE)) {
+ alq->aq_flags |= AQ_WANTED;
+ ALQ_UNLOCK(alq);
+ tsleep(alq, PWAIT, "aldclose", 0);
+ ALQ_LOCK(alq);
+ }
+ ALQ_UNLOCK(alq);
+
+ vn_close(alq->aq_vp, FREAD|FWRITE, alq->aq_td->td_ucred,
+ alq->aq_td);
+}
+
+/*
+ * Flush all pending data to disk. This operation will block.
+ */
+static int
+alq_doio(struct alq *alq)
+{
+ struct thread *td;
+ struct mount *mp;
+ struct vnode *vp;
+ struct uio auio;
+ struct iovec aiov[2];
+ struct ale *ale;
+ struct ale *alstart;
+ int totlen;
+ int iov;
+
+ vp = alq->aq_vp;
+ td = curthread;
+ totlen = 0;
+ iov = 0;
+
+ alstart = ale = alq->aq_entvalid;
+ alq->aq_entvalid = NULL;
+
+ bzero(&aiov, sizeof(aiov));
+ bzero(&auio, sizeof(auio));
+
+ do {
+ if (aiov[iov].iov_base == NULL)
+ aiov[iov].iov_base = ale->ae_data;
+ aiov[iov].iov_len += alq->aq_entlen;
+ totlen += alq->aq_entlen;
+ /* Check to see if we're wrapping the buffer */
+ if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
+ iov++;
+ ale->ae_flags &= ~AE_VALID;
+ ale = ale->ae_next;
+ } while (ale->ae_flags & AE_VALID);
+
+ alq->aq_flags |= AQ_FLUSHING;
+ ALQ_UNLOCK(alq);
+
+ if (iov == 2 || aiov[iov].iov_base == NULL)
+ iov--;
+
+ auio.uio_iov = &aiov[0];
+ auio.uio_offset = 0;
+ auio.uio_segflg = UIO_SYSSPACE;
+ auio.uio_rw = UIO_WRITE;
+ auio.uio_iovcnt = iov + 1;
+ auio.uio_resid = totlen;
+ auio.uio_td = td;
+
+ /*
+ * Do all of the junk required to write now.
+ */
+ vn_start_write(vp, &mp, V_WAIT);
+ vn_lock(vp, LK_EXCLUSIVE | LK_RETRY, td);
+ VOP_LEASE(vp, td, td->td_ucred, LEASE_WRITE);
+ /* XXX error ignored */
+ VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, td->td_ucred);
+ VOP_UNLOCK(vp, 0, td);
+ vn_finished_write(mp);
+
+ ALQ_LOCK(alq);
+ alq->aq_flags &= ~AQ_FLUSHING;
+
+ if (alq->aq_entfree == NULL)
+ alq->aq_entfree = alstart;
+
+ if (alq->aq_flags & AQ_WANTED) {
+ alq->aq_flags &= ~AQ_WANTED;
+ return (1);
+ }
+
+ return(0);
+}
+
+static struct kproc_desc ald_kp = {
+ "ALQ Daemon",
+ ald_daemon,
+ &ald_thread
+};
+
+SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp)
+SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL)
+
+
+/* User visible queue functions */
+
+/*
+ * Create the queue data structure, allocate the buffer, and open the file.
+ */
+int
+alq_open(struct alq **alqp, const char *file, int size, int count)
+{
+ struct thread *td;
+ struct nameidata nd;
+ struct ale *ale;
+ struct ale *alp;
+ struct alq *alq;
+ char *bufp;
+ int flags;
+ int error;
+ int i;
+
+ *alqp = NULL;
+ td = curthread;
+
+ NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td);
+ flags = FREAD | FWRITE | O_NOFOLLOW | O_CREAT;
+
+ error = vn_open(&nd, &flags, 0);
+ if (error)
+ return (error);
+
+ NDFREE(&nd, NDF_ONLY_PNBUF);
+ /* We just unlock so we hold a reference */
+ VOP_UNLOCK(nd.ni_vp, 0, td);
+
+ alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
+ alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
+ alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
+ alq->aq_vp = nd.ni_vp;
+ alq->aq_td = td;
+ alq->aq_entmax = count;
+ alq->aq_entlen = size;
+ alq->aq_entfree = alq->aq_first;
+
+ mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
+
+ bufp = alq->aq_entbuf;
+ ale = alq->aq_first;
+ alp = NULL;
+
+ /* Match up entries with buffers */
+ for (i = 0; i < count; i++) {
+ if (alp)
+ alp->ae_next = ale;
+ ale->ae_data = bufp;
+ alp = ale;
+ ale++;
+ bufp += size;
+ }
+
+ alp->ae_next = alq->aq_first;
+
+ if ((error = ald_add(alq)) != 0)
+ return (error);
+ *alqp = alq;
+
+ return (0);
+}
+
+/*
+ * Copy a new entry into the queue. If the operation would block either
+ * wait or return an error depending on the value of waitok.
+ */
+int
+alq_write(struct alq *alq, void *data, int waitok)
+{
+ struct ale *ale;
+
+ if ((ale = alq_get(alq, waitok)) == NULL)
+ return (EWOULDBLOCK);
+
+ bcopy(data, ale->ae_data, alq->aq_entlen);
+ alq_post(alq, ale);
+
+ return (0);
+}
+
+struct ale *
+alq_get(struct alq *alq, int waitok)
+{
+ struct ale *ale;
+ struct ale *aln;
+
+ ale = NULL;
+
+ ALQ_LOCK(alq);
+
+ /* Loop until we get an entry or we're shutting down */
+ while ((alq->aq_flags & AQ_SHUTDOWN) == 0 &&
+ (ale = alq->aq_entfree) == NULL &&
+ (waitok & ALQ_WAITOK)) {
+ alq->aq_flags |= AQ_WANTED;
+ ALQ_UNLOCK(alq);
+ tsleep(alq, PWAIT, "alqget", 0);
+ ALQ_LOCK(alq);
+ }
+
+ if (ale != NULL) {
+ aln = ale->ae_next;
+ if ((aln->ae_flags & AE_VALID) == 0)
+ alq->aq_entfree = aln;
+ } else
+ ALQ_UNLOCK(alq);
+
+
+ return (ale);
+}
+
+void
+alq_post(struct alq *alq, struct ale *ale)
+{
+ int activate;
+
+ ale->ae_flags |= AE_VALID;
+
+ if (alq->aq_entvalid == NULL)
+ alq->aq_entvalid = ale;
+
+ if ((alq->aq_flags & AQ_ACTIVE) == 0) {
+ alq->aq_flags |= AQ_ACTIVE;
+ activate = 1;
+ } else
+ activate = 0;
+
+ ALQ_UNLOCK(alq);
+ if (activate) {
+ ALD_LOCK();
+ ald_activate(alq);
+ ALD_UNLOCK();
+ }
+}
+
+void
+alq_flush(struct alq *alq)
+{
+ int needwakeup = 0;
+
+ ALD_LOCK();
+ ALQ_LOCK(alq);
+ if (alq->aq_flags & AQ_ACTIVE) {
+ ald_deactivate(alq);
+ ALD_UNLOCK();
+ needwakeup = alq_doio(alq);
+ } else
+ ALD_UNLOCK();
+ ALQ_UNLOCK(alq);
+
+ if (needwakeup)
+ wakeup(alq);
+}
+
+/*
+ * Flush remaining data, close the file and free all resources.
+ */
+void
+alq_close(struct alq *alq)
+{
+ /*
+ * If we're already shuting down someone else will flush and close
+ * the vnode.
+ */
+ if (ald_rem(alq) != 0)
+ return;
+
+ /*
+ * Drain all pending IO.
+ */
+ alq_shutdown(alq);
+
+ mtx_destroy(&alq->aq_mtx);
+ free(alq->aq_first, M_ALD);
+ free(alq->aq_entbuf, M_ALD);
+ free(alq, M_ALD);
+}
OpenPOWER on IntegriCloud