From e3a46811b797130bb11edec29755641528791d6b Mon Sep 17 00:00:00 2001 From: ambrisko Date: Wed, 12 Oct 2005 17:51:31 +0000 Subject: Add in kqueue support to LIO event notification and fix how it handled notifications when LIO operations completed. These were the problems with LIO event complete notification: - Move all LIO/AIO event notification into one general function so we don't have bugs in different data paths. This unification got rid of several notification bugs one of which if kqueue was used a SIGILL could get sent to the process. - Change the LIO event accounting to count all AIO request that could have been split across the fast path and daemon mode. The prior accounting only kept track of AIO op's in that mode and not the entire list of operations. This could cause a bogus LIO event complete notification to occur when all of the fast path AIO op's completed and not the AIO op's that ended up queued for the daemon. Suggestions from: alc --- sys/kern/kern_event.c | 8 +- sys/kern/vfs_aio.c | 319 +++++++++++++++++++++++++++++++------------------- sys/sys/event.h | 4 +- 3 files changed, 206 insertions(+), 125 deletions(-) diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 3aa4550..3419101 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -247,6 +247,7 @@ static struct { { &timer_filtops }, /* EVFILT_TIMER */ { &file_filtops }, /* EVFILT_NETDEV */ { &fs_filtops }, /* EVFILT_FS */ + { &null_filtops }, /* EVFILT_LIO */ }; /* @@ -633,6 +634,8 @@ kern_kevent(struct thread *td, int fd, int nchanges, int nevents, changes = keva; for (i = 0; i < n; i++) { kevp = &changes[i]; + if (!kevp->filter) + continue; kevp->flags &= ~EV_SYSFLAGS; error = kqueue_register(kq, kevp, td, 1); if (error) { @@ -1828,7 +1831,7 @@ knote_attach(struct knote *kn, struct kqueue *kq) } /* - * knote must already have been detatched using the f_detach method. + * knote must already have been detached using the f_detach method. * no lock need to be held, it is assumed that the KN_INFLUX flag is set * to prevent other removal. */ @@ -1850,7 +1853,8 @@ knote_drop(struct knote *kn, struct thread *td) else list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)]; - SLIST_REMOVE(list, kn, knote, kn_link); + if (!SLIST_EMPTY(list)) + SLIST_REMOVE(list, kn, knote, kn_link); if (kn->kn_status & KN_QUEUED) knote_dequeue(kn); KQ_UNLOCK_FLUX(kq); diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index ac4877f..b8b798b 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -211,11 +211,14 @@ struct aio_liojob { int lioj_buffer_finished_count; int lioj_queue_count; int lioj_queue_finished_count; + int lioj_total_count; struct sigevent lioj_signal; /* signal on all I/O done */ TAILQ_ENTRY(aio_liojob) lioj_list; + struct knlist klist; /* list of knotes */ }; #define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */ #define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */ +#define LIOJ_KEVENT_POSTED 0x4 /* kevent triggered */ /* * per process aio data structure @@ -262,6 +265,12 @@ static int aio_unload(void); static int filt_aioattach(struct knote *kn); static void filt_aiodetach(struct knote *kn); static int filt_aio(struct knote *kn, long hint); +static int filt_lioattach(struct knote *kn); +static void filt_liodetach(struct knote *kn); +static int filt_lio(struct knote *kn, long hint); +#define DONE_BUF 1 +#define DONE_QUEUE 2 +static void aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type); /* * Zones for: @@ -276,6 +285,8 @@ static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone; /* kqueue filters for aio */ static struct filterops aio_filtops = { 0, filt_aioattach, filt_aiodetach, filt_aio }; +static struct filterops lio_filtops = + { 0, filt_lioattach, filt_liodetach, filt_lio }; static eventhandler_tag exit_tag, exec_tag; @@ -336,6 +347,7 @@ aio_onceonly(void) exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL, EVENTHANDLER_PRI_ANY); kqueue_add_filteropts(EVFILT_AIO, &aio_filtops); + kqueue_add_filteropts(EVFILT_LIO, &lio_filtops); TAILQ_INIT(&aio_freeproc); mtx_init(&aio_freeproc_mtx, "aio_freeproc", NULL, MTX_DEF); TAILQ_INIT(&aio_jobs); @@ -486,6 +498,8 @@ aio_free_entry(struct aiocblist *aiocbe) * OWNING thread? (or maybe the running thread?) * There is a semantic problem here... */ + if (lj) + knlist_delete(&lj->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */ knlist_delete(&aiocbe->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */ if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN) @@ -757,6 +771,67 @@ aio_process(struct aiocblist *aiocbe) td->td_ucred = td_savedcred; } +static void +aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type){ + int lj_done; + struct aio_liojob *lj; + struct kaioinfo *ki; + + ki = userp->p_aioinfo; + lj = aiocbe->lio; + lj_done = 0; + if (lj) { + if (type == DONE_QUEUE) + lj->lioj_queue_finished_count++; + else + lj->lioj_buffer_finished_count++; + if (lj->lioj_queue_finished_count + + lj->lioj_buffer_finished_count == + lj->lioj_total_count) + lj_done = 1; + } + + if (ki) { + if (type == DONE_QUEUE) { + ki->kaio_queue_finished_count++; + TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist); + } else { + ki->kaio_buffer_finished_count++; + TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist); + } + if (lj_done) { + if (!knlist_empty(&lj->klist) + && lj->lioj_signal.sigev_notify == + SIGEV_KEVENT) { + lj->lioj_flags |= LIOJ_KEVENT_POSTED; + KNOTE_UNLOCKED(&lj->klist, 0); + } + if ((lj->lioj_flags & + (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) + == LIOJ_SIGNAL + && lj->lioj_signal.sigev_notify == SIGEV_SIGNAL) { + PROC_LOCK(userp); + psignal(userp, lj->lioj_signal.sigev_signo); + PROC_UNLOCK(userp); + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; + } + } + KNOTE_UNLOCKED(&aiocbe->klist, 0); + + if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { + ki->kaio_flags &= ~KAIO_WAKEUP; + wakeup(userp); + } + } + + if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) { + PROC_LOCK(userp); + psignal(userp, aiocbe->uaiocb.aio_sigevent.sigev_signo); + PROC_UNLOCK(userp); + } +} /* * The AIO daemon, most of the actual work is done in aio_process, * but the setup (and address space mgmt) is done in this routine. @@ -765,7 +840,6 @@ static void aio_daemon(void *uproc) { int s; - struct aio_liojob *lj; struct aiocb *cb; struct aiocblist *aiocbe; struct aiothreadlist *aiop; @@ -883,7 +957,6 @@ aio_daemon(void *uproc) } ki = userp->p_aioinfo; - lj = aiocbe->lio; /* Account for currently active jobs. */ ki->kaio_active_count++; @@ -891,57 +964,17 @@ aio_daemon(void *uproc) /* Do the I/O function. */ aio_process(aiocbe); + s = splbio(); /* Decrement the active job count. */ ki->kaio_active_count--; - /* - * Increment the completion count for wakeup/signal - * comparisons. - */ aiocbe->jobflags |= AIOCBLIST_DONE; - ki->kaio_queue_finished_count++; - if (lj) - lj->lioj_queue_finished_count++; - if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags - & KAIO_RUNDOWN) && (ki->kaio_active_count == 0))) { - ki->kaio_flags &= ~KAIO_WAKEUP; - wakeup(userp); - } - - s = splbio(); - if (lj && (lj->lioj_flags & - (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == LIOJ_SIGNAL) { - if ((lj->lioj_queue_finished_count == - lj->lioj_queue_count) && - (lj->lioj_buffer_finished_count == - lj->lioj_buffer_count)) { - PROC_LOCK(userp); - psignal(userp, - lj->lioj_signal.sigev_signo); - PROC_UNLOCK(userp); - lj->lioj_flags |= LIOJ_SIGNAL_POSTED; - } - } - splx(s); - aiocbe->jobstate = JOBST_JOBFINISHED; - - s = splnet(); - TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist); - splx(s); - KNOTE_UNLOCKED(&aiocbe->klist, 0); - + aio_bio_done_notify(userp, aiocbe, DONE_QUEUE); if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) { wakeup(aiocbe); aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN; } - - if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { - PROC_LOCK(userp); - psignal(userp, cb->aio_sigevent.sigev_signo); - PROC_UNLOCK(userp); - } } /* @@ -1049,7 +1082,7 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) struct vnode *vp; struct kaioinfo *ki; struct aio_liojob *lj; - int s; + int s, lj_done = 0; int notify; cb = &aiocbe->uaiocb; @@ -1075,6 +1108,9 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) if (cb->aio_nbytes % vp->v_bufobj.bo_bsize) return (-1); + if (cb->aio_nbytes > vp->v_rdev->si_iosize_max) + return (-1); + if (cb->aio_nbytes > MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK)) return (-1); @@ -1148,6 +1184,14 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) aiocbe->uaiocb._aiocb_private.error = bp->b_error; suword(&job->_aiocb_private.error, bp->b_error); + if (lj) { + lj->lioj_buffer_finished_count++; + if (lj->lioj_queue_finished_count + + lj->lioj_buffer_finished_count == + lj->lioj_total_count) + lj_done = 1; + } + ki->kaio_buffer_finished_count++; if (aiocbe->jobstate != JOBST_JOBBFINISHED) { @@ -1159,8 +1203,19 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe) } } splx(s); - if (notify) + if (notify) { + if (lj && !knlist_empty(&lj->klist)) { + lj->lioj_flags |= LIOJ_KEVENT_POSTED; + KNOTE_UNLOCKED(&lj->klist, 0); + } KNOTE_UNLOCKED(&aiocbe->klist, 0); + + } + if (cb->aio_lio_opcode == LIO_WRITE) { + aiocbe->outputcharge += btodb(cb->aio_nbytes); + } else if (cb->aio_lio_opcode == LIO_READ) { + aiocbe->inputcharge += btodb(cb->aio_nbytes); + } return (0); doerror: @@ -1546,19 +1601,12 @@ aio_return(struct thread *td, struct aio_return_args *uap) PROC_LOCK(p); TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) { if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) == - jobref) { - if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { - p->p_stats->p_ru.ru_oublock += - cb->outputcharge; - cb->outputcharge = 0; - } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { - p->p_stats->p_ru.ru_inblock += cb->inputcharge; - cb->inputcharge = 0; - } + jobref) goto done; - } } + s = splbio(); + /* aio_physwakeup */ for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = ncb) { ncb = TAILQ_NEXT(cb, plist); if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) @@ -1575,6 +1623,14 @@ aio_return(struct thread *td, struct aio_return_args *uap) cb->uaiocb._aiocb_private.status; } else td->td_retval[0] = EFAULT; + if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { + p->p_stats->p_ru.ru_oublock += + cb->outputcharge; + cb->outputcharge = 0; + } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { + p->p_stats->p_ru.ru_inblock += cb->inputcharge; + cb->inputcharge = 0; + } aio_free_entry(cb); return (0); } @@ -1781,21 +1837,11 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) if (cbe->jobstate == JOBST_JOBQGLOBAL) { TAILQ_REMOVE(&aio_jobs, cbe, list); - TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe, - plist); - cancelled++; - ki->kaio_queue_finished_count++; cbe->jobstate = JOBST_JOBFINISHED; + cancelled++; cbe->uaiocb._aiocb_private.status = -1; cbe->uaiocb._aiocb_private.error = ECANCELED; -/* XXX cancelled, knote? */ - if (cbe->uaiocb.aio_sigevent.sigev_notify == - SIGEV_SIGNAL) { - PROC_LOCK(cbe->userproc); - psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo); - PROC_UNLOCK(cbe->userproc); - } + aio_bio_done_notify(cbe->userproc, cbe, DONE_QUEUE); } else { notcancelled++; } @@ -1935,6 +1981,9 @@ lio_listio(struct thread *td, struct lio_listio_args *uap) struct aiocblist *cb; struct kaioinfo *ki; struct aio_liojob *lj; + struct kevent kev; + struct kqueue * kq; + struct file *kq_fp; int error, runningcode; int nerror; int i; @@ -1966,6 +2015,10 @@ lio_listio(struct thread *td, struct lio_listio_args *uap) lj->lioj_buffer_finished_count = 0; lj->lioj_queue_count = 0; lj->lioj_queue_finished_count = 0; + lj->lioj_total_count = nent; + knlist_init(&lj->klist, NULL, NULL, NULL, NULL); + + kev.ident = 0; /* * Setup signal. @@ -1977,12 +2030,41 @@ lio_listio(struct thread *td, struct lio_listio_args *uap) uma_zfree(aiolio_zone, lj); return (error); } - if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) { + + if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { + /* Assume only new style KEVENT */ + kev.ident = lj->lioj_signal.sigev_notify_kqueue; + kev.udata = lj->lioj_signal.sigev_value.sigval_ptr; + + if ((u_int)kev.ident >= p->p_fd->fd_nfiles || + (kq_fp = p->p_fd->fd_ofiles[kev.ident]) == NULL || + (kq_fp->f_type != DTYPE_KQUEUE)) { + uma_zfree(aiolio_zone, lj); + splx(s); + return (EBADF); + } + kq = (struct kqueue *)kq_fp->f_data; + kev.filter = EVFILT_LIO; + kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1; + kev.ident = (uintptr_t)lj; /* something unique */ + kev.data = (intptr_t)lj; + error = kqueue_register(kq, &kev, td, 1); + if (error) { + uma_zfree(aiolio_zone, lj); + splx(s); + return (error); + } + } else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) { uma_zfree(aiolio_zone, lj); - return (EINVAL); + splx(s); + return EINVAL; + } else { + lj->lioj_flags |= LIOJ_SIGNAL; + lj->lioj_flags &= ~LIOJ_SIGNAL_POSTED; } - lj->lioj_flags |= LIOJ_SIGNAL; - } + } else + lj->lioj_flags &= ~LIOJ_SIGNAL; + TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list); /* * Get pointers to the list of I/O requests. @@ -2102,9 +2184,7 @@ static void aio_physwakeup(struct buf *bp) { struct aiocblist *aiocbe; - struct proc *p; - struct kaioinfo *ki; - struct aio_liojob *lj; + struct proc *userp; mtx_lock(&Giant); bp->b_flags |= B_DONE; @@ -2112,7 +2192,7 @@ aio_physwakeup(struct buf *bp) aiocbe = (struct aiocblist *)bp->b_caller1; if (aiocbe) { - p = aiocbe->userproc; + userp = aiocbe->userproc; aiocbe->jobstate = JOBST_JOBBFINISHED; aiocbe->uaiocb._aiocb_private.status -= bp->b_resid; @@ -2122,50 +2202,7 @@ aio_physwakeup(struct buf *bp) if (bp->b_ioflags & BIO_ERROR) aiocbe->uaiocb._aiocb_private.error = bp->b_error; - lj = aiocbe->lio; - if (lj) { - lj->lioj_buffer_finished_count++; - - /* - * wakeup/signal if all of the interrupt jobs are done. - */ - if (lj->lioj_buffer_finished_count == - lj->lioj_buffer_count && - lj->lioj_queue_finished_count == - lj->lioj_queue_count) { - /* - * Post a signal if it is called for. - */ - if ((lj->lioj_flags & - (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == - LIOJ_SIGNAL) { - PROC_LOCK(p); - psignal(p, lj->lioj_signal.sigev_signo); - PROC_UNLOCK(p); - lj->lioj_flags |= LIOJ_SIGNAL_POSTED; - } - } - } - - ki = p->p_aioinfo; - if (ki) { - ki->kaio_buffer_finished_count++; - TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); - TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist); - - KNOTE_UNLOCKED(&aiocbe->klist, 0); - /* Do the wakeup. */ - if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { - ki->kaio_flags &= ~KAIO_WAKEUP; - wakeup(p); - } - } - - if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) { - PROC_LOCK(p); - psignal(p, aiocbe->uaiocb.aio_sigevent.sigev_signo); - PROC_UNLOCK(p); - } + aio_bio_done_notify(userp, aiocbe, DONE_BUF); } mtx_unlock(&Giant); } @@ -2275,7 +2312,8 @@ filt_aiodetach(struct knote *kn) { struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata; - knlist_remove(&aiocbe->klist, kn, 0); + if (!knlist_empty(&aiocbe->klist)) + knlist_remove(&aiocbe->klist, kn, 0); } /* kqueue filter function */ @@ -2292,3 +2330,42 @@ filt_aio(struct knote *kn, long hint) kn->kn_flags |= EV_EOF; return (1); } + +/* kqueue attach function */ +static int +filt_lioattach(struct knote *kn) +{ + struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + + /* + * The aio_liojob pointer must be validated before using it, so + * registration is restricted to the kernel; the user cannot + * set EV_FLAG1. + */ + if ((kn->kn_flags & EV_FLAG1) == 0) + return (EPERM); + kn->kn_flags &= ~EV_FLAG1; + + knlist_add(&lj->klist, kn, 0); + + return (0); +} + +/* kqueue detach function */ +static void +filt_liodetach(struct knote *kn) +{ + struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + + if (!knlist_empty(&lj->klist)) + knlist_remove(&lj->klist, kn, 0); +} + +/* kqueue filter function */ +/*ARGSUSED*/ +static int +filt_lio(struct knote *kn, long hint) +{ + struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata; + return (lj->lioj_flags & LIOJ_KEVENT_POSTED); +} diff --git a/sys/sys/event.h b/sys/sys/event.h index c2921c4..ed4a613 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -38,8 +38,8 @@ #define EVFILT_TIMER (-7) /* timers */ #define EVFILT_NETDEV (-8) /* network devices */ #define EVFILT_FS (-9) /* filesystem events */ - -#define EVFILT_SYSCOUNT 9 +#define EVFILT_LIO (-10) /* timers */ +#define EVFILT_SYSCOUNT 10 #define EV_SET(kevp_, a, b, c, d, e, f) do { \ struct kevent *kevp = (kevp_); \ -- cgit v1.1