diff options
author | davidxu <davidxu@FreeBSD.org> | 2006-01-22 05:59:27 +0000 |
---|---|---|
committer | davidxu <davidxu@FreeBSD.org> | 2006-01-22 05:59:27 +0000 |
commit | 72c8645faa631d2798bf531a57784fb9f81c9952 (patch) | |
tree | b753e6f8c06e1f7d68d98aef60c6a0f0ad839b0e | |
parent | af24439a4a62065bd0824fe04d44ebc883288dba (diff) | |
download | FreeBSD-src-72c8645faa631d2798bf531a57784fb9f81c9952.zip FreeBSD-src-72c8645faa631d2798bf531a57784fb9f81c9952.tar.gz |
Make aio code MP safe.
-rw-r--r-- | sys/kern/syscalls.master | 22 | ||||
-rw-r--r-- | sys/kern/vfs_aio.c | 1417 |
2 files changed, 585 insertions, 854 deletions
diff --git a/sys/kern/syscalls.master b/sys/kern/syscalls.master index a11bbda..6d80e93 100644 --- a/sys/kern/syscalls.master +++ b/sys/kern/syscalls.master @@ -467,9 +467,9 @@ u_int nfds, int timeout); } 253 AUE_NULL MSTD { int issetugid(void); } 254 AUE_NULL MSTD { int lchown(char *path, int uid, int gid); } -255 AUE_NULL NOSTD { int aio_read(struct aiocb *aiocbp); } -256 AUE_NULL NOSTD { int aio_write(struct aiocb *aiocbp); } -257 AUE_NULL NOSTD { int lio_listio(int mode, \ +255 AUE_NULL MNOSTD { int aio_read(struct aiocb *aiocbp); } +256 AUE_NULL MNOSTD { int aio_write(struct aiocb *aiocbp); } +257 AUE_NULL MNOSTD { int lio_listio(int mode, \ struct aiocb * const *acb_list, \ int nent, struct sigevent *sig); } 258 AUE_NULL UNIMPL nosys @@ -546,16 +546,16 @@ 312 AUE_NULL MSTD { int setresgid(gid_t rgid, gid_t egid, \ gid_t sgid); } 313 AUE_NULL OBSOL signanosleep -314 AUE_NULL NOSTD { int aio_return(struct aiocb *aiocbp); } -315 AUE_NULL NOSTD { int aio_suspend( \ +314 AUE_NULL MNOSTD { int aio_return(struct aiocb *aiocbp); } +315 AUE_NULL MNOSTD { int aio_suspend( \ struct aiocb * const * aiocbp, int nent, \ const struct timespec *timeout); } -316 AUE_NULL NOSTD { int aio_cancel(int fd, \ +316 AUE_NULL MNOSTD { int aio_cancel(int fd, \ struct aiocb *aiocbp); } -317 AUE_NULL NOSTD { int aio_error(struct aiocb *aiocbp); } -318 AUE_NULL NOSTD { int oaio_read(struct oaiocb *aiocbp); } -319 AUE_NULL NOSTD { int oaio_write(struct oaiocb *aiocbp); } -320 AUE_NULL NOSTD { int olio_listio(int mode, \ +317 AUE_NULL MNOSTD { int aio_error(struct aiocb *aiocbp); } +318 AUE_NULL MNOSTD { int oaio_read(struct oaiocb *aiocbp); } +319 AUE_NULL MNOSTD { int oaio_write(struct oaiocb *aiocbp); } +320 AUE_NULL MNOSTD { int olio_listio(int mode, \ struct oaiocb * const *acb_list, \ int nent, struct osigevent *sig); } 321 AUE_NULL MSTD { int yield(void); } @@ -631,7 +631,7 @@ 358 AUE_NULL MSTD { int extattr_delete_file(const char *path, \ int attrnamespace, \ const char *attrname); } -359 AUE_NULL NOSTD { int aio_waitcomplete( \ +359 AUE_NULL MNOSTD { int aio_waitcomplete( \ struct aiocb **aiocbp, \ struct timespec *timeout); } 360 AUE_NULL MSTD { int getresuid(uid_t *ruid, uid_t *euid, \ diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index 22a3bb7..b1b3288 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -42,15 +42,20 @@ __FBSDID("$FreeBSD$"); #include <sys/resourcevar.h> #include <sys/signalvar.h> #include <sys/protosw.h> +#include <sys/sema.h> +#include <sys/socket.h> #include <sys/socketvar.h> #include <sys/syscall.h> #include <sys/sysent.h> #include <sys/sysctl.h> #include <sys/sx.h> +#include <sys/taskqueue.h> #include <sys/vnode.h> #include <sys/conf.h> #include <sys/event.h> +#include <machine/atomic.h> + #include <posix4/posix4.h> #include <vm/vm.h> #include <vm/vm_extern.h> @@ -61,8 +66,6 @@ __FBSDID("$FreeBSD$"); #include "opt_vfs_aio.h" -NET_NEEDS_GIANT("aio"); - /* * Counter for allocating reference ids to new jobs. Wrapped to 1 on * overflow. @@ -70,11 +73,11 @@ NET_NEEDS_GIANT("aio"); static long jobrefid; #define JOBST_NULL 0x0 +#define JOBST_JOBQSOCK 0x1 #define JOBST_JOBQGLOBAL 0x2 #define JOBST_JOBRUNNING 0x3 #define JOBST_JOBFINISHED 0x4 #define JOBST_JOBQBUF 0x5 -#define JOBST_JOBBFINISHED 0x6 #ifndef MAX_AIO_PER_PROC #define MAX_AIO_PER_PROC 32 @@ -184,6 +187,7 @@ typedef struct oaiocb { struct aiocblist { TAILQ_ENTRY(aiocblist) list; /* List of jobs */ TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */ + TAILQ_ENTRY(aiocblist) allist; int jobflags; int jobstate; int inputcharge; @@ -192,16 +196,18 @@ struct aiocblist { struct proc *userproc; /* User process */ struct ucred *cred; /* Active credential when created */ struct file *fd_file; /* Pointer to file structure */ - struct aio_liojob *lio; /* Optional lio job */ + struct aioliojob *lio; /* Optional lio job */ struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */ struct knlist klist; /* list of knotes */ struct aiocb uaiocb; /* Kernel I/O control block */ ksiginfo_t ksi; /* Realtime signal info */ + struct task biotask; }; /* jobflags */ -#define AIOCBLIST_RUNDOWN 0x4 +#define AIOCBLIST_RUNDOWN 0x04 #define AIOCBLIST_DONE 0x10 +#define AIOCBLIST_BUFDONE 0x20 /* * AIO process info @@ -217,18 +223,17 @@ struct aiothreadlist { /* * data-structure for lio signal management */ -struct aio_liojob { +struct aioliojob { int lioj_flags; - int lioj_buffer_count; - int lioj_buffer_finished_count; - int lioj_queue_count; - int lioj_queue_finished_count; - int lioj_total_count; + int lioj_count; + int lioj_finished_count; + int lioj_ref_count; struct sigevent lioj_signal; /* signal on all I/O done */ - TAILQ_ENTRY(aio_liojob) lioj_list; + TAILQ_ENTRY(aioliojob) lioj_list; struct knlist klist; /* list of knotes */ ksiginfo_t lioj_ksi; /* Realtime signal info */ }; + #define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */ #define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */ #define LIOJ_KEVENT_POSTED 0x4 /* kevent triggered */ @@ -241,16 +246,14 @@ struct kaioinfo { int kaio_maxactive_count; /* maximum number of AIOs */ int kaio_active_count; /* number of currently used AIOs */ int kaio_qallowed_count; /* maxiumu size of AIO queue */ - int kaio_queue_count; /* size of AIO queue */ + int kaio_count; /* size of AIO queue */ int kaio_ballowed_count; /* maximum number of buffers */ - int kaio_queue_finished_count; /* number of daemon jobs finished */ int kaio_buffer_count; /* number of physio buffers */ - int kaio_buffer_finished_count; /* count of I/O done */ - TAILQ_HEAD(,aio_liojob) kaio_liojoblist; /* list of lio jobs */ + TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */ + TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */ + TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */ TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */ - TAILQ_HEAD(,aiocblist) kaio_jobdone; /* done queue for process */ TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */ - TAILQ_HEAD(,aiocblist) kaio_bufdone; /* buffer done queue for process */ TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */ }; @@ -258,22 +261,24 @@ struct kaioinfo { #define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */ static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */ -static struct mtx aio_freeproc_mtx; - +static struct sema aio_newproc_sem; +static struct mtx aio_job_mtx; +static struct mtx aio_sock_mtx; static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */ +static struct unrhdr *aiod_unr; static void aio_init_aioinfo(struct proc *p); static void aio_onceonly(void); static int aio_free_entry(struct aiocblist *aiocbe); static void aio_process(struct aiocblist *aiocbe); -static int aio_newproc(void); +static int aio_newproc(int *); static int aio_aqueue(struct thread *td, struct aiocb *job, int type, int osigev); static void aio_physwakeup(struct buf *bp); static void aio_proc_rundown(void *arg, struct proc *p); -static int aio_fphysio(struct aiocblist *aiocbe); static int aio_qphysio(struct proc *p, struct aiocblist *iocb); -static void aio_daemon(void *uproc); +static void biohelper(void *, int); +static void aio_daemon(void *param); static void aio_swake_cb(struct socket *, struct sockbuf *); static int aio_unload(void); static int filt_aioattach(struct knote *kn); @@ -306,6 +311,8 @@ static struct filterops lio_filtops = static eventhandler_tag exit_tag, exec_tag; +TASKQUEUE_DEFINE_THREAD(aiod_bio); + /* * Main operations function for use as a kernel module. */ @@ -368,8 +375,11 @@ aio_onceonly(void) kqueue_add_filteropts(EVFILT_AIO, &aio_filtops); kqueue_add_filteropts(EVFILT_LIO, &lio_filtops); TAILQ_INIT(&aio_freeproc); - mtx_init(&aio_freeproc_mtx, "aio_freeproc", NULL, MTX_DEF); + sema_init(&aio_newproc_sem, 0, "aio_new_proc"); + mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF); + mtx_init(&aio_sock_mtx, "aio_sock", NULL, MTX_DEF); TAILQ_INIT(&aio_jobs); + aiod_unr = new_unrhdr(1, INT_MAX, NULL); kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL, @@ -378,7 +388,7 @@ aio_onceonly(void) NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); - aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aio_liojob), NULL, + aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aioliojob), NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); aiod_timeout = AIOD_TIMEOUT_DEFAULT; aiod_lifetime = AIOD_LIFETIME_DEFAULT; @@ -411,12 +421,15 @@ aio_unload(void) error = kqueue_del_filteropts(EVFILT_AIO); if (error) return error; - async_io_version = 0; aio_swake = NULL; + taskqueue_free(taskqueue_aiod_bio); + delete_unrhdr(aiod_unr); EVENTHANDLER_DEREGISTER(process_exit, exit_tag); EVENTHANDLER_DEREGISTER(process_exec, exec_tag); - mtx_destroy(&aio_freeproc_mtx); + mtx_destroy(&aio_job_mtx); + mtx_destroy(&aio_sock_mtx); + sema_destroy(&aio_newproc_sem); p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1); p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1); p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1); @@ -437,13 +450,12 @@ aio_init_aioinfo(struct proc *p) ki->kaio_maxactive_count = max_aio_per_proc; ki->kaio_active_count = 0; ki->kaio_qallowed_count = max_aio_queue_per_proc; - ki->kaio_queue_count = 0; + ki->kaio_count = 0; ki->kaio_ballowed_count = max_buf_aio; ki->kaio_buffer_count = 0; - ki->kaio_buffer_finished_count = 0; - TAILQ_INIT(&ki->kaio_jobdone); + TAILQ_INIT(&ki->kaio_all); + TAILQ_INIT(&ki->kaio_done); TAILQ_INIT(&ki->kaio_jobqueue); - TAILQ_INIT(&ki->kaio_bufdone); TAILQ_INIT(&ki->kaio_bufqueue); TAILQ_INIT(&ki->kaio_liojoblist); TAILQ_INIT(&ki->kaio_sockqueue); @@ -457,7 +469,7 @@ aio_init_aioinfo(struct proc *p) } while (num_aio_procs < target_aio_procs) - aio_newproc(); + aio_newproc(NULL); } static int @@ -481,54 +493,53 @@ static int aio_free_entry(struct aiocblist *aiocbe) { struct kaioinfo *ki; - struct aio_liojob *lj; + struct aioliojob *lj; struct proc *p; - int error; - int s; - - if (aiocbe->jobstate == JOBST_NULL) - panic("aio_free_entry: freeing already free job"); p = aiocbe->userproc; - KASSERT(curthread->td_proc == p, - ("%s: called for non-curproc", __func__)); + + PROC_LOCK_ASSERT(p, MA_OWNED); + MPASS(curproc == p); + MPASS(aiocbe->jobstate == JOBST_JOBFINISHED); + ki = p->p_aioinfo; + MPASS(ki != NULL); + + atomic_subtract_int(&num_queue_count, 1); + + ki->kaio_count--; + MPASS(ki->kaio_count >= 0); + lj = aiocbe->lio; - if (ki == NULL) - panic("aio_free_entry: missing p->p_aioinfo"); + if (lj) { + lj->lioj_count--; + lj->lioj_finished_count--; - while (aiocbe->jobstate == JOBST_JOBRUNNING) { - aiocbe->jobflags |= AIOCBLIST_RUNDOWN; - tsleep(aiocbe, PRIBIO, "jobwai", 0); - } - if (aiocbe->bp == NULL) { - if (ki->kaio_queue_count <= 0) - panic("aio_free_entry: process queue size <= 0"); - if (num_queue_count <= 0) - panic("aio_free_entry: system wide queue size <= 0"); - - if (lj) { - lj->lioj_queue_count--; - if (aiocbe->jobflags & AIOCBLIST_DONE) - lj->lioj_queue_finished_count--; - } - ki->kaio_queue_count--; - if (aiocbe->jobflags & AIOCBLIST_DONE) - ki->kaio_queue_finished_count--; - num_queue_count--; - } else { - if (lj) { - lj->lioj_buffer_count--; - if (aiocbe->jobflags & AIOCBLIST_DONE) - lj->lioj_buffer_finished_count--; + if (lj->lioj_count == 0 && lj->lioj_ref_count == 0) { + TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); + /* lio is going away, we need to destroy any knotes */ + knlist_delete(&lj->klist, curthread, 1); + sigqueue_take(&lj->lioj_ksi); + uma_zfree(aiolio_zone, lj); } - if (aiocbe->jobflags & AIOCBLIST_DONE) - ki->kaio_buffer_finished_count--; - ki->kaio_buffer_count--; - num_buf_aio--; } + TAILQ_REMOVE(&ki->kaio_done, aiocbe, plist); + TAILQ_REMOVE(&ki->kaio_all, aiocbe, allist); + /* aiocbe is going away, we need to destroy any knotes */ + knlist_delete(&aiocbe->klist, curthread, 1); + sigqueue_take(&aiocbe->ksi); + + MPASS(aiocbe->bp == NULL); + aiocbe->jobstate = JOBST_NULL; + + /* Wake up anyone who has interest to do cleanup work. */ + if (ki->kaio_flags & (KAIO_WAKEUP | KAIO_RUNDOWN)) { + ki->kaio_flags &= ~KAIO_WAKEUP; + wakeup(&p->p_aioinfo); + } + PROC_UNLOCK(p); /* * The thread argument here is used to find the owning process @@ -550,55 +561,11 @@ aio_free_entry(struct aiocblist *aiocbe) * at open time, but this is already true of file descriptors in * a multithreaded process. */ - if (lj) - knlist_delete(&lj->klist, curthread, 0); - knlist_delete(&aiocbe->klist, curthread, 0); - - if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN) - && ((ki->kaio_buffer_count == 0) && (ki->kaio_queue_count == 0)))) { - ki->kaio_flags &= ~KAIO_WAKEUP; - wakeup(p); - } - - if (aiocbe->jobstate == JOBST_JOBQBUF) { - if ((error = aio_fphysio(aiocbe)) != 0) - return (error); - if (aiocbe->jobstate != JOBST_JOBBFINISHED) - panic("aio_free_entry: invalid physio finish-up state"); - s = splbio(); - TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist); - splx(s); - } else if (aiocbe->jobstate == JOBST_JOBQGLOBAL) { - s = splnet(); - TAILQ_REMOVE(&aio_jobs, aiocbe, list); - TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); - splx(s); - } else if (aiocbe->jobstate == JOBST_JOBFINISHED) - TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist); - else if (aiocbe->jobstate == JOBST_JOBBFINISHED) { - s = splbio(); - TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist); - splx(s); - if (aiocbe->bp) { - vunmapbuf(aiocbe->bp); - relpbuf(aiocbe->bp, NULL); - aiocbe->bp = NULL; - } - } - if (lj && (lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) { - TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); - PROC_LOCK(p); - sigqueue_take(&lj->lioj_ksi); - PROC_UNLOCK(p); - uma_zfree(aiolio_zone, lj); - } - aiocbe->jobstate = JOBST_NULL; fdrop(aiocbe->fd_file, curthread); crfree(aiocbe->cred); - PROC_LOCK(p); - sigqueue_take(&aiocbe->ksi); - PROC_UNLOCK(p); uma_zfree(aiocb_zone, aiocbe); + PROC_LOCK(p); + return (0); } @@ -608,10 +575,9 @@ aio_free_entry(struct aiocblist *aiocbe) static void aio_proc_rundown(void *arg, struct proc *p) { - int s; struct kaioinfo *ki; - struct aio_liojob *lj, *ljn; - struct aiocblist *aiocbe, *aiocbn; + struct aioliojob *lj; + struct aiocblist *cbe, *cbn; struct file *fp; struct socket *so; @@ -621,105 +587,72 @@ aio_proc_rundown(void *arg, struct proc *p) if (ki == NULL) return; - mtx_lock(&Giant); - ki->kaio_flags |= LIOJ_SIGNAL_POSTED; - while ((ki->kaio_active_count > 0) || (ki->kaio_buffer_count > - ki->kaio_buffer_finished_count)) { - ki->kaio_flags |= KAIO_RUNDOWN; - if (tsleep(p, PRIBIO, "kaiowt", aiod_timeout)) - break; - } + PROC_LOCK(p); + +restart: + ki->kaio_flags |= KAIO_RUNDOWN; /* - * Move any aio ops that are waiting on socket I/O to the normal job - * queues so they are cleaned up with any others. + * Try to cancel all pending requests. This code simulates + * aio_cancel on all pending I/O requests. */ - s = splnet(); - TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_sockqueue, plist, aiocbn) { - fp = aiocbe->fd_file; - if (fp != NULL) { - so = fp->f_data; - TAILQ_REMOVE(&so->so_aiojobq, aiocbe, list); - if (TAILQ_EMPTY(&so->so_aiojobq)) { - SOCKBUF_LOCK(&so->so_snd); - so->so_snd.sb_flags &= ~SB_AIO; - SOCKBUF_UNLOCK(&so->so_snd); - SOCKBUF_LOCK(&so->so_rcv); - so->so_rcv.sb_flags &= ~SB_AIO; - SOCKBUF_UNLOCK(&so->so_rcv); - } - } - TAILQ_REMOVE(&ki->kaio_sockqueue, aiocbe, plist); - TAILQ_INSERT_HEAD(&aio_jobs, aiocbe, list); - TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, aiocbe, plist); + while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) { + fp = cbe->fd_file; + so = fp->f_data; + mtx_lock(&aio_sock_mtx); + TAILQ_REMOVE(&so->so_aiojobq, cbe, list); + mtx_unlock(&aio_sock_mtx); + TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); + TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist); + cbe->jobstate = JOBST_JOBQGLOBAL; } - splx(s); -restart1: - TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_jobdone, plist, aiocbn) { - if (aio_free_entry(aiocbe)) - goto restart1; + TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) { + mtx_lock(&aio_job_mtx); + if (cbe->jobstate == JOBST_JOBQGLOBAL) { + TAILQ_REMOVE(&aio_jobs, cbe, list); + mtx_unlock(&aio_job_mtx); + cbe->jobstate = JOBST_JOBFINISHED; + cbe->uaiocb._aiocb_private.status = -1; + cbe->uaiocb._aiocb_private.error = ECANCELED; + TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); + aio_bio_done_notify(p, cbe, DONE_QUEUE); + } else { + mtx_unlock(&aio_job_mtx); + } } -restart2: - TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_jobqueue, plist, aiocbn) { - if (aio_free_entry(aiocbe)) - goto restart2; - } + if (TAILQ_FIRST(&ki->kaio_sockqueue)) + goto restart; -/* - * Note the use of lots of splbio here, trying to avoid splbio for long chains - * of I/O. Probably unnecessary. - */ -restart3: - s = splbio(); - while (TAILQ_FIRST(&ki->kaio_bufqueue)) { + /* Wait for all running I/O to be finished */ + if (TAILQ_FIRST(&ki->kaio_bufqueue) || + TAILQ_FIRST(&ki->kaio_jobqueue)) { ki->kaio_flags |= KAIO_WAKEUP; - tsleep(p, PRIBIO, "aioprn", 0); - splx(s); - goto restart3; + msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz); + goto restart; } - splx(s); - -restart4: - s = splbio(); - TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_bufdone, plist, aiocbn) { - if (aio_free_entry(aiocbe)) { - splx(s); - goto restart4; - } - } - splx(s); - /* - * If we've slept, jobs might have moved from one queue to another. - * Retry rundown if we didn't manage to empty the queues. - */ - if (TAILQ_FIRST(&ki->kaio_jobdone) != NULL || - TAILQ_FIRST(&ki->kaio_jobqueue) != NULL || - TAILQ_FIRST(&ki->kaio_bufqueue) != NULL || - TAILQ_FIRST(&ki->kaio_bufdone) != NULL) - goto restart1; - - TAILQ_FOREACH_SAFE(lj, &ki->kaio_liojoblist, lioj_list, ljn) { - if ((lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == - 0)) { + /* Free all completed I/O requests. */ + while ((cbe = TAILQ_FIRST(&ki->kaio_done)) != NULL) + aio_free_entry(cbe); + + while ((lj = TAILQ_FIRST(&ki->kaio_liojoblist)) != NULL) { + if (lj->lioj_count == 0 && lj->lioj_ref_count == 0) { TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); + knlist_delete(&lj->klist, curthread, 1); + sigqueue_take(&lj->lioj_ksi); uma_zfree(aiolio_zone, lj); } else { -#ifdef DIAGNOSTIC - printf("LIO job not cleaned up: B:%d, BF:%d, Q:%d, " - "QF:%d\n", lj->lioj_buffer_count, - lj->lioj_buffer_finished_count, - lj->lioj_queue_count, - lj->lioj_queue_finished_count); -#endif + panic("LIO job not cleaned up: C:%d, FC:%d, RC:%d\n", + lj->lioj_count, lj->lioj_finished_count, + lj->lioj_ref_count); } } uma_zfree(kaio_zone, ki); p->p_aioinfo = NULL; - mtx_unlock(&Giant); + PROC_UNLOCK(p); } /* @@ -728,25 +661,24 @@ restart4: static struct aiocblist * aio_selectjob(struct aiothreadlist *aiop) { - int s; struct aiocblist *aiocbe; struct kaioinfo *ki; struct proc *userp; - s = splnet(); + mtx_assert(&aio_job_mtx, MA_OWNED); TAILQ_FOREACH(aiocbe, &aio_jobs, list) { userp = aiocbe->userproc; ki = userp->p_aioinfo; if (ki->kaio_active_count < ki->kaio_maxactive_count) { TAILQ_REMOVE(&aio_jobs, aiocbe, list); - splx(s); - return (aiocbe); + /* Account for currently active jobs. */ + ki->kaio_active_count++; + aiocbe->jobstate = JOBST_JOBRUNNING; + break; } } - splx(s); - - return (NULL); + return (aiocbe); } /* @@ -754,6 +686,15 @@ aio_selectjob(struct aiothreadlist *aiop) * the non-physio version of the operations. The normal vn operations are used, * and this code should work in all instances for every type of file, including * pipes, sockets, fifos, and regular files. + * + * XXX I don't think these code work well with pipes, sockets and fifo, the + * problem is the aiod threads can be blocked if there is not data or no + * buffer space, and file was not opened with O_NONBLOCK, all aiod threads + * will be blocked if there is couple of such processes. We need a FOF_OFFSET + * like flag to override f_flag to tell low level system to do non-blocking + * I/O, we can not muck O_NONBLOCK because there is full of race between + * userland and aiod threads, although there is a trigger mechanism for socket, + * but it also does not work well if userland is misbehaviored. */ static void aio_process(struct aiocblist *aiocbe) @@ -763,6 +704,7 @@ aio_process(struct aiocblist *aiocbe) struct proc *mycp; struct aiocb *cb; struct file *fp; + struct socket *so; struct uio auio; struct iovec aiov; int cnt; @@ -811,9 +753,17 @@ aio_process(struct aiocblist *aiocbe) if (error == ERESTART || error == EINTR || error == EWOULDBLOCK) error = 0; if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) { - PROC_LOCK(aiocbe->userproc); - psignal(aiocbe->userproc, SIGPIPE); - PROC_UNLOCK(aiocbe->userproc); + int sigpipe = 1; + if (fp->f_type == DTYPE_SOCKET) { + so = fp->f_data; + if (so->so_options & SO_NOSIGPIPE) + sigpipe = 0; + } + if (sigpipe) { + PROC_LOCK(aiocbe->userproc); + psignal(aiocbe->userproc, SIGPIPE); + PROC_UNLOCK(aiocbe->userproc); + } } } @@ -824,77 +774,61 @@ aio_process(struct aiocblist *aiocbe) } static void -aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type){ - int lj_done; - struct aio_liojob *lj; +aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type) +{ + struct aioliojob *lj; struct kaioinfo *ki; + int lj_done; + PROC_LOCK_ASSERT(userp, MA_OWNED); ki = userp->p_aioinfo; lj = aiocbe->lio; lj_done = 0; if (lj) { - if (type == DONE_QUEUE) - lj->lioj_queue_finished_count++; - else - lj->lioj_buffer_finished_count++; - if (lj->lioj_queue_finished_count + - lj->lioj_buffer_finished_count == - lj->lioj_total_count) + lj->lioj_finished_count++; + if (lj->lioj_count == lj->lioj_finished_count) lj_done = 1; } + if (type == DONE_QUEUE) { + aiocbe->jobflags |= AIOCBLIST_DONE; + } else { + aiocbe->jobflags |= AIOCBLIST_BUFDONE; + ki->kaio_buffer_count--; + } + TAILQ_INSERT_TAIL(&ki->kaio_done, aiocbe, plist); + aiocbe->jobstate = JOBST_JOBFINISHED; + if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL || + aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) + aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi); - if (ki) { - if (type == DONE_QUEUE) { - ki->kaio_queue_finished_count++; - TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist); - } else { - ki->kaio_buffer_finished_count++; - TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist); - } - if (lj_done) { - if (!knlist_empty(&lj->klist) - && lj->lioj_signal.sigev_notify == - SIGEV_KEVENT) { - lj->lioj_flags |= LIOJ_KEVENT_POSTED; - KNOTE_UNLOCKED(&lj->klist, 0); - } - if ((lj->lioj_flags & - (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) - == LIOJ_SIGNAL - && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || - lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) { - PROC_LOCK(userp); - aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi); - PROC_UNLOCK(userp); - lj->lioj_flags |= LIOJ_SIGNAL_POSTED; - } - } - KNOTE_UNLOCKED(&aiocbe->klist, 0); + KNOTE_LOCKED(&aiocbe->klist, 1); - if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { - ki->kaio_flags &= ~KAIO_WAKEUP; - wakeup(userp); + if (lj_done) { + if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { + lj->lioj_flags |= LIOJ_KEVENT_POSTED; + KNOTE_LOCKED(&lj->klist, 1); + } + if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) + == LIOJ_SIGNAL + && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || + lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) { + aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi); + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; } } - - if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL || - aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) { - PROC_LOCK(userp); - aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi); - PROC_UNLOCK(userp); + if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { + ki->kaio_flags &= ~KAIO_WAKEUP; + wakeup(&userp->p_aioinfo); } } + /* * The AIO daemon, most of the actual work is done in aio_process, * but the setup (and address space mgmt) is done in this routine. */ static void -aio_daemon(void *uproc) +aio_daemon(void *_id) { - int s; - struct aiocb *cb; struct aiocblist *aiocbe; struct aiothreadlist *aiop; struct kaioinfo *ki; @@ -903,6 +837,7 @@ aio_daemon(void *uproc) struct thread *td = curthread; struct pgrp *newpgrp; struct session *newsess; + int id = (intptr_t)_id; /* * Local copies of curproc (cp) and vmspace (myvm) @@ -923,18 +858,16 @@ aio_daemon(void *uproc) /* * Place thread (lightweight process) onto the AIO free thread list. */ - mtx_lock(&aio_freeproc_mtx); + mtx_lock(&aio_job_mtx); TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); - mtx_unlock(&aio_freeproc_mtx); + mtx_unlock(&aio_job_mtx); /* * Get rid of our current filedescriptors. AIOD's don't need any * filedescriptors, except as temporarily inherited from the client. */ - mtx_lock(&Giant); fdfree(td); - mtx_unlock(&Giant); /* The daemon resides in its own pgrp. */ MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP, M_WAITOK | M_ZERO); @@ -944,14 +877,14 @@ aio_daemon(void *uproc) sx_xlock(&proctree_lock); enterpgrp(mycp, mycp->p_pid, newpgrp, newsess); sx_xunlock(&proctree_lock); - mtx_lock(&Giant); /* * Wakeup parent process. (Parent sleeps to keep from blasting away * and creating too many daemons.) */ - wakeup(mycp); + sema_post(&aio_newproc_sem); + mtx_lock(&aio_job_mtx); for (;;) { /* * curcp is the current daemon process context. @@ -962,22 +895,18 @@ aio_daemon(void *uproc) /* * Take daemon off of free queue */ - mtx_lock(&aio_freeproc_mtx); if (aiop->aiothreadflags & AIOP_FREE) { TAILQ_REMOVE(&aio_freeproc, aiop, list); aiop->aiothreadflags &= ~AIOP_FREE; } - mtx_unlock(&aio_freeproc_mtx); /* * Check for jobs. */ while ((aiocbe = aio_selectjob(aiop)) != NULL) { - cb = &aiocbe->uaiocb; + mtx_unlock(&aio_job_mtx); userp = aiocbe->userproc; - aiocbe->jobstate = JOBST_JOBRUNNING; - /* * Connect to process address space for user program. */ @@ -1012,29 +941,30 @@ aio_daemon(void *uproc) ki = userp->p_aioinfo; - /* Account for currently active jobs. */ - ki->kaio_active_count++; - /* Do the I/O function. */ aio_process(aiocbe); - s = splbio(); - /* Decrement the active job count. */ - ki->kaio_active_count--; - - aiocbe->jobflags |= AIOCBLIST_DONE; - aiocbe->jobstate = JOBST_JOBFINISHED; + PROC_LOCK(userp); + TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); aio_bio_done_notify(userp, aiocbe, DONE_QUEUE); if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) { wakeup(aiocbe); aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN; } + PROC_UNLOCK(userp); + + mtx_lock(&aio_job_mtx); + /* Decrement the active job count. */ + ki->kaio_active_count--; } /* * Disconnect from user address space. */ if (curcp != mycp) { + + mtx_unlock(&aio_job_mtx); + /* Get the user address space to disconnect from. */ tmpvm = mycp->p_vmspace; @@ -1053,9 +983,18 @@ aio_daemon(void *uproc) vmspace_free(tmpvm); curcp = mycp; + + mtx_lock(&aio_job_mtx); + /* + * We have to restart to avoid race, we only sleep if + * no job can be selected, that should be + * curcp == mycp. + */ + continue; } - mtx_lock(&aio_freeproc_mtx); + mtx_assert(&aio_job_mtx, MA_OWNED); + TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); aiop->aiothreadflags |= AIOP_FREE; @@ -1063,18 +1002,16 @@ aio_daemon(void *uproc) * If daemon is inactive for a long time, allow it to exit, * thereby freeing resources. */ - if (msleep(aiop->aiothread, &aio_freeproc_mtx, PDROP | PRIBIO, - "aiordy", aiod_lifetime)) { - s = splnet(); + if (msleep(aiop->aiothread, &aio_job_mtx, PRIBIO, "aiordy", + aiod_lifetime)) { if (TAILQ_EMPTY(&aio_jobs)) { - mtx_lock(&aio_freeproc_mtx); if ((aiop->aiothreadflags & AIOP_FREE) && (num_aio_procs > target_aio_procs)) { TAILQ_REMOVE(&aio_freeproc, aiop, list); - mtx_unlock(&aio_freeproc_mtx); - splx(s); - uma_zfree(aiop_zone, aiop); num_aio_procs--; + mtx_unlock(&aio_job_mtx); + uma_zfree(aiop_zone, aiop); + free_unr(aiod_unr, id); #ifdef DIAGNOSTIC if (mycp->p_vmspace->vm_refcnt <= 1) { printf("AIOD: bad vm refcnt for" @@ -1084,36 +1021,40 @@ aio_daemon(void *uproc) #endif kthread_exit(0); } - mtx_unlock(&aio_freeproc_mtx); } - splx(s); } } + mtx_unlock(&aio_job_mtx); + panic("shouldn't be here\n"); } /* - * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The + * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The * AIO daemon modifies its environment itself. */ static int -aio_newproc(void) +aio_newproc(int *start) { int error; struct proc *p; + int id; - error = kthread_create(aio_daemon, curproc, &p, RFNOWAIT, 0, "aiod%d", - num_aio_procs); - if (error) - return (error); - - /* - * Wait until daemon is started, but continue on just in case to - * handle error conditions. - */ - error = tsleep(p, PZERO, "aiosta", aiod_timeout); - - num_aio_procs++; - + id = alloc_unr(aiod_unr); + error = kthread_create(aio_daemon, (void *)(intptr_t)id, &p, + RFNOWAIT, 0, "aiod%d", id); + if (error == 0) { + /* + * Wait until daemon is started. + */ + sema_wait(&aio_newproc_sem); + mtx_lock(&aio_job_mtx); + num_aio_procs++; + if (start != NULL) + *start--; + mtx_unlock(&aio_job_mtx); + } else { + free_unr(aiod_unr, id); + } return (error); } @@ -1129,15 +1070,13 @@ aio_newproc(void) static int aio_qphysio(struct proc *p, struct aiocblist *aiocbe) { - int error; struct aiocb *cb; struct file *fp; struct buf *bp; struct vnode *vp; struct kaioinfo *ki; - struct aio_liojob *lj; - int s, lj_done = 0; - int notify; + struct aioliojob *lj; + int error; cb = &aiocbe->uaiocb; fp = aiocbe->fd_file; @@ -1173,16 +1112,18 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) if (ki->kaio_buffer_count >= ki->kaio_ballowed_count) return (-1); - ki->kaio_buffer_count++; - - lj = aiocbe->lio; - if (lj) - lj->lioj_buffer_count++; - /* Create and build a buffer header for a transfer. */ bp = (struct buf *)getpbuf(NULL); BUF_KERNPROC(bp); + PROC_LOCK(p); + ki->kaio_count++; + ki->kaio_buffer_count++; + lj = aiocbe->lio; + if (lj) + lj->lioj_count++; + PROC_UNLOCK(p); + /* * Get a copy of the kva from the physical buffer. */ @@ -1206,115 +1147,34 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) goto doerror; } - s = splbio(); + PROC_LOCK(p); aiocbe->bp = bp; bp->b_caller1 = (void *)aiocbe; TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); aiocbe->jobstate = JOBST_JOBQBUF; cb->_aiocb_private.status = cb->aio_nbytes; - num_buf_aio++; + PROC_UNLOCK(p); + + atomic_add_int(&num_queue_count, 1); + atomic_add_int(&num_buf_aio, 1); + bp->b_error = 0; - splx(s); + TASK_INIT(&aiocbe->biotask, 0, biohelper, aiocbe); /* Perform transfer. */ dev_strategy(vp->v_rdev, bp); - - notify = 0; - s = splbio(); - - /* - * If we had an error invoking the request, or an error in processing - * the request before we have returned, we process it as an error in - * transfer. Note that such an I/O error is not indicated immediately, - * but is returned using the aio_error mechanism. In this case, - * aio_suspend will return immediately. - */ - if (bp->b_error || (bp->b_ioflags & BIO_ERROR)) { - struct aiocb *job = aiocbe->uuaiocb; - - aiocbe->uaiocb._aiocb_private.status = 0; - suword(&job->_aiocb_private.status, 0); - aiocbe->uaiocb._aiocb_private.error = bp->b_error; - suword(&job->_aiocb_private.error, bp->b_error); - - if (lj) { - lj->lioj_buffer_finished_count++; - if (lj->lioj_queue_finished_count + - lj->lioj_buffer_finished_count == - lj->lioj_total_count) - lj_done = 1; - } - - ki->kaio_buffer_finished_count++; - - if (aiocbe->jobstate != JOBST_JOBBFINISHED) { - aiocbe->jobstate = JOBST_JOBBFINISHED; - aiocbe->jobflags |= AIOCBLIST_DONE; - TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist); - notify = 1; - } - } - splx(s); - if (notify) { - if (lj && !knlist_empty(&lj->klist)) { - lj->lioj_flags |= LIOJ_KEVENT_POSTED; - KNOTE_UNLOCKED(&lj->klist, 0); - } - KNOTE_UNLOCKED(&aiocbe->klist, 0); - - } - if (cb->aio_lio_opcode == LIO_WRITE) { - aiocbe->outputcharge += btodb(cb->aio_nbytes); - } else if (cb->aio_lio_opcode == LIO_READ) { - aiocbe->inputcharge += btodb(cb->aio_nbytes); - } return (0); doerror: + PROC_LOCK(p); + ki->kaio_count--; ki->kaio_buffer_count--; if (lj) - lj->lioj_buffer_count--; + lj->lioj_count--; aiocbe->bp = NULL; - relpbuf(bp, NULL); - return (error); -} - -/* - * This waits/tests physio completion. - */ -static int -aio_fphysio(struct aiocblist *iocb) -{ - int s; - struct buf *bp; - int error; - - bp = iocb->bp; - - s = splbio(); - while ((bp->b_flags & B_DONE) == 0) { - if (tsleep(bp, PRIBIO, "physstr", aiod_timeout)) { - if ((bp->b_flags & B_DONE) == 0) { - splx(s); - return (EINPROGRESS); - } else - break; - } - } - splx(s); - - /* Release mapping into kernel space. */ - vunmapbuf(bp); - iocb->bp = 0; - - error = 0; - - /* Check for an error. */ - if (bp->b_ioflags & BIO_ERROR) - error = bp->b_error; - + PROC_UNLOCK(p); relpbuf(bp, NULL); return (error); } @@ -1325,7 +1185,7 @@ aio_fphysio(struct aiocblist *iocb) static void aio_swake_cb(struct socket *so, struct sockbuf *sb) { - struct aiocblist *cb,*cbn; + struct aiocblist *cb, *cbn; struct proc *p; struct kaioinfo *ki = NULL; int opcode, wakecount = 0; @@ -1343,28 +1203,39 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb) SOCKBUF_UNLOCK(&so->so_rcv); } + mtx_lock(&aio_sock_mtx); TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) { if (opcode == cb->uaiocb.aio_lio_opcode) { + if (cb->jobstate != JOBST_JOBQGLOBAL) + panic("invalid queue value"); p = cb->userproc; ki = p->p_aioinfo; TAILQ_REMOVE(&so->so_aiojobq, cb, list); + PROC_LOCK(p); TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist); - TAILQ_INSERT_TAIL(&aio_jobs, cb, list); + /* + * XXX check AIO_RUNDOWN, and don't put on + * jobqueue if it was set. + */ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist); + cb->jobstate = JOBST_JOBQGLOBAL; + mtx_lock(&aio_job_mtx); + TAILQ_INSERT_TAIL(&aio_jobs, cb, list); + mtx_unlock(&aio_job_mtx); + PROC_UNLOCK(p); wakecount++; - if (cb->jobstate != JOBST_JOBQGLOBAL) - panic("invalid queue value"); } } + mtx_unlock(&aio_sock_mtx); while (wakecount--) { - mtx_lock(&aio_freeproc_mtx); + mtx_lock(&aio_job_mtx); if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { TAILQ_REMOVE(&aio_freeproc, aiop, list); aiop->aiothreadflags &= ~AIOP_FREE; wakeup(aiop->aiothread); } - mtx_unlock(&aio_freeproc_mtx); + mtx_unlock(&aio_job_mtx); } } @@ -1373,16 +1244,12 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb) * technique is done in this code. */ static int -_aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, +_aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, int type, int oldsigev) { struct proc *p = td->td_proc; struct file *fp; - unsigned int fd; struct socket *so; - int s; - int error; - int opcode; struct aiocblist *aiocbe; struct aiothreadlist *aiop; struct kaioinfo *ki; @@ -1390,12 +1257,17 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, struct kqueue *kq; struct file *kq_fp; struct sockbuf *sb; + int opcode; + int error; + int fd; + int jid; + + ki = p->p_aioinfo; - aiocbe = uma_zalloc(aiocb_zone, M_WAITOK); + aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO); aiocbe->inputcharge = 0; aiocbe->outputcharge = 0; - /* XXX - need a lock */ - knlist_init(&aiocbe->klist, NULL, NULL, NULL, NULL); + knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL); suword(&job->_aiocb_private.status, -1); suword(&job->_aiocb_private.error, 0); @@ -1445,8 +1317,7 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, } if (error) { uma_zfree(aiocb_zone, aiocbe); - if (type == 0) - suword(&job->_aiocb_private.error, EBADF); + suword(&job->_aiocb_private.error, EBADF); return (error); } aiocbe->fd_file = fp; @@ -1455,37 +1326,34 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, error = EINVAL; goto aqueue_fail; } - error = suword(&job->_aiocb_private.kernelinfo, jobrefid); - if (error) { - error = EINVAL; - goto aqueue_fail; - } - aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jobrefid; + + mtx_lock(&aio_job_mtx); + jid = jobrefid; if (jobrefid == LONG_MAX) jobrefid = 1; else jobrefid++; + mtx_unlock(&aio_job_mtx); + + error = suword(&job->_aiocb_private.kernelinfo, jid); + if (error) { + error = EINVAL; + goto aqueue_fail; + } + aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jid; if (opcode == LIO_NOP) { fdrop(fp, td); uma_zfree(aiocb_zone, aiocbe); - if (type == 0) { - suword(&job->_aiocb_private.error, 0); - suword(&job->_aiocb_private.status, 0); - suword(&job->_aiocb_private.kernelinfo, 0); - } return (0); } if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) { - if (type == 0) - suword(&job->_aiocb_private.status, 0); error = EINVAL; goto aqueue_fail; } if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) { kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue; - kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr; } else goto no_kqueue; error = fget(td, (u_int)kev.ident, &kq_fp); @@ -1501,14 +1369,14 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, kev.filter = EVFILT_AIO; kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1; kev.data = (intptr_t)aiocbe; + kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr; error = kqueue_register(kq, &kev, td, 1); fdrop(kq_fp, td); aqueue_fail: if (error) { fdrop(fp, td); uma_zfree(aiocb_zone, aiocbe); - if (type == 0) - suword(&job->_aiocb_private.error, error); + suword(&job->_aiocb_private.error, error); goto done; } no_kqueue: @@ -1519,7 +1387,6 @@ no_kqueue: aiocbe->cred = crhold(td->td_ucred); aiocbe->jobflags = 0; aiocbe->lio = lj; - ki = p->p_aioinfo; if (fp->f_type == DTYPE_SOCKET) { /* @@ -1538,47 +1405,54 @@ no_kqueue: so = fp->f_data; sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd; SOCKBUF_LOCK(sb); - s = splnet(); if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode == LIO_WRITE) && (!sowriteable(so)))) { + mtx_lock(&aio_sock_mtx); TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list); - TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist); + mtx_unlock(&aio_sock_mtx); + sb->sb_flags |= SB_AIO; - aiocbe->jobstate = JOBST_JOBQGLOBAL; /* XXX */ - ki->kaio_queue_count++; - num_queue_count++; + PROC_LOCK(p); + TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); + aiocbe->jobstate = JOBST_JOBQSOCK; + ki->kaio_count++; + if (lj) + lj->lioj_count++; + PROC_UNLOCK(p); SOCKBUF_UNLOCK(sb); - splx(s); + atomic_add_int(&num_queue_count, 1); error = 0; goto done; } SOCKBUF_UNLOCK(sb); - splx(s); } if ((error = aio_qphysio(p, aiocbe)) == 0) goto done; +#if 0 if (error > 0) { - suword(&job->_aiocb_private.status, 0); aiocbe->uaiocb._aiocb_private.error = error; suword(&job->_aiocb_private.error, error); goto done; } - +#endif /* No buffer for daemon I/O. */ aiocbe->bp = NULL; - ki->kaio_queue_count++; + PROC_LOCK(p); + ki->kaio_count++; if (lj) - lj->lioj_queue_count++; - s = splnet(); + lj->lioj_count++; TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); + + mtx_lock(&aio_job_mtx); TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); - splx(s); aiocbe->jobstate = JOBST_JOBQGLOBAL; + PROC_UNLOCK(p); - num_queue_count++; - error = 0; + atomic_add_int(&num_queue_count, 1); /* * If we don't have a free AIO process, and we are below our quota, then @@ -1587,8 +1461,8 @@ no_kqueue: * (thread) due to resource issues, we return an error for now (EAGAIN), * which is likely not the correct thing to do. */ - mtx_lock(&aio_freeproc_mtx); retryproc: + error = 0; if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { TAILQ_REMOVE(&aio_freeproc, aiop, list); aiop->aiothreadflags &= ~AIOP_FREE; @@ -1597,14 +1471,16 @@ retryproc: ((ki->kaio_active_count + num_aio_resv_start) < ki->kaio_maxactive_count)) { num_aio_resv_start++; - mtx_unlock(&aio_freeproc_mtx); - error = aio_newproc(); - mtx_lock(&aio_freeproc_mtx); - num_aio_resv_start--; - if (error) + mtx_unlock(&aio_job_mtx); + error = aio_newproc(&num_aio_resv_start); + mtx_lock(&aio_job_mtx); + if (error) { + num_aio_resv_start--; goto retryproc; + } } - mtx_unlock(&aio_freeproc_mtx); + mtx_unlock(&aio_job_mtx); + done: return (error); } @@ -1625,7 +1501,7 @@ aio_aqueue(struct thread *td, struct aiocb *job, int type, int oldsigev) return (EAGAIN); ki = p->p_aioinfo; - if (ki->kaio_queue_count >= ki->kaio_qallowed_count) + if (ki->kaio_count >= ki->kaio_qallowed_count) return (EAGAIN); return _aio_aqueue(td, job, NULL, type, oldsigev); @@ -1639,44 +1515,25 @@ int aio_return(struct thread *td, struct aio_return_args *uap) { struct proc *p = td->td_proc; - int s; - long jobref; - struct aiocblist *cb, *ncb; - struct aiocb *ujob; + struct aiocblist *cb; + struct aiocb *uaiocb; struct kaioinfo *ki; - - ujob = uap->aiocbp; - jobref = fuword(&ujob->_aiocb_private.kernelinfo); - if (jobref == -1 || jobref == 0) - return (EINVAL); + int status, error; ki = p->p_aioinfo; if (ki == NULL) return (EINVAL); + uaiocb = uap->aiocbp; PROC_LOCK(p); - TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) { - if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) == - jobref) - goto done; - } - - s = splbio(); - /* aio_physwakeup */ - TAILQ_FOREACH_SAFE(cb, &ki->kaio_bufdone, plist, ncb) { - if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) - == jobref) { + TAILQ_FOREACH(cb, &ki->kaio_done, plist) { + if (cb->uuaiocb == uaiocb) break; - } } - splx(s); - done: - PROC_UNLOCK(p); if (cb != NULL) { - if (ujob == cb->uuaiocb) { - td->td_retval[0] = - cb->uaiocb._aiocb_private.status; - } else - td->td_retval[0] = EFAULT; + MPASS(cb->jobstate == JOBST_JOBFINISHED); + status = cb->uaiocb._aiocb_private.status; + error = cb->uaiocb._aiocb_private.error; + td->td_retval[0] = status; if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { p->p_stats->p_ru.ru_oublock += cb->outputcharge; @@ -1686,9 +1543,13 @@ aio_return(struct thread *td, struct aio_return_args *uap) cb->inputcharge = 0; } aio_free_entry(cb); - return (0); - } - return (EINVAL); + suword(&uaiocb->_aiocb_private.error, error); + suword(&uaiocb->_aiocb_private.status, status); + error = 0; + } else + error = EINVAL; + PROC_UNLOCK(p); + return (error); } /* @@ -1702,12 +1563,12 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap) struct timespec ts; struct aiocb *const *cbptr, *cbp; struct kaioinfo *ki; - struct aiocblist *cb; - int i; - int njoblist; - int error, s, timo; - long *ijoblist; + struct aiocblist *cb, *cbfirst; struct aiocb **ujoblist; + int njoblist; + int error; + int timo; + int i; if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX) return (EINVAL); @@ -1732,7 +1593,6 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap) return (EAGAIN); njoblist = 0; - ijoblist = uma_zalloc(aiol_zone, M_WAITOK); ujoblist = uma_zalloc(aiol_zone, M_WAITOK); cbptr = uap->aiocbp; @@ -1741,69 +1601,44 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap) if (cbp == 0) continue; ujoblist[njoblist] = cbp; - ijoblist[njoblist] = fuword(&cbp->_aiocb_private.kernelinfo); njoblist++; } if (njoblist == 0) { - uma_zfree(aiol_zone, ijoblist); uma_zfree(aiol_zone, ujoblist); return (0); } - error = 0; + PROC_LOCK(p); for (;;) { - PROC_LOCK(p); - TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) { - for (i = 0; i < njoblist; i++) { - if (((intptr_t) - cb->uaiocb._aiocb_private.kernelinfo) == - ijoblist[i]) { - PROC_UNLOCK(p); - if (ujoblist[i] != cb->uuaiocb) - error = EINVAL; - uma_zfree(aiol_zone, ijoblist); - uma_zfree(aiol_zone, ujoblist); - return (error); - } - } - } - - s = splbio(); - TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) { + cbfirst = NULL; + error = 0; + TAILQ_FOREACH(cb, &ki->kaio_all, allist) { for (i = 0; i < njoblist; i++) { - if (((intptr_t) - cb->uaiocb._aiocb_private.kernelinfo) == - ijoblist[i]) { - PROC_UNLOCK(p); - splx(s); - if (ujoblist[i] != cb->uuaiocb) - error = EINVAL; - uma_zfree(aiol_zone, ijoblist); - uma_zfree(aiol_zone, ujoblist); - return (error); + if (cb->uuaiocb == ujoblist[i]) { + if (cbfirst == NULL) + cbfirst = cb; + if (cb->jobstate == JOBST_JOBFINISHED) + goto RETURN; } } } + /* All tasks were finished. */ + if (cbfirst == NULL) + break; ki->kaio_flags |= KAIO_WAKEUP; - error = msleep(p, &p->p_mtx, PDROP | PRIBIO | PCATCH, "aiospn", - timo); - splx(s); - - if (error == ERESTART || error == EINTR) { - uma_zfree(aiol_zone, ijoblist); - uma_zfree(aiol_zone, ujoblist); - return (EINTR); - } else if (error == EWOULDBLOCK) { - uma_zfree(aiol_zone, ijoblist); - uma_zfree(aiol_zone, ujoblist); - return (EAGAIN); - } + error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH, + "aiospn", timo); + if (error == ERESTART) + error = EINTR; + if (error) + break; } - -/* NOTREACHED */ - return (EINVAL); +RETURN: + PROC_UNLOCK(p); + uma_zfree(aiol_zone, ujoblist); + return (error); } /* @@ -1818,92 +1653,77 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) struct aiocblist *cbe, *cbn; struct file *fp; struct socket *so; - struct proc *po; - int s,error; - int cancelled=0; - int notcancelled=0; + int error; + int cancelled = 0; + int notcancelled = 0; struct vnode *vp; /* Lookup file object. */ - error = fget(td, (u_int)uap->fd, &fp); + error = fget(td, uap->fd, &fp); if (error) return (error); + ki = p->p_aioinfo; + if (ki == NULL) + goto done; + if (fp->f_type == DTYPE_VNODE) { vp = fp->f_vnode; - - if (vn_isdisk(vp,&error)) { + if (vn_isdisk(vp, &error)) { fdrop(fp, td); td->td_retval[0] = AIO_NOTCANCELED; return (0); } } else if (fp->f_type == DTYPE_SOCKET) { so = fp->f_data; - - s = splnet(); - + mtx_lock(&aio_sock_mtx); TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) { - if ((uap->aiocbp == NULL) || - (uap->aiocbp == cbe->uuaiocb) ) { - po = cbe->userproc; - ki = po->p_aioinfo; + if (cbe->userproc == p && + (uap->aiocbp == NULL || + uap->aiocbp == cbe->uuaiocb)) { TAILQ_REMOVE(&so->so_aiojobq, cbe, list); + PROC_LOCK(p); TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe, plist); - if (ki->kaio_flags & KAIO_WAKEUP) { - wakeup(po); - } - cbe->jobstate = JOBST_JOBFINISHED; - cbe->uaiocb._aiocb_private.status=-1; - cbe->uaiocb._aiocb_private.error=ECANCELED; + cbe->jobstate = JOBST_JOBRUNNING; + cbe->uaiocb._aiocb_private.status = -1; + cbe->uaiocb._aiocb_private.error = ECANCELED; + aio_bio_done_notify(p, cbe, DONE_QUEUE); + PROC_UNLOCK(p); cancelled++; -/* XXX cancelled, knote? */ - if (cbe->uaiocb.aio_sigevent.sigev_notify == - SIGEV_SIGNAL || - cbe->uaiocb.aio_sigevent.sigev_notify == - SIGEV_THREAD_ID) { - PROC_LOCK(cbe->userproc); - aio_sendsig(cbe->userproc, - &cbe->uaiocb.aio_sigevent, - &cbe->ksi); - PROC_UNLOCK(cbe->userproc); - } - if (uap->aiocbp) + if (uap->aiocbp != NULL) break; } } - splx(s); - - if ((cancelled) && (uap->aiocbp)) { + mtx_unlock(&aio_sock_mtx); + if (cancelled && uap->aiocbp != NULL) { fdrop(fp, td); td->td_retval[0] = AIO_CANCELED; return (0); } } - ki = p->p_aioinfo; - if (ki == NULL) - goto done; - s = splnet(); + PROC_LOCK(p); TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) { - if ((uap->fd == cbe->uaiocb.aio_fildes) && - ((uap->aiocbp == NULL ) || + ((uap->aiocbp == NULL) || (uap->aiocbp == cbe->uuaiocb))) { - + mtx_lock(&aio_job_mtx); if (cbe->jobstate == JOBST_JOBQGLOBAL) { TAILQ_REMOVE(&aio_jobs, cbe, list); - cbe->jobstate = JOBST_JOBFINISHED; - cancelled++; + mtx_unlock(&aio_job_mtx); + TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); cbe->uaiocb._aiocb_private.status = -1; cbe->uaiocb._aiocb_private.error = ECANCELED; - aio_bio_done_notify(cbe->userproc, cbe, DONE_QUEUE); + aio_bio_done_notify(p, cbe, DONE_QUEUE); + cancelled++; } else { + mtx_unlock(&aio_job_mtx); notcancelled++; } } } - splx(s); + PROC_UNLOCK(p); + done: fdrop(fp, td); if (notcancelled) { @@ -1928,84 +1748,41 @@ int aio_error(struct thread *td, struct aio_error_args *uap) { struct proc *p = td->td_proc; - int s; struct aiocblist *cb; struct kaioinfo *ki; - long jobref; + int status; ki = p->p_aioinfo; - if (ki == NULL) - return (EINVAL); - - jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo); - if ((jobref == -1) || (jobref == 0)) - return (EINVAL); - - PROC_LOCK(p); - TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { - PROC_UNLOCK(p); - td->td_retval[0] = cb->uaiocb._aiocb_private.error; - return (0); - } - } - - s = splnet(); - - TAILQ_FOREACH(cb, &ki->kaio_jobqueue, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { - PROC_UNLOCK(p); - td->td_retval[0] = EINPROGRESS; - splx(s); - return (0); - } - } - - TAILQ_FOREACH(cb, &ki->kaio_sockqueue, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { - PROC_UNLOCK(p); - td->td_retval[0] = EINPROGRESS; - splx(s); - return (0); - } - } - splx(s); - - s = splbio(); - TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { - PROC_UNLOCK(p); - td->td_retval[0] = cb->uaiocb._aiocb_private.error; - splx(s); - return (0); - } + if (ki == NULL) { + td->td_retval[0] = EINVAL; + return (0); } - TAILQ_FOREACH(cb, &ki->kaio_bufqueue, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { + PROC_LOCK(p); + TAILQ_FOREACH(cb, &ki->kaio_all, allist) { + if (cb->uuaiocb == uap->aiocbp) { + if (cb->jobstate == JOBST_JOBFINISHED) + td->td_retval[0] = + cb->uaiocb._aiocb_private.error; + else + td->td_retval[0] = EINPROGRESS; PROC_UNLOCK(p); - td->td_retval[0] = EINPROGRESS; - splx(s); return (0); } } - splx(s); PROC_UNLOCK(p); -#if (0) /* - * Hack for lio. + * Hack for failure of _aio_aqueue. */ status = fuword(&uap->aiocbp->_aiocb_private.status); - if (status == -1) - return fuword(&uap->aiocbp->_aiocb_private.error); -#endif - return (EINVAL); + if (status == -1) { + td->td_retval[0] = fuword(&uap->aiocbp->_aiocb_private.error); + return (0); + } + + td->td_retval[0] = EINVAL; + return (0); } /* syscall - asynchronous read from a file (REALTIME) */ @@ -2056,15 +1833,14 @@ static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) { struct proc *p = td->td_proc; - int nent, nentqueued; struct aiocb *iocb, * const *cbptr; - struct aiocblist *cb; struct kaioinfo *ki; - struct aio_liojob *lj; + struct aioliojob *lj; struct kevent kev; struct kqueue * kq; struct file *kq_fp; - int error, runningcode; + int nent; + int error; int nerror; int i; @@ -2078,28 +1854,16 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) if (p->p_aioinfo == NULL) aio_init_aioinfo(p); - if ((nent + num_queue_count) > max_queue_count) - return (EAGAIN); - ki = p->p_aioinfo; - if ((nent + ki->kaio_queue_count) > ki->kaio_qallowed_count) - return (EAGAIN); lj = uma_zalloc(aiolio_zone, M_WAITOK); - if (!lj) - return (EAGAIN); - lj->lioj_flags = 0; - lj->lioj_buffer_count = 0; - lj->lioj_buffer_finished_count = 0; - lj->lioj_queue_count = 0; - lj->lioj_queue_finished_count = 0; - lj->lioj_total_count = nent; - knlist_init(&lj->klist, NULL, NULL, NULL, NULL); + lj->lioj_count = 0; + lj->lioj_finished_count = 0; + lj->lioj_ref_count = 0; + knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL); ksiginfo_init(&lj->lioj_ksi); - kev.ident = 0; - /* * Setup signal. */ @@ -2115,10 +1879,8 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { /* Assume only new style KEVENT */ - kev.ident = lj->lioj_signal.sigev_notify_kqueue; - kev.udata = lj->lioj_signal.sigev_value.sival_ptr; - - error = fget(td, (u_int)kev.ident, &kq_fp); + error = fget(td, lj->lioj_signal.sigev_notify_kqueue, + &kq_fp); if (error) { uma_zfree(aiolio_zone, lj); return (error); @@ -2133,160 +1895,137 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1; kev.ident = (uintptr_t)lj; /* something unique */ kev.data = (intptr_t)lj; + /* pass user defined sigval data */ + kev.udata = lj->lioj_signal.sigev_value.sival_ptr; error = kqueue_register(kq, &kev, td, 1); fdrop(kq_fp, td); if (error) { uma_zfree(aiolio_zone, lj); return (error); } - } else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) { + } else if (lj->lioj_signal.sigev_notify == SIGEV_NONE) { + ; + } else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) { uma_zfree(aiolio_zone, lj); return EINVAL; } else { lj->lioj_flags |= LIOJ_SIGNAL; - lj->lioj_flags &= ~LIOJ_SIGNAL_POSTED; } - } else - lj->lioj_flags &= ~LIOJ_SIGNAL; + } + PROC_LOCK(p); TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list); /* + * Add extra aiocb count to avoid the lio to be freed + * by other threads doing aio_waitcomplete or aio_return, + * and prevent event from being sent until we have queued + * all tasks. + */ + lj->lioj_count = 1; + PROC_UNLOCK(p); + + /* * Get pointers to the list of I/O requests. */ nerror = 0; - nentqueued = 0; cbptr = uap->acb_list; for (i = 0; i < uap->nent; i++) { iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]); if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) { error = _aio_aqueue(td, iocb, lj, 0, oldsigev); - if (error == 0) - nentqueued++; - else + if (error != 0) nerror++; } } - /* - * If we haven't queued any, then just return error. - */ - if (nentqueued == 0) - return (0); - - /* - * Calculate the appropriate error return. - */ - runningcode = 0; - if (nerror) - runningcode = EIO; - + error = 0; + PROC_LOCK(p); if (uap->mode == LIO_WAIT) { - int command, found; - long jobref; - - for (;;) { - found = 0; - for (i = 0; i < uap->nent; i++) { - /* - * Fetch address of the control buf pointer in - * user space. - */ - iocb = (struct aiocb *) - (intptr_t)fuword(&cbptr[i]); - if (((intptr_t)iocb == -1) || ((intptr_t)iocb - == 0)) - continue; - - /* - * Fetch the associated command from user space. - */ - command = fuword(&iocb->aio_lio_opcode); - if (command == LIO_NOP) { - found++; - continue; - } - - jobref = - fuword(&iocb->_aiocb_private.kernelinfo); - - TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) - == jobref) { - if (cb->uaiocb.aio_lio_opcode - == LIO_WRITE) { - p->p_stats->p_ru.ru_oublock - += - cb->outputcharge; - cb->outputcharge = 0; - } else if (cb->uaiocb.aio_lio_opcode - == LIO_READ) { - p->p_stats->p_ru.ru_inblock - += cb->inputcharge; - cb->inputcharge = 0; - } - found++; - break; - } - } - - TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) { - if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) - == jobref) { - found++; - break; - } - } - } - - /* - * If all I/Os have been disposed of, then we can - * return. - */ - if (found == nentqueued) - return (runningcode); - + while (lj->lioj_count - 1 != lj->lioj_finished_count) { ki->kaio_flags |= KAIO_WAKEUP; - error = tsleep(p, PRIBIO | PCATCH, "aiospn", 0); - - if (error == EINTR) - return (EINTR); - else if (error == EWOULDBLOCK) - return (EAGAIN); + error = msleep(&p->p_aioinfo, &p->p_mtx, + PRIBIO | PCATCH, "aiospn", 0); + if (error == ERESTART) + error = EINTR; + if (error) + break; + } + } else { + if (lj->lioj_count - 1 == lj->lioj_finished_count) { + if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { + lj->lioj_flags |= LIOJ_KEVENT_POSTED; + KNOTE_LOCKED(&lj->klist, 1); + } + if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) + == LIOJ_SIGNAL + && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || + lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) { + aio_sendsig(p, &lj->lioj_signal, + &lj->lioj_ksi); + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; + } } } + lj->lioj_count--; + if (lj->lioj_count == 0) { + TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); + knlist_delete(&lj->klist, curthread, 1); + sigqueue_take(&lj->lioj_ksi); + PROC_UNLOCK(p); + uma_zfree(aiolio_zone, lj); + } else + PROC_UNLOCK(p); - return (runningcode); + if (nerror) + return (EIO); + return (error); } /* - * Interrupt handler for physio, performs the necessary process wakeups, and - * signals. + * Called from interrupt thread for physio, we should return as fast + * as possible, so we schedule a biohelper task. */ static void aio_physwakeup(struct buf *bp) { struct aiocblist *aiocbe; - struct proc *userp; - - mtx_lock(&Giant); - bp->b_flags |= B_DONE; - wakeup(bp); aiocbe = (struct aiocblist *)bp->b_caller1; - if (aiocbe) { - userp = aiocbe->userproc; + taskqueue_enqueue(taskqueue_aiod_bio, &aiocbe->biotask); +} - aiocbe->jobstate = JOBST_JOBBFINISHED; - aiocbe->uaiocb._aiocb_private.status -= bp->b_resid; - aiocbe->uaiocb._aiocb_private.error = 0; - aiocbe->jobflags |= AIOCBLIST_DONE; +/* + * Task routine to perform heavy tasks, process wakeup, and signals. + */ +static void +biohelper(void *context, int pending) +{ + struct aiocblist *aiocbe = context; + struct buf *bp; + struct proc *userp; + int nblks; - if (bp->b_ioflags & BIO_ERROR) - aiocbe->uaiocb._aiocb_private.error = bp->b_error; + bp = aiocbe->bp; + userp = aiocbe->userproc; + PROC_LOCK(userp); + aiocbe->uaiocb._aiocb_private.status -= bp->b_resid; + aiocbe->uaiocb._aiocb_private.error = 0; + if (bp->b_ioflags & BIO_ERROR) + aiocbe->uaiocb._aiocb_private.error = bp->b_error; + nblks = btodb(aiocbe->uaiocb.aio_nbytes); + if (aiocbe->uaiocb.aio_lio_opcode == LIO_WRITE) + aiocbe->outputcharge += nblks; + else + aiocbe->inputcharge += nblks; + aiocbe->bp = NULL; + TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist); + aio_bio_done_notify(userp, aiocbe, DONE_BUF); + PROC_UNLOCK(userp); - aio_bio_done_notify(userp, aiocbe, DONE_BUF); - } - mtx_unlock(&Giant); + /* Release mapping into kernel space. */ + vunmapbuf(bp); + relpbuf(bp, NULL); + atomic_subtract_int(&num_buf_aio, 1); } /* syscall - wait for the next completion of an aio request */ @@ -2297,10 +2036,11 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap) struct timeval atv; struct timespec ts; struct kaioinfo *ki; - struct aiocblist *cb = NULL; - int error, s, timo; + struct aiocblist *cb; + struct aiocb *uuaiocb; + int error, status, timo; - suword(uap->aiocbp, (int)NULL); + suword(uap->aiocbp, (long)NULL); timo = 0; if (uap->timeout) { @@ -2322,50 +2062,41 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap) aio_init_aioinfo(p); ki = p->p_aioinfo; - for (;;) { - PROC_LOCK(p); - if ((cb = TAILQ_FIRST(&ki->kaio_jobdone)) != 0) { - PROC_UNLOCK(p); - suword(uap->aiocbp, (uintptr_t)cb->uuaiocb); - td->td_retval[0] = cb->uaiocb._aiocb_private.status; - if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { - p->p_stats->p_ru.ru_oublock += - cb->outputcharge; - cb->outputcharge = 0; - } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { - p->p_stats->p_ru.ru_inblock += cb->inputcharge; - cb->inputcharge = 0; - } - error = cb->uaiocb._aiocb_private.error; - aio_free_entry(cb); - return (error); - } - - s = splbio(); - if ((cb = TAILQ_FIRST(&ki->kaio_bufdone)) != 0 ) { - PROC_UNLOCK(p); - splx(s); - suword(uap->aiocbp, (uintptr_t)cb->uuaiocb); - error = cb->uaiocb._aiocb_private.error; - td->td_retval[0] = cb->uaiocb._aiocb_private.status; - aio_free_entry(cb); - return (error); - } - + error = 0; + cb = NULL; + PROC_LOCK(p); + while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) { ki->kaio_flags |= KAIO_WAKEUP; - error = msleep(p, &p->p_mtx, PDROP | PRIBIO | PCATCH, "aiowc", - timo); - splx(s); - + error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH, + "aiowc", timo); if (error == ERESTART) - return (EINTR); - else if (error < 0) - return (error); - else if (error == EINTR) - return (EINTR); - else if (error == EWOULDBLOCK) - return (EAGAIN); + error = EINTR; + if (error) + break; } + + if (cb != NULL) { + MPASS(cb->jobstate == JOBST_JOBFINISHED); + uuaiocb = cb->uuaiocb; + status = cb->uaiocb._aiocb_private.status; + error = cb->uaiocb._aiocb_private.error; + td->td_retval[0] = status; + if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { + p->p_stats->p_ru.ru_oublock += cb->outputcharge; + cb->outputcharge = 0; + } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { + p->p_stats->p_ru.ru_inblock += cb->inputcharge; + cb->inputcharge = 0; + } + aio_free_entry(cb); + PROC_UNLOCK(p); + suword(uap->aiocbp, (long)uuaiocb); + suword(&uuaiocb->_aiocb_private.error, error); + suword(&uuaiocb->_aiocb_private.status, status); + } else + PROC_UNLOCK(p); + + return (error); } /* kqueue attach function */ @@ -2406,8 +2137,7 @@ filt_aio(struct knote *kn, long hint) struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata; kn->kn_data = aiocbe->uaiocb._aiocb_private.error; - if (aiocbe->jobstate != JOBST_JOBFINISHED && - aiocbe->jobstate != JOBST_JOBBFINISHED) + if (aiocbe->jobstate != JOBST_JOBFINISHED) return (0); kn->kn_flags |= EV_EOF; return (1); @@ -2417,10 +2147,10 @@ filt_aio(struct knote *kn, long hint) static int filt_lioattach(struct knote *kn) { - struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; /* - * The aio_liojob pointer must be validated before using it, so + * The aioliojob pointer must be validated before using it, so * registration is restricted to the kernel; the user cannot * set EV_FLAG1. */ @@ -2437,7 +2167,7 @@ filt_lioattach(struct knote *kn) static void filt_liodetach(struct knote *kn) { - struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; if (!knlist_empty(&lj->klist)) knlist_remove(&lj->klist, kn, 0); @@ -2448,6 +2178,7 @@ filt_liodetach(struct knote *kn) static int filt_lio(struct knote *kn, long hint) { - struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; + return (lj->lioj_flags & LIOJ_KEVENT_POSTED); } |