summaryrefslogtreecommitdiffstats
path: root/sys/kern/vfs_aio.c
diff options
context:
space:
mode:
authordyson <dyson@FreeBSD.org>1997-07-06 02:40:43 +0000
committerdyson <dyson@FreeBSD.org>1997-07-06 02:40:43 +0000
commit532dea9042abf44b0a21290e3e3e16fba10a32f7 (patch)
tree146db6ef5fca639767cd3930dc626eb15ba19366 /sys/kern/vfs_aio.c
parent31a3731a3f467232acb34d2bb58c8a652018afd6 (diff)
downloadFreeBSD-src-532dea9042abf44b0a21290e3e3e16fba10a32f7.zip
FreeBSD-src-532dea9042abf44b0a21290e3e3e16fba10a32f7.tar.gz
This is an upgrade so that the kernel supports the AIO calls from
POSIX.4. Additionally, there is some initial code that supports LIO. This code supports AIO/LIO for all types of file descriptors, with few if any restrictions. There will be a followup very soon that will support significantly more efficient operation for VCHR type files (raw.) This code is also dependent on some kernel features that don't work under SMP yet. After I commit the changes to the kernel to support proper address space sharing on SMP, this code will also work under SMP.
Diffstat (limited to 'sys/kern/vfs_aio.c')
-rw-r--r--sys/kern/vfs_aio.c1041
1 files changed, 977 insertions, 64 deletions
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
index 282eee4..64aa1bb 100644
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -1,4 +1,3 @@
-
/*
* Copyright (c) 1997 John S. Dyson. All rights reserved.
*
@@ -14,7 +13,7 @@
* bad that happens because of using this software isn't the responsibility
* of the author. This software is distributed AS-IS.
*
- * $Id$
+ * $Id: vfs_aio.c,v 1.1 1997/06/16 00:27:26 dyson Exp $
*/
/*
@@ -48,14 +47,830 @@
#include <sys/malloc.h>
#include <sys/dirent.h>
#include <sys/signalvar.h>
+#include <sys/queue.h>
#include <vm/vm.h>
#include <vm/vm_param.h>
#include <vm/vm_object.h>
#include <vm/vm_extern.h>
+#include <vm/pmap.h>
+#include <vm/vm_map.h>
#include <sys/sysctl.h>
#include <sys/aio.h>
+#define AIOCBLIST_CANCELLED 0x1
+#define AIOCBLIST_RUNDOWN 0x4
+#define AIOCBLIST_ASYNCFREE 0x8
+#define AIOCBLIST_SUSPEND 0x10
+
+#if 0
+#define DEBUGAIO
+#define DIAGNOSTIC
+#endif
+
+static int jobrefid;
+
+#define JOBST_NULL 0x0
+#define JOBST_JOBQPROC 0x1
+#define JOBST_JOBQGLOBAL 0x2
+#define JOBST_JOBRUNNING 0x3
+#define JOBST_JOBFINISHED 0x4
+
+#define MAX_AIO_PER_PROC 32
+#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
+#define MAX_AIO_PROCS 128
+#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
+#define TARGET_AIO_PROCS 64
+
+/*
+ * Job queue item
+ */
+struct aiocblist {
+ TAILQ_ENTRY (aiocblist) list; /* List of jobs */
+ TAILQ_ENTRY (aiocblist) plist; /* List of jobs for proc */
+ int jobflags;
+ int jobstate;
+ struct proc *userproc; /* User process */
+ struct aioproclist *jobaioproc; /* AIO process descriptor */
+ struct aiocb uaiocb; /* Kernel I/O control block */
+};
+
+#define AIOP_FREE 0x1 /* proc on free queue */
+/*
+ * AIO process info
+ */
+struct aioproclist {
+ int aioprocflags; /* AIO proc flags */
+ TAILQ_ENTRY(aioproclist) list; /* List of processes */
+ struct proc *aioproc; /* The AIO thread */
+ TAILQ_HEAD (,aiocblist) jobtorun; /* suggested job to run */
+};
+
+struct kaioinfo {
+ int kaio_maxactive_count; /* maximum number of AIOs */
+ int kaio_active_count; /* number of currently used AIOs */
+ int kaio_qallowed_count; /* maxiumu size of AIO queue */
+ int kaio_queue_count; /* size of AIO queue */
+ TAILQ_HEAD (,aiocblist) kaio_jobqueue; /* job queue for process */
+ TAILQ_HEAD (,aiocblist) kaio_jobdone; /* done queue for process */
+};
+
+TAILQ_HEAD (,aioproclist) aio_freeproc, aio_activeproc;
+TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
+TAILQ_HEAD(,aiocblist) aio_freejobs;
+
+int max_aio_procs = MAX_AIO_PROCS;
+int num_aio_procs = 0;
+int target_aio_procs = TARGET_AIO_PROCS;
+
+int max_queue_count = MAX_AIO_QUEUE;
+int num_queue_count = 0;
+
+void aio_init_aioinfo(struct proc *p) ;
+void aio_onceonly(void) ;
+void aio_proc_rundown(struct proc *p) ;
+int aio_free_entry(struct aiocblist *aiocbe);
+void aio_cancel_internal(struct aiocblist *aiocbe);
+void aio_process(struct aiocblist *aiocbe);
+void pmap_newvmspace(struct vmspace *);
+static int aio_newproc(void) ;
+static int aio_aqueue(struct proc *p, struct aiocb *job, int type) ;
+static void aio_marksuspend(struct proc *p, int njobs, int *joblist, int set) ;
+
+SYSINIT(aio, SI_SUB_VFS, SI_ORDER_ANY, aio_onceonly, NULL);
+
+/*
+ * Startup initialization
+ */
+void
+aio_onceonly() {
+ TAILQ_INIT(&aio_freeproc);
+ TAILQ_INIT(&aio_activeproc);
+ TAILQ_INIT(&aio_jobs);
+ TAILQ_INIT(&aio_freejobs);
+}
+
+/*
+ * Init the per-process aioinfo structure.
+ */
+void
+aio_init_aioinfo(struct proc *p) {
+ struct kaioinfo *ki;
+ if (p->p_aioinfo == NULL) {
+ ki = malloc(sizeof (struct kaioinfo), M_AIO, M_WAITOK);
+ p->p_aioinfo = ki;
+ ki->kaio_maxactive_count = MAX_AIO_PER_PROC;
+ ki->kaio_active_count = 0;
+ ki->kaio_qallowed_count = MAX_AIO_QUEUE_PER_PROC;
+ ki->kaio_queue_count = 0;
+ TAILQ_INIT(&ki->kaio_jobdone);
+ TAILQ_INIT(&ki->kaio_jobqueue);
+ }
+}
+
+/*
+ * Free a job entry. Wait for completion if it is currently
+ * active, but don't delay forever. If we delay, we return
+ * a flag that says that we have to restart the queue scan.
+ */
+int
+aio_free_entry(struct aiocblist *aiocbe) {
+ struct kaioinfo *ki;
+ struct aioproclist *aiop;
+ struct proc *p;
+
+ if (aiocbe->jobstate == JOBST_NULL)
+ panic("aio_free_entry: freeing already free job");
+
+ p = aiocbe->userproc;
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ panic("aio_free_entry: missing p->p_aioinfo");
+
+ if (aiocbe->jobstate == JOBST_JOBRUNNING) {
+ if (aiocbe->jobflags & AIOCBLIST_ASYNCFREE)
+ return 0;
+ aiocbe->jobflags |= AIOCBLIST_RUNDOWN;
+ if (tsleep(aiocbe, PRIBIO|PCATCH, "jobwai", hz*5)) {
+ aiocbe->jobflags |= AIOCBLIST_ASYNCFREE;
+ aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
+ return 1;
+ }
+ aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
+ }
+ aiocbe->jobflags &= ~AIOCBLIST_ASYNCFREE;
+
+ if (ki->kaio_queue_count <= 0)
+ panic("aio_free_entry: process queue size <= 0");
+ if (num_queue_count <= 0)
+ panic("aio_free_entry: system wide queue size <= 0");
+
+ --ki->kaio_queue_count;
+ --num_queue_count;
+
+ if ( aiocbe->jobstate == JOBST_JOBQPROC) {
+ aiop = aiocbe->jobaioproc;
+ TAILQ_REMOVE(&aiop->jobtorun, aiocbe, list);
+ } else if ( aiocbe->jobstate == JOBST_JOBQGLOBAL) {
+ TAILQ_REMOVE(&aio_jobs, aiocbe, list);
+ } else if ( aiocbe->jobstate == JOBST_JOBFINISHED) {
+ ki = p->p_aioinfo;
+ TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist);
+ }
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ aiocbe->jobstate = JOBST_NULL;
+ return 0;
+}
+
+/*
+ * Rundown the jobs for a given process.
+ */
+void
+aio_proc_rundown(struct proc *p) {
+ struct kaioinfo *ki;
+ struct aiocblist *aiocbe, *aiocbn;
+
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ return;
+
+restart1:
+ for ( aiocbe = TAILQ_FIRST(&ki->kaio_jobdone);
+ aiocbe;
+ aiocbe = aiocbn) {
+ aiocbn = TAILQ_NEXT(aiocbe, plist);
+ if (aio_free_entry(aiocbe))
+ goto restart1;
+ }
+
+restart2:
+ for ( aiocbe = TAILQ_FIRST(&ki->kaio_jobqueue);
+ aiocbe;
+ aiocbe = aiocbn) {
+ aiocbn = TAILQ_NEXT(aiocbe, plist);
+ if (aio_free_entry(aiocbe))
+ goto restart2;
+ }
+ free(ki, M_AIO);
+}
+
+/*
+ * Select a job to run (called by an AIO daemon)
+ */
+static struct aiocblist *
+aio_selectjob(struct aioproclist *aiop) {
+
+ struct aiocblist *aiocbe;
+
+ aiocbe = TAILQ_FIRST(&aiop->jobtorun);
+ if (aiocbe) {
+ TAILQ_REMOVE(&aiop->jobtorun, aiocbe, list);
+ return aiocbe;
+ }
+
+ for (aiocbe = TAILQ_FIRST(&aio_jobs);
+ aiocbe;
+ aiocbe = TAILQ_NEXT(aiocbe, list)) {
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ userp = aiocbe->userproc;
+ ki = userp->p_aioinfo;
+
+ if (ki->kaio_active_count < ki->kaio_maxactive_count) {
+ TAILQ_REMOVE(&aio_jobs, aiocbe, list);
+ return aiocbe;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * The AIO activity proper.
+ */
+void
+aio_process(struct aiocblist *aiocbe) {
+ struct filedesc *fdp;
+ struct proc *userp;
+ struct aiocb *cb;
+ struct file *fp;
+ struct uio auio;
+ struct iovec aiov;
+ unsigned int fd;
+ int cnt;
+ int error;
+
+ userp = aiocbe->userproc;
+ cb = &aiocbe->uaiocb;
+
+#ifdef DEBUGAIO
+ printf("fd: %d, offset: 0x%x, address: 0x%x, size: %d\n",
+ cb->aio_fildes, (int) cb->aio_offset,
+ cb->aio_buf, cb->aio_nbytes);
+ tsleep(curproc, PVM, "aioprc", hz);
+#endif
+ fdp = curproc->p_fd;
+ /*
+ * Range check file descriptor
+ */
+ fd = cb->aio_fildes;
+ fp = fdp->fd_ofiles[fd];
+
+ aiov.iov_base = cb->aio_buf;
+ aiov.iov_len = cb->aio_nbytes;
+
+ auio.uio_iov = &aiov;
+ auio.uio_iovcnt = 1;
+ auio.uio_offset = cb->aio_offset;
+ auio.uio_resid = cb->aio_nbytes;
+ cnt = cb->aio_nbytes;
+ auio.uio_segflg = UIO_USERSPACE;
+ auio.uio_procp = curproc;
+
+ if (cb->aio_lio_opcode == LIO_READ) {
+ auio.uio_rw = UIO_READ;
+ error = (*fp->f_ops->fo_read)(fp, &auio, fp->f_cred);
+ } else {
+ auio.uio_rw = UIO_WRITE;
+ error = (*fp->f_ops->fo_write)(fp, &auio, fp->f_cred);
+ }
+
+ if (error) {
+ if (auio.uio_resid != cnt) {
+ if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
+ error = 0;
+ if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE))
+ psignal(userp, SIGPIPE);
+ }
+ }
+
+ cnt -= auio.uio_resid;
+ cb->_aiocb_private.error = error;
+ cb->_aiocb_private.status = cnt;
+
+ return;
+
+}
+
+/*
+ * The AIO daemon.
+ */
+static void
+aio_startproc(void *uproc)
+{
+ struct aioproclist *aiop;
+
+ /*
+ * Allocate and ready the aio control info
+ */
+ aiop = malloc(sizeof *aiop, M_AIO, M_WAITOK);
+ aiop->aioproc = curproc;
+ aiop->aioprocflags |= AIOP_FREE;
+ TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
+ TAILQ_INIT(&aiop->jobtorun);
+
+ /*
+ * Get rid of current address space
+ */
+ if (curproc->p_vmspace->vm_refcnt == 1) {
+ if (curproc->p_vmspace->vm_shm)
+ shmexit(curproc);
+ pmap_remove_pages(&curproc->p_vmspace->vm_pmap, 0, USRSTACK);
+ vm_map_remove(&curproc->p_vmspace->vm_map, 0, USRSTACK);
+ } else {
+ vmspace_exec(curproc);
+ }
+
+ /*
+ * Make up a name for the daemon
+ */
+ strcpy(curproc->p_comm, "aiodaemon");
+
+ /*
+ * Get rid of our current filedescriptors
+ */
+ fdfree(curproc);
+ curproc->p_fd = NULL;
+ curproc->p_ucred = crcopy(curproc->p_ucred);
+ curproc->p_ucred->cr_uid = 0;
+ curproc->p_ucred->cr_groups[0] = 1;
+ curproc->p_flag |= P_SYSTEM;
+
+#ifdef DEBUGAIO
+ printf("Started new process: %d\n", curproc->p_pid);
+#endif
+ wakeup(uproc);
+
+ while(1) {
+ struct vmspace *myvm, *tmpvm;
+ struct proc *cp = curproc;
+ struct proc *up = NULL;
+ struct aiocblist *aiocbe;
+
+ if ((aiop->aioprocflags & AIOP_FREE) == 0) {
+ TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
+ aiop->aioprocflags |= AIOP_FREE;
+ }
+ tsleep(curproc, PZERO, "aiordy", 0);
+ if (aiop->aioprocflags & AIOP_FREE) {
+ TAILQ_REMOVE(&aio_freeproc, aiop, list);
+ TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
+ aiop->aioprocflags &= ~AIOP_FREE;
+ }
+
+ myvm = curproc->p_vmspace;
+
+ while ( aiocbe = aio_selectjob(aiop)) {
+ struct aiocb *cb;
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ cb = &aiocbe->uaiocb;
+ userp = aiocbe->userproc;
+ ki = userp->p_aioinfo;
+
+ aiocbe->jobstate = JOBST_JOBRUNNING;
+ if (userp != cp) {
+ tmpvm = curproc->p_vmspace;
+ curproc->p_vmspace = userp->p_vmspace;
+ ++curproc->p_vmspace->vm_refcnt;
+ pmap_activate(curproc);
+ if (tmpvm != myvm) {
+ vmspace_free(tmpvm);
+ }
+ if (curproc->p_fd)
+ fdfree(curproc);
+ curproc->p_fd = fdshare(userp);
+ cp = userp;
+ }
+
+ ki->kaio_active_count++;
+ aiocbe->jobaioproc = aiop;
+ aio_process(aiocbe);
+ --ki->kaio_active_count;
+
+ aiocbe->jobstate = JOBST_JOBFINISHED;
+
+ if (aiocbe->jobflags & AIOCBLIST_ASYNCFREE) {
+ aiocbe->jobflags &= ~AIOCBLIST_ASYNCFREE;
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ } else {
+ TAILQ_REMOVE(&ki->kaio_jobqueue,
+ aiocbe, plist);
+ TAILQ_INSERT_TAIL(&ki->kaio_jobdone,
+ aiocbe, plist);
+ }
+
+ if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
+ wakeup(aiocbe);
+ aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
+ }
+
+ if (aiocbe->jobflags & AIOCBLIST_SUSPEND) {
+ wakeup(userp);
+ aiocbe->jobflags &= ~AIOCBLIST_SUSPEND;
+ }
+
+ if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
+ psignal(userp, cb->aio_sigevent.sigev_signo);
+ }
+ }
+
+ if (cp != curproc) {
+ tmpvm = curproc->p_vmspace;
+ curproc->p_vmspace = myvm;
+ pmap_activate(curproc);
+ vmspace_free(tmpvm);
+ if (curproc->p_fd)
+ fdfree(curproc);
+ curproc->p_fd = NULL;
+ cp = curproc;
+ }
+ }
+}
+
+/*
+ * Create a new AIO daemon.
+ */
+static int
+aio_newproc() {
+ int error;
+ int rval[2];
+ struct rfork_args rfa;
+ struct proc *p;
+
+ rfa.flags = RFMEM | RFPROC | RFCFDG;
+
+ if (error = rfork(curproc, &rfa, &rval[0]))
+ return error;
+
+ cpu_set_fork_handler(p = pfind(rval[0]), aio_startproc, curproc);
+
+#ifdef DEBUGAIO
+ printf("Waiting for new process: %d, count: %d\n",
+ curproc->p_pid, num_aio_procs);
+#endif
+
+ error = tsleep(curproc, PZERO, "aiosta", 5*hz);
+ ++num_aio_procs;
+
+ return error;
+
+}
+
+/*
+ * Queue a new AIO request.
+ */
+static int
+_aio_aqueue(struct proc *p, struct aiocb *job, int type) {
+ struct filedesc *fdp;
+ struct file *fp;
+ unsigned int fd;
+
+ int error;
+ int opcode;
+ struct aiocblist *aiocbe;
+ struct aioproclist *aiop;
+ struct kaioinfo *ki;
+
+ if (aiocbe = TAILQ_FIRST(&aio_freejobs)) {
+ TAILQ_REMOVE(&aio_freejobs, aiocbe, list);
+ } else {
+ aiocbe = malloc (sizeof *aiocbe, M_AIO, M_WAITOK);
+ }
+
+ error = copyin((caddr_t)job,
+ (caddr_t) &aiocbe->uaiocb, sizeof aiocbe->uaiocb);
+ if (error) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ return error;
+ }
+
+
+ /*
+ * Get the fd info for process
+ */
+ fdp = p->p_fd;
+
+ /*
+ * Range check file descriptor
+ */
+ fd = aiocbe->uaiocb.aio_fildes;
+ if (fd >= fdp->fd_nfiles) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, EBADF);
+ }
+ return EBADF;
+ }
+
+ fp = fdp->fd_ofiles[fd];
+ if ((fp == NULL) || ((fp->f_flag & FWRITE) == 0)) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, EBADF);
+ }
+ return EBADF;
+ }
+
+ if (aiocbe->uaiocb.aio_offset == -1LL) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, EINVAL);
+ }
+ return EINVAL;
+ }
+
+#ifdef DEBUGAIO
+ printf("job addr: 0x%x, 0x%x, %d\n", job, &job->_aiocb_private.kernelinfo, jobrefid);
+#endif
+
+ error = suword(&job->_aiocb_private.kernelinfo, jobrefid);
+ if (error) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, EINVAL);
+ }
+ return error;
+ }
+
+ aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)jobrefid;
+#ifdef DEBUGAIO
+ printf("aio_aqueue: New job: %d... ", jobrefid);
+#endif
+ ++jobrefid;
+
+ if (type != LIO_NOP) {
+ aiocbe->uaiocb.aio_lio_opcode = type;
+ }
+
+ opcode = aiocbe->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_NOP) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, 0);
+ }
+ return 0;
+ }
+
+ if ((opcode != LIO_NOP) &&
+ (opcode != LIO_READ) && (opcode != LIO_WRITE)) {
+ TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list);
+ if (type == 0) {
+ suword(&job->_aiocb_private.status, -1);
+ suword(&job->_aiocb_private.error, EINVAL);
+ }
+ return EINVAL;
+ }
+
+ suword(&job->_aiocb_private.error, 0);
+ suword(&job->_aiocb_private.status, 0);
+ aiocbe->userproc = p;
+ aiocbe->jobflags = 0;
+ ki = p->p_aioinfo;
+ ++num_queue_count;
+ ++ki->kaio_queue_count;
+
+retryproc:
+ if (aiop = TAILQ_FIRST(&aio_freeproc)) {
+#ifdef DEBUGAIO
+ printf("found a free AIO process\n");
+#endif
+ TAILQ_REMOVE(&aio_freeproc, aiop, list);
+ TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
+ aiop->aioprocflags &= ~AIOP_FREE;
+ TAILQ_INSERT_TAIL(&aiop->jobtorun, aiocbe, list);
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
+ aiocbe->jobstate = JOBST_JOBQPROC;
+ aiocbe->jobaioproc = aiop;
+ wakeup(aiop->aioproc);
+ } else if ((num_aio_procs < max_aio_procs) &&
+ (ki->kaio_active_count < ki->kaio_maxactive_count)) {
+ if (error = aio_newproc()) {
+#ifdef DEBUGAIO
+ printf("aio_aqueue: problem sleeping for starting proc: %d\n",
+ error);
+#endif
+ }
+ goto retryproc;
+ } else {
+#ifdef DEBUGAIO
+ printf("queuing to global queue\n");
+#endif
+ TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
+ aiocbe->jobstate = JOBST_JOBQGLOBAL;
+ }
+
+ return 0;
+}
+
+static int
+aio_aqueue(struct proc *p, struct aiocb *job, int type) {
+ struct kaioinfo *ki;
+
+ if (p->p_aioinfo == NULL) {
+ aio_init_aioinfo(p);
+ }
+
+ if (num_queue_count >= max_queue_count)
+ return EAGAIN;
+
+ ki = p->p_aioinfo;
+ if (ki->kaio_queue_count >= ki->kaio_qallowed_count)
+ return EAGAIN;
+
+ return _aio_aqueue(p, job, type);
+}
+
+/*
+ * Support the aio_return system call
+ */
+int
+aio_return(struct proc *p, struct aio_return_args *uap, int *retval) {
+ int jobref, status;
+ struct aiocblist *cb;
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ ki = p->p_aioinfo;
+ if (ki == NULL) {
+ return EINVAL;
+ }
+
+ jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo);
+ if (jobref == -1)
+ return EINVAL;
+
+
+ for (cb = TAILQ_FIRST(&ki->kaio_jobdone);
+ cb;
+ cb = TAILQ_NEXT(cb, plist)) {
+ if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) {
+ retval[0] = cb->uaiocb._aiocb_private.status;
+ aio_free_entry(cb);
+ return 0;
+ }
+ }
+
+ status = fuword(&uap->aiocbp->_aiocb_private.status);
+ if (status == -1)
+ return 0;
+
+ return (EINVAL);
+}
+
+/*
+ * Rundown the jobs for a given process.
+ */
+void
+aio_marksuspend(struct proc *p, int njobs, int *joblist, int set) {
+ struct aiocblist *aiocbe;
+ struct kaioinfo *ki;
+
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ return;
+
+ for (aiocbe = TAILQ_FIRST(&ki->kaio_jobqueue);
+ aiocbe;
+ aiocbe = TAILQ_NEXT(aiocbe, plist)) {
+
+ if (njobs) {
+
+ int i;
+
+ for(i = 0; i < njobs; i++) {
+ if (((int) aiocbe->uaiocb._aiocb_private.kernelinfo) == joblist[i])
+ break;
+ }
+
+ if (i == njobs)
+ continue;
+ }
+
+ if (set)
+ aiocbe->jobflags |= AIOCBLIST_SUSPEND;
+ else
+ aiocbe->jobflags &= ~AIOCBLIST_SUSPEND;
+ }
+}
+
+/*
+ * Allow a process to wakeup when any of the I/O requests are
+ * completed.
+ */
+int
+aio_suspend(struct proc *p, struct aio_suspend_args *uap, int *retval) {
+ struct timeval atv, utv;
+ struct timespec ts;
+ struct aiocb *const *cbptr, *cbp;
+ struct kaioinfo *ki;
+ struct aiocblist *cb;
+ int i;
+ int error, s, timo;
+ int *joblist;
+
+
+ timo = 0;
+ if (uap->timeout) {
+ /*
+ * Get timespec struct
+ */
+ if (error = copyin((caddr_t) uap->timeout, (caddr_t) &ts, sizeof ts)) {
+ return error;
+ }
+
+ if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000)
+ return (EINVAL);
+
+ TIMESPEC_TO_TIMEVAL(&atv, &ts)
+ if (itimerfix(&atv))
+ return (EINVAL);
+ /*
+ * XXX this is not as careful as settimeofday() about minimising
+ * interrupt latency. The hzto() interface is inconvenient as usual.
+ */
+ s = splclock();
+ timevaladd(&atv, &time);
+ timo = hzto(&atv);
+ splx(s);
+ if (timo == 0)
+ timo = 1;
+ }
+
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ return EAGAIN;
+
+ joblist = malloc(uap->nent * sizeof(int), M_TEMP, M_WAITOK);
+ cbptr = uap->aiocbp;
+
+ for(i=0;i<uap->nent;i++) {
+ cbp = (struct aiocb *) fuword((caddr_t) &cbptr[i]);
+#ifdef DEBUGAIO
+ printf("cbp: %x\n", cbp);
+#endif
+ joblist[i] = fuword(&cbp->_aiocb_private.kernelinfo);
+ cbptr++;
+ }
+
+#ifdef DEBUGAIO
+ printf("Suspend, timeout: %d clocks, jobs:", timo);
+ for(i=0;i<uap->nent;i++)
+ printf(" %d", joblist[i]);
+ printf("\n");
+#endif
+
+ while (1) {
+ for (cb = TAILQ_FIRST(&ki->kaio_jobdone);
+ cb;
+ cb = TAILQ_NEXT(cb, plist)) {
+ for(i=0;i<uap->nent;i++) {
+ if (((int) cb->uaiocb._aiocb_private.kernelinfo) == joblist[i]) {
+ free(joblist, M_TEMP);
+ return 0;
+ }
+ }
+ }
+
+ aio_marksuspend(p, uap->nent, joblist, 1);
+#ifdef DEBUGAIO
+ printf("Suspending -- waiting for all I/O's to complete: ");
+ for(i=0;i<uap->nent;i++)
+ printf(" %d", joblist[i]);
+ printf("\n");
+#endif
+ error = tsleep(p, PRIBIO|PCATCH, "aiospn", timo);
+ aio_marksuspend(p, uap->nent, joblist, 0);
+
+ if (error == EINTR) {
+#ifdef DEBUGAIO
+ printf(" signal\n");
+#endif
+ free(joblist, M_TEMP);
+ return EINTR;
+ } else if (error == EWOULDBLOCK) {
+#ifdef DEBUGAIO
+ printf(" timeout\n");
+#endif
+ free(joblist, M_TEMP);
+ return EAGAIN;
+ }
+#ifdef DEBUGAIO
+ printf("\n");
+#endif
+ }
+
+/* NOTREACHED */
+ return EINVAL;
+}
/*
* aio_cancel at the kernel level is a NOOP right now. It
@@ -67,7 +882,6 @@ aio_cancel(struct proc *p, struct aio_cancel_args *uap, int *retval) {
return AIO_NOTCANCELLED;
}
-
/*
* aio_error is implemented in the kernel level for compatibility
* purposes only. For a user mode async implementation, it would be
@@ -76,24 +890,47 @@ aio_cancel(struct proc *p, struct aio_cancel_args *uap, int *retval) {
int
aio_error(struct proc *p, struct aio_error_args *uap, int *retval) {
int activeflag, errorcode;
- struct aiocb iocb;
- int error;
+ struct aiocblist *cb;
+ struct kaioinfo *ki;
+ int jobref;
+ int error, status;
- /*
- * Get control block
- */
- if (error = copyin((caddr_t) uap->aiocbp, (caddr_t) &iocb, sizeof iocb))
- return error;
- if (iocb._aiocb_private.active == -1)
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ return EINVAL;
+
+ jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo);
+ if (jobref == -1)
return EFAULT;
- if (iocb._aiocb_private.active != AIO_PMODE_ACTIVE) {
- retval[0] = EINVAL;
- return(0);
+ for (cb = TAILQ_FIRST(&ki->kaio_jobdone);
+ cb;
+ cb = TAILQ_NEXT(cb, plist)) {
+
+ if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) {
+ retval[0] = cb->uaiocb._aiocb_private.error;
+ return 0;
+ }
+ }
+
+ for (cb = TAILQ_FIRST(&ki->kaio_jobqueue);
+ cb;
+ cb = TAILQ_NEXT(cb, plist)) {
+
+ if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) {
+ retval[0] = EINPROGRESS;
+ return 0;
+ }
}
- retval[0] = iocb._aiocb_private.error;
- return(0);
+ /*
+ * Hack for lio
+ */
+ status = fuword(&uap->aiocbp->_aiocb_private.status);
+ if (status == -1) {
+ return fuword(&uap->aiocbp->_aiocb_private.error);
+ }
+ return EINVAL;
}
int
@@ -105,8 +942,12 @@ aio_read(struct proc *p, struct aio_read_args *uap, int *retval) {
unsigned int fd;
int cnt;
struct aiocb iocb;
- int error;
+ int error, pmodes;
+ pmodes = fuword(&uap->aiocbp->_aiocb_private.privatemodes);
+ if ((pmodes & AIO_PMODE_SYNC) == 0) {
+ return aio_aqueue(p, (struct aiocb *) uap->aiocbp, LIO_READ);
+ }
/*
* Get control block
@@ -115,12 +956,6 @@ aio_read(struct proc *p, struct aio_read_args *uap, int *retval) {
return error;
/*
- * We support sync only for now.
- */
- if ((iocb._aiocb_private.privatemodes & AIO_PMODE_SYNC) == 0)
- return ENOSYS;
-
- /*
* Get the fd info for process
*/
fdp = p->p_fd;
@@ -134,19 +969,26 @@ aio_read(struct proc *p, struct aio_read_args *uap, int *retval) {
fp = fdp->fd_ofiles[fd];
if ((fp == NULL) || ((fp->f_flag & FREAD) == 0))
return EBADF;
- if (((int) iocb.aio_offset) == -1)
+ if (iocb.aio_offset == -1LL)
return EINVAL;
+ auio.uio_resid = iocb.aio_nbytes;
+ if (auio.uio_resid < 0)
+ return (EINVAL);
+
+ /*
+ * Process sync simply -- queue async request.
+ */
+ if ((iocb._aiocb_private.privatemodes & AIO_PMODE_SYNC) == 0) {
+ return aio_aqueue(p, (struct aiocb *) uap->aiocbp, LIO_READ);
+ }
+
aiov.iov_base = iocb.aio_buf;
aiov.iov_len = iocb.aio_nbytes;
+
auio.uio_iov = &aiov;
auio.uio_iovcnt = 1;
auio.uio_offset = iocb.aio_offset;
-
- auio.uio_resid = iocb.aio_nbytes;
- if (auio.uio_resid < 0)
- return (EINVAL);
-
auio.uio_rw = UIO_READ;
auio.uio_segflg = UIO_USERSPACE;
auio.uio_procp = p;
@@ -162,20 +1004,6 @@ aio_read(struct proc *p, struct aio_read_args *uap, int *retval) {
return error;
}
-
-/*
- * Return and suspend aren't supported (yet).
- */
-int
-aio_return(struct proc *p, struct aio_return_args *uap, int *retval) {
- return (0);
-}
-
-int
-aio_suspend(struct proc *p, struct aio_suspend_args *uap, int *retval) {
- return (0);
-}
-
int
aio_write(struct proc *p, struct aio_write_args *uap, int *retval) {
struct filedesc *fdp;
@@ -186,15 +1014,18 @@ aio_write(struct proc *p, struct aio_write_args *uap, int *retval) {
int cnt;
struct aiocb iocb;
int error;
-
- if (error = copyin((caddr_t) uap->aiocbp, (caddr_t) &iocb, sizeof iocb))
- return error;
+ int pmodes;
/*
- * We support sync only for now.
+ * Process sync simply -- queue async request.
*/
- if ((iocb._aiocb_private.privatemodes & AIO_PMODE_SYNC) == 0)
- return ENOSYS;
+ pmodes = fuword(&uap->aiocbp->_aiocb_private.privatemodes);
+ if ((pmodes & AIO_PMODE_SYNC) == 0) {
+ return aio_aqueue(p, (struct aiocb *) uap->aiocbp, LIO_WRITE);
+ }
+
+ if (error = copyin((caddr_t) uap->aiocbp, (caddr_t) &iocb, sizeof iocb))
+ return error;
/*
* Get the fd info for process
@@ -210,7 +1041,7 @@ aio_write(struct proc *p, struct aio_write_args *uap, int *retval) {
fp = fdp->fd_ofiles[fd];
if ((fp == NULL) || ((fp->f_flag & FWRITE) == 0))
return EBADF;
- if (((int) iocb.aio_offset) == -1)
+ if (iocb.aio_offset == -1LL)
return EINVAL;
aiov.iov_base = iocb.aio_buf;
@@ -244,21 +1075,103 @@ aio_write(struct proc *p, struct aio_write_args *uap, int *retval) {
int
lio_listio(struct proc *p, struct lio_listio_args *uap, int *retval) {
- struct filedesc *fdp;
- struct file *fp;
- struct uio auio;
- struct iovec aiov;
- unsigned int fd;
- int cnt;
- unsigned int iocblen, iocbcnt;
- struct aiocb *iocb;
- int error;
+ int cnt, nent, nentqueued;
+ struct aiocb *iocb, * const *cbptr;
+ struct aiocblist *cb;
+ struct kaioinfo *ki;
+ int error, runningcode;
int i;
- if (uap->mode == LIO_NOWAIT)
- return ENOSYS;
- iocbcnt = uap->nent;
- if (iocbcnt > AIO_LISTIO_MAX)
+ if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT))
return EINVAL;
- return ENOSYS;
+
+ nent = uap->nent;
+ if (nent > AIO_LISTIO_MAX)
+ return EINVAL;
+
+ if (p->p_aioinfo == NULL) {
+ aio_init_aioinfo(p);
+ }
+
+ if ((nent + num_queue_count) > max_queue_count)
+ return EAGAIN;
+
+ ki = p->p_aioinfo;
+ if ((nent + ki->kaio_queue_count) > ki->kaio_qallowed_count)
+ return EAGAIN;
+
+/*
+ * reserve resources, remember that we have to unwind part of them sometimes
+ */
+ num_queue_count += nent;
+ ki->kaio_queue_count += nent;
+ nentqueued = 0;
+
+/*
+ * get pointers to the list of I/O requests
+ iocbvec = malloc(uap->nent * sizeof(struct aiocb *), M_TEMP, M_WAITOK);
+ */
+
+ cbptr = uap->acb_list;
+ for(i = 0; i < uap->nent; i++) {
+ iocb = (struct aiocb *) fuword((caddr_t) &cbptr[i]);
+ error = aio_aqueue(p, iocb, 0);
+ if (error == 0)
+ nentqueued++;
+ }
+
+ if (nentqueued == 0)
+ return EIO;
+
+ runningcode = 0;
+ if (nentqueued != nent)
+ runningcode = EIO;
+
+ if (uap->mode == LIO_WAIT) {
+ while (1) {
+ for(i = 0; i < uap->nent; i++) {
+ int found;
+ int jobref, command, status;
+
+ iocb = (struct aiocb *) fuword((caddr_t) &cbptr[i]);
+ command = fuword(&iocb->aio_lio_opcode);
+ if (command == LIO_NOP)
+ continue;
+
+ status = fuword(&iocb->_aiocb_private.status);
+ if (status == -1)
+ continue;
+ jobref = fuword(&iocb->_aiocb_private.kernelinfo);
+
+ found = 0;
+ for (cb = TAILQ_FIRST(&ki->kaio_jobdone);
+ cb;
+ cb = TAILQ_NEXT(cb, plist)) {
+ if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) {
+ found++;
+ break;
+ }
+ }
+ if (found == 0)
+ break;
+ }
+
+ if (i == uap->nent) {
+ return runningcode;
+ }
+
+ aio_marksuspend(p, 0, 0, 1);
+ error = tsleep(p, PRIBIO|PCATCH, "aiospn", 0);
+ aio_marksuspend(p, 0, 0, 0);
+
+ if (error == EINTR) {
+ return EINTR;
+ } else if (error == EWOULDBLOCK) {
+ return EAGAIN;
+ }
+
+ }
+ }
+
+ return runningcode;
}
OpenPOWER on IntegriCloud