diff options
author | jhb <jhb@FreeBSD.org> | 2016-03-01 18:12:14 +0000 |
---|---|---|
committer | jhb <jhb@FreeBSD.org> | 2016-03-01 18:12:14 +0000 |
commit | be47bc68fb065fc834ff51fea7df108abeae031c (patch) | |
tree | d445e19e349faf1791e2ec0922554b84037d618e | |
parent | 15b2caff0f7170f0c4bb31748f12833744f7985c (diff) | |
download | FreeBSD-src-be47bc68fb065fc834ff51fea7df108abeae031c.zip FreeBSD-src-be47bc68fb065fc834ff51fea7df108abeae031c.tar.gz |
Refactor the AIO subsystem to permit file-type-specific handling and
improve cancellation robustness.
Introduce a new file operation, fo_aio_queue, which is responsible for
queueing and completing an asynchronous I/O request for a given file.
The AIO subystem now exports library of routines to manipulate AIO
requests as well as the ability to run a handler function in the
"default" pool of AIO daemons to service a request.
A default implementation for file types which do not include an
fo_aio_queue method queues requests to the "default" pool invoking the
fo_read or fo_write methods as before.
The AIO subsystem permits file types to install a private "cancel"
routine when a request is queued to permit safe dequeueing and cleanup
of cancelled requests.
Sockets now use their own pool of AIO daemons and service per-socket
requests in FIFO order. Socket requests will not block indefinitely
permitting timely cancellation of all requests.
Due to the now-tight coupling of the AIO subsystem with file types,
the AIO subsystem is now a standard part of all kernels. The VFS_AIO
kernel option and aio.ko module are gone.
Many file types may block indefinitely in their fo_read or fo_write
callbacks resulting in a hung AIO daemon. This can result in hung
user processes (when processes attempt to cancel all outstanding
requests during exit) or a hung system. To protect against this, AIO
requests are only permitted for known "safe" files by default. AIO
requests for all file types can be enabled by setting the new
vfs.aio.enable_usafe sysctl to a non-zero value. The AIO tests have
been updated to skip operations on unsafe file types if the sysctl is
zero.
Currently, AIO requests on sockets and raw disks are considered safe
and are enabled by default. aio_mlock() is also enabled by default.
Reviewed by: cem, jilles
Discussed with: kib (earlier version)
Sponsored by: Chelsio Communications
Differential Revision: https://reviews.freebsd.org/D5289
-rw-r--r-- | share/man/man4/aio.4 | 118 | ||||
-rw-r--r-- | sys/conf/NOTES | 5 | ||||
-rw-r--r-- | sys/conf/files | 2 | ||||
-rw-r--r-- | sys/conf/options | 1 | ||||
-rw-r--r-- | sys/kern/sys_socket.c | 397 | ||||
-rw-r--r-- | sys/kern/uipc_debug.c | 4 | ||||
-rw-r--r-- | sys/kern/uipc_sockbuf.c | 2 | ||||
-rw-r--r-- | sys/kern/uipc_socket.c | 6 | ||||
-rw-r--r-- | sys/kern/vfs_aio.c | 737 | ||||
-rw-r--r-- | sys/modules/Makefile | 1 | ||||
-rw-r--r-- | sys/modules/aio/Makefile | 10 | ||||
-rw-r--r-- | sys/sys/aio.h | 121 | ||||
-rw-r--r-- | sys/sys/file.h | 10 | ||||
-rw-r--r-- | sys/sys/sockbuf.h | 4 | ||||
-rw-r--r-- | sys/sys/socketvar.h | 4 | ||||
-rw-r--r-- | tests/sys/aio/aio_kqueue_test.c | 2 | ||||
-rw-r--r-- | tests/sys/aio/aio_test.c | 5 | ||||
-rw-r--r-- | tests/sys/aio/lio_kqueue_test.c | 2 | ||||
-rw-r--r-- | tests/sys/aio/local.h | 74 |
19 files changed, 1072 insertions, 433 deletions
diff --git a/share/man/man4/aio.4 b/share/man/man4/aio.4 index 8e773d9..f973639 100644 --- a/share/man/man4/aio.4 +++ b/share/man/man4/aio.4 @@ -27,24 +27,116 @@ .\" .\" $FreeBSD$ .\" -.Dd October 24, 2002 +.Dd March 1, 2016 .Dt AIO 4 .Os .Sh NAME .Nm aio .Nd asynchronous I/O -.Sh SYNOPSIS -To link into the kernel: -.Cd "options VFS_AIO" -.Pp -To load as a kernel loadable module: -.Dl kldload aio .Sh DESCRIPTION The .Nm facility provides system calls for asynchronous I/O. -It is available both as a kernel option for static inclusion and as a -dynamic kernel module. +However, asynchronous I/O operations are only enabled for certain file +types by default. +Asynchronous I/O operations for other file types may block an AIO daemon +indefinitely resulting in process and/or system hangs. +Asynchronous I/O operations can be enabled for all file types by setting +the +.Va vfs.aio.enable_unsafe +sysctl node to a non-zero value. +.Pp +Asynchronous I/O operations on sockets and raw disk devices do not block +indefinitely and are enabled by default. +.Pp +The +.Nm +facility uses kernel processes +(also known as AIO daemons) +to service most asynchronous I/O requests. +These processes are grouped into pools containing a variable number of +processes. +Each pool will add or remove processes to the pool based on load. +Pools can be configured by sysctl nodes that define the minimum +and maximum number of processes as well as the amount of time an idle +process will wait before exiting. +.Pp +One pool of AIO daemons is used to service asynchronous I/O requests for +sockets. +These processes are named +.Dq soaiod<N> . +The following sysctl nodes are used with this pool: +.Bl -tag -width indent +.It Va kern.ipc.aio.num_procs +The current number of processes in the pool. +.It Va kern.ipc.aio.target_procs +The minimum number of processes that should be present in the pool. +.It Va kern.ipc.aio.max_procs +The maximum number of processes permitted in the pool. +.It Va kern.ipc.aio.lifetime +The amount of time a process is permitted to idle in clock ticks. +If a process is idle for this amount of time and there are more processes +in the pool than the target minimum, +the process will exit. +.El +.Pp +A second pool of AIO daemons is used to service all other asynchronous I/O +requests except for I/O requests to raw disks. +These processes are named +.Dq aiod<N> . +The following sysctl nodes are used with this pool: +.Bl -tag -width indent +.It Va vfs.aio.num_aio_procs +The current number of processes in the pool. +.It Va vfs.aio.target_aio_procs +The minimum number of processes that should be present in the pool. +.It Va vfs.aio.max_aio_procs +The maximum number of processes permitted in the pool. +.It Va vfs.aio.aiod_lifetime +The amount of time a process is permitted to idle in clock ticks. +If a process is idle for this amount of time and there are more processes +in the pool than the target minimum, +the process will exit. +.El +.Pp +Asynchronous I/O requests for raw disks are queued directly to the disk +device layer after temporarily wiring the user pages associated with the +request. +These requests are not serviced by any of the AIO daemon pools. +.Pp +Several limits on the number of asynchronous I/O requests are imposed both +system-wide and per-process. +These limits are configured via the following sysctls: +.Bl -tag -width indent +.It Va vfs.aio.max_buf_aio +The maximum number of queued asynchronous I/O requests for raw disks permitted +for a single process. +Asynchronous I/O requests that have completed but whose status has not been +retrieved via +.Xr aio_return 2 +or +.Xr aio_waitcomplete 2 +are not counted against this limit. +.It Va vfs.aio.num_buf_aio +The number of queued asynchronous I/O requests for raw disks system-wide. +.It Va vfs.aio.max_aio_queue_per_proc +The maximum number of asynchronous I/O requests for a single process +serviced concurrently by the default AIO daemon pool. +.It Va vfs.aio.max_aio_per_proc +The maximum number of outstanding asynchronous I/O requests permitted for a +single process. +This includes requests that have not been serviced, +requests currently being serviced, +and requests that have completed but whose status has not been retrieved via +.Xr aio_return 2 +or +.Xr aio_waitcomplete 2 . +.It Va vfs.aio.num_queue_count +The number of outstanding asynchronous I/O requests system-wide. +.It Va vfs.aio.max_aio_queue +The maximum number of outstanding asynchronous I/O requests permitted +system-wide. +.El .Sh SEE ALSO .Xr aio_cancel 2 , .Xr aio_error 2 , @@ -54,9 +146,7 @@ dynamic kernel module. .Xr aio_waitcomplete 2 , .Xr aio_write 2 , .Xr lio_listio 2 , -.Xr config 8 , -.Xr kldload 8 , -.Xr kldunload 8 +.Xr sysctl 8 .Sh HISTORY The .Nm @@ -66,3 +156,7 @@ The .Nm kernel module appeared in .Fx 5.0 . +The +.Nm +facility was integrated into all kernels in +.Fx 11.0 . diff --git a/sys/conf/NOTES b/sys/conf/NOTES index efef9e4..5a8014c 100644 --- a/sys/conf/NOTES +++ b/sys/conf/NOTES @@ -1130,11 +1130,6 @@ options EXT2FS # options REISERFS -# Use real implementations of the aio_* system calls. There are numerous -# stability and security issues in the current aio code that make it -# unsuitable for inclusion on machines with untrusted local users. -options VFS_AIO - # Cryptographically secure random number generator; /dev/random device random diff --git a/sys/conf/files b/sys/conf/files index 5f6201a..47b68b9 100644 --- a/sys/conf/files +++ b/sys/conf/files @@ -3332,7 +3332,7 @@ kern/uipc_socket.c standard kern/uipc_syscalls.c standard kern/uipc_usrreq.c standard kern/vfs_acl.c standard -kern/vfs_aio.c optional vfs_aio +kern/vfs_aio.c standard kern/vfs_bio.c standard kern/vfs_cache.c standard kern/vfs_cluster.c standard diff --git a/sys/conf/options b/sys/conf/options index 0bc9225..4df24a5 100644 --- a/sys/conf/options +++ b/sys/conf/options @@ -213,7 +213,6 @@ SYSVSHM opt_sysvipc.h SW_WATCHDOG opt_watchdog.h TURNSTILE_PROFILING UMTX_PROFILING -VFS_AIO VERBOSE_SYSINIT WLCACHE opt_wavelan.h WLDEBUG opt_wavelan.h diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c index dd831ae..ffdb003 100644 --- a/sys/kern/sys_socket.c +++ b/sys/kern/sys_socket.c @@ -34,9 +34,12 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/aio.h> #include <sys/domain.h> #include <sys/file.h> #include <sys/filedesc.h> +#include <sys/kernel.h> +#include <sys/kthread.h> #include <sys/malloc.h> #include <sys/proc.h> #include <sys/protosw.h> @@ -48,6 +51,9 @@ __FBSDID("$FreeBSD$"); #include <sys/filio.h> /* XXX */ #include <sys/sockio.h> #include <sys/stat.h> +#include <sys/sysctl.h> +#include <sys/sysproto.h> +#include <sys/taskqueue.h> #include <sys/uio.h> #include <sys/ucred.h> #include <sys/un.h> @@ -64,6 +70,22 @@ __FBSDID("$FreeBSD$"); #include <security/mac/mac_framework.h> +#include <vm/vm.h> +#include <vm/pmap.h> +#include <vm/vm_extern.h> +#include <vm/vm_map.h> + +static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL, + "socket AIO stats"); + +static int empty_results; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results, + 0, "socket operation returned EAGAIN"); + +static int empty_retries; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries, + 0, "socket operation retries"); + static fo_rdwr_t soo_read; static fo_rdwr_t soo_write; static fo_ioctl_t soo_ioctl; @@ -72,6 +94,9 @@ extern fo_kqfilter_t soo_kqfilter; static fo_stat_t soo_stat; static fo_close_t soo_close; static fo_fill_kinfo_t soo_fill_kinfo; +static fo_aio_queue_t soo_aio_queue; + +static void soo_aio_cancel(struct kaiocb *job); struct fileops socketops = { .fo_read = soo_read, @@ -86,6 +111,7 @@ struct fileops socketops = { .fo_chown = invfo_chown, .fo_sendfile = invfo_sendfile, .fo_fill_kinfo = soo_fill_kinfo, + .fo_aio_queue = soo_aio_queue, .fo_flags = DFLAG_PASSABLE }; @@ -363,3 +389,374 @@ soo_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp) sizeof(kif->kf_path)); return (0); } + +static STAILQ_HEAD(, task) soaio_jobs; +static struct mtx soaio_jobs_lock; +static struct task soaio_kproc_task; +static int soaio_starting, soaio_idle, soaio_queued; +static struct unrhdr *soaio_kproc_unr; + +static int soaio_max_procs = MAX_AIO_PROCS; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0, + "Maximum number of kernel processes to use for async socket IO"); + +static int soaio_num_procs; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0, + "Number of active kernel processes for async socket IO"); + +static int soaio_target_procs = TARGET_AIO_PROCS; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD, + &soaio_target_procs, 0, + "Preferred number of ready kernel processes for async socket IO"); + +static int soaio_lifetime; +SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0, + "Maximum lifetime for idle aiod"); + +static void +soaio_kproc_loop(void *arg) +{ + struct proc *p; + struct vmspace *myvm; + struct task *task; + int error, id, pending; + + id = (intptr_t)arg; + + /* + * Grab an extra reference on the daemon's vmspace so that it + * doesn't get freed by jobs that switch to a different + * vmspace. + */ + p = curproc; + myvm = vmspace_acquire_ref(p); + + mtx_lock(&soaio_jobs_lock); + MPASS(soaio_starting > 0); + soaio_starting--; + for (;;) { + while (!STAILQ_EMPTY(&soaio_jobs)) { + task = STAILQ_FIRST(&soaio_jobs); + STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link); + soaio_queued--; + pending = task->ta_pending; + task->ta_pending = 0; + mtx_unlock(&soaio_jobs_lock); + + task->ta_func(task->ta_context, pending); + + mtx_lock(&soaio_jobs_lock); + } + MPASS(soaio_queued == 0); + + if (p->p_vmspace != myvm) { + mtx_unlock(&soaio_jobs_lock); + vmspace_switch_aio(myvm); + mtx_lock(&soaio_jobs_lock); + continue; + } + + soaio_idle++; + error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-", + soaio_lifetime); + soaio_idle--; + if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) && + soaio_num_procs > soaio_target_procs) + break; + } + soaio_num_procs--; + mtx_unlock(&soaio_jobs_lock); + free_unr(soaio_kproc_unr, id); + kproc_exit(0); +} + +static void +soaio_kproc_create(void *context, int pending) +{ + struct proc *p; + int error, id; + + mtx_lock(&soaio_jobs_lock); + for (;;) { + if (soaio_num_procs < soaio_target_procs) { + /* Must create */ + } else if (soaio_num_procs >= soaio_max_procs) { + /* + * Hit the limit on kernel processes, don't + * create another one. + */ + break; + } else if (soaio_queued <= soaio_idle + soaio_starting) { + /* + * No more AIO jobs waiting for a process to be + * created, so stop. + */ + break; + } + soaio_starting++; + mtx_unlock(&soaio_jobs_lock); + + id = alloc_unr(soaio_kproc_unr); + error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id, + &p, 0, 0, "soaiod%d", id); + if (error != 0) { + free_unr(soaio_kproc_unr, id); + mtx_lock(&soaio_jobs_lock); + soaio_starting--; + break; + } + + mtx_lock(&soaio_jobs_lock); + soaio_num_procs++; + } + mtx_unlock(&soaio_jobs_lock); +} + +static void +soaio_enqueue(struct task *task) +{ + + mtx_lock(&soaio_jobs_lock); + MPASS(task->ta_pending == 0); + task->ta_pending++; + STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link); + soaio_queued++; + if (soaio_queued <= soaio_idle) + wakeup_one(&soaio_idle); + else if (soaio_num_procs < soaio_max_procs) + taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task); + mtx_unlock(&soaio_jobs_lock); +} + +static void +soaio_init(void) +{ + + soaio_lifetime = AIOD_LIFETIME_DEFAULT; + STAILQ_INIT(&soaio_jobs); + mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF); + soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL); + TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL); + if (soaio_target_procs > 0) + taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task); +} +SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL); + +static __inline int +soaio_ready(struct socket *so, struct sockbuf *sb) +{ + return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so)); +} + +static void +soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job) +{ + struct ucred *td_savedcred; + struct thread *td; + struct file *fp; + struct uio uio; + struct iovec iov; + size_t cnt; + int error, flags; + + SOCKBUF_UNLOCK(sb); + aio_switch_vmspace(job); + td = curthread; + fp = job->fd_file; +retry: + td_savedcred = td->td_ucred; + td->td_ucred = job->cred; + + cnt = job->uaiocb.aio_nbytes; + iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf; + iov.iov_len = cnt; + uio.uio_iov = &iov; + uio.uio_iovcnt = 1; + uio.uio_offset = 0; + uio.uio_resid = cnt; + uio.uio_segflg = UIO_USERSPACE; + uio.uio_td = td; + flags = MSG_NBIO; + + /* TODO: Charge ru_msg* to job. */ + + if (sb == &so->so_rcv) { + uio.uio_rw = UIO_READ; +#ifdef MAC + error = mac_socket_check_receive(fp->f_cred, so); + if (error == 0) + +#endif + error = soreceive(so, NULL, &uio, NULL, NULL, &flags); + } else { + uio.uio_rw = UIO_WRITE; +#ifdef MAC + error = mac_socket_check_send(fp->f_cred, so); + if (error == 0) +#endif + error = sosend(so, NULL, &uio, NULL, NULL, flags, td); + if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) { + PROC_LOCK(job->userproc); + kern_psignal(job->userproc, SIGPIPE); + PROC_UNLOCK(job->userproc); + } + } + + cnt -= uio.uio_resid; + td->td_ucred = td_savedcred; + + /* XXX: Not sure if this is needed? */ + if (cnt != 0 && (error == ERESTART || error == EINTR || + error == EWOULDBLOCK)) + error = 0; + if (error == EWOULDBLOCK) { + /* + * A read() or write() on the socket raced with this + * request. If the socket is now ready, try again. + * If it is not, place this request at the head of the + * queue to try again when the socket is ready. + */ + SOCKBUF_LOCK(sb); + empty_results++; + if (soaio_ready(so, sb)) { + empty_retries++; + SOCKBUF_UNLOCK(sb); + goto retry; + } + + if (!aio_set_cancel_function(job, soo_aio_cancel)) { + MPASS(cnt == 0); + SOCKBUF_UNLOCK(sb); + aio_cancel(job); + SOCKBUF_LOCK(sb); + } else { + TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); + } + } else { + aio_complete(job, cnt, error); + SOCKBUF_LOCK(sb); + } +} + +static void +soaio_process_sb(struct socket *so, struct sockbuf *sb) +{ + struct kaiocb *job; + + SOCKBUF_LOCK(sb); + while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) { + job = TAILQ_FIRST(&sb->sb_aiojobq); + TAILQ_REMOVE(&sb->sb_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) + continue; + + soaio_process_job(so, sb, job); + } + + /* + * If there are still pending requests, the socket must not be + * ready so set SB_AIO to request a wakeup when the socket + * becomes ready. + */ + if (!TAILQ_EMPTY(&sb->sb_aiojobq)) + sb->sb_flags |= SB_AIO; + sb->sb_flags &= ~SB_AIO_RUNNING; + SOCKBUF_UNLOCK(sb); + + ACCEPT_LOCK(); + SOCK_LOCK(so); + sorele(so); +} + +void +soaio_rcv(void *context, int pending) +{ + struct socket *so; + + so = context; + soaio_process_sb(so, &so->so_rcv); +} + +void +soaio_snd(void *context, int pending) +{ + struct socket *so; + + so = context; + soaio_process_sb(so, &so->so_snd); +} + +void +sowakeup_aio(struct socket *so, struct sockbuf *sb) +{ + + SOCKBUF_LOCK_ASSERT(sb); + sb->sb_flags &= ~SB_AIO; + if (sb->sb_flags & SB_AIO_RUNNING) + return; + sb->sb_flags |= SB_AIO_RUNNING; + if (sb == &so->so_snd) + SOCK_LOCK(so); + soref(so); + if (sb == &so->so_snd) + SOCK_UNLOCK(so); + soaio_enqueue(&sb->sb_aiotask); +} + +static void +soo_aio_cancel(struct kaiocb *job) +{ + struct socket *so; + struct sockbuf *sb; + int opcode; + + so = job->fd_file->f_data; + opcode = job->uaiocb.aio_lio_opcode; + if (opcode == LIO_READ) + sb = &so->so_rcv; + else { + MPASS(opcode == LIO_WRITE); + sb = &so->so_snd; + } + + SOCKBUF_LOCK(sb); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&sb->sb_aiojobq, job, list); + if (TAILQ_EMPTY(&sb->sb_aiojobq)) + sb->sb_flags &= ~SB_AIO; + SOCKBUF_UNLOCK(sb); + + aio_cancel(job); +} + +static int +soo_aio_queue(struct file *fp, struct kaiocb *job) +{ + struct socket *so; + struct sockbuf *sb; + + so = fp->f_data; + switch (job->uaiocb.aio_lio_opcode) { + case LIO_READ: + sb = &so->so_rcv; + break; + case LIO_WRITE: + sb = &so->so_snd; + break; + default: + return (EINVAL); + } + + SOCKBUF_LOCK(sb); + if (!aio_set_cancel_function(job, soo_aio_cancel)) + panic("new job was cancelled"); + TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list); + if (!(sb->sb_flags & SB_AIO_RUNNING)) { + if (soaio_ready(so, sb)) + sowakeup_aio(so, sb); + else + sb->sb_flags |= SB_AIO; + } + SOCKBUF_UNLOCK(sb); + return (0); +} diff --git a/sys/kern/uipc_debug.c b/sys/kern/uipc_debug.c index 7c8b93c..fb4ee52 100644 --- a/sys/kern/uipc_debug.c +++ b/sys/kern/uipc_debug.c @@ -416,6 +416,9 @@ db_print_sockbuf(struct sockbuf *sb, const char *sockbufname, int indent) db_printf("sb_flags: 0x%x (", sb->sb_flags); db_print_sbflags(sb->sb_flags); db_printf(")\n"); + + db_print_indent(indent); + db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq)); } static void @@ -470,7 +473,6 @@ db_print_socket(struct socket *so, const char *socketname, int indent) db_print_indent(indent); db_printf("so_sigio: %p ", so->so_sigio); db_printf("so_oobmark: %lu ", so->so_oobmark); - db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq)); db_print_sockbuf(&so->so_rcv, "so_rcv", indent); db_print_sockbuf(&so->so_snd, "so_snd", indent); diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c index edf03a3..0cdd43b 100644 --- a/sys/kern/uipc_sockbuf.c +++ b/sys/kern/uipc_sockbuf.c @@ -332,7 +332,7 @@ sowakeup(struct socket *so, struct sockbuf *sb) } else ret = SU_OK; if (sb->sb_flags & SB_AIO) - aio_swake(so, sb); + sowakeup_aio(so, sb); SOCKBUF_UNLOCK(sb); if (ret == SU_ISCONNECTED) soisconnected(so); diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index 5d2247f..35d17b1 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -134,6 +134,7 @@ __FBSDID("$FreeBSD$"); #include <sys/stat.h> #include <sys/sx.h> #include <sys/sysctl.h> +#include <sys/taskqueue.h> #include <sys/uio.h> #include <sys/jail.h> #include <sys/syslog.h> @@ -396,7 +397,10 @@ soalloc(struct vnet *vnet) SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv"); sx_init(&so->so_snd.sb_sx, "so_snd_sx"); sx_init(&so->so_rcv.sb_sx, "so_rcv_sx"); - TAILQ_INIT(&so->so_aiojobq); + TAILQ_INIT(&so->so_snd.sb_aiojobq); + TAILQ_INIT(&so->so_rcv.sb_aiojobq); + TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so); + TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so); #ifdef VIMAGE VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p", __func__, __LINE__, so)); diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index 5b2083c..59dd57c 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -72,8 +72,6 @@ __FBSDID("$FreeBSD$"); #include <vm/uma.h> #include <sys/aio.h> -#include "opt_vfs_aio.h" - /* * Counter for allocating reference ids to new jobs. Wrapped to 1 on * overflow. (XXX will be removed soon.) @@ -85,14 +83,6 @@ static u_long jobrefid; */ static uint64_t jobseqno; -#define JOBST_NULL 0 -#define JOBST_JOBQSOCK 1 -#define JOBST_JOBQGLOBAL 2 -#define JOBST_JOBRUNNING 3 -#define JOBST_JOBFINISHED 4 -#define JOBST_JOBQBUF 5 -#define JOBST_JOBQSYNC 6 - #ifndef MAX_AIO_PER_PROC #define MAX_AIO_PER_PROC 32 #endif @@ -101,26 +91,14 @@ static uint64_t jobseqno; #define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */ #endif -#ifndef MAX_AIO_PROCS -#define MAX_AIO_PROCS 32 -#endif - #ifndef MAX_AIO_QUEUE #define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */ #endif -#ifndef TARGET_AIO_PROCS -#define TARGET_AIO_PROCS 4 -#endif - #ifndef MAX_BUF_AIO #define MAX_BUF_AIO 16 #endif -#ifndef AIOD_LIFETIME_DEFAULT -#define AIOD_LIFETIME_DEFAULT (30 * hz) -#endif - FEATURE(aio, "Asynchronous I/O"); static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list"); @@ -128,6 +106,10 @@ static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list"); static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management"); +static int enable_aio_unsafe = 0; +SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0, + "Permit asynchronous IO on all file types, not just known-safe types"); + static int max_aio_procs = MAX_AIO_PROCS; SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0, "Maximum number of kernel processes to use for handling async IO "); @@ -165,11 +147,6 @@ static int aiod_lifetime; SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0, "Maximum lifetime for idle aiod"); -static int unloadable = 0; -SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0, - "Allow unload of aio (not recommended)"); - - static int max_aio_per_proc = MAX_AIO_PER_PROC; SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc, 0, @@ -208,46 +185,27 @@ typedef struct oaiocb { */ /* - * Current, there is only two backends: BIO and generic file I/O. - * socket I/O is served by generic file I/O, this is not a good idea, since - * disk file I/O and any other types without O_NONBLOCK flag can block daemon - * processes, if there is no thread to serve socket I/O, the socket I/O will be - * delayed too long or starved, we should create some processes dedicated to - * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O - * systems we really need non-blocking interface, fiddling O_NONBLOCK in file - * structure is not safe because there is race between userland and aio - * daemons. + * If the routine that services an AIO request blocks while running in an + * AIO kernel process it can starve other I/O requests. BIO requests + * queued via aio_qphysio() complete in GEOM and do not use AIO kernel + * processes at all. Socket I/O requests use a separate pool of + * kprocs and also force non-blocking I/O. Other file I/O requests + * use the generic fo_read/fo_write operations which can block. The + * fsync and mlock operations can also block while executing. Ideally + * none of these requests would block while executing. + * + * Note that the service routines cannot toggle O_NONBLOCK in the file + * structure directly while handling a request due to races with + * userland threads. */ -struct kaiocb { - TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */ - TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */ - TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */ - int jobflags; /* (a) job flags */ - int jobstate; /* (b) job state */ - int inputcharge; /* (*) input blockes */ - int outputcharge; /* (*) output blockes */ - struct bio *bp; /* (*) BIO backend BIO pointer */ - struct buf *pbuf; /* (*) BIO backend buffer pointer */ - struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */ - int npages; /* BIO backend number of pages */ - struct proc *userproc; /* (*) user process */ - struct ucred *cred; /* (*) active credential when created */ - struct file *fd_file; /* (*) pointer to file structure */ - struct aioliojob *lio; /* (*) optional lio job */ - struct aiocb *ujob; /* (*) pointer in userspace of aiocb */ - struct knlist klist; /* (a) list of knotes */ - struct aiocb uaiocb; /* (*) kernel I/O control block */ - ksiginfo_t ksi; /* (a) realtime signal info */ - uint64_t seqno; /* (*) job number */ - int pending; /* (a) number of pending I/O, aio_fsync only */ -}; - /* jobflags */ -#define KAIOCB_DONE 0x01 -#define KAIOCB_BUFDONE 0x02 -#define KAIOCB_RUNDOWN 0x04 +#define KAIOCB_QUEUEING 0x01 +#define KAIOCB_CANCELLED 0x02 +#define KAIOCB_CANCELLING 0x04 #define KAIOCB_CHECKSYNC 0x08 +#define KAIOCB_CLEARED 0x10 +#define KAIOCB_FINISHED 0x20 /* * AIO process info @@ -293,9 +251,10 @@ struct kaioinfo { TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */ TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */ TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */ - TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */ TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */ + TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */ struct task kaio_task; /* (*) task to kick aio processes */ + struct task kaio_sync_task; /* (*) task to schedule fsync jobs */ }; #define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx) @@ -332,21 +291,18 @@ static int aio_free_entry(struct kaiocb *job); static void aio_process_rw(struct kaiocb *job); static void aio_process_sync(struct kaiocb *job); static void aio_process_mlock(struct kaiocb *job); +static void aio_schedule_fsync(void *context, int pending); static int aio_newproc(int *); int aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lio, int type, struct aiocb_ops *ops); +static int aio_queue_file(struct file *fp, struct kaiocb *job); static void aio_physwakeup(struct bio *bp); static void aio_proc_rundown(void *arg, struct proc *p); static void aio_proc_rundown_exec(void *arg, struct proc *p, struct image_params *imgp); static int aio_qphysio(struct proc *p, struct kaiocb *job); static void aio_daemon(void *param); -static void aio_swake_cb(struct socket *, struct sockbuf *); -static int aio_unload(void); -static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job, - int type); -#define DONE_BUF 1 -#define DONE_QUEUE 2 +static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job); static int aio_kick(struct proc *userp); static void aio_kick_nowait(struct proc *userp); static void aio_kick_helper(void *context, int pending); @@ -397,13 +353,10 @@ aio_modload(struct module *module, int cmd, void *arg) case MOD_LOAD: aio_onceonly(); break; - case MOD_UNLOAD: - error = aio_unload(); - break; case MOD_SHUTDOWN: break; default: - error = EINVAL; + error = EOPNOTSUPP; break; } return (error); @@ -471,8 +424,6 @@ aio_onceonly(void) { int error; - /* XXX: should probably just use so->callback */ - aio_swake = &aio_swake_cb; exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL, EVENTHANDLER_PRI_ANY); exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec, @@ -513,55 +464,6 @@ aio_onceonly(void) } /* - * Callback for unload of AIO when used as a module. - */ -static int -aio_unload(void) -{ - int error; - - /* - * XXX: no unloads by default, it's too dangerous. - * perhaps we could do it if locked out callers and then - * did an aio_proc_rundown() on each process. - * - * jhb: aio_proc_rundown() needs to run on curproc though, - * so I don't think that would fly. - */ - if (!unloadable) - return (EOPNOTSUPP); - -#ifdef COMPAT_FREEBSD32 - syscall32_helper_unregister(aio32_syscalls); -#endif - syscall_helper_unregister(aio_syscalls); - - error = kqueue_del_filteropts(EVFILT_AIO); - if (error) - return error; - error = kqueue_del_filteropts(EVFILT_LIO); - if (error) - return error; - async_io_version = 0; - aio_swake = NULL; - taskqueue_free(taskqueue_aiod_kick); - delete_unrhdr(aiod_unr); - uma_zdestroy(kaio_zone); - uma_zdestroy(aiop_zone); - uma_zdestroy(aiocb_zone); - uma_zdestroy(aiol_zone); - uma_zdestroy(aiolio_zone); - EVENTHANDLER_DEREGISTER(process_exit, exit_tag); - EVENTHANDLER_DEREGISTER(process_exec, exec_tag); - mtx_destroy(&aio_job_mtx); - sema_destroy(&aio_newproc_sem); - p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1); - p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1); - p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1); - return (0); -} - -/* * Init the per-process aioinfo structure. The aioinfo limits are set * per-process for user limit (resource) management. */ @@ -582,10 +484,11 @@ aio_init_aioinfo(struct proc *p) TAILQ_INIT(&ki->kaio_all); TAILQ_INIT(&ki->kaio_done); TAILQ_INIT(&ki->kaio_jobqueue); - TAILQ_INIT(&ki->kaio_bufqueue); TAILQ_INIT(&ki->kaio_liojoblist); TAILQ_INIT(&ki->kaio_syncqueue); + TAILQ_INIT(&ki->kaio_syncready); TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p); + TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki); PROC_LOCK(p); if (p->p_aioinfo == NULL) { p->p_aioinfo = ki; @@ -637,7 +540,7 @@ aio_free_entry(struct kaiocb *job) MPASS(ki != NULL); AIO_LOCK_ASSERT(ki, MA_OWNED); - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); atomic_subtract_int(&num_queue_count, 1); @@ -670,7 +573,6 @@ aio_free_entry(struct kaiocb *job) PROC_UNLOCK(p); MPASS(job->bp == NULL); - job->jobstate = JOBST_NULL; AIO_UNLOCK(ki); /* @@ -709,6 +611,57 @@ aio_proc_rundown_exec(void *arg, struct proc *p, aio_proc_rundown(arg, p); } +static int +aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job) +{ + aio_cancel_fn_t *func; + int cancelled; + + AIO_LOCK_ASSERT(ki, MA_OWNED); + if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED)) + return (0); + MPASS((job->jobflags & KAIOCB_CANCELLING) == 0); + job->jobflags |= KAIOCB_CANCELLED; + + func = job->cancel_fn; + + /* + * If there is no cancel routine, just leave the job marked as + * cancelled. The job should be in active use by a caller who + * should complete it normally or when it fails to install a + * cancel routine. + */ + if (func == NULL) + return (0); + + /* + * Set the CANCELLING flag so that aio_complete() will defer + * completions of this job. This prevents the job from being + * freed out from under the cancel callback. After the + * callback any deferred completion (whether from the callback + * or any other source) will be completed. + */ + job->jobflags |= KAIOCB_CANCELLING; + AIO_UNLOCK(ki); + func(job); + AIO_LOCK(ki); + job->jobflags &= ~KAIOCB_CANCELLING; + if (job->jobflags & KAIOCB_FINISHED) { + cancelled = job->uaiocb._aiocb_private.error == ECANCELED; + TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); + aio_bio_done_notify(p, job); + } else { + /* + * The cancel callback might have scheduled an + * operation to cancel this request, but it is + * only counted as cancelled if the request is + * cancelled when the callback returns. + */ + cancelled = 0; + } + return (cancelled); +} + /* * Rundown the jobs for a given process. */ @@ -718,9 +671,6 @@ aio_proc_rundown(void *arg, struct proc *p) struct kaioinfo *ki; struct aioliojob *lj; struct kaiocb *job, *jobn; - struct file *fp; - struct socket *so; - int remove; KASSERT(curthread->td_proc == p, ("%s: called on non-curproc", __func__)); @@ -738,35 +688,11 @@ restart: * aio_cancel on all pending I/O requests. */ TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) { - remove = 0; - mtx_lock(&aio_job_mtx); - if (job->jobstate == JOBST_JOBQGLOBAL) { - TAILQ_REMOVE(&aio_jobs, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSOCK) { - fp = job->fd_file; - MPASS(fp->f_type == DTYPE_SOCKET); - so = fp->f_data; - TAILQ_REMOVE(&so->so_aiojobq, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSYNC) { - TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); - remove = 1; - } - mtx_unlock(&aio_job_mtx); - - if (remove) { - job->jobstate = JOBST_JOBFINISHED; - job->uaiocb._aiocb_private.status = -1; - job->uaiocb._aiocb_private.error = ECANCELED; - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - aio_bio_done_notify(p, job, DONE_QUEUE); - } + aio_cancel_job(p, ki, job); } /* Wait for all running I/O to be finished */ - if (TAILQ_FIRST(&ki->kaio_bufqueue) || - TAILQ_FIRST(&ki->kaio_jobqueue)) { + if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) { ki->kaio_flags |= KAIO_WAKEUP; msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz); goto restart; @@ -791,6 +717,7 @@ restart: } AIO_UNLOCK(ki); taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task); + taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task); mtx_destroy(&ki->kaio_mtx); uma_zfree(kaio_zone, ki); p->p_aioinfo = NULL; @@ -807,15 +734,18 @@ aio_selectjob(struct aioproc *aiop) struct proc *userp; mtx_assert(&aio_job_mtx, MA_OWNED); +restart: TAILQ_FOREACH(job, &aio_jobs, list) { userp = job->userproc; ki = userp->p_aioinfo; if (ki->kaio_active_count < ki->kaio_maxactive_count) { TAILQ_REMOVE(&aio_jobs, job, list); + if (!aio_clear_cancel_function(job)) + goto restart; + /* Account for currently active jobs. */ ki->kaio_active_count++; - job->jobstate = JOBST_JOBRUNNING; break; } } @@ -863,7 +793,6 @@ aio_process_rw(struct kaiocb *job) struct thread *td; struct aiocb *cb; struct file *fp; - struct socket *so; struct uio auio; struct iovec aiov; int cnt; @@ -875,6 +804,7 @@ aio_process_rw(struct kaiocb *job) job->uaiocb.aio_lio_opcode == LIO_WRITE, ("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode)); + aio_switch_vmspace(job); td = curthread; td_savedcred = td->td_ucred; td->td_ucred = job->cred; @@ -920,24 +850,15 @@ aio_process_rw(struct kaiocb *job) if (error == ERESTART || error == EINTR || error == EWOULDBLOCK) error = 0; if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) { - int sigpipe = 1; - if (fp->f_type == DTYPE_SOCKET) { - so = fp->f_data; - if (so->so_options & SO_NOSIGPIPE) - sigpipe = 0; - } - if (sigpipe) { - PROC_LOCK(job->userproc); - kern_psignal(job->userproc, SIGPIPE); - PROC_UNLOCK(job->userproc); - } + PROC_LOCK(job->userproc); + kern_psignal(job->userproc, SIGPIPE); + PROC_UNLOCK(job->userproc); } } cnt -= auio.uio_resid; - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = cnt; td->td_ucred = td_savedcred; + aio_complete(job, cnt, error); } static void @@ -945,7 +866,6 @@ aio_process_sync(struct kaiocb *job) { struct thread *td = curthread; struct ucred *td_savedcred = td->td_ucred; - struct aiocb *cb = &job->uaiocb; struct file *fp = job->fd_file; int error = 0; @@ -955,9 +875,8 @@ aio_process_sync(struct kaiocb *job) td->td_ucred = job->cred; if (fp->f_vnode != NULL) error = aio_fsync_vnode(td, fp->f_vnode); - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = 0; td->td_ucred = td_savedcred; + aio_complete(job, 0, error); } static void @@ -969,19 +888,20 @@ aio_process_mlock(struct kaiocb *job) KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK, ("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode)); + aio_switch_vmspace(job); error = vm_mlock(job->userproc, job->cred, __DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes); - cb->_aiocb_private.error = error; - cb->_aiocb_private.status = 0; + aio_complete(job, 0, error); } static void -aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) +aio_bio_done_notify(struct proc *userp, struct kaiocb *job) { struct aioliojob *lj; struct kaioinfo *ki; struct kaiocb *sjob, *sjobn; int lj_done; + bool schedule_fsync; ki = userp->p_aioinfo; AIO_LOCK_ASSERT(ki, MA_OWNED); @@ -992,13 +912,8 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) if (lj->lioj_count == lj->lioj_finished_count) lj_done = 1; } - if (type == DONE_QUEUE) { - job->jobflags |= KAIOCB_DONE; - } else { - job->jobflags |= KAIOCB_BUFDONE; - } TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist); - job->jobstate = JOBST_JOBFINISHED; + MPASS(job->jobflags & KAIOCB_FINISHED); if (ki->kaio_flags & KAIO_RUNDOWN) goto notification_done; @@ -1025,20 +940,24 @@ aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type) notification_done: if (job->jobflags & KAIOCB_CHECKSYNC) { + schedule_fsync = false; TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) { if (job->fd_file == sjob->fd_file && job->seqno < sjob->seqno) { if (--sjob->pending == 0) { - mtx_lock(&aio_job_mtx); - sjob->jobstate = JOBST_JOBQGLOBAL; TAILQ_REMOVE(&ki->kaio_syncqueue, sjob, list); - TAILQ_INSERT_TAIL(&aio_jobs, sjob, list); - aio_kick_nowait(userp); - mtx_unlock(&aio_job_mtx); + if (!aio_clear_cancel_function(sjob)) + continue; + TAILQ_INSERT_TAIL(&ki->kaio_syncready, + sjob, list); + schedule_fsync = true; } } } + if (schedule_fsync) + taskqueue_enqueue(taskqueue_aiod_kick, + &ki->kaio_sync_task); } if (ki->kaio_flags & KAIO_WAKEUP) { ki->kaio_flags &= ~KAIO_WAKEUP; @@ -1047,6 +966,103 @@ notification_done: } static void +aio_schedule_fsync(void *context, int pending) +{ + struct kaioinfo *ki; + struct kaiocb *job; + + ki = context; + AIO_LOCK(ki); + while (!TAILQ_EMPTY(&ki->kaio_syncready)) { + job = TAILQ_FIRST(&ki->kaio_syncready); + TAILQ_REMOVE(&ki->kaio_syncready, job, list); + AIO_UNLOCK(ki); + aio_schedule(job, aio_process_sync); + AIO_LOCK(ki); + } + AIO_UNLOCK(ki); +} + +bool +aio_cancel_cleared(struct kaiocb *job) +{ + struct kaioinfo *ki; + + /* + * The caller should hold the same queue lock held when + * aio_clear_cancel_function() was called and set this flag + * ensuring this check sees an up-to-date value. However, + * there is no way to assert that. + */ + ki = job->userproc->p_aioinfo; + return ((job->jobflags & KAIOCB_CLEARED) != 0); +} + +bool +aio_clear_cancel_function(struct kaiocb *job) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + AIO_LOCK(ki); + MPASS(job->cancel_fn != NULL); + if (job->jobflags & KAIOCB_CANCELLING) { + job->jobflags |= KAIOCB_CLEARED; + AIO_UNLOCK(ki); + return (false); + } + job->cancel_fn = NULL; + AIO_UNLOCK(ki); + return (true); +} + +bool +aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + AIO_LOCK(ki); + if (job->jobflags & KAIOCB_CANCELLED) { + AIO_UNLOCK(ki); + return (false); + } + job->cancel_fn = func; + AIO_UNLOCK(ki); + return (true); +} + +void +aio_complete(struct kaiocb *job, long status, int error) +{ + struct kaioinfo *ki; + struct proc *userp; + + job->uaiocb._aiocb_private.error = error; + job->uaiocb._aiocb_private.status = status; + + userp = job->userproc; + ki = userp->p_aioinfo; + + AIO_LOCK(ki); + KASSERT(!(job->jobflags & KAIOCB_FINISHED), + ("duplicate aio_complete")); + job->jobflags |= KAIOCB_FINISHED; + if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) { + TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); + aio_bio_done_notify(userp, job); + } + AIO_UNLOCK(ki); +} + +void +aio_cancel(struct kaiocb *job) +{ + + aio_complete(job, -1, ECANCELED); +} + +void aio_switch_vmspace(struct kaiocb *job) { @@ -1063,7 +1079,7 @@ aio_daemon(void *_id) struct kaiocb *job; struct aioproc *aiop; struct kaioinfo *ki; - struct proc *p, *userp; + struct proc *p; struct vmspace *myvm; struct thread *td = curthread; int id = (intptr_t)_id; @@ -1107,40 +1123,13 @@ aio_daemon(void *_id) */ while ((job = aio_selectjob(aiop)) != NULL) { mtx_unlock(&aio_job_mtx); - userp = job->userproc; - - /* - * Connect to process address space for user program. - */ - aio_switch_vmspace(job); - ki = userp->p_aioinfo; - - /* Do the I/O function. */ - switch(job->uaiocb.aio_lio_opcode) { - case LIO_READ: - case LIO_WRITE: - aio_process_rw(job); - break; - case LIO_SYNC: - aio_process_sync(job); - break; - case LIO_MLOCK: - aio_process_mlock(job); - break; - } + ki = job->userproc->p_aioinfo; + job->handle_fn(job); mtx_lock(&aio_job_mtx); /* Decrement the active job count. */ ki->kaio_active_count--; - mtx_unlock(&aio_job_mtx); - - AIO_LOCK(ki); - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - aio_bio_done_notify(userp, job, DONE_QUEUE); - AIO_UNLOCK(ki); - - mtx_lock(&aio_job_mtx); } /* @@ -1236,7 +1225,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job) struct cdevsw *csw; struct cdev *dev; struct kaioinfo *ki; - struct aioliojob *lj; int error, ref, unmap, poff; vm_prot_t prot; @@ -1293,16 +1281,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job) } AIO_LOCK(ki); - ki->kaio_count++; if (!unmap) ki->kaio_buffer_count++; - lj = job->lio; - if (lj) - lj->lioj_count++; - TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); - job->jobstate = JOBST_JOBQBUF; - cb->_aiocb_private.status = cb->aio_nbytes; AIO_UNLOCK(ki); bp->bio_length = cb->aio_nbytes; @@ -1336,7 +1316,6 @@ aio_qphysio(struct proc *p, struct kaiocb *job) bp->bio_flags |= BIO_UNMAPPED; } - atomic_add_int(&num_queue_count, 1); if (!unmap) atomic_add_int(&num_buf_aio, 1); @@ -1347,14 +1326,8 @@ aio_qphysio(struct proc *p, struct kaiocb *job) doerror: AIO_LOCK(ki); - job->jobstate = JOBST_NULL; - TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist); - TAILQ_REMOVE(&ki->kaio_all, job, allist); - ki->kaio_count--; if (!unmap) ki->kaio_buffer_count--; - if (lj) - lj->lioj_count--; AIO_UNLOCK(ki); if (pbuf) { relpbuf(pbuf, NULL); @@ -1367,40 +1340,6 @@ unref: return (error); } -/* - * Wake up aio requests that may be serviceable now. - */ -static void -aio_swake_cb(struct socket *so, struct sockbuf *sb) -{ - struct kaiocb *job, *jobn; - int opcode; - - SOCKBUF_LOCK_ASSERT(sb); - if (sb == &so->so_snd) - opcode = LIO_WRITE; - else - opcode = LIO_READ; - - sb->sb_flags &= ~SB_AIO; - mtx_lock(&aio_job_mtx); - TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) { - if (opcode == job->uaiocb.aio_lio_opcode) { - if (job->jobstate != JOBST_JOBQSOCK) - panic("invalid queue value"); - /* XXX - * We don't have actual sockets backend yet, - * so we simply move the requests to the generic - * file I/O backend. - */ - TAILQ_REMOVE(&so->so_aiojobq, job, list); - TAILQ_INSERT_TAIL(&aio_jobs, job, list); - aio_kick_nowait(job->userproc); - } - } - mtx_unlock(&aio_job_mtx); -} - static int convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig) { @@ -1521,11 +1460,9 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj, struct proc *p = td->td_proc; cap_rights_t rights; struct file *fp; - struct socket *so; - struct kaiocb *job, *job2; + struct kaiocb *job; struct kaioinfo *ki; struct kevent kev; - struct sockbuf *sb; int opcode; int error; int fd, kqfd; @@ -1668,86 +1605,128 @@ aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lj, kev.data = (intptr_t)job; kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr; error = kqfd_register(kqfd, &kev, td, 1); -aqueue_fail: - if (error) { - if (fp) - fdrop(fp, td); - uma_zfree(aiocb_zone, job); - ops->store_error(ujob, error); - goto done; - } + if (error) + goto aqueue_fail; + no_kqueue: ops->store_error(ujob, EINPROGRESS); job->uaiocb._aiocb_private.error = EINPROGRESS; job->userproc = p; job->cred = crhold(td->td_ucred); - job->jobflags = 0; + job->jobflags = KAIOCB_QUEUEING; job->lio = lj; - if (opcode == LIO_SYNC) - goto queueit; + if (opcode == LIO_MLOCK) { + aio_schedule(job, aio_process_mlock); + error = 0; + } else if (fp->f_ops->fo_aio_queue == NULL) + error = aio_queue_file(fp, job); + else + error = fo_aio_queue(fp, job); + if (error) + goto aqueue_fail; - if (fp && fp->f_type == DTYPE_SOCKET) { + AIO_LOCK(ki); + job->jobflags &= ~KAIOCB_QUEUEING; + TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); + ki->kaio_count++; + if (lj) + lj->lioj_count++; + atomic_add_int(&num_queue_count, 1); + if (job->jobflags & KAIOCB_FINISHED) { /* - * Alternate queueing for socket ops: Reach down into the - * descriptor to get the socket data. Then check to see if the - * socket is ready to be read or written (based on the requested - * operation). - * - * If it is not ready for io, then queue the job on the - * socket, and set the flags so we get a call when sbnotify() - * happens. - * - * Note if opcode is neither LIO_WRITE nor LIO_READ we lock - * and unlock the snd sockbuf for no reason. + * The queue callback completed the request synchronously. + * The bulk of the completion is deferred in that case + * until this point. */ - so = fp->f_data; - sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd; - SOCKBUF_LOCK(sb); - if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode == - LIO_WRITE) && (!sowriteable(so)))) { - sb->sb_flags |= SB_AIO; + aio_bio_done_notify(p, job); + } else + TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); + AIO_UNLOCK(ki); + return (0); - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list); - mtx_unlock(&aio_job_mtx); +aqueue_fail: + knlist_delete(&job->klist, curthread, 0); + if (fp) + fdrop(fp, td); + uma_zfree(aiocb_zone, job); + ops->store_error(ujob, error); + return (error); +} - AIO_LOCK(ki); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); - job->jobstate = JOBST_JOBQSOCK; - ki->kaio_count++; - if (lj) - lj->lioj_count++; - AIO_UNLOCK(ki); - SOCKBUF_UNLOCK(sb); - atomic_add_int(&num_queue_count, 1); - error = 0; - goto done; - } - SOCKBUF_UNLOCK(sb); +static void +aio_cancel_daemon_job(struct kaiocb *job) +{ + + mtx_lock(&aio_job_mtx); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&aio_jobs, job, list); + mtx_unlock(&aio_job_mtx); + aio_cancel(job); +} + +void +aio_schedule(struct kaiocb *job, aio_handle_fn_t *func) +{ + + mtx_lock(&aio_job_mtx); + if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) { + mtx_unlock(&aio_job_mtx); + aio_cancel(job); + return; } + job->handle_fn = func; + TAILQ_INSERT_TAIL(&aio_jobs, job, list); + aio_kick_nowait(job->userproc); + mtx_unlock(&aio_job_mtx); +} + +static void +aio_cancel_sync(struct kaiocb *job) +{ + struct kaioinfo *ki; + + ki = job->userproc->p_aioinfo; + mtx_lock(&aio_job_mtx); + if (!aio_cancel_cleared(job)) + TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); + mtx_unlock(&aio_job_mtx); + aio_cancel(job); +} + +int +aio_queue_file(struct file *fp, struct kaiocb *job) +{ + struct aioliojob *lj; + struct kaioinfo *ki; + struct kaiocb *job2; + int error, opcode; + + lj = job->lio; + ki = job->userproc->p_aioinfo; + opcode = job->uaiocb.aio_lio_opcode; + if (opcode == LIO_SYNC) + goto queueit; - if ((error = aio_qphysio(p, job)) == 0) + if ((error = aio_qphysio(job->userproc, job)) == 0) goto done; #if 0 - if (error > 0) { - job->uaiocb._aiocb_private.error = error; - ops->store_error(ujob, error); + /* + * XXX: This means qphysio() failed with EFAULT. The current + * behavior is to retry the operation via fo_read/fo_write. + * Wouldn't it be better to just complete the request with an + * error here? + */ + if (error > 0) goto done; - } #endif queueit: - atomic_add_int(&num_queue_count, 1); + if (!enable_aio_unsafe) + return (EOPNOTSUPP); - AIO_LOCK(ki); - ki->kaio_count++; - if (lj) - lj->lioj_count++; - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist); - TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist); if (opcode == LIO_SYNC) { + AIO_LOCK(ki); TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) { if (job2->fd_file == job->fd_file && job2->uaiocb.aio_lio_opcode != LIO_SYNC && @@ -1756,28 +1735,32 @@ queueit: job->pending++; } } - TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) { - if (job2->fd_file == job->fd_file && - job2->uaiocb.aio_lio_opcode != LIO_SYNC && - job2->seqno < job->seqno) { - job2->jobflags |= KAIOCB_CHECKSYNC; - job->pending++; - } - } if (job->pending != 0) { + if (!aio_set_cancel_function(job, aio_cancel_sync)) { + AIO_UNLOCK(ki); + aio_cancel(job); + return (0); + } TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list); - job->jobstate = JOBST_JOBQSYNC; AIO_UNLOCK(ki); - goto done; + return (0); } + AIO_UNLOCK(ki); + } + + switch (opcode) { + case LIO_READ: + case LIO_WRITE: + aio_schedule(job, aio_process_rw); + error = 0; + break; + case LIO_SYNC: + aio_schedule(job, aio_process_sync); + error = 0; + break; + default: + error = EINVAL; } - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_TAIL(&aio_jobs, job, list); - job->jobstate = JOBST_JOBQGLOBAL; - aio_kick_nowait(p); - mtx_unlock(&aio_job_mtx); - AIO_UNLOCK(ki); - error = 0; done: return (error); } @@ -1864,7 +1847,7 @@ kern_aio_return(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops) break; } if (job != NULL) { - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; td->td_retval[0] = status; @@ -1933,7 +1916,7 @@ kern_aio_suspend(struct thread *td, int njoblist, struct aiocb **ujoblist, if (job->ujob == ujoblist[i]) { if (firstjob == NULL) firstjob = job; - if (job->jobstate == JOBST_JOBFINISHED) + if (job->jobflags & KAIOCB_FINISHED) goto RETURN; } } @@ -1992,10 +1975,8 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap) struct kaioinfo *ki; struct kaiocb *job, *jobn; struct file *fp; - struct socket *so; cap_rights_t rights; int error; - int remove; int cancelled = 0; int notcancelled = 0; struct vnode *vp; @@ -2023,28 +2004,7 @@ sys_aio_cancel(struct thread *td, struct aio_cancel_args *uap) if ((uap->fd == job->uaiocb.aio_fildes) && ((uap->aiocbp == NULL) || (uap->aiocbp == job->ujob))) { - remove = 0; - - mtx_lock(&aio_job_mtx); - if (job->jobstate == JOBST_JOBQGLOBAL) { - TAILQ_REMOVE(&aio_jobs, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSOCK) { - MPASS(fp->f_type == DTYPE_SOCKET); - so = fp->f_data; - TAILQ_REMOVE(&so->so_aiojobq, job, list); - remove = 1; - } else if (job->jobstate == JOBST_JOBQSYNC) { - TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); - remove = 1; - } - mtx_unlock(&aio_job_mtx); - - if (remove) { - TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist); - job->uaiocb._aiocb_private.status = -1; - job->uaiocb._aiocb_private.error = ECANCELED; - aio_bio_done_notify(p, job, DONE_QUEUE); + if (aio_cancel_job(p, ki, job)) { cancelled++; } else { notcancelled++; @@ -2102,7 +2062,7 @@ kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops) AIO_LOCK(ki); TAILQ_FOREACH(job, &ki->kaio_all, allist) { if (job->ujob == ujob) { - if (job->jobstate == JOBST_JOBFINISHED) + if (job->jobflags & KAIOCB_FINISHED) td->td_retval[0] = job->uaiocb._aiocb_private.error; else @@ -2382,35 +2342,36 @@ aio_physwakeup(struct bio *bp) struct kaiocb *job = (struct kaiocb *)bp->bio_caller1; struct proc *userp; struct kaioinfo *ki; - int nblks; + size_t nbytes; + int error, nblks; /* Release mapping into kernel space. */ + userp = job->userproc; + ki = userp->p_aioinfo; if (job->pbuf) { pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages); relpbuf(job->pbuf, NULL); job->pbuf = NULL; atomic_subtract_int(&num_buf_aio, 1); + AIO_LOCK(ki); + ki->kaio_buffer_count--; + AIO_UNLOCK(ki); } vm_page_unhold_pages(job->pages, job->npages); bp = job->bp; job->bp = NULL; - userp = job->userproc; - ki = userp->p_aioinfo; - AIO_LOCK(ki); - job->uaiocb._aiocb_private.status -= bp->bio_resid; - job->uaiocb._aiocb_private.error = 0; + nbytes = job->uaiocb.aio_nbytes - bp->bio_resid; + error = 0; if (bp->bio_flags & BIO_ERROR) - job->uaiocb._aiocb_private.error = bp->bio_error; - nblks = btodb(job->uaiocb.aio_nbytes); + error = bp->bio_error; + nblks = btodb(nbytes); if (job->uaiocb.aio_lio_opcode == LIO_WRITE) job->outputcharge += nblks; else job->inputcharge += nblks; - TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist); - ki->kaio_buffer_count--; - aio_bio_done_notify(userp, job, DONE_BUF); - AIO_UNLOCK(ki); + + aio_complete(job, nbytes, error); g_destroy_bio(bp); } @@ -2465,7 +2426,7 @@ kern_aio_waitcomplete(struct thread *td, struct aiocb **ujobp, } if (job != NULL) { - MPASS(job->jobstate == JOBST_JOBFINISHED); + MPASS(job->jobflags & KAIOCB_FINISHED); ujob = job->ujob; status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; @@ -2570,7 +2531,7 @@ filt_aio(struct knote *kn, long hint) struct kaiocb *job = kn->kn_ptr.p_aio; kn->kn_data = job->uaiocb._aiocb_private.error; - if (job->jobstate != JOBST_JOBFINISHED) + if (!(job->jobflags & KAIOCB_FINISHED)) return (0); kn->kn_flags |= EV_EOF; return (1); diff --git a/sys/modules/Makefile b/sys/modules/Makefile index 76a12d3..f2cf93a 100644 --- a/sys/modules/Makefile +++ b/sys/modules/Makefile @@ -31,7 +31,6 @@ SUBDIR= \ ahci \ ${_aic} \ aic7xxx \ - aio \ alc \ ale \ alq \ diff --git a/sys/modules/aio/Makefile b/sys/modules/aio/Makefile deleted file mode 100644 index 95e6cb5..0000000 --- a/sys/modules/aio/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -# $FreeBSD$ - -.PATH: ${.CURDIR}/../../kern - -KMOD= aio -SRCS= vfs_aio.c opt_vfs_aio.h vnode_if.h opt_compat.h - -EXPORT_SYMS= aio_init_aioinfo aio_aqueue - -.include <bsd.kmod.mk> diff --git a/sys/sys/aio.h b/sys/sys/aio.h index d6ef6aa..25ffb28 100644 --- a/sys/sys/aio.h +++ b/sys/sys/aio.h @@ -21,6 +21,11 @@ #include <sys/types.h> #include <sys/signal.h> +#ifdef _KERNEL +#include <sys/queue.h> +#include <sys/event.h> +#include <sys/signalvar.h> +#endif /* * Returned by aio_cancel: @@ -51,6 +56,24 @@ */ #define AIO_LISTIO_MAX 16 +#ifdef _KERNEL + +/* Default values of tunables for the AIO worker pool. */ + +#ifndef MAX_AIO_PROCS +#define MAX_AIO_PROCS 32 +#endif + +#ifndef TARGET_AIO_PROCS +#define TARGET_AIO_PROCS 4 +#endif + +#ifndef AIOD_LIFETIME_DEFAULT +#define AIOD_LIFETIME_DEFAULT (30 * hz) +#endif + +#endif + /* * Private members for aiocb -- don't access * directly. @@ -77,7 +100,91 @@ typedef struct aiocb { struct sigevent aio_sigevent; /* Signal to deliver */ } aiocb_t; -#ifndef _KERNEL +#ifdef _KERNEL + +typedef void aio_cancel_fn_t(struct kaiocb *); +typedef void aio_handle_fn_t(struct kaiocb *); + +/* + * Kernel version of an I/O control block. + * + * Locking key: + * * - need not protected + * a - locked by kaioinfo lock + * b - locked by backend lock + * c - locked by aio_job_mtx + */ +struct kaiocb { + TAILQ_ENTRY(kaiocb) list; /* (b) backend-specific list of jobs */ + TAILQ_ENTRY(kaiocb) plist; /* (a) lists of pending / done jobs */ + TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */ + int jobflags; /* (a) job flags */ + int inputcharge; /* (*) input blocks */ + int outputcharge; /* (*) output blocks */ + struct bio *bp; /* (*) BIO backend BIO pointer */ + struct buf *pbuf; /* (*) BIO backend buffer pointer */ + struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */ + int npages; /* BIO backend number of pages */ + struct proc *userproc; /* (*) user process */ + struct ucred *cred; /* (*) active credential when created */ + struct file *fd_file; /* (*) pointer to file structure */ + struct aioliojob *lio; /* (*) optional lio job */ + struct aiocb *ujob; /* (*) pointer in userspace of aiocb */ + struct knlist klist; /* (a) list of knotes */ + struct aiocb uaiocb; /* (*) copy of user I/O control block */ + ksiginfo_t ksi; /* (a) realtime signal info */ + uint64_t seqno; /* (*) job number */ + int pending; /* (a) number of pending I/O, aio_fsync only */ + aio_cancel_fn_t *cancel_fn; /* (a) backend cancel function */ + aio_handle_fn_t *handle_fn; /* (c) backend handle function */ +}; + +struct socket; +struct sockbuf; + +/* + * AIO backends should permit cancellation of queued requests waiting to + * be serviced by installing a cancel routine while the request is + * queued. The cancellation routine should dequeue the request if + * necessary and cancel it. Care must be used to handle races between + * queueing and dequeueing requests and cancellation. + * + * When queueing a request somewhere such that it can be cancelled, the + * caller should: + * + * 1) Acquire lock that protects the associated queue. + * 2) Call aio_set_cancel_function() to install the cancel routine. + * 3) If that fails, the request has a pending cancel and should be + * cancelled via aio_cancel(). + * 4) Queue the request. + * + * When dequeueing a request to service it or hand it off to somewhere else, + * the caller should: + * + * 1) Acquire the lock that protects the associated queue. + * 2) Dequeue the request. + * 3) Call aio_clear_cancel_function() to clear the cancel routine. + * 4) If that fails, the cancel routine is about to be called. The + * caller should ignore the request. + * + * The cancel routine should: + * + * 1) Acquire the lock that protects the associated queue. + * 2) Call aio_cancel_cleared() to determine if the request is already + * dequeued due to a race with dequeueing thread. + * 3) If that fails, dequeue the request. + * 4) Cancel the request via aio_cancel(). + */ + +bool aio_cancel_cleared(struct kaiocb *job); +void aio_cancel(struct kaiocb *job); +bool aio_clear_cancel_function(struct kaiocb *job); +void aio_complete(struct kaiocb *job, long status, int error); +void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func); +bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func); +void aio_switch_vmspace(struct kaiocb *job); + +#else /* !_KERNEL */ struct timespec; @@ -137,14 +244,6 @@ int aio_waitcomplete(struct aiocb **, struct timespec *); int aio_fsync(int op, struct aiocb *aiocbp); __END_DECLS -#else - -/* Forward declarations for prototypes below. */ -struct socket; -struct sockbuf; - -extern void (*aio_swake)(struct socket *, struct sockbuf *); +#endif /* !_KERNEL */ -#endif - -#endif +#endif /* !_SYS_AIO_H_ */ diff --git a/sys/sys/file.h b/sys/sys/file.h index 68d33e0..524c1ce 100644 --- a/sys/sys/file.h +++ b/sys/sys/file.h @@ -73,6 +73,7 @@ struct socket; struct file; struct filecaps; +struct kaiocb; struct kinfo_file; struct ucred; @@ -119,6 +120,7 @@ typedef int fo_fill_kinfo_t(struct file *fp, struct kinfo_file *kif, typedef int fo_mmap_t(struct file *fp, vm_map_t map, vm_offset_t *addr, vm_size_t size, vm_prot_t prot, vm_prot_t cap_maxprot, int flags, vm_ooffset_t foff, struct thread *td); +typedef int fo_aio_queue_t(struct file *fp, struct kaiocb *job); typedef int fo_flags_t; struct fileops { @@ -136,6 +138,7 @@ struct fileops { fo_seek_t *fo_seek; fo_fill_kinfo_t *fo_fill_kinfo; fo_mmap_t *fo_mmap; + fo_aio_queue_t *fo_aio_queue; fo_flags_t fo_flags; /* DFLAG_* below */ }; @@ -406,6 +409,13 @@ fo_mmap(struct file *fp, vm_map_t map, vm_offset_t *addr, vm_size_t size, flags, foff, td)); } +static __inline int +fo_aio_queue(struct file *fp, struct kaiocb *job) +{ + + return ((*fp->f_ops->fo_aio_queue)(fp, job)); +} + #endif /* _KERNEL */ #endif /* !SYS_FILE_H */ diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h index 6631593..c0148b4 100644 --- a/sys/sys/sockbuf.h +++ b/sys/sys/sockbuf.h @@ -36,6 +36,7 @@ #include <sys/_lock.h> #include <sys/_mutex.h> #include <sys/_sx.h> +#include <sys/_task.h> #define SB_MAX (2*1024*1024) /* default for max chars in sockbuf */ @@ -53,6 +54,7 @@ #define SB_IN_TOE 0x400 /* socket buffer is in the middle of an operation */ #define SB_AUTOSIZE 0x800 /* automatically size socket buffer */ #define SB_STOP 0x1000 /* backpressure indicator */ +#define SB_AIO_RUNNING 0x2000 /* AIO operation running */ #define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */ #define SBS_CANTRCVMORE 0x0020 /* can't receive more data from peer */ @@ -107,6 +109,8 @@ struct sockbuf { short sb_flags; /* (a) flags, see below */ int (*sb_upcall)(struct socket *, void *, int); /* (a) */ void *sb_upcallarg; /* (a) */ + TAILQ_HEAD(, kaiocb) sb_aiojobq; /* (a) pending AIO ops */ + struct task sb_aiotask; /* AIO task */ }; #ifdef _KERNEL diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h index 870e5f9..176f16c 100644 --- a/sys/sys/socketvar.h +++ b/sys/sys/socketvar.h @@ -103,7 +103,6 @@ struct socket { struct sigio *so_sigio; /* [sg] information for async I/O or out of band data (SIGURG) */ u_long so_oobmark; /* (c) chars to oob mark */ - TAILQ_HEAD(, kaiocb) so_aiojobq; /* AIO ops waiting on socket */ struct sockbuf so_rcv, so_snd; @@ -342,6 +341,8 @@ int getsock_cap(struct thread *td, int fd, cap_rights_t *rightsp, struct file **fpp, u_int *fflagp); void soabort(struct socket *so); int soaccept(struct socket *so, struct sockaddr **nam); +void soaio_rcv(void *context, int pending); +void soaio_snd(void *context, int pending); int socheckuid(struct socket *so, uid_t uid); int sobind(struct socket *so, struct sockaddr *nam, struct thread *td); int sobindat(int fd, struct socket *so, struct sockaddr *nam, @@ -396,6 +397,7 @@ void soupcall_clear(struct socket *so, int which); void soupcall_set(struct socket *so, int which, int (*func)(struct socket *, void *, int), void *arg); void sowakeup(struct socket *so, struct sockbuf *sb); +void sowakeup_aio(struct socket *so, struct sockbuf *sb); int selsocket(struct socket *so, int events, struct timeval *tv, struct thread *td); diff --git a/tests/sys/aio/aio_kqueue_test.c b/tests/sys/aio/aio_kqueue_test.c index 97c2c38f..08d57f8 100644 --- a/tests/sys/aio/aio_kqueue_test.c +++ b/tests/sys/aio/aio_kqueue_test.c @@ -47,6 +47,7 @@ #include <unistd.h> #include "freebsd_test_suite/macros.h" +#include "local.h" #define PATH_TEMPLATE "aio.XXXXXXXXXX" @@ -70,6 +71,7 @@ main (int argc, char *argv[]) unsigned i, j; PLAIN_REQUIRE_KERNEL_MODULE("aio", 0); + PLAIN_REQUIRE_UNSAFE_AIO(0); kq = kqueue(); if (kq < 0) { diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c index 4134e78..607f46d 100644 --- a/tests/sys/aio/aio_test.c +++ b/tests/sys/aio/aio_test.c @@ -60,6 +60,7 @@ #include <atf-c.h> #include "freebsd_test_suite/macros.h" +#include "local.h" #define PATH_TEMPLATE "aio.XXXXXXXXXX" @@ -340,6 +341,7 @@ ATF_TC_BODY(aio_file_test, tc) int fd; ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); strcpy(pathname, PATH_TEMPLATE); fd = mkstemp(pathname); @@ -386,6 +388,7 @@ ATF_TC_BODY(aio_fifo_test, tc) struct aio_context ac; ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); /* * In theory, mkstemp() can return a name that is then collided with. @@ -497,6 +500,7 @@ ATF_TC_BODY(aio_pty_test, tc) int error; ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); ATF_REQUIRE_MSG(openpty(&read_fd, &write_fd, NULL, NULL, NULL) == 0, "openpty failed: %s", strerror(errno)); @@ -544,6 +548,7 @@ ATF_TC_BODY(aio_pipe_test, tc) int pipes[2]; ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); ATF_REQUIRE_MSG(pipe(pipes) != -1, "pipe failed: %s", strerror(errno)); diff --git a/tests/sys/aio/lio_kqueue_test.c b/tests/sys/aio/lio_kqueue_test.c index e69b9c9..5178a1d 100644 --- a/tests/sys/aio/lio_kqueue_test.c +++ b/tests/sys/aio/lio_kqueue_test.c @@ -50,6 +50,7 @@ #include <unistd.h> #include "freebsd_test_suite/macros.h" +#include "local.h" #define PATH_TEMPLATE "aio.XXXXXXXXXX" @@ -74,6 +75,7 @@ main(int argc, char *argv[]) int tmp_file = 0, failed = 0; PLAIN_REQUIRE_KERNEL_MODULE("aio", 0); + PLAIN_REQUIRE_UNSAFE_AIO(0); kq = kqueue(); if (kq < 0) diff --git a/tests/sys/aio/local.h b/tests/sys/aio/local.h new file mode 100644 index 0000000..308b700 --- /dev/null +++ b/tests/sys/aio/local.h @@ -0,0 +1,74 @@ +/*- + * Copyright (c) 2016 Chelsio Communications, Inc. + * All rights reserved. + * Written by: John Baldwin <jhb@FreeBSD.org> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#ifndef _AIO_TEST_LOCAL_H_ +#define _AIO_TEST_LOCAL_H_ + +#include <sys/types.h> +#include <sys/sysctl.h> +#include <errno.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include <atf-c.h> + +#define ATF_REQUIRE_UNSAFE_AIO() do { \ + size_t _len; \ + int _unsafe; \ + \ + _len = sizeof(_unsafe); \ + if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\ + 0) < 0) { \ + if (errno != ENOENT) \ + atf_libc_error(errno, \ + "Failed to read vfs.aio.enable_unsafe"); \ + } else if (_unsafe == 0) \ + atf_tc_skip("Unsafe AIO is disabled"); \ +} while (0) + +#define PLAIN_REQUIRE_UNSAFE_AIO(_exit_code) do { \ + size_t _len; \ + int _unsafe; \ + \ + _len = sizeof(_unsafe); \ + if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\ + 0) < 0) { \ + if (errno != ENOENT) { \ + printf("Failed to read vfs.aio.enable_unsafe: %s\n",\ + strerror(errno)); \ + _exit(1); \ + } \ + } else if (_unsafe == 0) { \ + printf("Unsafe AIO is disabled"); \ + _exit(_exit_code); \ + } \ +} while (0) + +#endif /* !_AIO_TEST_LOCAL_H_ */ |