summaryrefslogtreecommitdiffstats
path: root/sys/kern
diff options
context:
space:
mode:
authorjhb <jhb@FreeBSD.org>2016-03-01 18:12:14 +0000
committerjhb <jhb@FreeBSD.org>2016-03-01 18:12:14 +0000
commitbe47bc68fb065fc834ff51fea7df108abeae031c (patch)
treed445e19e349faf1791e2ec0922554b84037d618e /sys/kern
parent15b2caff0f7170f0c4bb31748f12833744f7985c (diff)
downloadFreeBSD-src-be47bc68fb065fc834ff51fea7df108abeae031c.zip
FreeBSD-src-be47bc68fb065fc834ff51fea7df108abeae031c.tar.gz
Refactor the AIO subsystem to permit file-type-specific handling and
improve cancellation robustness. Introduce a new file operation, fo_aio_queue, which is responsible for queueing and completing an asynchronous I/O request for a given file. The AIO subystem now exports library of routines to manipulate AIO requests as well as the ability to run a handler function in the "default" pool of AIO daemons to service a request. A default implementation for file types which do not include an fo_aio_queue method queues requests to the "default" pool invoking the fo_read or fo_write methods as before. The AIO subsystem permits file types to install a private "cancel" routine when a request is queued to permit safe dequeueing and cleanup of cancelled requests. Sockets now use their own pool of AIO daemons and service per-socket requests in FIFO order. Socket requests will not block indefinitely permitting timely cancellation of all requests. Due to the now-tight coupling of the AIO subsystem with file types, the AIO subsystem is now a standard part of all kernels. The VFS_AIO kernel option and aio.ko module are gone. Many file types may block indefinitely in their fo_read or fo_write callbacks resulting in a hung AIO daemon. This can result in hung user processes (when processes attempt to cancel all outstanding requests during exit) or a hung system. To protect against this, AIO requests are only permitted for known "safe" files by default. AIO requests for all file types can be enabled by setting the new vfs.aio.enable_usafe sysctl to a non-zero value. The AIO tests have been updated to skip operations on unsafe file types if the sysctl is zero. Currently, AIO requests on sockets and raw disks are considered safe and are enabled by default. aio_mlock() is also enabled by default. Reviewed by: cem, jilles Discussed with: kib (earlier version) Sponsored by: Chelsio Communications Differential Revision: https://reviews.freebsd.org/D5289
Diffstat (limited to 'sys/kern')
-rw-r--r--sys/kern/sys_socket.c397
-rw-r--r--sys/kern/uipc_debug.c4
-rw-r--r--sys/kern/uipc_sockbuf.c2
-rw-r--r--sys/kern/uipc_socket.c6
-rw-r--r--sys/kern/vfs_aio.c737
5 files changed, 755 insertions, 391 deletions
diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c
index dd831ae..ffdb003 100644
--- a/sys/kern/sys_socket.c
+++ b/sys/kern/sys_socket.c
@@ -34,9 +34,12 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/aio.h>
#include <sys/domain.h>
#include <sys/file.h>
#include <sys/filedesc.h>
+#include <sys/kernel.h>
+#include <sys/kthread.h>
#include <sys/malloc.h>
#include <sys/proc.h>
#include <sys/protosw.h>
@@ -48,6 +51,9 @@ __FBSDID("$FreeBSD$");
#include <sys/filio.h> /* XXX */
#include <sys/sockio.h>
#include <sys/stat.h>
+#include <sys/sysctl.h>
+#include <sys/sysproto.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/ucred.h>
#include <sys/un.h>
@@ -64,6 +70,22 @@ __FBSDID("$FreeBSD$");
#include <security/mac/mac_framework.h>
+#include <vm/vm.h>
+#include <vm/pmap.h>
+#include <vm/vm_extern.h>
+#include <vm/vm_map.h>
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL,
+ "socket AIO stats");
+
+static int empty_results;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results,
+ 0, "socket operation returned EAGAIN");
+
+static int empty_retries;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries,
+ 0, "socket operation retries");
+
static fo_rdwr_t soo_read;
static fo_rdwr_t soo_write;
static fo_ioctl_t soo_ioctl;
@@ -72,6 +94,9 @@ extern fo_kqfilter_t soo_kqfilter;
static fo_stat_t soo_stat;
static fo_close_t soo_close;
static fo_fill_kinfo_t soo_fill_kinfo;
+static fo_aio_queue_t soo_aio_queue;
+
+static void soo_aio_cancel(struct kaiocb *job);
struct fileops socketops = {
.fo_read = soo_read,
@@ -86,6 +111,7 @@ struct fileops socketops = {
.fo_chown = invfo_chown,
.fo_sendfile = invfo_sendfile,
.fo_fill_kinfo = soo_fill_kinfo,
+ .fo_aio_queue = soo_aio_queue,
.fo_flags = DFLAG_PASSABLE
};
@@ -363,3 +389,374 @@ soo_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp)
sizeof(kif->kf_path));
return (0);
}
+
+static STAILQ_HEAD(, task) soaio_jobs;
+static struct mtx soaio_jobs_lock;
+static struct task soaio_kproc_task;
+static int soaio_starting, soaio_idle, soaio_queued;
+static struct unrhdr *soaio_kproc_unr;
+
+static int soaio_max_procs = MAX_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0,
+ "Maximum number of kernel processes to use for async socket IO");
+
+static int soaio_num_procs;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0,
+ "Number of active kernel processes for async socket IO");
+
+static int soaio_target_procs = TARGET_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD,
+ &soaio_target_procs, 0,
+ "Preferred number of ready kernel processes for async socket IO");
+
+static int soaio_lifetime;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0,
+ "Maximum lifetime for idle aiod");
+
+static void
+soaio_kproc_loop(void *arg)
+{
+ struct proc *p;
+ struct vmspace *myvm;
+ struct task *task;
+ int error, id, pending;
+
+ id = (intptr_t)arg;
+
+ /*
+ * Grab an extra reference on the daemon's vmspace so that it
+ * doesn't get freed by jobs that switch to a different
+ * vmspace.
+ */
+ p = curproc;
+ myvm = vmspace_acquire_ref(p);
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(soaio_starting > 0);
+ soaio_starting--;
+ for (;;) {
+ while (!STAILQ_EMPTY(&soaio_jobs)) {
+ task = STAILQ_FIRST(&soaio_jobs);
+ STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link);
+ soaio_queued--;
+ pending = task->ta_pending;
+ task->ta_pending = 0;
+ mtx_unlock(&soaio_jobs_lock);
+
+ task->ta_func(task->ta_context, pending);
+
+ mtx_lock(&soaio_jobs_lock);
+ }
+ MPASS(soaio_queued == 0);
+
+ if (p->p_vmspace != myvm) {
+ mtx_unlock(&soaio_jobs_lock);
+ vmspace_switch_aio(myvm);
+ mtx_lock(&soaio_jobs_lock);
+ continue;
+ }
+
+ soaio_idle++;
+ error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-",
+ soaio_lifetime);
+ soaio_idle--;
+ if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) &&
+ soaio_num_procs > soaio_target_procs)
+ break;
+ }
+ soaio_num_procs--;
+ mtx_unlock(&soaio_jobs_lock);
+ free_unr(soaio_kproc_unr, id);
+ kproc_exit(0);
+}
+
+static void
+soaio_kproc_create(void *context, int pending)
+{
+ struct proc *p;
+ int error, id;
+
+ mtx_lock(&soaio_jobs_lock);
+ for (;;) {
+ if (soaio_num_procs < soaio_target_procs) {
+ /* Must create */
+ } else if (soaio_num_procs >= soaio_max_procs) {
+ /*
+ * Hit the limit on kernel processes, don't
+ * create another one.
+ */
+ break;
+ } else if (soaio_queued <= soaio_idle + soaio_starting) {
+ /*
+ * No more AIO jobs waiting for a process to be
+ * created, so stop.
+ */
+ break;
+ }
+ soaio_starting++;
+ mtx_unlock(&soaio_jobs_lock);
+
+ id = alloc_unr(soaio_kproc_unr);
+ error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id,
+ &p, 0, 0, "soaiod%d", id);
+ if (error != 0) {
+ free_unr(soaio_kproc_unr, id);
+ mtx_lock(&soaio_jobs_lock);
+ soaio_starting--;
+ break;
+ }
+
+ mtx_lock(&soaio_jobs_lock);
+ soaio_num_procs++;
+ }
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_enqueue(struct task *task)
+{
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(task->ta_pending == 0);
+ task->ta_pending++;
+ STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link);
+ soaio_queued++;
+ if (soaio_queued <= soaio_idle)
+ wakeup_one(&soaio_idle);
+ else if (soaio_num_procs < soaio_max_procs)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_init(void)
+{
+
+ soaio_lifetime = AIOD_LIFETIME_DEFAULT;
+ STAILQ_INIT(&soaio_jobs);
+ mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF);
+ soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL);
+ TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL);
+ if (soaio_target_procs > 0)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+}
+SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL);
+
+static __inline int
+soaio_ready(struct socket *so, struct sockbuf *sb)
+{
+ return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so));
+}
+
+static void
+soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
+{
+ struct ucred *td_savedcred;
+ struct thread *td;
+ struct file *fp;
+ struct uio uio;
+ struct iovec iov;
+ size_t cnt;
+ int error, flags;
+
+ SOCKBUF_UNLOCK(sb);
+ aio_switch_vmspace(job);
+ td = curthread;
+ fp = job->fd_file;
+retry:
+ td_savedcred = td->td_ucred;
+ td->td_ucred = job->cred;
+
+ cnt = job->uaiocb.aio_nbytes;
+ iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf;
+ iov.iov_len = cnt;
+ uio.uio_iov = &iov;
+ uio.uio_iovcnt = 1;
+ uio.uio_offset = 0;
+ uio.uio_resid = cnt;
+ uio.uio_segflg = UIO_USERSPACE;
+ uio.uio_td = td;
+ flags = MSG_NBIO;
+
+ /* TODO: Charge ru_msg* to job. */
+
+ if (sb == &so->so_rcv) {
+ uio.uio_rw = UIO_READ;
+#ifdef MAC
+ error = mac_socket_check_receive(fp->f_cred, so);
+ if (error == 0)
+
+#endif
+ error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
+ } else {
+ uio.uio_rw = UIO_WRITE;
+#ifdef MAC
+ error = mac_socket_check_send(fp->f_cred, so);
+ if (error == 0)
+#endif
+ error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
+ if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
+ }
+ }
+
+ cnt -= uio.uio_resid;
+ td->td_ucred = td_savedcred;
+
+ /* XXX: Not sure if this is needed? */
+ if (cnt != 0 && (error == ERESTART || error == EINTR ||
+ error == EWOULDBLOCK))
+ error = 0;
+ if (error == EWOULDBLOCK) {
+ /*
+ * A read() or write() on the socket raced with this
+ * request. If the socket is now ready, try again.
+ * If it is not, place this request at the head of the
+ * queue to try again when the socket is ready.
+ */
+ SOCKBUF_LOCK(sb);
+ empty_results++;
+ if (soaio_ready(so, sb)) {
+ empty_retries++;
+ SOCKBUF_UNLOCK(sb);
+ goto retry;
+ }
+
+ if (!aio_set_cancel_function(job, soo_aio_cancel)) {
+ MPASS(cnt == 0);
+ SOCKBUF_UNLOCK(sb);
+ aio_cancel(job);
+ SOCKBUF_LOCK(sb);
+ } else {
+ TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
+ }
+ } else {
+ aio_complete(job, cnt, error);
+ SOCKBUF_LOCK(sb);
+ }
+}
+
+static void
+soaio_process_sb(struct socket *so, struct sockbuf *sb)
+{
+ struct kaiocb *job;
+
+ SOCKBUF_LOCK(sb);
+ while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) {
+ job = TAILQ_FIRST(&sb->sb_aiojobq);
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (!aio_clear_cancel_function(job))
+ continue;
+
+ soaio_process_job(so, sb, job);
+ }
+
+ /*
+ * If there are still pending requests, the socket must not be
+ * ready so set SB_AIO to request a wakeup when the socket
+ * becomes ready.
+ */
+ if (!TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags |= SB_AIO;
+ sb->sb_flags &= ~SB_AIO_RUNNING;
+ SOCKBUF_UNLOCK(sb);
+
+ ACCEPT_LOCK();
+ SOCK_LOCK(so);
+ sorele(so);
+}
+
+void
+soaio_rcv(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_rcv);
+}
+
+void
+soaio_snd(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_snd);
+}
+
+void
+sowakeup_aio(struct socket *so, struct sockbuf *sb)
+{
+
+ SOCKBUF_LOCK_ASSERT(sb);
+ sb->sb_flags &= ~SB_AIO;
+ if (sb->sb_flags & SB_AIO_RUNNING)
+ return;
+ sb->sb_flags |= SB_AIO_RUNNING;
+ if (sb == &so->so_snd)
+ SOCK_LOCK(so);
+ soref(so);
+ if (sb == &so->so_snd)
+ SOCK_UNLOCK(so);
+ soaio_enqueue(&sb->sb_aiotask);
+}
+
+static void
+soo_aio_cancel(struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+ int opcode;
+
+ so = job->fd_file->f_data;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_READ)
+ sb = &so->so_rcv;
+ else {
+ MPASS(opcode == LIO_WRITE);
+ sb = &so->so_snd;
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags &= ~SB_AIO;
+ SOCKBUF_UNLOCK(sb);
+
+ aio_cancel(job);
+}
+
+static int
+soo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+
+ so = fp->f_data;
+ switch (job->uaiocb.aio_lio_opcode) {
+ case LIO_READ:
+ sb = &so->so_rcv;
+ break;
+ case LIO_WRITE:
+ sb = &so->so_snd;
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_set_cancel_function(job, soo_aio_cancel))
+ panic("new job was cancelled");
+ TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
+ if (!(sb->sb_flags & SB_AIO_RUNNING)) {
+ if (soaio_ready(so, sb))
+ sowakeup_aio(so, sb);
+ else
+ sb->sb_flags |= SB_AIO;
+ }
+ SOCKBUF_UNLOCK(sb);
+ return (0);
+}
diff --git a/sys/kern/uipc_debug.c b/sys/kern/uipc_debug.c
index 7c8b93c..fb4ee52 100644
--- a/sys/kern/uipc_debug.c
+++ b/sys/kern/uipc_debug.c
@@ -416,6 +416,9 @@ db_print_sockbuf(struct sockbuf *sb, const char *sockbufname, int indent)
db_printf("sb_flags: 0x%x (", sb->sb_flags);
db_print_sbflags(sb->sb_flags);
db_printf(")\n");
+
+ db_print_indent(indent);
+ db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq));
}
static void
@@ -470,7 +473,6 @@ db_print_socket(struct socket *so, const char *socketname, int indent)
db_print_indent(indent);
db_printf("so_sigio: %p ", so->so_sigio);
db_printf("so_oobmark: %lu ", so->so_oobmark);
- db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq));
db_print_sockbuf(&so->so_rcv, "so_rcv", indent);
db_print_sockbuf(&so->so_snd, "so_snd", indent);
diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c
index edf03a3..0cdd43b 100644
--- a/sys/kern/uipc_sockbuf.c
+++ b/sys/kern/uipc_sockbuf.c
@@ -332,7 +332,7 @@ sowakeup(struct socket *so, struct sockbuf *sb)
} else
ret = SU_OK;
if (sb->sb_flags & SB_AIO)
- aio_swake(so, sb);
+ sowakeup_aio(so, sb);
SOCKBUF_UNLOCK(sb);
if (ret == SU_ISCONNECTED)
soisconnected(so);
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
index 5d2247f..35d17b1 100644
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -134,6 +134,7 @@ __FBSDID("$FreeBSD$");
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/jail.h>
#include <sys/syslog.h>
@@ -396,7 +397,10 @@ soalloc(struct vnet *vnet)
SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv");
sx_init(&so->so_snd.sb_sx, "so_snd_sx");
sx_init(&so->so_rcv.sb_sx, "so_rcv_sx");
- TAILQ_INIT(&so->so_aiojobq);
+ TAILQ_INIT(&so->so_snd.sb_aiojobq);
+ TAILQ_INIT(&so->so_rcv.sb_aiojobq);
+ TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so);
+ TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so);
#ifdef VIMAGE
VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p",
__func__, __LINE__, so));
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
index 5b2083c..59dd57c 100644
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -72,8 +72,6 @@ __FBSDID("$FreeBSD$");
#include <vm/uma.h>
#include <sys/aio.h>
-#include "opt_vfs_aio.h"
-
/*
* Counter for allocating reference ids to new jobs. Wrapped to 1 on
* overflow. (XXX will be removed soon.)
@@ -85,14 +83,6 @@ static u_long jobrefid;
*/
static uint64_t jobseqno;
-#define JOBST_NULL 0
-#define JOBST_JOBQSOCK 1
-#define JOBST_JOBQGLOBAL 2
-#define JOBST_JOBRUNNING 3
-#define JOBST_JOBFINISHED 4
-#define JOBST_JOBQBUF 5
-#define JOBST_JOBQSYNC 6
-
#ifndef MAX_AIO_PER_PROC
#define MAX_AIO_PER_PROC 32
#endif
@@ -101,26 +91,14 @@ static uint64_t jobseqno;
#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef MAX_AIO_PROCS
-#define MAX_AIO_PROCS 32
-#endif
-
#ifndef MAX_AIO_QUEUE
#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef TARGET_AIO_PROCS
-#define TARGET_AIO_PROCS 4
-#endif
-
#ifndef MAX_BUF_AIO
#define MAX_BUF_AIO 16
#endif
-#ifndef AIOD_LIFETIME_DEFAULT
-#define AIOD_LIFETIME_DEFAULT (30 * hz)
-#endif
-
FEATURE(aio, "Asynchronous I/O");
static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
@@ -128,6 +106,10 @@ static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0,
"Async IO management");
+static int enable_aio_unsafe = 0;
+SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0,
+ "Permit asynchronous IO on all file types, not just known-safe types");
+
static int max_aio_procs = MAX_AIO_PROCS;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0,
"Maximum number of kernel processes to use for handling async IO ");
@@ -165,11 +147,6 @@ static int aiod_lifetime;
SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
"Maximum lifetime for idle aiod");
-static int unloadable = 0;
-SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
- "Allow unload of aio (not recommended)");
-
-
static int max_aio_per_proc = MAX_AIO_PER_PROC;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
0,
@@ -208,46 +185,27 @@ typedef struct oaiocb {
*/
/*
- * Current, there is only two backends: BIO and generic file I/O.
- * socket I/O is served by generic file I/O, this is not a good idea, since
- * disk file I/O and any other types without O_NONBLOCK flag can block daemon
- * processes, if there is no thread to serve socket I/O, the socket I/O will be
- * delayed too long or starved, we should create some processes dedicated to
- * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
- * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
- * structure is not safe because there is race between userland and aio
- * daemons.
+ * If the routine that services an AIO request blocks while running in an
+ * AIO kernel process it can starve other I/O requests. BIO requests
+ * queued via aio_qphysio() complete in GEOM and do not use AIO kernel
+ * processes at all. Socket I/O requests use a separate pool of
+ * kprocs and also force non-blocking I/O. Other file I/O requests
+ * use the generic fo_read/fo_write operations which can block. The
+ * fsync and mlock operations can also block while executing. Ideally
+ * none of these requests would block while executing.
+ *
+ * Note that the service routines cannot toggle O_NONBLOCK in the file
+ * structure directly while handling a request due to races with
+ * userland threads.
*/
-struct kaiocb {
- TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */
- TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */
- TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
- int jobflags; /* (a) job flags */
- int jobstate; /* (b) job state */
- int inputcharge; /* (*) input blockes */
- int outputcharge; /* (*) output blockes */
- struct bio *bp; /* (*) BIO backend BIO pointer */
- struct buf *pbuf; /* (*) BIO backend buffer pointer */
- struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
- int npages; /* BIO backend number of pages */
- struct proc *userproc; /* (*) user process */
- struct ucred *cred; /* (*) active credential when created */
- struct file *fd_file; /* (*) pointer to file structure */
- struct aioliojob *lio; /* (*) optional lio job */
- struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
- struct knlist klist; /* (a) list of knotes */
- struct aiocb uaiocb; /* (*) kernel I/O control block */
- ksiginfo_t ksi; /* (a) realtime signal info */
- uint64_t seqno; /* (*) job number */
- int pending; /* (a) number of pending I/O, aio_fsync only */
-};
-
/* jobflags */
-#define KAIOCB_DONE 0x01
-#define KAIOCB_BUFDONE 0x02
-#define KAIOCB_RUNDOWN 0x04
+#define KAIOCB_QUEUEING 0x01
+#define KAIOCB_CANCELLED 0x02
+#define KAIOCB_CANCELLING 0x04
#define KAIOCB_CHECKSYNC 0x08
+#define KAIOCB_CLEARED 0x10
+#define KAIOCB_FINISHED 0x20
/*
* AIO process info
@@ -293,9 +251,10 @@ struct kaioinfo {
TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */
- TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */
TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */
+ TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */
struct task kaio_task; /* (*) task to kick aio processes */
+ struct task kaio_sync_task; /* (*) task to schedule fsync jobs */
};
#define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx)
@@ -332,21 +291,18 @@ static int aio_free_entry(struct kaiocb *job);
static void aio_process_rw(struct kaiocb *job);
static void aio_process_sync(struct kaiocb *job);
static void aio_process_mlock(struct kaiocb *job);
+static void aio_schedule_fsync(void *context, int pending);
static int aio_newproc(int *);
int aio_aqueue(struct thread *td, struct aiocb *ujob,
struct aioliojob *lio, int type, struct aiocb_ops *ops);
+static int aio_queue_file(struct file *fp, struct kaiocb *job);
static void aio_physwakeup(struct bio *bp);
static void aio_proc_rundown(void *arg, struct proc *p);
static void aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp);
static int aio_qphysio(struct proc *p, struct kaiocb *job);
static void aio_daemon(void *param);
-static void aio_swake_cb(struct socket *, struct sockbuf *);
-static int aio_unload(void);
-static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job,
- int type);
-#define DONE_BUF 1
-#define DONE_QUEUE 2
+static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job);
static int aio_kick(struct proc *userp);
static void aio_kick_nowait(struct proc *userp);
static void aio_kick_helper(void *context, int pending);
@@ -397,13 +353,10 @@ aio_modload(struct module *module, int cmd, void *arg)
case MOD_LOAD:
aio_onceonly();
break;
- case MOD_UNLOAD:
- error = aio_unload();
- break;
case MOD_SHUTDOWN:
break;
default:
- error = EINVAL;
+ error = EOPNOTSUPP;
break;
}
return (error);
@@ -471,8 +424,6 @@ aio_onceonly(void)
{
int error;
- /* XXX: should probably just use so->callback */
- aio_swake = &aio_swake_cb;
exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
EVENTHANDLER_PRI_ANY);
exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec,
@@ -513,55 +464,6 @@ aio_onceonly(void)
}
/*
- * Callback for unload of AIO when used as a module.
- */
-static int
-aio_unload(void)
-{
- int error;
-
- /*
- * XXX: no unloads by default, it's too dangerous.
- * perhaps we could do it if locked out callers and then
- * did an aio_proc_rundown() on each process.
- *
- * jhb: aio_proc_rundown() needs to run on curproc though,
- * so I don't think that would fly.
- */
- if (!unloadable)
- return (EOPNOTSUPP);
-
-#ifdef COMPAT_FREEBSD32
- syscall32_helper_unregister(aio32_syscalls);
-#endif
- syscall_helper_unregister(aio_syscalls);
-
- error = kqueue_del_filteropts(EVFILT_AIO);
- if (error)
- return error;
- error = kqueue_del_filteropts(EVFILT_LIO);
- if (error)
- return error;
- async_io_version = 0;
- aio_swake = NULL;
- taskqueue_free(taskqueue_aiod_kick);
- delete_unrhdr(aiod_unr);
- uma_zdestroy(kaio_zone);
- uma_zdestroy(aiop_zone);
- uma_zdestroy(aiocb_zone);
- uma_zdestroy(aiol_zone);
- uma_zdestroy(aiolio_zone);
- EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
- EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
- mtx_destroy(&aio_job_mtx);
- sema_destroy(&aio_newproc_sem);
- p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
- return (0);
-}
-
-/*
* Init the per-process aioinfo structure. The aioinfo limits are set
* per-process for user limit (resource) management.
*/
@@ -582,10 +484,11 @@ aio_init_aioinfo(struct proc *p)
TAILQ_INIT(&ki->kaio_all);
TAILQ_INIT(&ki->kaio_done);
TAILQ_INIT(&ki->kaio_jobqueue);
- TAILQ_INIT(&ki->kaio_bufqueue);
TAILQ_INIT(&ki->kaio_liojoblist);
TAILQ_INIT(&ki->kaio_syncqueue);
+ TAILQ_INIT(&ki->kaio_syncready);
TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p);
+ TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki);
PROC_LOCK(p);
if (p->p_aioinfo == NULL) {
p->p_aioinfo = ki;
@@ -637,7 +540,7 @@ aio_free_entry(struct kaiocb *job)
MPASS(ki != NULL);
AIO_LOCK_ASSERT(ki, MA_OWNED);
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
atomic_subtract_int(&num_queue_count, 1);
@@ -670,7 +573,6 @@ aio_free_entry(struct kaiocb *job)
PROC_UNLOCK(p);
MPASS(job->bp == NULL);
- job->jobstate = JOBST_NULL;
AIO_UNLOCK(ki);
/*
@@ -709,6 +611,57 @@ aio_proc_rundown_exec(void *arg, struct proc *p,
aio_proc_rundown(arg, p);
}
+static int
+aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job)
+{
+ aio_cancel_fn_t *func;
+ int cancelled;
+
+ AIO_LOCK_ASSERT(ki, MA_OWNED);
+ if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED))
+ return (0);
+ MPASS((job->jobflags & KAIOCB_CANCELLING) == 0);
+ job->jobflags |= KAIOCB_CANCELLED;
+
+ func = job->cancel_fn;
+
+ /*
+ * If there is no cancel routine, just leave the job marked as
+ * cancelled. The job should be in active use by a caller who
+ * should complete it normally or when it fails to install a
+ * cancel routine.
+ */
+ if (func == NULL)
+ return (0);
+
+ /*
+ * Set the CANCELLING flag so that aio_complete() will defer
+ * completions of this job. This prevents the job from being
+ * freed out from under the cancel callback. After the
+ * callback any deferred completion (whether from the callback
+ * or any other source) will be completed.
+ */
+ job->jobflags |= KAIOCB_CANCELLING;
+ AIO_UNLOCK(ki);
+ func(job);
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_CANCELLING;
+ if (job->jobflags & KAIOCB_FINISHED) {
+ cancelled = job->uaiocb._aiocb_private.error == ECANCELED;
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(p, job);
+ } else {
+ /*
+ * The cancel callback might have scheduled an
+ * operation to cancel this request, but it is
+ * only counted as cancelled if the request is
+ * cancelled when the callback returns.
+ */
+ cancelled = 0;
+ }
+ return (cancelled);
+}
+
/*
* Rundown the jobs for a given process.
*/
@@ -718,9 +671,6 @@ aio_proc_rundown(void *arg, struct proc *p)
struct kaioinfo *ki;
struct aioliojob *lj;
struct kaiocb *job, *jobn;
- struct file *fp;
- struct socket *so;
- int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@@ -738,35 +688,11 @@ restart:
* aio_cancel on all pending I/O requests.
*/
TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) {
- remove = 0;
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- fp = job->fd_file;
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- job->jobstate = JOBST_JOBFINISHED;
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(p, job, DONE_QUEUE);
- }
+ aio_cancel_job(p, ki, job);
}
/* Wait for all running I/O to be finished */
- if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
- TAILQ_FIRST(&ki->kaio_jobqueue)) {
+ if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) {
ki->kaio_flags |= KAIO_WAKEUP;
msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz);
goto restart;
@@ -791,6 +717,7 @@ restart:
}
AIO_UNLOCK(ki);
taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task);
+ taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task);
mtx_destroy(&ki->kaio_mtx);
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
@@ -807,15 +734,18 @@ aio_selectjob(struct aioproc *aiop)
struct proc *userp;
mtx_assert(&aio_job_mtx, MA_OWNED);
+restart:
TAILQ_FOREACH(job, &aio_jobs, list) {
userp = job->userproc;
ki = userp->p_aioinfo;
if (ki->kaio_active_count < ki->kaio_maxactive_count) {
TAILQ_REMOVE(&aio_jobs, job, list);
+ if (!aio_clear_cancel_function(job))
+ goto restart;
+
/* Account for currently active jobs. */
ki->kaio_active_count++;
- job->jobstate = JOBST_JOBRUNNING;
break;
}
}
@@ -863,7 +793,6 @@ aio_process_rw(struct kaiocb *job)
struct thread *td;
struct aiocb *cb;
struct file *fp;
- struct socket *so;
struct uio auio;
struct iovec aiov;
int cnt;
@@ -875,6 +804,7 @@ aio_process_rw(struct kaiocb *job)
job->uaiocb.aio_lio_opcode == LIO_WRITE,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
td = curthread;
td_savedcred = td->td_ucred;
td->td_ucred = job->cred;
@@ -920,24 +850,15 @@ aio_process_rw(struct kaiocb *job)
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
error = 0;
if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
- int sigpipe = 1;
- if (fp->f_type == DTYPE_SOCKET) {
- so = fp->f_data;
- if (so->so_options & SO_NOSIGPIPE)
- sigpipe = 0;
- }
- if (sigpipe) {
- PROC_LOCK(job->userproc);
- kern_psignal(job->userproc, SIGPIPE);
- PROC_UNLOCK(job->userproc);
- }
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
}
}
cnt -= auio.uio_resid;
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = cnt;
td->td_ucred = td_savedcred;
+ aio_complete(job, cnt, error);
}
static void
@@ -945,7 +866,6 @@ aio_process_sync(struct kaiocb *job)
{
struct thread *td = curthread;
struct ucred *td_savedcred = td->td_ucred;
- struct aiocb *cb = &job->uaiocb;
struct file *fp = job->fd_file;
int error = 0;
@@ -955,9 +875,8 @@ aio_process_sync(struct kaiocb *job)
td->td_ucred = job->cred;
if (fp->f_vnode != NULL)
error = aio_fsync_vnode(td, fp->f_vnode);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
td->td_ucred = td_savedcred;
+ aio_complete(job, 0, error);
}
static void
@@ -969,19 +888,20 @@ aio_process_mlock(struct kaiocb *job)
KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
error = vm_mlock(job->userproc, job->cred,
__DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
+ aio_complete(job, 0, error);
}
static void
-aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
+aio_bio_done_notify(struct proc *userp, struct kaiocb *job)
{
struct aioliojob *lj;
struct kaioinfo *ki;
struct kaiocb *sjob, *sjobn;
int lj_done;
+ bool schedule_fsync;
ki = userp->p_aioinfo;
AIO_LOCK_ASSERT(ki, MA_OWNED);
@@ -992,13 +912,8 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
if (lj->lioj_count == lj->lioj_finished_count)
lj_done = 1;
}
- if (type == DONE_QUEUE) {
- job->jobflags |= KAIOCB_DONE;
- } else {
- job->jobflags |= KAIOCB_BUFDONE;
- }
TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist);
- job->jobstate = JOBST_JOBFINISHED;
+ MPASS(job->jobflags & KAIOCB_FINISHED);
if (ki->kaio_flags & KAIO_RUNDOWN)
goto notification_done;
@@ -1025,20 +940,24 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
notification_done:
if (job->jobflags & KAIOCB_CHECKSYNC) {
+ schedule_fsync = false;
TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) {
if (job->fd_file == sjob->fd_file &&
job->seqno < sjob->seqno) {
if (--sjob->pending == 0) {
- mtx_lock(&aio_job_mtx);
- sjob->jobstate = JOBST_JOBQGLOBAL;
TAILQ_REMOVE(&ki->kaio_syncqueue, sjob,
list);
- TAILQ_INSERT_TAIL(&aio_jobs, sjob, list);
- aio_kick_nowait(userp);
- mtx_unlock(&aio_job_mtx);
+ if (!aio_clear_cancel_function(sjob))
+ continue;
+ TAILQ_INSERT_TAIL(&ki->kaio_syncready,
+ sjob, list);
+ schedule_fsync = true;
}
}
}
+ if (schedule_fsync)
+ taskqueue_enqueue(taskqueue_aiod_kick,
+ &ki->kaio_sync_task);
}
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
@@ -1047,6 +966,103 @@ notification_done:
}
static void
+aio_schedule_fsync(void *context, int pending)
+{
+ struct kaioinfo *ki;
+ struct kaiocb *job;
+
+ ki = context;
+ AIO_LOCK(ki);
+ while (!TAILQ_EMPTY(&ki->kaio_syncready)) {
+ job = TAILQ_FIRST(&ki->kaio_syncready);
+ TAILQ_REMOVE(&ki->kaio_syncready, job, list);
+ AIO_UNLOCK(ki);
+ aio_schedule(job, aio_process_sync);
+ AIO_LOCK(ki);
+ }
+ AIO_UNLOCK(ki);
+}
+
+bool
+aio_cancel_cleared(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ /*
+ * The caller should hold the same queue lock held when
+ * aio_clear_cancel_function() was called and set this flag
+ * ensuring this check sees an up-to-date value. However,
+ * there is no way to assert that.
+ */
+ ki = job->userproc->p_aioinfo;
+ return ((job->jobflags & KAIOCB_CLEARED) != 0);
+}
+
+bool
+aio_clear_cancel_function(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ MPASS(job->cancel_fn != NULL);
+ if (job->jobflags & KAIOCB_CANCELLING) {
+ job->jobflags |= KAIOCB_CLEARED;
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = NULL;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+bool
+aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ if (job->jobflags & KAIOCB_CANCELLED) {
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = func;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+void
+aio_complete(struct kaiocb *job, long status, int error)
+{
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ job->uaiocb._aiocb_private.error = error;
+ job->uaiocb._aiocb_private.status = status;
+
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
+
+ AIO_LOCK(ki);
+ KASSERT(!(job->jobflags & KAIOCB_FINISHED),
+ ("duplicate aio_complete"));
+ job->jobflags |= KAIOCB_FINISHED;
+ if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) {
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(userp, job);
+ }
+ AIO_UNLOCK(ki);
+}
+
+void
+aio_cancel(struct kaiocb *job)
+{
+
+ aio_complete(job, -1, ECANCELED);
+}
+
+void
aio_switch_vmspace(struct kaiocb *job)
{
@@ -1063,7 +1079,7 @@ aio_daemon(void *_id)
struct kaiocb *job;
struct aioproc *aiop;
struct kaioinfo *ki;
- struct proc *p, *userp;
+ struct proc *p;
struct vmspace *myvm;
struct thread *td = curthread;
int id = (intptr_t)_id;
@@ -1107,40 +1123,13 @@ aio_daemon(void *_id)
*/
while ((job = aio_selectjob(aiop)) != NULL) {
mtx_unlock(&aio_job_mtx);
- userp = job->userproc;
-
- /*
- * Connect to process address space for user program.
- */
- aio_switch_vmspace(job);
- ki = userp->p_aioinfo;
-
- /* Do the I/O function. */
- switch(job->uaiocb.aio_lio_opcode) {
- case LIO_READ:
- case LIO_WRITE:
- aio_process_rw(job);
- break;
- case LIO_SYNC:
- aio_process_sync(job);
- break;
- case LIO_MLOCK:
- aio_process_mlock(job);
- break;
- }
+ ki = job->userproc->p_aioinfo;
+ job->handle_fn(job);
mtx_lock(&aio_job_mtx);
/* Decrement the active job count. */
ki->kaio_active_count--;
- mtx_unlock(&aio_job_mtx);
-
- AIO_LOCK(ki);
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(userp, job, DONE_QUEUE);
- AIO_UNLOCK(ki);
-
- mtx_lock(&aio_job_mtx);
}
/*
@@ -1236,7 +1225,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
struct cdevsw *csw;
struct cdev *dev;
struct kaioinfo *ki;
- struct aioliojob *lj;
int error, ref, unmap, poff;
vm_prot_t prot;
@@ -1293,16 +1281,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
}
AIO_LOCK(ki);
- ki->kaio_count++;
if (!unmap)
ki->kaio_buffer_count++;
- lj = job->lio;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- job->jobstate = JOBST_JOBQBUF;
- cb->_aiocb_private.status = cb->aio_nbytes;
AIO_UNLOCK(ki);
bp->bio_length = cb->aio_nbytes;
@@ -1336,7 +1316,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
bp->bio_flags |= BIO_UNMAPPED;
}
- atomic_add_int(&num_queue_count, 1);
if (!unmap)
atomic_add_int(&num_buf_aio, 1);
@@ -1347,14 +1326,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
doerror:
AIO_LOCK(ki);
- job->jobstate = JOBST_NULL;
- TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist);
- TAILQ_REMOVE(&ki->kaio_all, job, allist);
- ki->kaio_count--;
if (!unmap)
ki->kaio_buffer_count--;
- if (lj)
- lj->lioj_count--;
AIO_UNLOCK(ki);
if (pbuf) {
relpbuf(pbuf, NULL);
@@ -1367,40 +1340,6 @@ unref:
return (error);
}
-/*
- * Wake up aio requests that may be serviceable now.
- */
-static void
-aio_swake_cb(struct socket *so, struct sockbuf *sb)
-{
- struct kaiocb *job, *jobn;
- int opcode;
-
- SOCKBUF_LOCK_ASSERT(sb);
- if (sb == &so->so_snd)
- opcode = LIO_WRITE;
- else
- opcode = LIO_READ;
-
- sb->sb_flags &= ~SB_AIO;
- mtx_lock(&aio_job_mtx);
- TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) {
- if (opcode == job->uaiocb.aio_lio_opcode) {
- if (job->jobstate != JOBST_JOBQSOCK)
- panic("invalid queue value");
- /* XXX
- * We don't have actual sockets backend yet,
- * so we simply move the requests to the generic
- * file I/O backend.
- */
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- aio_kick_nowait(job->userproc);
- }
- }
- mtx_unlock(&aio_job_mtx);
-}
-
static int
convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig)
{
@@ -1521,11 +1460,9 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj,
struct proc *p = td->td_proc;
cap_rights_t rights;
struct file *fp;
- struct socket *so;
- struct kaiocb *job, *job2;
+ struct kaiocb *job;
struct kaioinfo *ki;
struct kevent kev;
- struct sockbuf *sb;
int opcode;
int error;
int fd, kqfd;
@@ -1668,86 +1605,128 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj,
kev.data = (intptr_t)job;
kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr;
error = kqfd_register(kqfd, &kev, td, 1);
-aqueue_fail:
- if (error) {
- if (fp)
- fdrop(fp, td);
- uma_zfree(aiocb_zone, job);
- ops->store_error(ujob, error);
- goto done;
- }
+ if (error)
+ goto aqueue_fail;
+
no_kqueue:
ops->store_error(ujob, EINPROGRESS);
job->uaiocb._aiocb_private.error = EINPROGRESS;
job->userproc = p;
job->cred = crhold(td->td_ucred);
- job->jobflags = 0;
+ job->jobflags = KAIOCB_QUEUEING;
job->lio = lj;
- if (opcode == LIO_SYNC)
- goto queueit;
+ if (opcode == LIO_MLOCK) {
+ aio_schedule(job, aio_process_mlock);
+ error = 0;
+ } else if (fp->f_ops->fo_aio_queue == NULL)
+ error = aio_queue_file(fp, job);
+ else
+ error = fo_aio_queue(fp, job);
+ if (error)
+ goto aqueue_fail;
- if (fp && fp->f_type == DTYPE_SOCKET) {
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_QUEUEING;
+ TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
+ ki->kaio_count++;
+ if (lj)
+ lj->lioj_count++;
+ atomic_add_int(&num_queue_count, 1);
+ if (job->jobflags & KAIOCB_FINISHED) {
/*
- * Alternate queueing for socket ops: Reach down into the
- * descriptor to get the socket data. Then check to see if the
- * socket is ready to be read or written (based on the requested
- * operation).
- *
- * If it is not ready for io, then queue the job on the
- * socket, and set the flags so we get a call when sbnotify()
- * happens.
- *
- * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
- * and unlock the snd sockbuf for no reason.
+ * The queue callback completed the request synchronously.
+ * The bulk of the completion is deferred in that case
+ * until this point.
*/
- so = fp->f_data;
- sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
- SOCKBUF_LOCK(sb);
- if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
- LIO_WRITE) && (!sowriteable(so)))) {
- sb->sb_flags |= SB_AIO;
+ aio_bio_done_notify(p, job);
+ } else
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
+ AIO_UNLOCK(ki);
+ return (0);
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list);
- mtx_unlock(&aio_job_mtx);
+aqueue_fail:
+ knlist_delete(&job->klist, curthread, 0);
+ if (fp)
+ fdrop(fp, td);
+ uma_zfree(aiocb_zone, job);
+ ops->store_error(ujob, error);
+ return (error);
+}
- AIO_LOCK(ki);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- job->jobstate = JOBST_JOBQSOCK;
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- AIO_UNLOCK(ki);
- SOCKBUF_UNLOCK(sb);
- atomic_add_int(&num_queue_count, 1);
- error = 0;
- goto done;
- }
- SOCKBUF_UNLOCK(sb);
+static void
+aio_cancel_daemon_job(struct kaiocb *job)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&aio_jobs, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+void
+aio_schedule(struct kaiocb *job, aio_handle_fn_t *func)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) {
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+ return;
}
+ job->handle_fn = func;
+ TAILQ_INSERT_TAIL(&aio_jobs, job, list);
+ aio_kick_nowait(job->userproc);
+ mtx_unlock(&aio_job_mtx);
+}
+
+static void
+aio_cancel_sync(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+int
+aio_queue_file(struct file *fp, struct kaiocb *job)
+{
+ struct aioliojob *lj;
+ struct kaioinfo *ki;
+ struct kaiocb *job2;
+ int error, opcode;
+
+ lj = job->lio;
+ ki = job->userproc->p_aioinfo;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_SYNC)
+ goto queueit;
- if ((error = aio_qphysio(p, job)) == 0)
+ if ((error = aio_qphysio(job->userproc, job)) == 0)
goto done;
#if 0
- if (error > 0) {
- job->uaiocb._aiocb_private.error = error;
- ops->store_error(ujob, error);
+ /*
+ * XXX: This means qphysio() failed with EFAULT. The current
+ * behavior is to retry the operation via fo_read/fo_write.
+ * Wouldn't it be better to just complete the request with an
+ * error here?
+ */
+ if (error > 0)
goto done;
- }
#endif
queueit:
- atomic_add_int(&num_queue_count, 1);
+ if (!enable_aio_unsafe)
+ return (EOPNOTSUPP);
- AIO_LOCK(ki);
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
if (opcode == LIO_SYNC) {
+ AIO_LOCK(ki);
TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) {
if (job2->fd_file == job->fd_file &&
job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
@@ -1756,28 +1735,32 @@ queueit:
job->pending++;
}
}
- TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) {
- if (job2->fd_file == job->fd_file &&
- job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
- job2->seqno < job->seqno) {
- job2->jobflags |= KAIOCB_CHECKSYNC;
- job->pending++;
- }
- }
if (job->pending != 0) {
+ if (!aio_set_cancel_function(job, aio_cancel_sync)) {
+ AIO_UNLOCK(ki);
+ aio_cancel(job);
+ return (0);
+ }
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list);
- job->jobstate = JOBST_JOBQSYNC;
AIO_UNLOCK(ki);
- goto done;
+ return (0);
}
+ AIO_UNLOCK(ki);
+ }
+
+ switch (opcode) {
+ case LIO_READ:
+ case LIO_WRITE:
+ aio_schedule(job, aio_process_rw);
+ error = 0;
+ break;
+ case LIO_SYNC:
+ aio_schedule(job, aio_process_sync);
+ error = 0;
+ break;
+ default:
+ error = EINVAL;
}
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- job->jobstate = JOBST_JOBQGLOBAL;
- aio_kick_nowait(p);
- mtx_unlock(&aio_job_mtx);
- AIO_UNLOCK(ki);
- error = 0;
done:
return (error);
}
@@ -1864,7 +1847,7 @@ kern_aio_return(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
break;
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
@@ -1933,7 +1916,7 @@ kern_aio_suspend(struct thread *td, int njoblist, struct aiocb **ujoblist,
if (job->ujob == ujoblist[i]) {
if (firstjob == NULL)
firstjob = job;
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
goto RETURN;
}
}
@@ -1992,10 +1975,8 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap)
struct kaioinfo *ki;
struct kaiocb *job, *jobn;
struct file *fp;
- struct socket *so;
cap_rights_t rights;
int error;
- int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@@ -2023,28 +2004,7 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap)
if ((uap->fd == job->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == job->ujob))) {
- remove = 0;
-
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(p, job, DONE_QUEUE);
+ if (aio_cancel_job(p, ki, job)) {
cancelled++;
} else {
notcancelled++;
@@ -2102,7 +2062,7 @@ kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
AIO_LOCK(ki);
TAILQ_FOREACH(job, &ki->kaio_all, allist) {
if (job->ujob == ujob) {
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
td->td_retval[0] =
job->uaiocb._aiocb_private.error;
else
@@ -2382,35 +2342,36 @@ aio_physwakeup(struct bio *bp)
struct kaiocb *job = (struct kaiocb *)bp->bio_caller1;
struct proc *userp;
struct kaioinfo *ki;
- int nblks;
+ size_t nbytes;
+ int error, nblks;
/* Release mapping into kernel space. */
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
if (job->pbuf) {
pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages);
relpbuf(job->pbuf, NULL);
job->pbuf = NULL;
atomic_subtract_int(&num_buf_aio, 1);
+ AIO_LOCK(ki);
+ ki->kaio_buffer_count--;
+ AIO_UNLOCK(ki);
}
vm_page_unhold_pages(job->pages, job->npages);
bp = job->bp;
job->bp = NULL;
- userp = job->userproc;
- ki = userp->p_aioinfo;
- AIO_LOCK(ki);
- job->uaiocb._aiocb_private.status -= bp->bio_resid;
- job->uaiocb._aiocb_private.error = 0;
+ nbytes = job->uaiocb.aio_nbytes - bp->bio_resid;
+ error = 0;
if (bp->bio_flags & BIO_ERROR)
- job->uaiocb._aiocb_private.error = bp->bio_error;
- nblks = btodb(job->uaiocb.aio_nbytes);
+ error = bp->bio_error;
+ nblks = btodb(nbytes);
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
job->outputcharge += nblks;
else
job->inputcharge += nblks;
- TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist);
- ki->kaio_buffer_count--;
- aio_bio_done_notify(userp, job, DONE_BUF);
- AIO_UNLOCK(ki);
+
+ aio_complete(job, nbytes, error);
g_destroy_bio(bp);
}
@@ -2465,7 +2426,7 @@ kern_aio_waitcomplete(struct thread *td, struct aiocb **ujobp,
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
ujob = job->ujob;
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
@@ -2570,7 +2531,7 @@ filt_aio(struct knote *kn, long hint)
struct kaiocb *job = kn->kn_ptr.p_aio;
kn->kn_data = job->uaiocb._aiocb_private.error;
- if (job->jobstate != JOBST_JOBFINISHED)
+ if (!(job->jobflags & KAIOCB_FINISHED))
return (0);
kn->kn_flags |= EV_EOF;
return (1);
OpenPOWER on IntegriCloud