diff options
author | davidxu <davidxu@FreeBSD.org> | 2006-03-23 08:46:42 +0000 |
---|---|---|
committer | davidxu <davidxu@FreeBSD.org> | 2006-03-23 08:46:42 +0000 |
commit | fa4b9b6f236907499be7c18cd878eaa9a27a1ff6 (patch) | |
tree | d535c12cd2fa60a2405a7f10fcac2121d2326a92 /sys/kern/vfs_aio.c | |
parent | fca73f29089c2ca833ec6d519d336b86a89a2440 (diff) | |
download | FreeBSD-src-fa4b9b6f236907499be7c18cd878eaa9a27a1ff6.zip FreeBSD-src-fa4b9b6f236907499be7c18cd878eaa9a27a1ff6.tar.gz |
Implement aio_fsync() syscall.
Diffstat (limited to 'sys/kern/vfs_aio.c')
-rw-r--r-- | sys/kern/vfs_aio.c | 322 |
1 files changed, 244 insertions, 78 deletions
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index f100175..8097744 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -53,6 +53,7 @@ __FBSDID("$FreeBSD$"); #include <sys/vnode.h> #include <sys/conf.h> #include <sys/event.h> +#include <sys/mount.h> #include <machine/atomic.h> @@ -61,6 +62,7 @@ __FBSDID("$FreeBSD$"); #include <vm/vm_extern.h> #include <vm/pmap.h> #include <vm/vm_map.h> +#include <vm/vm_object.h> #include <vm/uma.h> #include <sys/aio.h> @@ -68,16 +70,22 @@ __FBSDID("$FreeBSD$"); /* * Counter for allocating reference ids to new jobs. Wrapped to 1 on - * overflow. + * overflow. (XXX will be removed soon.) */ -static long jobrefid; +static u_long jobrefid; -#define JOBST_NULL 0x0 -#define JOBST_JOBQSOCK 0x1 -#define JOBST_JOBQGLOBAL 0x2 -#define JOBST_JOBRUNNING 0x3 -#define JOBST_JOBFINISHED 0x4 -#define JOBST_JOBQBUF 0x5 +/* + * Counter for aio_fsync. + */ +static uint64_t jobseqno; + +#define JOBST_NULL 0 +#define JOBST_JOBQSOCK 1 +#define JOBST_JOBQGLOBAL 2 +#define JOBST_JOBRUNNING 3 +#define JOBST_JOBFINISHED 4 +#define JOBST_JOBQBUF 5 +#define JOBST_JOBQSYNC 6 #ifndef MAX_AIO_PER_PROC #define MAX_AIO_PER_PROC 32 @@ -227,13 +235,16 @@ struct aiocblist { struct knlist klist; /* (a) list of knotes */ struct aiocb uaiocb; /* (*) kernel I/O control block */ ksiginfo_t ksi; /* (a) realtime signal info */ - struct task biotask; /* (*) private to BIO backend */ + struct task biotask; /* (*) private to BIO backend */ + uint64_t seqno; /* (*) job number */ + int pending; /* (a) number of pending I/O, aio_fsync only */ }; /* jobflags */ +#define AIOCBLIST_DONE 0x01 +#define AIOCBLIST_BUFDONE 0x02 #define AIOCBLIST_RUNDOWN 0x04 -#define AIOCBLIST_DONE 0x10 -#define AIOCBLIST_BUFDONE 0x20 +#define AIOCBLIST_CHECKSYNC 0x08 /* * AIO process info @@ -280,8 +291,10 @@ struct kaioinfo { TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* (a) job queue for process */ TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* (a) buffer job queue for process */ TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* (a) queue for aios waiting on sockets, - * not used yet. + * NOT USED YET. */ + TAILQ_HEAD(,aiocblist) kaio_syncqueue; /* (a) queue for aio_fsync */ + struct task kaio_task; /* (*) task to kick aio threads */ }; #define KAIO_RUNDOWN 0x1 /* process is being run down */ @@ -300,7 +313,7 @@ static int aio_free_entry(struct aiocblist *aiocbe); static void aio_process(struct aiocblist *aiocbe); static int aio_newproc(int *); static int aio_aqueue(struct thread *td, struct aiocb *job, - struct aioliojob *lio, int type, int osigev); + struct aioliojob *lio, int type, int osigev, uint64_t *jseqno); static void aio_physwakeup(struct buf *bp); static void aio_proc_rundown(void *arg, struct proc *p); static int aio_qphysio(struct proc *p, struct aiocblist *iocb); @@ -308,17 +321,19 @@ static void biohelper(void *, int); static void aio_daemon(void *param); static void aio_swake_cb(struct socket *, struct sockbuf *); static int aio_unload(void); +static void aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type); +#define DONE_BUF 1 +#define DONE_QUEUE 2 +static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev); +static void aio_kick(struct proc *userp); +static void aio_kick_nowait(struct proc *userp); +static void aio_kick_helper(void *context, int pending); static int filt_aioattach(struct knote *kn); static void filt_aiodetach(struct knote *kn); static int filt_aio(struct knote *kn, long hint); static int filt_lioattach(struct knote *kn); static void filt_liodetach(struct knote *kn); static int filt_lio(struct knote *kn, long hint); -#define DONE_BUF 1 -#define DONE_QUEUE 2 -static void aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type); -static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, - int oldsigev); /* * Zones for: @@ -370,13 +385,14 @@ static moduledata_t aio_mod = { NULL }; -SYSCALL_MODULE_HELPER(aio_return); -SYSCALL_MODULE_HELPER(aio_suspend); SYSCALL_MODULE_HELPER(aio_cancel); SYSCALL_MODULE_HELPER(aio_error); +SYSCALL_MODULE_HELPER(aio_fsync); SYSCALL_MODULE_HELPER(aio_read); -SYSCALL_MODULE_HELPER(aio_write); +SYSCALL_MODULE_HELPER(aio_return); +SYSCALL_MODULE_HELPER(aio_suspend); SYSCALL_MODULE_HELPER(aio_waitcomplete); +SYSCALL_MODULE_HELPER(aio_write); SYSCALL_MODULE_HELPER(lio_listio); SYSCALL_MODULE_HELPER(oaio_read); SYSCALL_MODULE_HELPER(oaio_write); @@ -494,6 +510,8 @@ aio_init_aioinfo(struct proc *p) TAILQ_INIT(&ki->kaio_bufqueue); TAILQ_INIT(&ki->kaio_liojoblist); TAILQ_INIT(&ki->kaio_sockqueue); + TAILQ_INIT(&ki->kaio_syncqueue); + TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p); PROC_LOCK(p); if (p->p_aioinfo == NULL) { p->p_aioinfo = ki; @@ -566,7 +584,7 @@ aio_free_entry(struct aiocblist *aiocbe) knlist_delete(&aiocbe->klist, curthread, 1); sigqueue_take(&aiocbe->ksi); - MPASS(aiocbe->bp == NULL); + MPASS(aiocbe->bp == NULL); aiocbe->jobstate = JOBST_NULL; PROC_UNLOCK(p); @@ -638,6 +656,9 @@ restart: so = fp->f_data; TAILQ_REMOVE(&so->so_aiojobq, cbe, list); remove = 1; + } else if (cbe->jobstate == JOBST_JOBQSYNC) { + TAILQ_REMOVE(&ki->kaio_syncqueue, cbe, list); + remove = 1; } mtx_unlock(&aio_job_mtx); @@ -673,10 +694,10 @@ restart: lj->lioj_count, lj->lioj_finished_count); } } - + PROC_UNLOCK(p); + taskqueue_drain(taskqueue_aiod_bio, &ki->kaio_task); uma_zfree(kaio_zone, ki); p->p_aioinfo = NULL; - PROC_UNLOCK(p); } /* @@ -706,6 +727,35 @@ aio_selectjob(struct aiothreadlist *aiop) } /* + * Move all data to a permanent storage device, this code + * simulates fsync syscall. + */ +static int +aio_fsync_vnode(struct thread *td, struct vnode *vp) +{ + struct mount *mp; + int vfslocked; + int error; + + vfslocked = VFS_LOCK_GIANT(vp->v_mount); + if ((error = vn_start_write(vp, &mp, V_WAIT | PCATCH)) != 0) + goto drop; + vn_lock(vp, LK_EXCLUSIVE | LK_RETRY, td); + if (vp->v_object != NULL) { + VM_OBJECT_LOCK(vp->v_object); + vm_object_page_clean(vp->v_object, 0, 0, 0); + VM_OBJECT_UNLOCK(vp->v_object); + } + error = VOP_FSYNC(vp, MNT_WAIT, td); + + VOP_UNLOCK(vp, 0, td); + vn_finished_write(mp); +drop: + VFS_UNLOCK_GIANT(vfslocked); + return (error); +} + +/* * The AIO processing activity. This is the code that does the I/O request for * the non-physio version of the operations. The normal vn operations are used, * and this code should work in all instances for every type of file, including @@ -736,6 +786,17 @@ aio_process(struct aiocblist *aiocbe) cb = &aiocbe->uaiocb; fp = aiocbe->fd_file; + if (cb->aio_lio_opcode == LIO_SYNC) { + error = 0; + cnt = 0; + if (fp->f_vnode != NULL) + error = aio_fsync_vnode(td, fp->f_vnode); + cb->_aiocb_private.error = error; + cb->_aiocb_private.status = 0; + td->td_ucred = td_savedcred; + return; + } + aiov.iov_base = (void *)(uintptr_t)cb->aio_buf; aiov.iov_len = cb->aio_nbytes; @@ -797,6 +858,7 @@ aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type) { struct aioliojob *lj; struct kaioinfo *ki; + struct aiocblist *scb, *scbn; int lj_done; PROC_LOCK_ASSERT(userp, MA_OWNED); @@ -840,6 +902,22 @@ aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type) } notification_done: + if (aiocbe->jobflags & AIOCBLIST_CHECKSYNC) { + TAILQ_FOREACH_SAFE(scb, &ki->kaio_syncqueue, list, scbn) { + if (scb->pending != -1 && + aiocbe->fd_file == scb->fd_file && + aiocbe->seqno < scb->seqno) { + if (--scb->pending == 0) { + mtx_lock(&aio_job_mtx); + scb->jobstate = JOBST_JOBQGLOBAL; + TAILQ_REMOVE(&ki->kaio_syncqueue, scb, list); + TAILQ_INSERT_TAIL(&aio_jobs, scb, list); + aio_kick_nowait(userp); + mtx_unlock(&aio_job_mtx); + } + } + } + } if (ki->kaio_flags & KAIO_WAKEUP) { ki->kaio_flags &= ~KAIO_WAKEUP; wakeup(&userp->p_aioinfo); @@ -875,14 +953,7 @@ aio_daemon(void *_id) */ aiop = uma_zalloc(aiop_zone, M_WAITOK); aiop->aiothread = td; - aiop->aiothreadflags = AIOP_FREE; - - /* - * Place thread (lightweight process) onto the AIO free thread list. - */ - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); - mtx_unlock(&aio_job_mtx); + aiop->aiothreadflags = 0; /* * Get rid of our current filedescriptors. AIOD's don't need any @@ -1203,8 +1274,7 @@ static void aio_swake_cb(struct socket *so, struct sockbuf *sb) { struct aiocblist *cb, *cbn; - int opcode, wakecount = 0; - struct aiothreadlist *aiop; + int opcode; if (sb == &so->so_snd) opcode = LIO_WRITE; @@ -1225,21 +1295,11 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb) */ TAILQ_REMOVE(&so->so_aiojobq, cb, list); TAILQ_INSERT_TAIL(&aio_jobs, cb, list); - wakecount++; + aio_kick_nowait(cb->userproc); } } mtx_unlock(&aio_job_mtx); SOCKBUF_UNLOCK(sb); - - while (wakecount--) { - mtx_lock(&aio_job_mtx); - if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { - TAILQ_REMOVE(&aio_freeproc, aiop, list); - aiop->aiothreadflags &= ~AIOP_FREE; - wakeup(aiop->aiothread); - } - mtx_unlock(&aio_job_mtx); - } } /* @@ -1248,13 +1308,12 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb) */ static int aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, - int type, int oldsigev) + int type, int oldsigev, uint64_t *jseqno) { struct proc *p = td->td_proc; struct file *fp; struct socket *so; struct aiocblist *aiocbe; - struct aiothreadlist *aiop; struct kaioinfo *ki; struct kevent kev; struct kqueue *kq; @@ -1283,6 +1342,7 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO); aiocbe->inputcharge = 0; aiocbe->outputcharge = 0; + aiocbe->pending = -1; knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL); if (oldsigev) { @@ -1342,21 +1402,25 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, suword(&job->_aiocb_private.error, error); return (error); } - aiocbe->fd_file = fp; + + if (opcode == LIO_SYNC && fp->f_vnode == NULL) { + error = EINVAL; + goto aqueue_fail; + } if (aiocbe->uaiocb.aio_offset == -1LL) { error = EINVAL; goto aqueue_fail; } + aiocbe->fd_file = fp; + mtx_lock(&aio_job_mtx); - jid = jobrefid; - if (jobrefid == LONG_MAX) - jobrefid = 1; - else - jobrefid++; + jid = jobrefid++; + aiocbe->seqno = jobseqno++; + if (jseqno) + *jseqno = aiocbe->seqno; mtx_unlock(&aio_job_mtx); - error = suword(&job->_aiocb_private.kernelinfo, jid); if (error) { error = EINVAL; @@ -1369,14 +1433,15 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, uma_zfree(aiocb_zone, aiocbe); return (0); } - if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) { + if ((opcode != LIO_READ) && (opcode != LIO_WRITE) && + (opcode != LIO_SYNC)) { error = EINVAL; goto aqueue_fail; } - if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) { + if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue; - } else + else goto no_kqueue; error = fget(td, (u_int)kev.ident, &kq_fp); if (error) @@ -1410,6 +1475,9 @@ no_kqueue: aiocbe->jobflags = 0; aiocbe->lio = lj; + if (opcode == LIO_SYNC) + goto queueit; + if (fp->f_type == DTYPE_SOCKET) { /* * Alternate queueing for socket ops: Reach down into the @@ -1460,8 +1528,10 @@ no_kqueue: goto done; } #endif +queueit: /* No buffer for daemon I/O. */ aiocbe->bp = NULL; + atomic_add_int(&num_queue_count, 1); PROC_LOCK(p); ki->kaio_count++; @@ -1469,23 +1539,47 @@ no_kqueue: lj->lioj_count++; TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); - - mtx_lock(&aio_job_mtx); - TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); - aiocbe->jobstate = JOBST_JOBQGLOBAL; + if (opcode == LIO_SYNC) { + TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, aiocbe, list); + aiocbe->jobstate = JOBST_JOBQSYNC; + } else { + mtx_lock(&aio_job_mtx); + TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); + aiocbe->jobstate = JOBST_JOBQGLOBAL; + aio_kick_nowait(p); + mtx_unlock(&aio_job_mtx); + } PROC_UNLOCK(p); + error = 0; +done: + return (error); +} - atomic_add_int(&num_queue_count, 1); +static void +aio_kick_nowait(struct proc *userp) +{ + struct kaioinfo *ki = userp->p_aioinfo; + struct aiothreadlist *aiop; - /* - * If we don't have a free AIO process, and we are below our quota, then - * start one. Otherwise, depend on the subsequent I/O completions to - * pick-up this job. If we don't sucessfully create the new process - * (thread) due to resource issues, we return an error for now (EAGAIN), - * which is likely not the correct thing to do. - */ + mtx_assert(&aio_job_mtx, MA_OWNED); + if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { + TAILQ_REMOVE(&aio_freeproc, aiop, list); + aiop->aiothreadflags &= ~AIOP_FREE; + wakeup(aiop->aiothread); + } else { + taskqueue_enqueue(taskqueue_aiod_bio, &ki->kaio_task); + } +} + +static void +aio_kick(struct proc *userp) +{ + struct kaioinfo *ki = userp->p_aioinfo; + struct aiothreadlist *aiop; + int error; + + mtx_assert(&aio_job_mtx, MA_OWNED); retryproc: - error = 0; if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { TAILQ_REMOVE(&aio_freeproc, aiop, list); aiop->aiothreadflags &= ~AIOP_FREE; @@ -1502,10 +1596,17 @@ retryproc: goto retryproc; } } - mtx_unlock(&aio_job_mtx); +} -done: - return (error); +static void +aio_kick_helper(void *context, int pending) +{ + struct proc *userp = context; + + mtx_lock(&aio_job_mtx); + while (--pending >= 0) + aio_kick(userp); + mtx_unlock(&aio_job_mtx); } /* @@ -1695,6 +1796,9 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) so = fp->f_data; TAILQ_REMOVE(&so->so_aiojobq, cbe, list); remove = 1; + } else if (cbe->jobstate == JOBST_JOBQSYNC) { + TAILQ_REMOVE(&ki->kaio_syncqueue, cbe, list); + remove = 1; } mtx_unlock(&aio_job_mtx); @@ -1789,14 +1893,14 @@ int oaio_read(struct thread *td, struct oaio_read_args *uap) { - return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1); + return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1, NULL); } int aio_read(struct thread *td, struct aio_read_args *uap) { - return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0); + return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0, NULL); } /* syscall - asynchronous write to a file (REALTIME) */ @@ -1804,14 +1908,14 @@ int oaio_write(struct thread *td, struct oaio_write_args *uap) { - return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1); + return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1, NULL); } int aio_write(struct thread *td, struct aio_write_args *uap) { - return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0); + return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0, NULL); } /* syscall - list directed I/O (REALTIME) */ @@ -1935,7 +2039,7 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) for (i = 0; i < uap->nent; i++) { iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]); if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) { - error = aio_aqueue(td, iocb, lj, LIO_NOP, oldsigev); + error = aio_aqueue(td, iocb, lj, LIO_NOP, oldsigev, NULL); if (error != 0) nerror++; } @@ -2105,6 +2209,68 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap) return (error); } +int +aio_fsync(struct thread *td, struct aio_fsync_args *uap) +{ + struct proc *p = td->td_proc; + struct aiocblist *cb, *scb; + struct kaioinfo *ki; + struct aiocb uaiocb; + int error; + uint64_t jseqno; + + if (uap->op != O_SYNC) /* XXX lack of O_DSYNC */ + return (EINVAL); + error = copyin(uap->aiocbp, &uaiocb, sizeof(struct aiocb)); + if (error) + return (error); + ki = p->p_aioinfo; + if (ki == NULL) + aio_init_aioinfo(p); + ki = p->p_aioinfo; + error = aio_aqueue(td, uap->aiocbp, NULL, LIO_SYNC, 0, &jseqno); + if (error) + return (error); + PROC_LOCK(p); + TAILQ_FOREACH(scb, &ki->kaio_syncqueue, plist) { + if (scb->seqno == jseqno) + break; + } + if (scb == NULL) { + PROC_UNLOCK(p); + return (0); + } + scb->pending = 0; + TAILQ_FOREACH(cb, &ki->kaio_jobqueue, plist) { + if (cb->fd_file == scb->fd_file && + cb->uaiocb.aio_lio_opcode != LIO_SYNC && + cb->seqno < scb->seqno) { + cb->jobflags |= AIOCBLIST_CHECKSYNC; + scb->pending++; + } + } + TAILQ_FOREACH(cb, &ki->kaio_bufqueue, plist) { + if (cb->fd_file == scb->fd_file && + cb->uaiocb.aio_lio_opcode != LIO_SYNC && + cb->seqno < scb->seqno) { + cb->jobflags |= AIOCBLIST_CHECKSYNC; + scb->pending++; + } + } + if (scb->pending == 0) { + mtx_lock(&aio_job_mtx); + if (scb->jobstate == JOBST_JOBQSYNC) { + scb->jobstate = JOBST_JOBQGLOBAL; + TAILQ_REMOVE(&ki->kaio_syncqueue, scb, list); + TAILQ_INSERT_TAIL(&aio_jobs, scb, list); + aio_kick_nowait(p); + } + mtx_unlock(&aio_job_mtx); + } + PROC_UNLOCK(p); + return (0); +} + /* kqueue attach function */ static int filt_aioattach(struct knote *kn) |