diff options
-rw-r--r-- | sys/kern/kern_alq.c | 499 | ||||
-rw-r--r-- | sys/sys/alq.h | 117 |
2 files changed, 616 insertions, 0 deletions
diff --git a/sys/kern/kern_alq.c b/sys/kern/kern_alq.c new file mode 100644 index 0000000..394b6c7 --- /dev/null +++ b/sys/kern/kern_alq.c @@ -0,0 +1,499 @@ +/* + * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + * + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/kernel.h> +#include <sys/kthread.h> +#include <sys/lock.h> +#include <sys/mutex.h> +#include <sys/namei.h> +#include <sys/proc.h> +#include <sys/vnode.h> +#include <sys/alq.h> +#include <sys/malloc.h> +#include <sys/unistd.h> +#include <sys/fcntl.h> +#include <sys/eventhandler.h> + +/* Async. Logging Queue */ +struct alq { + int aq_entmax; /* Max entries */ + int aq_entlen; /* Entry length */ + char *aq_entbuf; /* Buffer for stored entries */ + int aq_flags; /* Queue flags */ + struct mtx aq_mtx; /* Queue lock */ + struct vnode *aq_vp; /* Open vnode handle */ + struct thread *aq_td; /* Thread that opened the vnode */ + struct ale *aq_first; /* First ent */ + struct ale *aq_entfree; /* First free ent */ + struct ale *aq_entvalid; /* First ent valid for writing */ + LIST_ENTRY(alq) aq_act; /* List of active queues */ + LIST_ENTRY(alq) aq_link; /* List of all queues */ +}; + +#define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ +#define AQ_ACTIVE 0x0002 /* on the active list */ +#define AQ_FLUSHING 0x0004 /* doing IO */ +#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ + +#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) +#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) + +static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); + +/* + * The ald_mtx protects the ald_queues list and the ald_active list. + */ +static struct mtx ald_mtx; +static LIST_HEAD(, alq) ald_queues; +static LIST_HEAD(, alq) ald_active; +static struct proc *ald_thread; +static int ald_shutingdown = 0; + +#define ALD_LOCK() mtx_lock(&ald_mtx) +#define ALD_UNLOCK() mtx_unlock(&ald_mtx) + +/* Daemon functions */ +static int ald_add(struct alq *); +static int ald_rem(struct alq *); +static void ald_startup(void *); +static void ald_daemon(void); +static void ald_shutdown(void *, int); +static void ald_activate(struct alq *); +static void ald_deactivate(struct alq *); + +/* Internal queue functions */ +static void alq_shutdown(struct alq *); +static int alq_doio(struct alq *); + + +/* + * Add a new queue to the global list. Fail if we're shutting down. + */ +static int +ald_add(struct alq *alq) +{ + int error; + + error = 0; + + ALD_LOCK(); + if (ald_shutingdown) { + error = EBUSY; + goto done; + } + LIST_INSERT_HEAD(&ald_queues, alq, aq_link); +done: + ALD_UNLOCK(); + return (error); +} + +/* + * Remove a queue from the global list unless we're shutting down. If so, + * the ald will take care of cleaning up it's resources. + */ +static int +ald_rem(struct alq *alq) +{ + int error; + + error = 0; + + ALD_LOCK(); + if (ald_shutingdown) { + error = EBUSY; + goto done; + } + LIST_REMOVE(alq, aq_link); +done: + ALD_UNLOCK(); + return (error); +} + +/* + * Put a queue on the active list. This will schedule it for writing. + */ +static void +ald_activate(struct alq *alq) +{ + LIST_INSERT_HEAD(&ald_active, alq, aq_act); + wakeup(&ald_active); +} + +static void +ald_deactivate(struct alq *alq) +{ + LIST_REMOVE(alq, aq_act); + alq->aq_flags &= ~AQ_ACTIVE; +} + +static void +ald_startup(void *unused) +{ + mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); + LIST_INIT(&ald_queues); + LIST_INIT(&ald_active); +} + +static void +ald_daemon(void) +{ + int needwakeup; + struct alq *alq; + + mtx_lock(&Giant); + + EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL, + SHUTDOWN_PRI_FIRST); + + ALD_LOCK(); + + for (;;) { + while ((alq = LIST_FIRST(&ald_active)) == NULL) + msleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); + + ALQ_LOCK(alq); + ald_deactivate(alq); + ALD_UNLOCK(); + needwakeup = alq_doio(alq); + ALQ_UNLOCK(alq); + if (needwakeup) + wakeup(alq); + ALD_LOCK(); + } +} + +static void +ald_shutdown(void *arg, int howto) +{ + struct alq *alq; + + ALD_LOCK(); + ald_shutingdown = 1; + + while ((alq = LIST_FIRST(&ald_queues)) != NULL) { + LIST_REMOVE(alq, aq_link); + ALD_UNLOCK(); + alq_shutdown(alq); + ALD_LOCK(); + } + ALD_UNLOCK(); +} + +static void +alq_shutdown(struct alq *alq) +{ + ALQ_LOCK(alq); + + /* Stop any new writers. */ + alq->aq_flags |= AQ_SHUTDOWN; + + /* Drain IO */ + while (alq->aq_flags & (AQ_FLUSHING|AQ_ACTIVE)) { + alq->aq_flags |= AQ_WANTED; + ALQ_UNLOCK(alq); + tsleep(alq, PWAIT, "aldclose", 0); + ALQ_LOCK(alq); + } + ALQ_UNLOCK(alq); + + vn_close(alq->aq_vp, FREAD|FWRITE, alq->aq_td->td_ucred, + alq->aq_td); +} + +/* + * Flush all pending data to disk. This operation will block. + */ +static int +alq_doio(struct alq *alq) +{ + struct thread *td; + struct mount *mp; + struct vnode *vp; + struct uio auio; + struct iovec aiov[2]; + struct ale *ale; + struct ale *alstart; + int totlen; + int iov; + + vp = alq->aq_vp; + td = curthread; + totlen = 0; + iov = 0; + + alstart = ale = alq->aq_entvalid; + alq->aq_entvalid = NULL; + + bzero(&aiov, sizeof(aiov)); + bzero(&auio, sizeof(auio)); + + do { + if (aiov[iov].iov_base == NULL) + aiov[iov].iov_base = ale->ae_data; + aiov[iov].iov_len += alq->aq_entlen; + totlen += alq->aq_entlen; + /* Check to see if we're wrapping the buffer */ + if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data) + iov++; + ale->ae_flags &= ~AE_VALID; + ale = ale->ae_next; + } while (ale->ae_flags & AE_VALID); + + alq->aq_flags |= AQ_FLUSHING; + ALQ_UNLOCK(alq); + + if (iov == 2 || aiov[iov].iov_base == NULL) + iov--; + + auio.uio_iov = &aiov[0]; + auio.uio_offset = 0; + auio.uio_segflg = UIO_SYSSPACE; + auio.uio_rw = UIO_WRITE; + auio.uio_iovcnt = iov + 1; + auio.uio_resid = totlen; + auio.uio_td = td; + + /* + * Do all of the junk required to write now. + */ + vn_start_write(vp, &mp, V_WAIT); + vn_lock(vp, LK_EXCLUSIVE | LK_RETRY, td); + VOP_LEASE(vp, td, td->td_ucred, LEASE_WRITE); + /* XXX error ignored */ + VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, td->td_ucred); + VOP_UNLOCK(vp, 0, td); + vn_finished_write(mp); + + ALQ_LOCK(alq); + alq->aq_flags &= ~AQ_FLUSHING; + + if (alq->aq_entfree == NULL) + alq->aq_entfree = alstart; + + if (alq->aq_flags & AQ_WANTED) { + alq->aq_flags &= ~AQ_WANTED; + return (1); + } + + return(0); +} + +static struct kproc_desc ald_kp = { + "ALQ Daemon", + ald_daemon, + &ald_thread +}; + +SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp) +SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL) + + +/* User visible queue functions */ + +/* + * Create the queue data structure, allocate the buffer, and open the file. + */ +int +alq_open(struct alq **alqp, const char *file, int size, int count) +{ + struct thread *td; + struct nameidata nd; + struct ale *ale; + struct ale *alp; + struct alq *alq; + char *bufp; + int flags; + int error; + int i; + + *alqp = NULL; + td = curthread; + + NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td); + flags = FREAD | FWRITE | O_NOFOLLOW | O_CREAT; + + error = vn_open(&nd, &flags, 0); + if (error) + return (error); + + NDFREE(&nd, NDF_ONLY_PNBUF); + /* We just unlock so we hold a reference */ + VOP_UNLOCK(nd.ni_vp, 0, td); + + alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); + alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO); + alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO); + alq->aq_vp = nd.ni_vp; + alq->aq_td = td; + alq->aq_entmax = count; + alq->aq_entlen = size; + alq->aq_entfree = alq->aq_first; + + mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); + + bufp = alq->aq_entbuf; + ale = alq->aq_first; + alp = NULL; + + /* Match up entries with buffers */ + for (i = 0; i < count; i++) { + if (alp) + alp->ae_next = ale; + ale->ae_data = bufp; + alp = ale; + ale++; + bufp += size; + } + + alp->ae_next = alq->aq_first; + + if ((error = ald_add(alq)) != 0) + return (error); + *alqp = alq; + + return (0); +} + +/* + * Copy a new entry into the queue. If the operation would block either + * wait or return an error depending on the value of waitok. + */ +int +alq_write(struct alq *alq, void *data, int waitok) +{ + struct ale *ale; + + if ((ale = alq_get(alq, waitok)) == NULL) + return (EWOULDBLOCK); + + bcopy(data, ale->ae_data, alq->aq_entlen); + alq_post(alq, ale); + + return (0); +} + +struct ale * +alq_get(struct alq *alq, int waitok) +{ + struct ale *ale; + struct ale *aln; + + ale = NULL; + + ALQ_LOCK(alq); + + /* Loop until we get an entry or we're shutting down */ + while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && + (ale = alq->aq_entfree) == NULL && + (waitok & ALQ_WAITOK)) { + alq->aq_flags |= AQ_WANTED; + ALQ_UNLOCK(alq); + tsleep(alq, PWAIT, "alqget", 0); + ALQ_LOCK(alq); + } + + if (ale != NULL) { + aln = ale->ae_next; + if ((aln->ae_flags & AE_VALID) == 0) + alq->aq_entfree = aln; + } else + ALQ_UNLOCK(alq); + + + return (ale); +} + +void +alq_post(struct alq *alq, struct ale *ale) +{ + int activate; + + ale->ae_flags |= AE_VALID; + + if (alq->aq_entvalid == NULL) + alq->aq_entvalid = ale; + + if ((alq->aq_flags & AQ_ACTIVE) == 0) { + alq->aq_flags |= AQ_ACTIVE; + activate = 1; + } else + activate = 0; + + ALQ_UNLOCK(alq); + if (activate) { + ALD_LOCK(); + ald_activate(alq); + ALD_UNLOCK(); + } +} + +void +alq_flush(struct alq *alq) +{ + int needwakeup = 0; + + ALD_LOCK(); + ALQ_LOCK(alq); + if (alq->aq_flags & AQ_ACTIVE) { + ald_deactivate(alq); + ALD_UNLOCK(); + needwakeup = alq_doio(alq); + } else + ALD_UNLOCK(); + ALQ_UNLOCK(alq); + + if (needwakeup) + wakeup(alq); +} + +/* + * Flush remaining data, close the file and free all resources. + */ +void +alq_close(struct alq *alq) +{ + /* + * If we're already shuting down someone else will flush and close + * the vnode. + */ + if (ald_rem(alq) != 0) + return; + + /* + * Drain all pending IO. + */ + alq_shutdown(alq); + + mtx_destroy(&alq->aq_mtx); + free(alq->aq_first, M_ALD); + free(alq->aq_entbuf, M_ALD); + free(alq, M_ALD); +} diff --git a/sys/sys/alq.h b/sys/sys/alq.h new file mode 100644 index 0000000..58d0cb4 --- /dev/null +++ b/sys/sys/alq.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + * + */ +#ifndef _SYS_ALD_H_ +#define _SYS_ALD_H_ + +/* + * Opaque type for the Async. Logging Queue + */ +struct alq; + +/* + * Async. Logging Entry + */ +struct ale { + struct ale *ae_next; /* Next Entry */ + char *ae_data; /* Entry buffer */ + int ae_flags; /* Entry flags */ +}; + +#define AE_VALID 0x0001 /* Entry has valid data */ + + +/* waitok options */ +#define ALQ_NOWAIT 0x0001 +#define ALQ_WAITOK 0x0002 + +/* + * alq_open: Creates a new queue + * + * Arguments: + * alq Storage for a pointer to the newly created queue. + * file The filename to open for logging. + * size The size of each entry in the queue. + * count The number of items in the buffer, this should be large enough + * to store items over the period of a disk write. + * Returns: + * error from open or 0 on success + */ +int alq_open(struct alq **, const char *file, int size, int count); + +/* + * alq_write: Write data into the queue + * + * Arguments: + * alq The queue we're writing to + * data The entry to be recorded + * waitok Are we permitted to wait? + * + * Returns: + * EWOULDBLOCK if: + * Waitok is ALQ_NOWAIT and the queue is full. + * The system is shutting down. + * 0 on success. + */ +int alq_write(struct alq *alq, void *data, int waitok); + +/* + * alq_flush: Flush the queue out to disk + */ +void alq_flush(struct alq *alq); + +/* + * alq_close: Flush the queue and free all resources. + */ +void alq_close(struct alq *alq); + +/* + * alq_get: Return an entry for direct access + * + * Arguments: + * alq The queue to retrieve an entry from + * waitok Are we permitted to wait? + * + * Returns: + * The next available ale on success. + * NULL if: + * Waitok is ALQ_NOWAIT and the queue is full. + * The system is shutting down. + * + * This leaves the queue locked until a subsequent alq_post. + */ +struct ale *alq_get(struct alq *alq, int waitok); + +/* + * alq_post: Schedule the ale retrieved by alq_get for writing. + * alq The queue to post the entry to. + * ale An asynch logging entry returned by alq_get. + */ +void alq_post(struct alq *, struct ale *); + +#endif /* _SYS_ALD_H_ */ |