summaryrefslogtreecommitdiffstats
path: root/sys/kern
diff options
context:
space:
mode:
Diffstat (limited to 'sys/kern')
-rw-r--r--sys/kern/sys_socket.c397
-rw-r--r--sys/kern/uipc_debug.c4
-rw-r--r--sys/kern/uipc_sockbuf.c2
-rw-r--r--sys/kern/uipc_socket.c6
-rw-r--r--sys/kern/vfs_aio.c737
5 files changed, 755 insertions, 391 deletions
diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c
index dd831ae..ffdb003 100644
--- a/sys/kern/sys_socket.c
+++ b/sys/kern/sys_socket.c
@@ -34,9 +34,12 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/aio.h>
#include <sys/domain.h>
#include <sys/file.h>
#include <sys/filedesc.h>
+#include <sys/kernel.h>
+#include <sys/kthread.h>
#include <sys/malloc.h>
#include <sys/proc.h>
#include <sys/protosw.h>
@@ -48,6 +51,9 @@ __FBSDID("$FreeBSD$");
#include <sys/filio.h> /* XXX */
#include <sys/sockio.h>
#include <sys/stat.h>
+#include <sys/sysctl.h>
+#include <sys/sysproto.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/ucred.h>
#include <sys/un.h>
@@ -64,6 +70,22 @@ __FBSDID("$FreeBSD$");
#include <security/mac/mac_framework.h>
+#include <vm/vm.h>
+#include <vm/pmap.h>
+#include <vm/vm_extern.h>
+#include <vm/vm_map.h>
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL,
+ "socket AIO stats");
+
+static int empty_results;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results,
+ 0, "socket operation returned EAGAIN");
+
+static int empty_retries;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries,
+ 0, "socket operation retries");
+
static fo_rdwr_t soo_read;
static fo_rdwr_t soo_write;
static fo_ioctl_t soo_ioctl;
@@ -72,6 +94,9 @@ extern fo_kqfilter_t soo_kqfilter;
static fo_stat_t soo_stat;
static fo_close_t soo_close;
static fo_fill_kinfo_t soo_fill_kinfo;
+static fo_aio_queue_t soo_aio_queue;
+
+static void soo_aio_cancel(struct kaiocb *job);
struct fileops socketops = {
.fo_read = soo_read,
@@ -86,6 +111,7 @@ struct fileops socketops = {
.fo_chown = invfo_chown,
.fo_sendfile = invfo_sendfile,
.fo_fill_kinfo = soo_fill_kinfo,
+ .fo_aio_queue = soo_aio_queue,
.fo_flags = DFLAG_PASSABLE
};
@@ -363,3 +389,374 @@ soo_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp)
sizeof(kif->kf_path));
return (0);
}
+
+static STAILQ_HEAD(, task) soaio_jobs;
+static struct mtx soaio_jobs_lock;
+static struct task soaio_kproc_task;
+static int soaio_starting, soaio_idle, soaio_queued;
+static struct unrhdr *soaio_kproc_unr;
+
+static int soaio_max_procs = MAX_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0,
+ "Maximum number of kernel processes to use for async socket IO");
+
+static int soaio_num_procs;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0,
+ "Number of active kernel processes for async socket IO");
+
+static int soaio_target_procs = TARGET_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD,
+ &soaio_target_procs, 0,
+ "Preferred number of ready kernel processes for async socket IO");
+
+static int soaio_lifetime;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0,
+ "Maximum lifetime for idle aiod");
+
+static void
+soaio_kproc_loop(void *arg)
+{
+ struct proc *p;
+ struct vmspace *myvm;
+ struct task *task;
+ int error, id, pending;
+
+ id = (intptr_t)arg;
+
+ /*
+ * Grab an extra reference on the daemon's vmspace so that it
+ * doesn't get freed by jobs that switch to a different
+ * vmspace.
+ */
+ p = curproc;
+ myvm = vmspace_acquire_ref(p);
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(soaio_starting > 0);
+ soaio_starting--;
+ for (;;) {
+ while (!STAILQ_EMPTY(&soaio_jobs)) {
+ task = STAILQ_FIRST(&soaio_jobs);
+ STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link);
+ soaio_queued--;
+ pending = task->ta_pending;
+ task->ta_pending = 0;
+ mtx_unlock(&soaio_jobs_lock);
+
+ task->ta_func(task->ta_context, pending);
+
+ mtx_lock(&soaio_jobs_lock);
+ }
+ MPASS(soaio_queued == 0);
+
+ if (p->p_vmspace != myvm) {
+ mtx_unlock(&soaio_jobs_lock);
+ vmspace_switch_aio(myvm);
+ mtx_lock(&soaio_jobs_lock);
+ continue;
+ }
+
+ soaio_idle++;
+ error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-",
+ soaio_lifetime);
+ soaio_idle--;
+ if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) &&
+ soaio_num_procs > soaio_target_procs)
+ break;
+ }
+ soaio_num_procs--;
+ mtx_unlock(&soaio_jobs_lock);
+ free_unr(soaio_kproc_unr, id);
+ kproc_exit(0);
+}
+
+static void
+soaio_kproc_create(void *context, int pending)
+{
+ struct proc *p;
+ int error, id;
+
+ mtx_lock(&soaio_jobs_lock);
+ for (;;) {
+ if (soaio_num_procs < soaio_target_procs) {
+ /* Must create */
+ } else if (soaio_num_procs >= soaio_max_procs) {
+ /*
+ * Hit the limit on kernel processes, don't
+ * create another one.
+ */
+ break;
+ } else if (soaio_queued <= soaio_idle + soaio_starting) {
+ /*
+ * No more AIO jobs waiting for a process to be
+ * created, so stop.
+ */
+ break;
+ }
+ soaio_starting++;
+ mtx_unlock(&soaio_jobs_lock);
+
+ id = alloc_unr(soaio_kproc_unr);
+ error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id,
+ &p, 0, 0, "soaiod%d", id);
+ if (error != 0) {
+ free_unr(soaio_kproc_unr, id);
+ mtx_lock(&soaio_jobs_lock);
+ soaio_starting--;
+ break;
+ }
+
+ mtx_lock(&soaio_jobs_lock);
+ soaio_num_procs++;
+ }
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_enqueue(struct task *task)
+{
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(task->ta_pending == 0);
+ task->ta_pending++;
+ STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link);
+ soaio_queued++;
+ if (soaio_queued <= soaio_idle)
+ wakeup_one(&soaio_idle);
+ else if (soaio_num_procs < soaio_max_procs)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_init(void)
+{
+
+ soaio_lifetime = AIOD_LIFETIME_DEFAULT;
+ STAILQ_INIT(&soaio_jobs);
+ mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF);
+ soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL);
+ TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL);
+ if (soaio_target_procs > 0)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+}
+SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL);
+
+static __inline int
+soaio_ready(struct socket *so, struct sockbuf *sb)
+{
+ return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so));
+}
+
+static void
+soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
+{
+ struct ucred *td_savedcred;
+ struct thread *td;
+ struct file *fp;
+ struct uio uio;
+ struct iovec iov;
+ size_t cnt;
+ int error, flags;
+
+ SOCKBUF_UNLOCK(sb);
+ aio_switch_vmspace(job);
+ td = curthread;
+ fp = job->fd_file;
+retry:
+ td_savedcred = td->td_ucred;
+ td->td_ucred = job->cred;
+
+ cnt = job->uaiocb.aio_nbytes;
+ iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf;
+ iov.iov_len = cnt;
+ uio.uio_iov = &iov;
+ uio.uio_iovcnt = 1;
+ uio.uio_offset = 0;
+ uio.uio_resid = cnt;
+ uio.uio_segflg = UIO_USERSPACE;
+ uio.uio_td = td;
+ flags = MSG_NBIO;
+
+ /* TODO: Charge ru_msg* to job. */
+
+ if (sb == &so->so_rcv) {
+ uio.uio_rw = UIO_READ;
+#ifdef MAC
+ error = mac_socket_check_receive(fp->f_cred, so);
+ if (error == 0)
+
+#endif
+ error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
+ } else {
+ uio.uio_rw = UIO_WRITE;
+#ifdef MAC
+ error = mac_socket_check_send(fp->f_cred, so);
+ if (error == 0)
+#endif
+ error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
+ if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
+ }
+ }
+
+ cnt -= uio.uio_resid;
+ td->td_ucred = td_savedcred;
+
+ /* XXX: Not sure if this is needed? */
+ if (cnt != 0 && (error == ERESTART || error == EINTR ||
+ error == EWOULDBLOCK))
+ error = 0;
+ if (error == EWOULDBLOCK) {
+ /*
+ * A read() or write() on the socket raced with this
+ * request. If the socket is now ready, try again.
+ * If it is not, place this request at the head of the
+ * queue to try again when the socket is ready.
+ */
+ SOCKBUF_LOCK(sb);
+ empty_results++;
+ if (soaio_ready(so, sb)) {
+ empty_retries++;
+ SOCKBUF_UNLOCK(sb);
+ goto retry;
+ }
+
+ if (!aio_set_cancel_function(job, soo_aio_cancel)) {
+ MPASS(cnt == 0);
+ SOCKBUF_UNLOCK(sb);
+ aio_cancel(job);
+ SOCKBUF_LOCK(sb);
+ } else {
+ TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
+ }
+ } else {
+ aio_complete(job, cnt, error);
+ SOCKBUF_LOCK(sb);
+ }
+}
+
+static void
+soaio_process_sb(struct socket *so, struct sockbuf *sb)
+{
+ struct kaiocb *job;
+
+ SOCKBUF_LOCK(sb);
+ while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) {
+ job = TAILQ_FIRST(&sb->sb_aiojobq);
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (!aio_clear_cancel_function(job))
+ continue;
+
+ soaio_process_job(so, sb, job);
+ }
+
+ /*
+ * If there are still pending requests, the socket must not be
+ * ready so set SB_AIO to request a wakeup when the socket
+ * becomes ready.
+ */
+ if (!TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags |= SB_AIO;
+ sb->sb_flags &= ~SB_AIO_RUNNING;
+ SOCKBUF_UNLOCK(sb);
+
+ ACCEPT_LOCK();
+ SOCK_LOCK(so);
+ sorele(so);
+}
+
+void
+soaio_rcv(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_rcv);
+}
+
+void
+soaio_snd(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_snd);
+}
+
+void
+sowakeup_aio(struct socket *so, struct sockbuf *sb)
+{
+
+ SOCKBUF_LOCK_ASSERT(sb);
+ sb->sb_flags &= ~SB_AIO;
+ if (sb->sb_flags & SB_AIO_RUNNING)
+ return;
+ sb->sb_flags |= SB_AIO_RUNNING;
+ if (sb == &so->so_snd)
+ SOCK_LOCK(so);
+ soref(so);
+ if (sb == &so->so_snd)
+ SOCK_UNLOCK(so);
+ soaio_enqueue(&sb->sb_aiotask);
+}
+
+static void
+soo_aio_cancel(struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+ int opcode;
+
+ so = job->fd_file->f_data;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_READ)
+ sb = &so->so_rcv;
+ else {
+ MPASS(opcode == LIO_WRITE);
+ sb = &so->so_snd;
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags &= ~SB_AIO;
+ SOCKBUF_UNLOCK(sb);
+
+ aio_cancel(job);
+}
+
+static int
+soo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+
+ so = fp->f_data;
+ switch (job->uaiocb.aio_lio_opcode) {
+ case LIO_READ:
+ sb = &so->so_rcv;
+ break;
+ case LIO_WRITE:
+ sb = &so->so_snd;
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_set_cancel_function(job, soo_aio_cancel))
+ panic("new job was cancelled");
+ TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
+ if (!(sb->sb_flags & SB_AIO_RUNNING)) {
+ if (soaio_ready(so, sb))
+ sowakeup_aio(so, sb);
+ else
+ sb->sb_flags |= SB_AIO;
+ }
+ SOCKBUF_UNLOCK(sb);
+ return (0);
+}
diff --git a/sys/kern/uipc_debug.c b/sys/kern/uipc_debug.c
index 7c8b93c..fb4ee52 100644
--- a/sys/kern/uipc_debug.c
+++ b/sys/kern/uipc_debug.c
@@ -416,6 +416,9 @@ db_print_sockbuf(struct sockbuf *sb, const char *sockbufname, int indent)
db_printf("sb_flags: 0x%x (", sb->sb_flags);
db_print_sbflags(sb->sb_flags);
db_printf(")\n");
+
+ db_print_indent(indent);
+ db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq));
}
static void
@@ -470,7 +473,6 @@ db_print_socket(struct socket *so, const char *socketname, int indent)
db_print_indent(indent);
db_printf("so_sigio: %p ", so->so_sigio);
db_printf("so_oobmark: %lu ", so->so_oobmark);
- db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq));
db_print_sockbuf(&so->so_rcv, "so_rcv", indent);
db_print_sockbuf(&so->so_snd, "so_snd", indent);
diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c
index edf03a3..0cdd43b 100644
--- a/sys/kern/uipc_sockbuf.c
+++ b/sys/kern/uipc_sockbuf.c
@@ -332,7 +332,7 @@ sowakeup(struct socket *so, struct sockbuf *sb)
} else
ret = SU_OK;
if (sb->sb_flags & SB_AIO)
- aio_swake(so, sb);
+ sowakeup_aio(so, sb);
SOCKBUF_UNLOCK(sb);
if (ret == SU_ISCONNECTED)
soisconnected(so);
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
index 5d2247f..35d17b1 100644
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -134,6 +134,7 @@ __FBSDID("$FreeBSD$");
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/jail.h>
#include <sys/syslog.h>
@@ -396,7 +397,10 @@ soalloc(struct vnet *vnet)
SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv");
sx_init(&so->so_snd.sb_sx, "so_snd_sx");
sx_init(&so->so_rcv.sb_sx, "so_rcv_sx");
- TAILQ_INIT(&so->so_aiojobq);
+ TAILQ_INIT(&so->so_snd.sb_aiojobq);
+ TAILQ_INIT(&so->so_rcv.sb_aiojobq);
+ TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so);
+ TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so);
#ifdef VIMAGE
VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p",
__func__, __LINE__, so));
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
index 5b2083c..59dd57c 100644
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -72,8 +72,6 @@ __FBSDID("$FreeBSD$");
#include <vm/uma.h>
#include <sys/aio.h>
-#include "opt_vfs_aio.h"
-
/*
* Counter for allocating reference ids to new jobs. Wrapped to 1 on
* overflow. (XXX will be removed soon.)
@@ -85,14 +83,6 @@ static u_long jobrefid;
*/
static uint64_t jobseqno;
-#define JOBST_NULL 0
-#define JOBST_JOBQSOCK 1
-#define JOBST_JOBQGLOBAL 2
-#define JOBST_JOBRUNNING 3
-#define JOBST_JOBFINISHED 4
-#define JOBST_JOBQBUF 5
-#define JOBST_JOBQSYNC 6
-
#ifndef MAX_AIO_PER_PROC
#define MAX_AIO_PER_PROC 32
#endif
@@ -101,26 +91,14 @@ static uint64_t jobseqno;
#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef MAX_AIO_PROCS
-#define MAX_AIO_PROCS 32
-#endif
-
#ifndef MAX_AIO_QUEUE
#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef TARGET_AIO_PROCS
-#define TARGET_AIO_PROCS 4
-#endif
-
#ifndef MAX_BUF_AIO
#define MAX_BUF_AIO 16
#endif
-#ifndef AIOD_LIFETIME_DEFAULT
-#define AIOD_LIFETIME_DEFAULT (30 * hz)
-#endif
-
FEATURE(aio, "Asynchronous I/O");
static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
@@ -128,6 +106,10 @@ static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0,
"Async IO management");
+static int enable_aio_unsafe = 0;
+SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0,
+ "Permit asynchronous IO on all file types, not just known-safe types");
+
static int max_aio_procs = MAX_AIO_PROCS;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0,
"Maximum number of kernel processes to use for handling async IO ");
@@ -165,11 +147,6 @@ static int aiod_lifetime;
SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
"Maximum lifetime for idle aiod");
-static int unloadable = 0;
-SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
- "Allow unload of aio (not recommended)");
-
-
static int max_aio_per_proc = MAX_AIO_PER_PROC;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
0,
@@ -208,46 +185,27 @@ typedef struct oaiocb {
*/
/*
- * Current, there is only two backends: BIO and generic file I/O.
- * socket I/O is served by generic file I/O, this is not a good idea, since
- * disk file I/O and any other types without O_NONBLOCK flag can block daemon
- * processes, if there is no thread to serve socket I/O, the socket I/O will be
- * delayed too long or starved, we should create some processes dedicated to
- * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
- * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
- * structure is not safe because there is race between userland and aio
- * daemons.
+ * If the routine that services an AIO request blocks while running in an
+ * AIO kernel process it can starve other I/O requests. BIO requests
+ * queued via aio_qphysio() complete in GEOM and do not use AIO kernel
+ * processes at all. Socket I/O requests use a separate pool of
+ * kprocs and also force non-blocking I/O. Other file I/O requests
+ * use the generic fo_read/fo_write operations which can block. The
+ * fsync and mlock operations can also block while executing. Ideally
+ * none of these requests would block while executing.
+ *
+ * Note that the service routines cannot toggle O_NONBLOCK in the file
+ * structure directly while handling a request due to races with
+ * userland threads.
*/
-struct kaiocb {
- TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */
- TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */
- TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
- int jobflags; /* (a) job flags */
- int jobstate; /* (b) job state */
- int inputcharge; /* (*) input blockes */
- int outputcharge; /* (*) output blockes */
- struct bio *bp; /* (*) BIO backend BIO pointer */
- struct buf *pbuf; /* (*) BIO backend buffer pointer */
- struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
- int npages; /* BIO backend number of pages */
- struct proc *userproc; /* (*) user process */
- struct ucred *cred; /* (*) active credential when created */
- struct file *fd_file; /* (*) pointer to file structure */
- struct aioliojob *lio; /* (*) optional lio job */
- struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
- struct knlist klist; /* (a) list of knotes */
- struct aiocb uaiocb; /* (*) kernel I/O control block */
- ksiginfo_t ksi; /* (a) realtime signal info */
- uint64_t seqno; /* (*) job number */
- int pending; /* (a) number of pending I/O, aio_fsync only */
-};
-
/* jobflags */
-#define KAIOCB_DONE 0x01
-#define KAIOCB_BUFDONE 0x02
-#define KAIOCB_RUNDOWN 0x04
+#define KAIOCB_QUEUEING 0x01
+#define KAIOCB_CANCELLED 0x02
+#define KAIOCB_CANCELLING 0x04
#define KAIOCB_CHECKSYNC 0x08
+#define KAIOCB_CLEARED 0x10
+#define KAIOCB_FINISHED 0x20
/*
* AIO process info
@@ -293,9 +251,10 @@ struct kaioinfo {
TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */
- TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */
TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */
+ TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */
struct task kaio_task; /* (*) task to kick aio processes */
+ struct task kaio_sync_task; /* (*) task to schedule fsync jobs */
};
#define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx)
@@ -332,21 +291,18 @@ static int aio_free_entry(struct kaiocb *job);
static void aio_process_rw(struct kaiocb *job);
static void aio_process_sync(struct kaiocb *job);
static void aio_process_mlock(struct kaiocb *job);
+static void aio_schedule_fsync(void *context, int pending);
static int aio_newproc(int *);
int aio_aqueue(struct thread *td, struct aiocb *ujob,
struct aioliojob *lio, int type, struct aiocb_ops *ops);
+static int aio_queue_file(struct file *fp, struct kaiocb *job);
static void aio_physwakeup(struct bio *bp);
static void aio_proc_rundown(void *arg, struct proc *p);
static void aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp);
static int aio_qphysio(struct proc *p, struct kaiocb *job);
static void aio_daemon(void *param);
-static void aio_swake_cb(struct socket *, struct sockbuf *);
-static int aio_unload(void);
-static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job,
- int type);
-#define DONE_BUF 1
-#define DONE_QUEUE 2
+static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job);
static int aio_kick(struct proc *userp);
static void aio_kick_nowait(struct proc *userp);
static void aio_kick_helper(void *context, int pending);
@@ -397,13 +353,10 @@ aio_modload(struct module *module, int cmd, void *arg)
case MOD_LOAD:
aio_onceonly();
break;
- case MOD_UNLOAD:
- error = aio_unload();
- break;
case MOD_SHUTDOWN:
break;
default:
- error = EINVAL;
+ error = EOPNOTSUPP;
break;
}
return (error);
@@ -471,8 +424,6 @@ aio_onceonly(void)
{
int error;
- /* XXX: should probably just use so->callback */
- aio_swake = &aio_swake_cb;
exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
EVENTHANDLER_PRI_ANY);
exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec,
@@ -513,55 +464,6 @@ aio_onceonly(void)
}
/*
- * Callback for unload of AIO when used as a module.
- */
-static int
-aio_unload(void)
-{
- int error;
-
- /*
- * XXX: no unloads by default, it's too dangerous.
- * perhaps we could do it if locked out callers and then
- * did an aio_proc_rundown() on each process.
- *
- * jhb: aio_proc_rundown() needs to run on curproc though,
- * so I don't think that would fly.
- */
- if (!unloadable)
- return (EOPNOTSUPP);
-
-#ifdef COMPAT_FREEBSD32
- syscall32_helper_unregister(aio32_syscalls);
-#endif
- syscall_helper_unregister(aio_syscalls);
-
- error = kqueue_del_filteropts(EVFILT_AIO);
- if (error)
- return error;
- error = kqueue_del_filteropts(EVFILT_LIO);
- if (error)
- return error;
- async_io_version = 0;
- aio_swake = NULL;
- taskqueue_free(taskqueue_aiod_kick);
- delete_unrhdr(aiod_unr);
- uma_zdestroy(kaio_zone);
- uma_zdestroy(aiop_zone);
- uma_zdestroy(aiocb_zone);
- uma_zdestroy(aiol_zone);
- uma_zdestroy(aiolio_zone);
- EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
- EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
- mtx_destroy(&aio_job_mtx);
- sema_destroy(&aio_newproc_sem);
- p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
- return (0);
-}
-
-/*
* Init the per-process aioinfo structure. The aioinfo limits are set
* per-process for user limit (resource) management.
*/
@@ -582,10 +484,11 @@ aio_init_aioinfo(struct proc *p)
TAILQ_INIT(&ki->kaio_all);
TAILQ_INIT(&ki->kaio_done);
TAILQ_INIT(&ki->kaio_jobqueue);
- TAILQ_INIT(&ki->kaio_bufqueue);
TAILQ_INIT(&ki->kaio_liojoblist);
TAILQ_INIT(&ki->kaio_syncqueue);
+ TAILQ_INIT(&ki->kaio_syncready);
TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p);
+ TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki);
PROC_LOCK(p);
if (p->p_aioinfo == NULL) {
p->p_aioinfo = ki;
@@ -637,7 +540,7 @@ aio_free_entry(struct kaiocb *job)
MPASS(ki != NULL);
AIO_LOCK_ASSERT(ki, MA_OWNED);
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
atomic_subtract_int(&num_queue_count, 1);
@@ -670,7 +573,6 @@ aio_free_entry(struct kaiocb *job)
PROC_UNLOCK(p);
MPASS(job->bp == NULL);
- job->jobstate = JOBST_NULL;
AIO_UNLOCK(ki);
/*
@@ -709,6 +611,57 @@ aio_proc_rundown_exec(void *arg, struct proc *p,
aio_proc_rundown(arg, p);
}
+static int
+aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job)
+{
+ aio_cancel_fn_t *func;
+ int cancelled;
+
+ AIO_LOCK_ASSERT(ki, MA_OWNED);
+ if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED))
+ return (0);
+ MPASS((job->jobflags & KAIOCB_CANCELLING) == 0);
+ job->jobflags |= KAIOCB_CANCELLED;
+
+ func = job->cancel_fn;
+
+ /*
+ * If there is no cancel routine, just leave the job marked as
+ * cancelled. The job should be in active use by a caller who
+ * should complete it normally or when it fails to install a
+ * cancel routine.
+ */
+ if (func == NULL)
+ return (0);
+
+ /*
+ * Set the CANCELLING flag so that aio_complete() will defer
+ * completions of this job. This prevents the job from being
+ * freed out from under the cancel callback. After the
+ * callback any deferred completion (whether from the callback
+ * or any other source) will be completed.
+ */
+ job->jobflags |= KAIOCB_CANCELLING;
+ AIO_UNLOCK(ki);
+ func(job);
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_CANCELLING;
+ if (job->jobflags & KAIOCB_FINISHED) {
+ cancelled = job->uaiocb._aiocb_private.error == ECANCELED;
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(p, job);
+ } else {
+ /*
+ * The cancel callback might have scheduled an
+ * operation to cancel this request, but it is
+ * only counted as cancelled if the request is
+ * cancelled when the callback returns.
+ */
+ cancelled = 0;
+ }
+ return (cancelled);
+}
+
/*
* Rundown the jobs for a given process.
*/
@@ -718,9 +671,6 @@ aio_proc_rundown(void *arg, struct proc *p)
struct kaioinfo *ki;
struct aioliojob *lj;
struct kaiocb *job, *jobn;
- struct file *fp;
- struct socket *so;
- int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@@ -738,35 +688,11 @@ restart:
* aio_cancel on all pending I/O requests.
*/
TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) {
- remove = 0;
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- fp = job->fd_file;
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- job->jobstate = JOBST_JOBFINISHED;
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(p, job, DONE_QUEUE);
- }
+ aio_cancel_job(p, ki, job);
}
/* Wait for all running I/O to be finished */
- if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
- TAILQ_FIRST(&ki->kaio_jobqueue)) {
+ if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) {
ki->kaio_flags |= KAIO_WAKEUP;
msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz);
goto restart;
@@ -791,6 +717,7 @@ restart:
}
AIO_UNLOCK(ki);
taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task);
+ taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task);
mtx_destroy(&ki->kaio_mtx);
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
@@ -807,15 +734,18 @@ aio_selectjob(struct aioproc *aiop)
struct proc *userp;
mtx_assert(&aio_job_mtx, MA_OWNED);
+restart:
TAILQ_FOREACH(job, &aio_jobs, list) {
userp = job->userproc;
ki = userp->p_aioinfo;
if (ki->kaio_active_count < ki->kaio_maxactive_count) {
TAILQ_REMOVE(&aio_jobs, job, list);
+ if (!aio_clear_cancel_function(job))
+ goto restart;
+
/* Account for currently active jobs. */
ki->kaio_active_count++;
- job->jobstate = JOBST_JOBRUNNING;
break;
}
}
@@ -863,7 +793,6 @@ aio_process_rw(struct kaiocb *job)
struct thread *td;
struct aiocb *cb;
struct file *fp;
- struct socket *so;
struct uio auio;
struct iovec aiov;
int cnt;
@@ -875,6 +804,7 @@ aio_process_rw(struct kaiocb *job)
job->uaiocb.aio_lio_opcode == LIO_WRITE,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
td = curthread;
td_savedcred = td->td_ucred;
td->td_ucred = job->cred;
@@ -920,24 +850,15 @@ aio_process_rw(struct kaiocb *job)
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
error = 0;
if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
- int sigpipe = 1;
- if (fp->f_type == DTYPE_SOCKET) {
- so = fp->f_data;
- if (so->so_options & SO_NOSIGPIPE)
- sigpipe = 0;
- }
- if (sigpipe) {
- PROC_LOCK(job->userproc);
- kern_psignal(job->userproc, SIGPIPE);
- PROC_UNLOCK(job->userproc);
- }
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
}
}
cnt -= auio.uio_resid;
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = cnt;
td->td_ucred = td_savedcred;
+ aio_complete(job, cnt, error);
}
static void
@@ -945,7 +866,6 @@ aio_process_sync(struct kaiocb *job)
{
struct thread *td = curthread;
struct ucred *td_savedcred = td->td_ucred;
- struct aiocb *cb = &job->uaiocb;
struct file *fp = job->fd_file;
int error = 0;
@@ -955,9 +875,8 @@ aio_process_sync(struct kaiocb *job)
td->td_ucred = job->cred;
if (fp->f_vnode != NULL)
error = aio_fsync_vnode(td, fp->f_vnode);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
td->td_ucred = td_savedcred;
+ aio_complete(job, 0, error);
}
static void
@@ -969,19 +888,20 @@ aio_process_mlock(struct kaiocb *job)
KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
error = vm_mlock(job->userproc, job->cred,
__DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
+ aio_complete(job, 0, error);
}
static void
-aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
+aio_bio_done_notify(struct proc *userp, struct kaiocb *job)
{
struct aioliojob *lj;
struct kaioinfo *ki;
struct kaiocb *sjob, *sjobn;
int lj_done;
+ bool schedule_fsync;
ki = userp->p_aioinfo;
AIO_LOCK_ASSERT(ki, MA_OWNED);
@@ -992,13 +912,8 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
if (lj->lioj_count == lj->lioj_finished_count)
lj_done = 1;
}
- if (type == DONE_QUEUE) {
- job->jobflags |= KAIOCB_DONE;
- } else {
- job->jobflags |= KAIOCB_BUFDONE;
- }
TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist);
- job->jobstate = JOBST_JOBFINISHED;
+ MPASS(job->jobflags & KAIOCB_FINISHED);
if (ki->kaio_flags & KAIO_RUNDOWN)
goto notification_done;
@@ -1025,20 +940,24 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
notification_done:
if (job->jobflags & KAIOCB_CHECKSYNC) {
+ schedule_fsync = false;
TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) {
if (job->fd_file == sjob->fd_file &&
job->seqno < sjob->seqno) {
if (--sjob->pending == 0) {
- mtx_lock(&aio_job_mtx);
- sjob->jobstate = JOBST_JOBQGLOBAL;
TAILQ_REMOVE(&ki->kaio_syncqueue, sjob,
list);
- TAILQ_INSERT_TAIL(&aio_jobs, sjob, list);
- aio_kick_nowait(userp);
- mtx_unlock(&aio_job_mtx);
+ if (!aio_clear_cancel_function(sjob))
+ continue;
+ TAILQ_INSERT_TAIL(&ki->kaio_syncready,
+ sjob, list);
+ schedule_fsync = true;
}
}
}
+ if (schedule_fsync)
+ taskqueue_enqueue(taskqueue_aiod_kick,
+ &ki->kaio_sync_task);
}
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
@@ -1047,6 +966,103 @@ notification_done:
}
static void
+aio_schedule_fsync(void *context, int pending)
+{
+ struct kaioinfo *ki;
+ struct kaiocb *job;
+
+ ki = context;
+ AIO_LOCK(ki);
+ while (!TAILQ_EMPTY(&ki->kaio_syncready)) {
+ job = TAILQ_FIRST(&ki->kaio_syncready);
+ TAILQ_REMOVE(&ki->kaio_syncready, job, list);
+ AIO_UNLOCK(ki);
+ aio_schedule(job, aio_process_sync);
+ AIO_LOCK(ki);
+ }
+ AIO_UNLOCK(ki);
+}
+
+bool
+aio_cancel_cleared(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ /*
+ * The caller should hold the same queue lock held when
+ * aio_clear_cancel_function() was called and set this flag
+ * ensuring this check sees an up-to-date value. However,
+ * there is no way to assert that.
+ */
+ ki = job->userproc->p_aioinfo;
+ return ((job->jobflags & KAIOCB_CLEARED) != 0);
+}
+
+bool
+aio_clear_cancel_function(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ MPASS(job->cancel_fn != NULL);
+ if (job->jobflags & KAIOCB_CANCELLING) {
+ job->jobflags |= KAIOCB_CLEARED;
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = NULL;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+bool
+aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ if (job->jobflags & KAIOCB_CANCELLED) {
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = func;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+void
+aio_complete(struct kaiocb *job, long status, int error)
+{
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ job->uaiocb._aiocb_private.error = error;
+ job->uaiocb._aiocb_private.status = status;
+
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
+
+ AIO_LOCK(ki);
+ KASSERT(!(job->jobflags & KAIOCB_FINISHED),
+ ("duplicate aio_complete"));
+ job->jobflags |= KAIOCB_FINISHED;
+ if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) {
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(userp, job);
+ }
+ AIO_UNLOCK(ki);
+}
+
+void
+aio_cancel(struct kaiocb *job)
+{
+
+ aio_complete(job, -1, ECANCELED);
+}
+
+void
aio_switch_vmspace(struct kaiocb *job)
{
@@ -1063,7 +1079,7 @@ aio_daemon(void *_id)
struct kaiocb *job;
struct aioproc *aiop;
struct kaioinfo *ki;
- struct proc *p, *userp;
+ struct proc *p;
struct vmspace *myvm;
struct thread *td = curthread;
int id = (intptr_t)_id;
@@ -1107,40 +1123,13 @@ aio_daemon(void *_id)
*/
while ((job = aio_selectjob(aiop)) != NULL) {
mtx_unlock(&aio_job_mtx);
- userp = job->userproc;
-
- /*
- * Connect to process address space for user program.
- */
- aio_switch_vmspace(job);
- ki = userp->p_aioinfo;
-
- /* Do the I/O function. */
- switch(job->uaiocb.aio_lio_opcode) {
- case LIO_READ:
- case LIO_WRITE:
- aio_process_rw(job);
- break;
- case LIO_SYNC:
- aio_process_sync(job);
- break;
- case LIO_MLOCK:
- aio_process_mlock(job);
- break;
- }
+ ki = job->userproc->p_aioinfo;
+ job->handle_fn(job);
mtx_lock(&aio_job_mtx);
/* Decrement the active job count. */
ki->kaio_active_count--;
- mtx_unlock(&aio_job_mtx);
-
- AIO_LOCK(ki);
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(userp, job, DONE_QUEUE);
- AIO_UNLOCK(ki);
-
- mtx_lock(&aio_job_mtx);
}
/*
@@ -1236,7 +1225,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
struct cdevsw *csw;
struct cdev *dev;
struct kaioinfo *ki;
- struct aioliojob *lj;
int error, ref, unmap, poff;
vm_prot_t prot;
@@ -1293,16 +1281,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
}
AIO_LOCK(ki);
- ki->kaio_count++;
if (!unmap)
ki->kaio_buffer_count++;
- lj = job->lio;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- job->jobstate = JOBST_JOBQBUF;
- cb->_aiocb_private.status = cb->aio_nbytes;
AIO_UNLOCK(ki);
bp->bio_length = cb->aio_nbytes;
@@ -1336,7 +1316,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
bp->bio_flags |= BIO_UNMAPPED;
}
- atomic_add_int(&num_queue_count, 1);
if (!unmap)
atomic_add_int(&num_buf_aio, 1);
@@ -1347,14 +1326,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job)
doerror:
AIO_LOCK(ki);
- job->jobstate = JOBST_NULL;
- TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist);
- TAILQ_REMOVE(&ki->kaio_all, job, allist);
- ki->kaio_count--;
if (!unmap)
ki->kaio_buffer_count--;
- if (lj)
- lj->lioj_count--;
AIO_UNLOCK(ki);
if (pbuf) {
relpbuf(pbuf, NULL);
@@ -1367,40 +1340,6 @@ unref:
return (error);
}
-/*
- * Wake up aio requests that may be serviceable now.
- */
-static void
-aio_swake_cb(struct socket *so, struct sockbuf *sb)
-{
- struct kaiocb *job, *jobn;
- int opcode;
-
- SOCKBUF_LOCK_ASSERT(sb);
- if (sb == &so->so_snd)
- opcode = LIO_WRITE;
- else
- opcode = LIO_READ;
-
- sb->sb_flags &= ~SB_AIO;
- mtx_lock(&aio_job_mtx);
- TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) {
- if (opcode == job->uaiocb.aio_lio_opcode) {
- if (job->jobstate != JOBST_JOBQSOCK)
- panic("invalid queue value");
- /* XXX
- * We don't have actual sockets backend yet,
- * so we simply move the requests to the generic
- * file I/O backend.
- */
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- aio_kick_nowait(job->userproc);
- }
- }
- mtx_unlock(&aio_job_mtx);
-}
-
static int
convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig)
{
@@ -1521,11 +1460,9 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj,
struct proc *p = td->td_proc;
cap_rights_t rights;
struct file *fp;
- struct socket *so;
- struct kaiocb *job, *job2;
+ struct kaiocb *job;
struct kaioinfo *ki;
struct kevent kev;
- struct sockbuf *sb;
int opcode;
int error;
int fd, kqfd;
@@ -1668,86 +1605,128 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj,
kev.data = (intptr_t)job;
kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr;
error = kqfd_register(kqfd, &kev, td, 1);
-aqueue_fail:
- if (error) {
- if (fp)
- fdrop(fp, td);
- uma_zfree(aiocb_zone, job);
- ops->store_error(ujob, error);
- goto done;
- }
+ if (error)
+ goto aqueue_fail;
+
no_kqueue:
ops->store_error(ujob, EINPROGRESS);
job->uaiocb._aiocb_private.error = EINPROGRESS;
job->userproc = p;
job->cred = crhold(td->td_ucred);
- job->jobflags = 0;
+ job->jobflags = KAIOCB_QUEUEING;
job->lio = lj;
- if (opcode == LIO_SYNC)
- goto queueit;
+ if (opcode == LIO_MLOCK) {
+ aio_schedule(job, aio_process_mlock);
+ error = 0;
+ } else if (fp->f_ops->fo_aio_queue == NULL)
+ error = aio_queue_file(fp, job);
+ else
+ error = fo_aio_queue(fp, job);
+ if (error)
+ goto aqueue_fail;
- if (fp && fp->f_type == DTYPE_SOCKET) {
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_QUEUEING;
+ TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
+ ki->kaio_count++;
+ if (lj)
+ lj->lioj_count++;
+ atomic_add_int(&num_queue_count, 1);
+ if (job->jobflags & KAIOCB_FINISHED) {
/*
- * Alternate queueing for socket ops: Reach down into the
- * descriptor to get the socket data. Then check to see if the
- * socket is ready to be read or written (based on the requested
- * operation).
- *
- * If it is not ready for io, then queue the job on the
- * socket, and set the flags so we get a call when sbnotify()
- * happens.
- *
- * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
- * and unlock the snd sockbuf for no reason.
+ * The queue callback completed the request synchronously.
+ * The bulk of the completion is deferred in that case
+ * until this point.
*/
- so = fp->f_data;
- sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
- SOCKBUF_LOCK(sb);
- if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
- LIO_WRITE) && (!sowriteable(so)))) {
- sb->sb_flags |= SB_AIO;
+ aio_bio_done_notify(p, job);
+ } else
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
+ AIO_UNLOCK(ki);
+ return (0);
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list);
- mtx_unlock(&aio_job_mtx);
+aqueue_fail:
+ knlist_delete(&job->klist, curthread, 0);
+ if (fp)
+ fdrop(fp, td);
+ uma_zfree(aiocb_zone, job);
+ ops->store_error(ujob, error);
+ return (error);
+}
- AIO_LOCK(ki);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- job->jobstate = JOBST_JOBQSOCK;
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- AIO_UNLOCK(ki);
- SOCKBUF_UNLOCK(sb);
- atomic_add_int(&num_queue_count, 1);
- error = 0;
- goto done;
- }
- SOCKBUF_UNLOCK(sb);
+static void
+aio_cancel_daemon_job(struct kaiocb *job)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&aio_jobs, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+void
+aio_schedule(struct kaiocb *job, aio_handle_fn_t *func)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) {
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+ return;
}
+ job->handle_fn = func;
+ TAILQ_INSERT_TAIL(&aio_jobs, job, list);
+ aio_kick_nowait(job->userproc);
+ mtx_unlock(&aio_job_mtx);
+}
+
+static void
+aio_cancel_sync(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+int
+aio_queue_file(struct file *fp, struct kaiocb *job)
+{
+ struct aioliojob *lj;
+ struct kaioinfo *ki;
+ struct kaiocb *job2;
+ int error, opcode;
+
+ lj = job->lio;
+ ki = job->userproc->p_aioinfo;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_SYNC)
+ goto queueit;
- if ((error = aio_qphysio(p, job)) == 0)
+ if ((error = aio_qphysio(job->userproc, job)) == 0)
goto done;
#if 0
- if (error > 0) {
- job->uaiocb._aiocb_private.error = error;
- ops->store_error(ujob, error);
+ /*
+ * XXX: This means qphysio() failed with EFAULT. The current
+ * behavior is to retry the operation via fo_read/fo_write.
+ * Wouldn't it be better to just complete the request with an
+ * error here?
+ */
+ if (error > 0)
goto done;
- }
#endif
queueit:
- atomic_add_int(&num_queue_count, 1);
+ if (!enable_aio_unsafe)
+ return (EOPNOTSUPP);
- AIO_LOCK(ki);
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
if (opcode == LIO_SYNC) {
+ AIO_LOCK(ki);
TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) {
if (job2->fd_file == job->fd_file &&
job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
@@ -1756,28 +1735,32 @@ queueit:
job->pending++;
}
}
- TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) {
- if (job2->fd_file == job->fd_file &&
- job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
- job2->seqno < job->seqno) {
- job2->jobflags |= KAIOCB_CHECKSYNC;
- job->pending++;
- }
- }
if (job->pending != 0) {
+ if (!aio_set_cancel_function(job, aio_cancel_sync)) {
+ AIO_UNLOCK(ki);
+ aio_cancel(job);
+ return (0);
+ }
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list);
- job->jobstate = JOBST_JOBQSYNC;
AIO_UNLOCK(ki);
- goto done;
+ return (0);
}
+ AIO_UNLOCK(ki);
+ }
+
+ switch (opcode) {
+ case LIO_READ:
+ case LIO_WRITE:
+ aio_schedule(job, aio_process_rw);
+ error = 0;
+ break;
+ case LIO_SYNC:
+ aio_schedule(job, aio_process_sync);
+ error = 0;
+ break;
+ default:
+ error = EINVAL;
}
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- job->jobstate = JOBST_JOBQGLOBAL;
- aio_kick_nowait(p);
- mtx_unlock(&aio_job_mtx);
- AIO_UNLOCK(ki);
- error = 0;
done:
return (error);
}
@@ -1864,7 +1847,7 @@ kern_aio_return(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
break;
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
@@ -1933,7 +1916,7 @@ kern_aio_suspend(struct thread *td, int njoblist, struct aiocb **ujoblist,
if (job->ujob == ujoblist[i]) {
if (firstjob == NULL)
firstjob = job;
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
goto RETURN;
}
}
@@ -1992,10 +1975,8 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap)
struct kaioinfo *ki;
struct kaiocb *job, *jobn;
struct file *fp;
- struct socket *so;
cap_rights_t rights;
int error;
- int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@@ -2023,28 +2004,7 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap)
if ((uap->fd == job->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == job->ujob))) {
- remove = 0;
-
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(p, job, DONE_QUEUE);
+ if (aio_cancel_job(p, ki, job)) {
cancelled++;
} else {
notcancelled++;
@@ -2102,7 +2062,7 @@ kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
AIO_LOCK(ki);
TAILQ_FOREACH(job, &ki->kaio_all, allist) {
if (job->ujob == ujob) {
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
td->td_retval[0] =
job->uaiocb._aiocb_private.error;
else
@@ -2382,35 +2342,36 @@ aio_physwakeup(struct bio *bp)
struct kaiocb *job = (struct kaiocb *)bp->bio_caller1;
struct proc *userp;
struct kaioinfo *ki;
- int nblks;
+ size_t nbytes;
+ int error, nblks;
/* Release mapping into kernel space. */
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
if (job->pbuf) {
pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages);
relpbuf(job->pbuf, NULL);
job->pbuf = NULL;
atomic_subtract_int(&num_buf_aio, 1);
+ AIO_LOCK(ki);
+ ki->kaio_buffer_count--;
+ AIO_UNLOCK(ki);
}
vm_page_unhold_pages(job->pages, job->npages);
bp = job->bp;
job->bp = NULL;
- userp = job->userproc;
- ki = userp->p_aioinfo;
- AIO_LOCK(ki);
- job->uaiocb._aiocb_private.status -= bp->bio_resid;
- job->uaiocb._aiocb_private.error = 0;
+ nbytes = job->uaiocb.aio_nbytes - bp->bio_resid;
+ error = 0;
if (bp->bio_flags & BIO_ERROR)
- job->uaiocb._aiocb_private.error = bp->bio_error;
- nblks = btodb(job->uaiocb.aio_nbytes);
+ error = bp->bio_error;
+ nblks = btodb(nbytes);
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
job->outputcharge += nblks;
else
job->inputcharge += nblks;
- TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist);
- ki->kaio_buffer_count--;
- aio_bio_done_notify(userp, job, DONE_BUF);
- AIO_UNLOCK(ki);
+
+ aio_complete(job, nbytes, error);
g_destroy_bio(bp);
}
@@ -2465,7 +2426,7 @@ kern_aio_waitcomplete(struct thread *td, struct aiocb **ujobp,
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
ujob = job->ujob;
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
@@ -2570,7 +2531,7 @@ filt_aio(struct knote *kn, long hint)
struct kaiocb *job = kn->kn_ptr.p_aio;
kn->kn_data = job->uaiocb._aiocb_private.error;
- if (job->jobstate != JOBST_JOBFINISHED)
+ if (!(job->jobflags & KAIOCB_FINISHED))
return (0);
kn->kn_flags |= EV_EOF;
return (1);
OpenPOWER on IntegriCloud