diff options
Diffstat (limited to 'sys/kern')
-rw-r--r-- | sys/kern/sys_socket.c | 397 | ||||
-rw-r--r-- | sys/kern/uipc_debug.c | 4 | ||||
-rw-r--r-- | sys/kern/uipc_sockbuf.c | 2 | ||||
-rw-r--r-- | sys/kern/uipc_socket.c | 6 | ||||
-rw-r--r-- | sys/kern/vfs_aio.c | 737 |
5 files changed, 755 insertions, 391 deletions
diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c index dd831ae..ffdb003 100644 --- a/sys/kern/sys_socket.c +++ b/sys/kern/sys_socket.c @@ -34,9 +34,12 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/aio.h> #include <sys/domain.h> #include <sys/file.h> #include <sys/filedesc.h> +#include <sys/kernel.h> +#include <sys/kthread.h> #include <sys/malloc.h> #include <sys/proc.h> #include <sys/protosw.h> @@ -48,6 +51,9 @@ __FBSDID("$FreeBSD$"); #include <sys/filio.h> /* XXX */ #include <sys/sockio.h> #include <sys/stat.h> +#include <sys/sysctl.h> +#include <sys/sysproto.h> +#include <sys/taskqueue.h> #include <sys/uio.h> #include <sys/ucred.h> #include <sys/un.h> @@ -64,6 +70,22 @@ __FBSDID("$FreeBSD$"); #include <security/mac/mac_framework.h> +#include <vm/vm.h> +#include <vm/pmap.h> +#include <vm/vm_extern.h> +#include <vm/vm_map.h> + +static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL, + "socket AIO stats"); + +static int empty_results; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results, + 0, "socket operation returned EAGAIN"); + +static int empty_retries; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries, + 0, "socket operation retries"); + static fo_rdwr_t soo_read; static fo_rdwr_t soo_write; static fo_ioctl_t soo_ioctl; @@ -72,6 +94,9 @@ extern fo_kqfilter_t soo_kqfilter; static fo_stat_t soo_stat; static fo_close_t soo_close; static fo_fill_kinfo_t soo_fill_kinfo; +static fo_aio_queue_t soo_aio_queue; + +static void soo_aio_cancel(struct kaiocb *job); struct fileops socketops = { .fo_read = soo_read, @@ -86,6 +111,7 @@ struct fileops socketops = { .fo_chown = invfo_chown, .fo_sendfile = invfo_sendfile, .fo_fill_kinfo = soo_fill_kinfo, + .fo_aio_queue = soo_aio_queue, .fo_flags = DFLAG_PASSABLE }; @@ -363,3 +389,374 @@ soo_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp) sizeof(kif->kf_path)); return (0); } + +static STAILQ_HEAD(, task) soaio_jobs; +static struct mtx soaio_jobs_lock; +static struct task soaio_kproc_task; +static int soaio_starting, soaio_idle, soaio_queued; +static struct unrhdr *soaio_kproc_unr; + +static int soaio_max_procs = MAX_AIO_PROCS; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0, + "Maximum number of kernel processes to use for async socket IO"); + +static int soaio_num_procs; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0, + "Number of active kernel processes for async socket IO"); + +static int soaio_target_procs = TARGET_AIO_PROCS; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD, + &soaio_target_procs, 0, + "Preferred number of ready kernel processes for async socket IO"); + +static int soaio_lifetime; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0, + "Maximum lifetime for idle aiod"); + +static void +soaio_kproc_loop(void *arg) +{ + struct proc *p; + struct vmspace *myvm; + struct task *task; + int error, id, pending; + + id = (intptr_t)arg; + + /* + * Grab an extra reference on the daemon's vmspace so that it + * doesn't get freed by jobs that switch to a different + * vmspace. + */ + p = curproc; + myvm = vmspace_acquire_ref(p); + + mtx_lock(&soaio_jobs_lock); + MPASS(soaio_starting > 0); + soaio_starting--; + for (;;) { + while (!STAILQ_EMPTY(&soaio_jobs)) { + task = STAILQ_FIRST(&soaio_jobs); + STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link); + soaio_queued--; + pending = task->ta_pending; + task->ta_pending = 0; + mtx_unlock(&soaio_jobs_lock); + + task->ta_func(task->ta_context, pending); + + mtx_lock(&soaio_jobs_lock); + } + MPASS(soaio_queued == 0); + + if (p->p_vmspace != myvm) { + mtx_unlock(&soaio_jobs_lock); + vmspace_switch_aio(myvm); + mtx_lock(&soaio_jobs_lock); + continue; + } + + soaio_idle++; + error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-", + soaio_lifetime); + soaio_idle--; + if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) && + soaio_num_procs > soaio_target_procs) + break; + } + soaio_num_procs--; + mtx_unlock(&soaio_jobs_lock); + free_unr(soaio_kproc_unr, id); + kproc_exit(0); +} + +static void +soaio_kproc_create(void *context, int pending) +{ + struct proc *p; + int error, id; + + mtx_lock(&soaio_jobs_lock); + for (;;) { + if (soaio_num_procs < soaio_target_procs) { + /* Must create */ + } else if (soaio_num_procs >= soaio_max_procs) { + /* + * Hit the limit on kernel processes, don't + * create another one. + */ + break; + } else if (soaio_queued <= soaio_idle + soaio_starting) { + /* + * No more AIO jobs waiting for a process to be + * created, so stop. + */ + break; + } + soaio_starting++; + mtx_unlock(&soaio_jobs_lock); + + id = alloc_unr(soaio_kproc_unr); + error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id, + &p, 0, 0, "soaiod%d", id); + if (error != 0) { + free_unr(soaio_kproc_unr, id); + mtx_lock(&soaio_jobs_lock); + soaio_starting--; + break; + } + + mtx_lock(&soaio_jobs_lock); + soaio_num_procs++; + } + mtx_unlock(&soaio_jobs_lock); +} + +static void +soaio_enqueue(struct task *task) +{ + + mtx_lock(&soaio_jobs_lock); + MPASS(task->ta_pending == 0); + task->ta_pending++; + STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link); + soaio_queued++; + if (soaio_queued <= soaio_idle) + wakeup_one(&soaio_idle); + else if (soaio_num_procs < soaio_max_procs) + taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task); + mtx_unlock(&soaio_jobs_lock); +} + +static void +soaio_init(void) +{ + + soaio_lifetime = AIOD_LIFETIME_DEFAULT; + STAILQ_INIT(&soaio_jobs); + mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF); + soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL); + TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL); + if (soaio_target_procs > 0) + taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task); +} +SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL); + +static __inline int +soaio_ready(struct socket *so, struct sockbuf *sb) +{ + return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so)); +} + +static void +soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job) +{ + struct ucred *td_savedcred; + struct thread *td; + struct file *fp; + struct uio uio; + struct iovec iov; + size_t cnt; + int error, flags; + + SOCKBUF_UNLOCK(sb); + aio_switch_vmspace(job); + td = curthread; + fp = job->fd_file; +retry: + td_savedcred = td->td_ucred; + td->td_ucred = job->cred; + + cnt = job->uaiocb.aio_nbytes; + iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf; + iov.iov_len = cnt; + uio.uio_iov = &iov; + uio.uio_iovcnt = 1; + uio.uio_offset = 0; + uio.uio_resid = cnt; + uio.uio_segflg = UIO_USERSPACE; + uio.uio_td = td; + flags = MSG_NBIO; + + /* TODO: Charge ru_msg* to job. */ + + if (sb == &so->so_rcv) { + uio.uio_rw = UIO_READ; +#ifdef MAC + error = mac_socket_check_receive(fp->f_cred, so); + if (error == 0) + +#endif + error = soreceive(so, NULL, &uio, NULL, NULL, &flags); + } else { + uio.uio_rw = UIO_WRITE; +#ifdef MAC + error = mac_socket_check_send(fp->f_cred, so); + if (error == 0) +#endif + error = sosend(so, NULL, &uio, NULL, NULL, flags, td); + if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) { + PROC_LOCK(job->userproc); + kern_psignal(job->userproc, SIGPIPE); + PROC_UNLOCK(job->userproc); + } + } + + cnt -= uio.uio_resid; + td->td_ucred = td_savedcred; + + /* XXX: Not sure if this is needed? */ + if (cnt != 0 && (error == ERESTART || error == EINTR || + error == EWOULDBLOCK)) + error = 0; + if (error == EWOULDBLOCK) { + /* + * A read() or write() on the socket raced with this + * request. If the socket is now ready, try again. + * If it is not, place this request at the head of the + * queue to try again when the socket is ready. + */ + SOCKBUF_LOCK(sb); + empty_results++; + if (soaio_ready(so, sb)) { + empty_retries++; + SOCKBUF_UNLOCK(sb); + goto retry; + } + + if (!aio_set_cancel_function(job, soo_aio_cancel)) { + MPASS(cnt == 0); + SOCKBUF_UNLOCK(sb); + aio_cancel(job); + SOCKBUF_LOCK(sb); + } else { + TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); + } + } else { + aio_complete(job, cnt, error); + SOCKBUF_LOCK(sb); + } +} + +static void +soaio_process_sb(struct socket *so, struct sockbuf *sb) +{ + struct kaiocb *job; + + SOCKBUF_LOCK(sb); + while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) { + job = TAILQ_FIRST(&sb->sb_aiojobq); + TAILQ_REMOVE(&sb->sb_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) + continue; + + soaio_process_job(so, sb, job); + } + + /* + * If there are still pending requests, the socket must not be + * ready so set SB_AIO to request a wakeup when the socket + * becomes ready. + */ + if (!TAILQ_EMPTY(&sb->sb_aiojobq)) + sb->sb_flags |= SB_AIO; + sb->sb_flags &= ~SB_AIO_RUNNING; + SOCKBUF_UNLOCK(sb); + + ACCEPT_LOCK(); + SOCK_LOCK(so); + sorele(so); +} + +void +soaio_rcv(void *context, int pending) +{ + struct socket *so; + + so = context; + soaio_process_sb(so, &so->so_rcv); +} + +void +soaio_snd(void *context, int pending) +{ + struct socket *so; + + so = context; + soaio_process_sb(so, &so->so_snd); +} + +void +sowakeup_aio(struct socket *so, struct sockbuf *sb) +{ + + SOCKBUF_LOCK_ASSERT(sb); + sb->sb_flags &= ~SB_AIO; + if (sb->sb_flags & SB_AIO_RUNNING) + return; + sb->sb_flags |= SB_AIO_RUNNING; + if (sb == &so->so_snd) + SOCK_LOCK(so); + soref(so); + if (sb == &so->so_snd) + SOCK_UNLOCK(so); + soaio_enqueue(&sb->sb_aiotask); +} + +static void +soo_aio_cancel(struct kaiocb *job) +{ + struct socket *so; + struct sockbuf *sb; + int opcode; + + so = job->fd_file->f_data; + opcode = job->uaiocb.aio_lio_opcode; + if (opcode == LIO_READ) + sb = &so->so_rcv; + else { + MPASS(opcode == LIO_WRITE); + sb = &so->so_snd; + } + + SOCKBUF_LOCK(sb); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&sb->sb_aiojobq, job, list); + if (TAILQ_EMPTY(&sb->sb_aiojobq)) + sb->sb_flags &= ~SB_AIO; + SOCKBUF_UNLOCK(sb); + + aio_cancel(job); +} + +static int +soo_aio_queue(struct file *fp, struct kaiocb *job) +{ + struct socket *so; + struct sockbuf *sb; + + so = fp->f_data; + switch (job->uaiocb.aio_lio_opcode) { + case LIO_READ: + sb = &so->so_rcv; + break; + case LIO_WRITE: + sb = &so->so_snd; + break; + default: + return (EINVAL); + } + + SOCKBUF_LOCK(sb); + if (!aio_set_cancel_function(job, soo_aio_cancel)) + panic("new job was cancelled"); + TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list); + if (!(sb->sb_flags & SB_AIO_RUNNING)) { + if (soaio_ready(so, sb)) + sowakeup_aio(so, sb); + else + sb->sb_flags |= SB_AIO; + } + SOCKBUF_UNLOCK(sb); + return (0); +} diff --git a/sys/kern/uipc_debug.c b/sys/kern/uipc_debug.c index 7c8b93c..fb4ee52 100644 --- a/sys/kern/uipc_debug.c +++ b/sys/kern/uipc_debug.c @@ -416,6 +416,9 @@ db_print_sockbuf(struct sockbuf *sb, const char *sockbufname, int indent) db_printf("sb_flags: 0x%x (", sb->sb_flags); db_print_sbflags(sb->sb_flags); db_printf(")\n"); + + db_print_indent(indent); + db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq)); } static void @@ -470,7 +473,6 @@ db_print_socket(struct socket *so, const char *socketname, int indent) db_print_indent(indent); db_printf("so_sigio: %p ", so->so_sigio); db_printf("so_oobmark: %lu ", so->so_oobmark); - db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq)); db_print_sockbuf(&so->so_rcv, "so_rcv", indent); db_print_sockbuf(&so->so_snd, "so_snd", indent); diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c index edf03a3..0cdd43b 100644 --- a/sys/kern/uipc_sockbuf.c +++ b/sys/kern/uipc_sockbuf.c @@ -332,7 +332,7 @@ sowakeup(struct socket *so, struct sockbuf *sb) } else ret = SU_OK; if (sb->sb_flags & SB_AIO) - aio_swake(so, sb); + sowakeup_aio(so, sb); SOCKBUF_UNLOCK(sb); if (ret == SU_ISCONNECTED) soisconnected(so); diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index 5d2247f..35d17b1 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -134,6 +134,7 @@ __FBSDID("$FreeBSD$"); #include <sys/stat.h> #include <sys/sx.h> #include <sys/sysctl.h> +#include <sys/taskqueue.h> #include <sys/uio.h> #include <sys/jail.h> #include <sys/syslog.h> @@ -396,7 +397,10 @@ soalloc(struct vnet *vnet) SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv"); sx_init(&so->so_snd.sb_sx, "so_snd_sx"); sx_init(&so->so_rcv.sb_sx, "so_rcv_sx"); - TAILQ_INIT(&so->so_aiojobq); + TAILQ_INIT(&so->so_snd.sb_aiojobq); + TAILQ_INIT(&so->so_rcv.sb_aiojobq); + TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so); + TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so); #ifdef VIMAGE VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p", __func__, __LINE__, so)); diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index 5b2083c..59dd57c 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -72,8 +72,6 @@ __FBSDID("$FreeBSD$"); #include <vm/uma.h> #include <sys/aio.h> -#include "opt_vfs_aio.h" - /* * Counter for allocating reference ids to new jobs. Wrapped to 1 on * overflow. (XXX will be removed soon.) @@ -85,14 +83,6 @@ static u_long jobrefid; */ static uint64_t jobseqno; -#define JOBST_NULL 0 -#define JOBST_JOBQSOCK 1 -#define JOBST_JOBQGLOBAL 2 -#define JOBST_JOBRUNNING 3 -#define JOBST_JOBFINISHED 4 -#define JOBST_JOBQBUF 5 -#define JOBST_JOBQSYNC 6 - #ifndef MAX_AIO_PER_PROC #define MAX_AIO_PER_PROC 32 #endif @@ -101,26 +91,14 @@ static uint64_t jobseqno; #define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */ #endif -#ifndef MAX_AIO_PROCS -#define MAX_AIO_PROCS 32 -#endif - #ifndef MAX_AIO_QUEUE #define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */ #endif -#ifndef TARGET_AIO_PROCS -#define TARGET_AIO_PROCS 4 -#endif - #ifndef MAX_BUF_AIO #define MAX_BUF_AIO 16 #endif -#ifndef AIOD_LIFETIME_DEFAULT -#define AIOD_LIFETIME_DEFAULT (30 * hz) -#endif - FEATURE(aio, "Asynchronous I/O"); static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list"); @@ -128,6 +106,10 @@ static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list"); static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management"); +static int enable_aio_unsafe = 0; +SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0, + "Permit asynchronous IO on all file types, not just known-safe types"); + static int max_aio_procs = MAX_AIO_PROCS; SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0, "Maximum number of kernel processes to use for handling async IO "); @@ -165,11 +147,6 @@ static int aiod_lifetime; SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0, "Maximum lifetime for idle aiod"); -static int unloadable = 0; -SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0, - "Allow unload of aio (not recommended)"); - - static int max_aio_per_proc = MAX_AIO_PER_PROC; SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc, 0, @@ -208,46 +185,27 @@ typedef struct oaiocb { */ /* - * Current, there is only two backends: BIO and generic file I/O. - * socket I/O is served by generic file I/O, this is not a good idea, since - * disk file I/O and any other types without O_NONBLOCK flag can block daemon - * processes, if there is no thread to serve socket I/O, the socket I/O will be - * delayed too long or starved, we should create some processes dedicated to - * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O - * systems we really need non-blocking interface, fiddling O_NONBLOCK in file - * structure is not safe because there is race between userland and aio - * daemons. + * If the routine that services an AIO request blocks while running in an + * AIO kernel process it can starve other I/O requests. BIO requests + * queued via aio_qphysio() complete in GEOM and do not use AIO kernel + * processes at all. Socket I/O requests use a separate pool of + * kprocs and also force non-blocking I/O. Other file I/O requests + * use the generic fo_read/fo_write operations which can block. The + * fsync and mlock operations can also block while executing. Ideally + * none of these requests would block while executing. + * + * Note that the service routines cannot toggle O_NONBLOCK in the file + * structure directly while handling a request due to races with + * userland threads. */ -struct kaiocb { - TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */ - TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */ - TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */ - int jobflags; /* (a) job flags */ - int jobstate; /* (b) job state */ - int inputcharge; /* (*) input blockes */ - int outputcharge; /* (*) output blockes */ - struct bio *bp; /* (*) BIO backend BIO pointer */ - struct buf *pbuf; /* (*) BIO backend buffer pointer */ - struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */ - int npages; /* BIO backend number of pages */ - struct proc *userproc; /* (*) user process */ - struct ucred *cred; /* (*) active credential when created */ - struct file *fd_file; /* (*) pointer to file structure */ - struct aioliojob *lio; /* (*) optional lio job */ - struct aiocb *ujob; /* (*) pointer in userspace of aiocb */ - struct knlist klist; /* (a) list of knotes */ - struct aiocb uaiocb; /* (*) kernel I/O control block */ - ksiginfo_t ksi; /* (a) realtime signal info */ - uint64_t seqno; /* (*) job number */ - int pending; /* (a) number of pending I/O, aio_fsync only */ -}; - /* jobflags */ -#define KAIOCB_DONE 0x01 -#define KAIOCB_BUFDONE 0x02 -#define KAIOCB_RUNDOWN 0x04 +#define KAIOCB_QUEUEING 0x01 +#define KAIOCB_CANCELLED 0x02 +#define KAIOCB_CANCELLING 0x04 #define KAIOCB_CHECKSYNC 0x08 +#define KAIOCB_CLEARED 0x10 +#define KAIOCB_FINISHED 0x20 /* * AIO process info @@ -293,9 +251,10 @@ struct kaioinfo { TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */ TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */ TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */ - TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */ TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */ + TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */ struct task kaio_task; /* (*) task to kick aio processes */ + struct task kaio_sync_task; /* (*) task to schedule fsync jobs */ }; #define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx) @@ -332,21 +291,18 @@ static int aio_free_entry(struct kaiocb *job); static void aio_process_rw(struct kaiocb *job); static void aio_process_sync(struct kaiocb *job); static void aio_process_mlock(struct kaiocb *job); +static void aio_schedule_fsync(void *context, int pending); static int aio_newproc(int *); int aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lio, int type, struct aiocb_ops *ops); +static int aio_queue_file(struct file *fp, struct kaiocb *job); static void aio_physwakeup(struct bio *bp); static void aio_proc_rundown(void *arg, struct proc *p); static void aio_proc_rundown_exec(void *arg, struct proc *p, struct image_params *imgp); static int aio_qphysio(struct proc *p, struct kaiocb *job); static void aio_daemon(void *param); -static void aio_swake_cb(struct socket *, struct sockbuf *); -static int aio_unload(void); -static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job, - int type); -#define DONE_BUF 1 -#define DONE_QUEUE 2 +static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job); static int aio_kick(struct proc *userp); static void aio_kick_nowait(struct proc *userp); static void aio_kick_helper(void *context, int pending); @@ -397,13 +353,10 @@ aio_modload(struct module *module, int cmd, void *arg) case MOD_LOAD: aio_onceonly(); break; - case MOD_UNLOAD: - error = aio_unload(); - break; case MOD_SHUTDOWN: break; default: - error = EINVAL; + error = EOPNOTSUPP; break; } return (error); @@ -471,8 +424,6 @@ aio_onceonly(void) { int error; - /* XXX: should probably just use so->callback */ - aio_swake = &aio_swake_cb; exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL, EVENTHANDLER_PRI_ANY); exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec, @@ -513,55 +464,6 @@ aio_onceonly(void) } /* - * Callback for unload of AIO when used as a module. - */ -static int -aio_unload(void) -{ - int error; - - /* - * XXX: no unloads by default, it's too dangerous. - * perhaps we could do it if locked out callers and then - * did an aio_proc_rundown() on each process. - * - * jhb: aio_proc_rundown() needs to run on curproc though, - * so I don't think that would fly. - */ - if (!unloadable) - return (EOPNOTSUPP); - -#ifdef COMPAT_FREEBSD32 - syscall32_helper_unregister(aio32_syscalls); -#endif - syscall_helper_unregister(aio_syscalls); - - error = kqueue_del_filteropts(EVFILT_AIO); - if (error) - return error; - error = kqueue_del_filteropts(EVFILT_LIO); - if (error) - return error; - async_io_version = 0; - aio_swake = NULL; - taskqueue_free(taskqueue_aiod_kick); - delete_unrhdr(aiod_unr); - uma_zdestroy(kaio_zone); - uma_zdestroy(aiop_zone); - uma_zdestroy(aiocb_zone); - uma_zdestroy(aiol_zone); - uma_zdestroy(aiolio_zone); - EVENTHANDLER_DEREGISTER(process_exit, exit_tag); - EVENTHANDLER_DEREGISTER(process_exec, exec_tag); - mtx_destroy(&aio_job_mtx); - sema_destroy(&aio_newproc_sem); - p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1); - p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1); - p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1); - return (0); -} - -/* * Init the per-process aioinfo structure. The aioinfo limits are set * per-process for user limit (resource) management. */ @@ -582,10 +484,11 @@ aio_init_aioinfo(struct proc *p) TAILQ_INIT(&ki->kaio_all); TAILQ_INIT(&ki->kaio_done); TAILQ_INIT(&ki->kaio_jobqueue); - TAILQ_INIT(&ki->kaio_bufqueue); TAILQ_INIT(&ki->kaio_liojoblist); TAILQ_INIT(&ki->kaio_syncqueue); + TAILQ_INIT(&ki->kaio_syncready); TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p); + TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki); PROC_LOCK(p); if (p->p_aioinfo == NULL) { p->p_aioinfo = ki; @@ -637,7 +540,7 @@ aio_free_entry(struct kaiocb *job) MPASS(ki != NULL); AIO_LOCK_ASSERT(ki, MA_OWNED); - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); atomic_subtract_int(&num_queue_count, 1); @@ -670,7 +573,6 @@ aio_free_entry(struct kaiocb *job) PROC_UNLOCK(p); MPASS(job->bp == NULL); - job->jobstate = JOBST_NULL; AIO_UNLOCK(ki); /* @@ -709,6 +611,57 @@ aio_proc_rundown_exec(void *arg, struct proc *p, aio_proc_rundown(arg, p); } +static int +aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job) +{ + aio_cancel_fn_t *func; + int cancelled; + + AIO_LOCK_ASSERT(ki, MA_OWNED); + if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED)) + return (0); + MPASS((job->jobflags & KAIOCB_CANCELLING) == 0); + job->jobflags |= KAIOCB_CANCELLED; + + func = job->cancel_fn; + + /* + * If there is no cancel routine, just leave the job marked as + * cancelled. The job should be in active use by a caller who + * should complete it normally or when it fails to install a + * cancel routine. + */ + if (func == NULL) + return (0); + + /* + * Set the CANCELLING flag so that aio_complete() will defer + * completions of this job. This prevents the job from being + * freed out from under the cancel callback. After the + * callback any deferred completion (whether from the callback + * or any other source) will be completed. + */ + job->jobflags |= KAIOCB_CANCELLING; + AIO_UNLOCK(ki); + func(job); + AIO_LOCK(ki); + job->jobflags &= ~KAIOCB_CANCELLING; + if (job->jobflags & KAIOCB_FINISHED) { + cancelled = job->uaiocb._aiocb_private.error == ECANCELED; + TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); + aio_bio_done_notify(p, job); + } else { + /* + * The cancel callback might have scheduled an + * operation to cancel this request, but it is + * only counted as cancelled if the request is + * cancelled when the callback returns. + */ + cancelled = 0; + } + return (cancelled); +} + /* * Rundown the jobs for a given process. */ @@ -718,9 +671,6 @@ aio_proc_rundown(void *arg, struct proc *p) struct kaioinfo *ki; struct aioliojob *lj; struct kaiocb *job, *jobn; - struct file *fp; - struct socket *so; - int remove; KASSERT(curthread->td_proc == p, ("%s: called on non-curproc", __func__)); @@ -738,35 +688,11 @@ restart: * aio_cancel on all pending I/O requests. */ TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) { - remove = 0; - mtx_lock(&aio_job_mtx); - if (job->jobstate == JOBST_JOBQGLOBAL) { - TAILQ_REMOVE(&aio_jobs, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSOCK) { - fp = job->fd_file; - MPASS(fp->f_type == DTYPE_SOCKET); - so = fp->f_data; - TAILQ_REMOVE(&so->so_aiojobq, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSYNC) { - TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); - remove = 1; - } - mtx_unlock(&aio_job_mtx); - - if (remove) { - job->jobstate = JOBST_JOBFINISHED; - job->uaiocb._aiocb_private.status = -1; - job->uaiocb._aiocb_private.error = ECANCELED; - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - aio_bio_done_notify(p, job, DONE_QUEUE); - } + aio_cancel_job(p, ki, job); } /* Wait for all running I/O to be finished */ - if (TAILQ_FIRST(&ki->kaio_bufqueue) || - TAILQ_FIRST(&ki->kaio_jobqueue)) { + if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) { ki->kaio_flags |= KAIO_WAKEUP; msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz); goto restart; @@ -791,6 +717,7 @@ restart: } AIO_UNLOCK(ki); taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task); + taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task); mtx_destroy(&ki->kaio_mtx); uma_zfree(kaio_zone, ki); p->p_aioinfo = NULL; @@ -807,15 +734,18 @@ aio_selectjob(struct aioproc *aiop) struct proc *userp; mtx_assert(&aio_job_mtx, MA_OWNED); +restart: TAILQ_FOREACH(job, &aio_jobs, list) { userp = job->userproc; ki = userp->p_aioinfo; if (ki->kaio_active_count < ki->kaio_maxactive_count) { TAILQ_REMOVE(&aio_jobs, job, list); + if (!aio_clear_cancel_function(job)) + goto restart; + /* Account for currently active jobs. */ ki->kaio_active_count++; - job->jobstate = JOBST_JOBRUNNING; break; } } @@ -863,7 +793,6 @@ aio_process_rw(struct kaiocb *job) struct thread *td; struct aiocb *cb; struct file *fp; - struct socket *so; struct uio auio; struct iovec aiov; int cnt; @@ -875,6 +804,7 @@ aio_process_rw(struct kaiocb *job) job->uaiocb.aio_lio_opcode == LIO_WRITE, ("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode)); + aio_switch_vmspace(job); td = curthread; td_savedcred = td->td_ucred; td->td_ucred = job->cred; @@ -920,24 +850,15 @@ aio_process_rw(struct kaiocb *job) if (error == ERESTART || error == EINTR || error == EWOULDBLOCK) error = 0; if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) { - int sigpipe = 1; - if (fp->f_type == DTYPE_SOCKET) { - so = fp->f_data; - if (so->so_options & SO_NOSIGPIPE) - sigpipe = 0; - } - if (sigpipe) { - PROC_LOCK(job->userproc); - kern_psignal(job->userproc, SIGPIPE); - PROC_UNLOCK(job->userproc); - } + PROC_LOCK(job->userproc); + kern_psignal(job->userproc, SIGPIPE); + PROC_UNLOCK(job->userproc); } } cnt -= auio.uio_resid; - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = cnt; td->td_ucred = td_savedcred; + aio_complete(job, cnt, error); } static void @@ -945,7 +866,6 @@ aio_process_sync(struct kaiocb *job) { struct thread *td = curthread; struct ucred *td_savedcred = td->td_ucred; - struct aiocb *cb = &job->uaiocb; struct file *fp = job->fd_file; int error = 0; @@ -955,9 +875,8 @@ aio_process_sync(struct kaiocb *job) td->td_ucred = job->cred; if (fp->f_vnode != NULL) error = aio_fsync_vnode(td, fp->f_vnode); - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = 0; td->td_ucred = td_savedcred; + aio_complete(job, 0, error); } static void @@ -969,19 +888,20 @@ aio_process_mlock(struct kaiocb *job) KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK, ("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode)); + aio_switch_vmspace(job); error = vm_mlock(job->userproc, job->cred, __DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes); - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = 0; + aio_complete(job, 0, error); } static void -aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) +aio_bio_done_notify(struct proc *userp, struct kaiocb *job) { struct aioliojob *lj; struct kaioinfo *ki; struct kaiocb *sjob, *sjobn; int lj_done; + bool schedule_fsync; ki = userp->p_aioinfo; AIO_LOCK_ASSERT(ki, MA_OWNED); @@ -992,13 +912,8 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) if (lj->lioj_count == lj->lioj_finished_count) lj_done = 1; } - if (type == DONE_QUEUE) { - job->jobflags |= KAIOCB_DONE; - } else { - job->jobflags |= KAIOCB_BUFDONE; - } TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist); - job->jobstate = JOBST_JOBFINISHED; + MPASS(job->jobflags & KAIOCB_FINISHED); if (ki->kaio_flags & KAIO_RUNDOWN) goto notification_done; @@ -1025,20 +940,24 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) notification_done: if (job->jobflags & KAIOCB_CHECKSYNC) { + schedule_fsync = false; TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) { if (job->fd_file == sjob->fd_file && job->seqno < sjob->seqno) { if (--sjob->pending == 0) { - mtx_lock(&aio_job_mtx); - sjob->jobstate = JOBST_JOBQGLOBAL; TAILQ_REMOVE(&ki->kaio_syncqueue, sjob, list); - TAILQ_INSERT_TAIL(&aio_jobs, sjob, list); - aio_kick_nowait(userp); - mtx_unlock(&aio_job_mtx); + if (!aio_clear_cancel_function(sjob)) + continue; + TAILQ_INSERT_TAIL(&ki->kaio_syncready, + sjob, list); + schedule_fsync = true; } } } + if (schedule_fsync) + taskqueue_enqueue(taskqueue_aiod_kick, + &ki->kaio_sync_task); } if (ki->kaio_flags & KAIO_WAKEUP) { ki->kaio_flags &= ~KAIO_WAKEUP; @@ -1047,6 +966,103 @@ notification_done: } static void +aio_schedule_fsync(void *context, int pending) +{ + struct kaioinfo *ki; + struct kaiocb *job; + + ki = context; + AIO_LOCK(ki); + while (!TAILQ_EMPTY(&ki->kaio_syncready)) { + job = TAILQ_FIRST(&ki->kaio_syncready); + TAILQ_REMOVE(&ki->kaio_syncready, job, list); + AIO_UNLOCK(ki); + aio_schedule(job, aio_process_sync); + AIO_LOCK(ki); + } + AIO_UNLOCK(ki); +} + +bool +aio_cancel_cleared(struct kaiocb *job) +{ + struct kaioinfo *ki; + + /* + * The caller should hold the same queue lock held when + * aio_clear_cancel_function() was called and set this flag + * ensuring this check sees an up-to-date value. However, + * there is no way to assert that. + */ + ki = job->userproc->p_aioinfo; + return ((job->jobflags & KAIOCB_CLEARED) != 0); +} + +bool +aio_clear_cancel_function(struct kaiocb *job) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + AIO_LOCK(ki); + MPASS(job->cancel_fn != NULL); + if (job->jobflags & KAIOCB_CANCELLING) { + job->jobflags |= KAIOCB_CLEARED; + AIO_UNLOCK(ki); + return (false); + } + job->cancel_fn = NULL; + AIO_UNLOCK(ki); + return (true); +} + +bool +aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + AIO_LOCK(ki); + if (job->jobflags & KAIOCB_CANCELLED) { + AIO_UNLOCK(ki); + return (false); + } + job->cancel_fn = func; + AIO_UNLOCK(ki); + return (true); +} + +void +aio_complete(struct kaiocb *job, long status, int error) +{ + struct kaioinfo *ki; + struct proc *userp; + + job->uaiocb._aiocb_private.error = error; + job->uaiocb._aiocb_private.status = status; + + userp = job->userproc; + ki = userp->p_aioinfo; + + AIO_LOCK(ki); + KASSERT(!(job->jobflags & KAIOCB_FINISHED), + ("duplicate aio_complete")); + job->jobflags |= KAIOCB_FINISHED; + if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) { + TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); + aio_bio_done_notify(userp, job); + } + AIO_UNLOCK(ki); +} + +void +aio_cancel(struct kaiocb *job) +{ + + aio_complete(job, -1, ECANCELED); +} + +void aio_switch_vmspace(struct kaiocb *job) { @@ -1063,7 +1079,7 @@ aio_daemon(void *_id) struct kaiocb *job; struct aioproc *aiop; struct kaioinfo *ki; - struct proc *p, *userp; + struct proc *p; struct vmspace *myvm; struct thread *td = curthread; int id = (intptr_t)_id; @@ -1107,40 +1123,13 @@ aio_daemon(void *_id) */ while ((job = aio_selectjob(aiop)) != NULL) { mtx_unlock(&aio_job_mtx); - userp = job->userproc; - - /* - * Connect to process address space for user program. - */ - aio_switch_vmspace(job); - ki = userp->p_aioinfo; - - /* Do the I/O function. */ - switch(job->uaiocb.aio_lio_opcode) { - case LIO_READ: - case LIO_WRITE: - aio_process_rw(job); - break; - case LIO_SYNC: - aio_process_sync(job); - break; - case LIO_MLOCK: - aio_process_mlock(job); - break; - } + ki = job->userproc->p_aioinfo; + job->handle_fn(job); mtx_lock(&aio_job_mtx); /* Decrement the active job count. */ ki->kaio_active_count--; - mtx_unlock(&aio_job_mtx); - - AIO_LOCK(ki); - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - aio_bio_done_notify(userp, job, DONE_QUEUE); - AIO_UNLOCK(ki); - - mtx_lock(&aio_job_mtx); } /* @@ -1236,7 +1225,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job) struct cdevsw *csw; struct cdev *dev; struct kaioinfo *ki; - struct aioliojob *lj; int error, ref, unmap, poff; vm_prot_t prot; @@ -1293,16 +1281,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job) } AIO_LOCK(ki); - ki->kaio_count++; if (!unmap) ki->kaio_buffer_count++; - lj = job->lio; - if (lj) - lj->lioj_count++; - TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); - job->jobstate = JOBST_JOBQBUF; - cb->_aiocb_private.status = cb->aio_nbytes; AIO_UNLOCK(ki); bp->bio_length = cb->aio_nbytes; @@ -1336,7 +1316,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job) bp->bio_flags |= BIO_UNMAPPED; } - atomic_add_int(&num_queue_count, 1); if (!unmap) atomic_add_int(&num_buf_aio, 1); @@ -1347,14 +1326,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job) doerror: AIO_LOCK(ki); - job->jobstate = JOBST_NULL; - TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist); - TAILQ_REMOVE(&ki->kaio_all, job, allist); - ki->kaio_count--; if (!unmap) ki->kaio_buffer_count--; - if (lj) - lj->lioj_count--; AIO_UNLOCK(ki); if (pbuf) { relpbuf(pbuf, NULL); @@ -1367,40 +1340,6 @@ unref: return (error); } -/* - * Wake up aio requests that may be serviceable now. - */ -static void -aio_swake_cb(struct socket *so, struct sockbuf *sb) -{ - struct kaiocb *job, *jobn; - int opcode; - - SOCKBUF_LOCK_ASSERT(sb); - if (sb == &so->so_snd) - opcode = LIO_WRITE; - else - opcode = LIO_READ; - - sb->sb_flags &= ~SB_AIO; - mtx_lock(&aio_job_mtx); - TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) { - if (opcode == job->uaiocb.aio_lio_opcode) { - if (job->jobstate != JOBST_JOBQSOCK) - panic("invalid queue value"); - /* XXX - * We don't have actual sockets backend yet, - * so we simply move the requests to the generic - * file I/O backend. - */ - TAILQ_REMOVE(&so->so_aiojobq, job, list); - TAILQ_INSERT_TAIL(&aio_jobs, job, list); - aio_kick_nowait(job->userproc); - } - } - mtx_unlock(&aio_job_mtx); -} - static int convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig) { @@ -1521,11 +1460,9 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj, struct proc *p = td->td_proc; cap_rights_t rights; struct file *fp; - struct socket *so; - struct kaiocb *job, *job2; + struct kaiocb *job; struct kaioinfo *ki; struct kevent kev; - struct sockbuf *sb; int opcode; int error; int fd, kqfd; @@ -1668,86 +1605,128 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj, kev.data = (intptr_t)job; kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr; error = kqfd_register(kqfd, &kev, td, 1); -aqueue_fail: - if (error) { - if (fp) - fdrop(fp, td); - uma_zfree(aiocb_zone, job); - ops->store_error(ujob, error); - goto done; - } + if (error) + goto aqueue_fail; + no_kqueue: ops->store_error(ujob, EINPROGRESS); job->uaiocb._aiocb_private.error = EINPROGRESS; job->userproc = p; job->cred = crhold(td->td_ucred); - job->jobflags = 0; + job->jobflags = KAIOCB_QUEUEING; job->lio = lj; - if (opcode == LIO_SYNC) - goto queueit; + if (opcode == LIO_MLOCK) { + aio_schedule(job, aio_process_mlock); + error = 0; + } else if (fp->f_ops->fo_aio_queue == NULL) + error = aio_queue_file(fp, job); + else + error = fo_aio_queue(fp, job); + if (error) + goto aqueue_fail; - if (fp && fp->f_type == DTYPE_SOCKET) { + AIO_LOCK(ki); + job->jobflags &= ~KAIOCB_QUEUEING; + TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); + ki->kaio_count++; + if (lj) + lj->lioj_count++; + atomic_add_int(&num_queue_count, 1); + if (job->jobflags & KAIOCB_FINISHED) { /* - * Alternate queueing for socket ops: Reach down into the - * descriptor to get the socket data. Then check to see if the - * socket is ready to be read or written (based on the requested - * operation). - * - * If it is not ready for io, then queue the job on the - * socket, and set the flags so we get a call when sbnotify() - * happens. - * - * Note if opcode is neither LIO_WRITE nor LIO_READ we lock - * and unlock the snd sockbuf for no reason. + * The queue callback completed the request synchronously. + * The bulk of the completion is deferred in that case + * until this point. */ - so = fp->f_data; - sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd; - SOCKBUF_LOCK(sb); - if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode == - LIO_WRITE) && (!sowriteable(so)))) { - sb->sb_flags |= SB_AIO; + aio_bio_done_notify(p, job); + } else + TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); + AIO_UNLOCK(ki); + return (0); - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list); - mtx_unlock(&aio_job_mtx); +aqueue_fail: + knlist_delete(&job->klist, curthread, 0); + if (fp) + fdrop(fp, td); + uma_zfree(aiocb_zone, job); + ops->store_error(ujob, error); + return (error); +} - AIO_LOCK(ki); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); - job->jobstate = JOBST_JOBQSOCK; - ki->kaio_count++; - if (lj) - lj->lioj_count++; - AIO_UNLOCK(ki); - SOCKBUF_UNLOCK(sb); - atomic_add_int(&num_queue_count, 1); - error = 0; - goto done; - } - SOCKBUF_UNLOCK(sb); +static void +aio_cancel_daemon_job(struct kaiocb *job) +{ + + mtx_lock(&aio_job_mtx); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&aio_jobs, job, list); + mtx_unlock(&aio_job_mtx); + aio_cancel(job); +} + +void +aio_schedule(struct kaiocb *job, aio_handle_fn_t *func) +{ + + mtx_lock(&aio_job_mtx); + if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) { + mtx_unlock(&aio_job_mtx); + aio_cancel(job); + return; } + job->handle_fn = func; + TAILQ_INSERT_TAIL(&aio_jobs, job, list); + aio_kick_nowait(job->userproc); + mtx_unlock(&aio_job_mtx); +} + +static void +aio_cancel_sync(struct kaiocb *job) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + mtx_lock(&aio_job_mtx); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); + mtx_unlock(&aio_job_mtx); + aio_cancel(job); +} + +int +aio_queue_file(struct file *fp, struct kaiocb *job) +{ + struct aioliojob *lj; + struct kaioinfo *ki; + struct kaiocb *job2; + int error, opcode; + + lj = job->lio; + ki = job->userproc->p_aioinfo; + opcode = job->uaiocb.aio_lio_opcode; + if (opcode == LIO_SYNC) + goto queueit; - if ((error = aio_qphysio(p, job)) == 0) + if ((error = aio_qphysio(job->userproc, job)) == 0) goto done; #if 0 - if (error > 0) { - job->uaiocb._aiocb_private.error = error; - ops->store_error(ujob, error); + /* + * XXX: This means qphysio() failed with EFAULT. The current + * behavior is to retry the operation via fo_read/fo_write. + * Wouldn't it be better to just complete the request with an + * error here? + */ + if (error > 0) goto done; - } #endif queueit: - atomic_add_int(&num_queue_count, 1); + if (!enable_aio_unsafe) + return (EOPNOTSUPP); - AIO_LOCK(ki); - ki->kaio_count++; - if (lj) - lj->lioj_count++; - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); if (opcode == LIO_SYNC) { + AIO_LOCK(ki); TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) { if (job2->fd_file == job->fd_file && job2->uaiocb.aio_lio_opcode != LIO_SYNC && @@ -1756,28 +1735,32 @@ queueit: job->pending++; } } - TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) { - if (job2->fd_file == job->fd_file && - job2->uaiocb.aio_lio_opcode != LIO_SYNC && - job2->seqno < job->seqno) { - job2->jobflags |= KAIOCB_CHECKSYNC; - job->pending++; - } - } if (job->pending != 0) { + if (!aio_set_cancel_function(job, aio_cancel_sync)) { + AIO_UNLOCK(ki); + aio_cancel(job); + return (0); + } TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list); - job->jobstate = JOBST_JOBQSYNC; AIO_UNLOCK(ki); - goto done; + return (0); } + AIO_UNLOCK(ki); + } + + switch (opcode) { + case LIO_READ: + case LIO_WRITE: + aio_schedule(job, aio_process_rw); + error = 0; + break; + case LIO_SYNC: + aio_schedule(job, aio_process_sync); + error = 0; + break; + default: + error = EINVAL; } - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_TAIL(&aio_jobs, job, list); - job->jobstate = JOBST_JOBQGLOBAL; - aio_kick_nowait(p); - mtx_unlock(&aio_job_mtx); - AIO_UNLOCK(ki); - error = 0; done: return (error); } @@ -1864,7 +1847,7 @@ kern_aio_return(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops) break; } if (job != NULL) { - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; td->td_retval[0] = status; @@ -1933,7 +1916,7 @@ kern_aio_suspend(struct thread *td, int njoblist, struct aiocb **ujoblist, if (job->ujob == ujoblist[i]) { if (firstjob == NULL) firstjob = job; - if (job->jobstate == JOBST_JOBFINISHED) + if (job->jobflags & KAIOCB_FINISHED) goto RETURN; } } @@ -1992,10 +1975,8 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap) struct kaioinfo *ki; struct kaiocb *job, *jobn; struct file *fp; - struct socket *so; cap_rights_t rights; int error; - int remove; int cancelled = 0; int notcancelled = 0; struct vnode *vp; @@ -2023,28 +2004,7 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap) if ((uap->fd == job->uaiocb.aio_fildes) && ((uap->aiocbp == NULL) || (uap->aiocbp == job->ujob))) { - remove = 0; - - mtx_lock(&aio_job_mtx); - if (job->jobstate == JOBST_JOBQGLOBAL) { - TAILQ_REMOVE(&aio_jobs, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSOCK) { - MPASS(fp->f_type == DTYPE_SOCKET); - so = fp->f_data; - TAILQ_REMOVE(&so->so_aiojobq, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSYNC) { - TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); - remove = 1; - } - mtx_unlock(&aio_job_mtx); - - if (remove) { - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - job->uaiocb._aiocb_private.status = -1; - job->uaiocb._aiocb_private.error = ECANCELED; - aio_bio_done_notify(p, job, DONE_QUEUE); + if (aio_cancel_job(p, ki, job)) { cancelled++; } else { notcancelled++; @@ -2102,7 +2062,7 @@ kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops) AIO_LOCK(ki); TAILQ_FOREACH(job, &ki->kaio_all, allist) { if (job->ujob == ujob) { - if (job->jobstate == JOBST_JOBFINISHED) + if (job->jobflags & KAIOCB_FINISHED) td->td_retval[0] = job->uaiocb._aiocb_private.error; else @@ -2382,35 +2342,36 @@ aio_physwakeup(struct bio *bp) struct kaiocb *job = (struct kaiocb *)bp->bio_caller1; struct proc *userp; struct kaioinfo *ki; - int nblks; + size_t nbytes; + int error, nblks; /* Release mapping into kernel space. */ + userp = job->userproc; + ki = userp->p_aioinfo; if (job->pbuf) { pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages); relpbuf(job->pbuf, NULL); job->pbuf = NULL; atomic_subtract_int(&num_buf_aio, 1); + AIO_LOCK(ki); + ki->kaio_buffer_count--; + AIO_UNLOCK(ki); } vm_page_unhold_pages(job->pages, job->npages); bp = job->bp; job->bp = NULL; - userp = job->userproc; - ki = userp->p_aioinfo; - AIO_LOCK(ki); - job->uaiocb._aiocb_private.status -= bp->bio_resid; - job->uaiocb._aiocb_private.error = 0; + nbytes = job->uaiocb.aio_nbytes - bp->bio_resid; + error = 0; if (bp->bio_flags & BIO_ERROR) - job->uaiocb._aiocb_private.error = bp->bio_error; - nblks = btodb(job->uaiocb.aio_nbytes); + error = bp->bio_error; + nblks = btodb(nbytes); if (job->uaiocb.aio_lio_opcode == LIO_WRITE) job->outputcharge += nblks; else job->inputcharge += nblks; - TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist); - ki->kaio_buffer_count--; - aio_bio_done_notify(userp, job, DONE_BUF); - AIO_UNLOCK(ki); + + aio_complete(job, nbytes, error); g_destroy_bio(bp); } @@ -2465,7 +2426,7 @@ kern_aio_waitcomplete(struct thread *td, struct aiocb **ujobp, } if (job != NULL) { - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); ujob = job->ujob; status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; @@ -2570,7 +2531,7 @@ filt_aio(struct knote *kn, long hint) struct kaiocb *job = kn->kn_ptr.p_aio; kn->kn_data = job->uaiocb._aiocb_private.error; - if (job->jobstate != JOBST_JOBFINISHED) + if (!(job->jobflags & KAIOCB_FINISHED)) return (0); kn->kn_flags |= EV_EOF; return (1); |