diff options
author | davidxu <davidxu@FreeBSD.org> | 2006-01-24 07:24:24 +0000 |
---|---|---|
committer | davidxu <davidxu@FreeBSD.org> | 2006-01-24 07:24:24 +0000 |
commit | 1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16 (patch) | |
tree | bedf52c1584a15c55361c7f2bea8fd9354017eee /sys/kern/vfs_aio.c | |
parent | 0ba6836a1ee728cb7194558e58365c057278e60f (diff) | |
download | FreeBSD-src-1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16.zip FreeBSD-src-1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16.tar.gz |
Add locking annotation and comments about socket, pipe, fifo problem.
Temporarily fix a locking problem for socket I/O.
Diffstat (limited to 'sys/kern/vfs_aio.c')
-rw-r--r-- | sys/kern/vfs_aio.c | 251 |
1 files changed, 126 insertions, 125 deletions
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index 25da746..37acf94 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -184,24 +184,50 @@ typedef struct oaiocb { struct __aiocb_private _aiocb_private; } oaiocb_t; +/* + * Below is a key of locks used to protect each member of struct aiocblist + * aioliojob and kaioinfo and any backends. + * + * * - need not protected + * a - locked by proc mtx + * b - locked by backend lock, the backend lock can be null in some cases, + * for example, BIO belongs to this type, in this case, proc lock is + * reused. + * c - locked by aio_job_mtx, the lock for the generic file I/O backend. + */ + +/* + * Current, there is only two backends: BIO and generic file I/O. + * socket I/O is served by generic file I/O, this is not a good idea, since + * disk file I/O and any other types without O_NONBLOCK flag can block daemon + * threads, if there is no thread to serve socket I/O, the socket I/O will be + * delayed too long or starved, we should create some threads dedicated to + * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O + * systems we really need non-blocking interface, fiddling O_NONBLOCK in file + * structure is not safe because there is race between userland and aio + * daemons. + */ + struct aiocblist { - TAILQ_ENTRY(aiocblist) list; /* List of jobs */ - TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */ - TAILQ_ENTRY(aiocblist) allist; - int jobflags; - int jobstate; - int inputcharge; - int outputcharge; - struct buf *bp; /* Buffer pointer */ - struct proc *userproc; /* User process */ - struct ucred *cred; /* Active credential when created */ - struct file *fd_file; /* Pointer to file structure */ - struct aioliojob *lio; /* Optional lio job */ - struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */ - struct knlist klist; /* list of knotes */ - struct aiocb uaiocb; /* Kernel I/O control block */ - ksiginfo_t ksi; /* Realtime signal info */ - struct task biotask; + TAILQ_ENTRY(aiocblist) list; /* (b) internal list of for backend */ + TAILQ_ENTRY(aiocblist) plist; /* (a) list of jobs for each backend */ + TAILQ_ENTRY(aiocblist) allist; /* (a) list of all jobs in proc */ + int jobflags; /* (a) job flags */ + int jobstate; /* (b) job state */ + int inputcharge; /* (*) input blockes */ + int outputcharge; /* (*) output blockes */ + struct buf *bp; /* (*) private to BIO backend, + * buffer pointer + */ + struct proc *userproc; /* (*) user process */ + struct ucred *cred; /* (*) active credential when created */ + struct file *fd_file; /* (*) pointer to file structure */ + struct aioliojob *lio; /* (*) optional lio job */ + struct aiocb *uuaiocb; /* (*) pointer in userspace of aiocb */ + struct knlist klist; /* (a) list of knotes */ + struct aiocb uaiocb; /* (*) kernel I/O control block */ + ksiginfo_t ksi; /* (a) realtime signal info */ + struct task biotask; /* (*) private to BIO backend */ }; /* jobflags */ @@ -215,22 +241,22 @@ struct aiocblist { #define AIOP_FREE 0x1 /* proc on free queue */ struct aiothreadlist { - int aiothreadflags; /* AIO proc flags */ - TAILQ_ENTRY(aiothreadlist) list; /* List of processes */ - struct thread *aiothread; /* The AIO thread */ + int aiothreadflags; /* (c) AIO proc flags */ + TAILQ_ENTRY(aiothreadlist) list; /* (c) list of processes */ + struct thread *aiothread; /* (*) the AIO thread */ }; /* * data-structure for lio signal management */ struct aioliojob { - int lioj_flags; - int lioj_count; - int lioj_finished_count; - struct sigevent lioj_signal; /* signal on all I/O done */ - TAILQ_ENTRY(aioliojob) lioj_list; - struct knlist klist; /* list of knotes */ - ksiginfo_t lioj_ksi; /* Realtime signal info */ + int lioj_flags; /* (a) listio flags */ + int lioj_count; /* (a) listio flags */ + int lioj_finished_count; /* (a) listio flags */ + struct sigevent lioj_signal; /* (a) signal on all I/O done */ + TAILQ_ENTRY(aioliojob) lioj_list; /* (a) lio list */ + struct knlist klist; /* (a) list of knotes */ + ksiginfo_t lioj_ksi; /* (a) Realtime signal info */ }; #define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */ @@ -241,29 +267,31 @@ struct aioliojob { * per process aio data structure */ struct kaioinfo { - int kaio_flags; /* per process kaio flags */ - int kaio_maxactive_count; /* maximum number of AIOs */ - int kaio_active_count; /* number of currently used AIOs */ - int kaio_qallowed_count; /* maxiumu size of AIO queue */ - int kaio_count; /* size of AIO queue */ - int kaio_ballowed_count; /* maximum number of buffers */ - int kaio_buffer_count; /* number of physio buffers */ - TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */ - TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */ - TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */ - TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */ - TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */ - TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */ + int kaio_flags; /* (a) per process kaio flags */ + int kaio_maxactive_count; /* (*) maximum number of AIOs */ + int kaio_active_count; /* (c) number of currently used AIOs */ + int kaio_qallowed_count; /* (*) maxiumu size of AIO queue */ + int kaio_count; /* (a) size of AIO queue */ + int kaio_ballowed_count; /* (*) maximum number of buffers */ + int kaio_buffer_count; /* (a) number of physio buffers */ + TAILQ_HEAD(,aiocblist) kaio_all; /* (a) all AIOs in the process */ + TAILQ_HEAD(,aiocblist) kaio_done; /* (a) done queue for process */ + TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */ + TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* (a) job queue for process */ + TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* (a) buffer job queue for process */ + TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* (a) queue for aios waiting on sockets, + * not used yet. + */ }; #define KAIO_RUNDOWN 0x1 /* process is being run down */ #define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */ -static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */ +static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* (c) Idle daemons */ static struct sema aio_newproc_sem; static struct mtx aio_job_mtx; static struct mtx aio_sock_mtx; -static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */ +static TAILQ_HEAD(,aiocblist) aio_jobs; /* (c) Async job list */ static struct unrhdr *aiod_unr; static void aio_init_aioinfo(struct proc *p); @@ -587,6 +615,7 @@ aio_proc_rundown(void *arg, struct proc *p) struct aiocblist *cbe, *cbn; struct file *fp; struct socket *so; + int remove; KASSERT(curthread->td_proc == p, ("%s: called on non-curproc", __func__)); @@ -603,35 +632,30 @@ restart: * Try to cancel all pending requests. This code simulates * aio_cancel on all pending I/O requests. */ - while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) { - fp = cbe->fd_file; - so = fp->f_data; - mtx_lock(&aio_sock_mtx); - TAILQ_REMOVE(&so->so_aiojobq, cbe, list); - mtx_unlock(&aio_sock_mtx); - TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); - TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist); - cbe->jobstate = JOBST_JOBQGLOBAL; - } - TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) { + remove = 0; mtx_lock(&aio_job_mtx); if (cbe->jobstate == JOBST_JOBQGLOBAL) { TAILQ_REMOVE(&aio_jobs, cbe, list); - mtx_unlock(&aio_job_mtx); + remove = 1; + } else if (cbe->jobstate == JOBST_JOBQSOCK) { + fp = cbe->fd_file; + MPASS(fp->f_type == DTYPE_SOCKET); + so = fp->f_data; + TAILQ_REMOVE(&so->so_aiojobq, cbe, list); + remove = 1; + } + mtx_unlock(&aio_job_mtx); + + if (remove) { cbe->jobstate = JOBST_JOBFINISHED; cbe->uaiocb._aiocb_private.status = -1; cbe->uaiocb._aiocb_private.error = ECANCELED; TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); aio_bio_done_notify(p, cbe, DONE_QUEUE); - } else { - mtx_unlock(&aio_job_mtx); } } - if (TAILQ_FIRST(&ki->kaio_sockqueue)) - goto restart; - /* Wait for all running I/O to be finished */ if (TAILQ_FIRST(&ki->kaio_bufqueue) || TAILQ_FIRST(&ki->kaio_jobqueue)) { @@ -693,14 +717,7 @@ aio_selectjob(struct aiothreadlist *aiop) * and this code should work in all instances for every type of file, including * pipes, sockets, fifos, and regular files. * - * XXX I don't think these code work well with pipes, sockets and fifo, the - * problem is the aiod threads can be blocked if there is not data or no - * buffer space, and file was not opened with O_NONBLOCK, all aiod threads - * will be blocked if there is couple of such processes. We need a FOF_OFFSET - * like flag to override f_flag to tell low level system to do non-blocking - * I/O, we can not muck O_NONBLOCK because there is full of race between - * userland and aiod threads, although there is a trigger mechanism for socket, - * but it also does not work well if userland is misbehaviored. + * XXX I don't think it works well for socket, pipe, and fifo. */ static void aio_process(struct aiocblist *aiocbe) @@ -1186,47 +1203,33 @@ static void aio_swake_cb(struct socket *so, struct sockbuf *sb) { struct aiocblist *cb, *cbn; - struct proc *p; - struct kaioinfo *ki = NULL; int opcode, wakecount = 0; struct aiothreadlist *aiop; - if (sb == &so->so_snd) { + if (sb == &so->so_snd) opcode = LIO_WRITE; - SOCKBUF_LOCK(&so->so_snd); - so->so_snd.sb_flags &= ~SB_AIO; - SOCKBUF_UNLOCK(&so->so_snd); - } else { + else opcode = LIO_READ; - SOCKBUF_LOCK(&so->so_rcv); - so->so_rcv.sb_flags &= ~SB_AIO; - SOCKBUF_UNLOCK(&so->so_rcv); - } - mtx_lock(&aio_sock_mtx); + SOCKBUF_LOCK(sb); + sb->sb_flags &= ~SB_AIO; + mtx_lock(&aio_job_mtx); TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) { if (opcode == cb->uaiocb.aio_lio_opcode) { if (cb->jobstate != JOBST_JOBQSOCK) panic("invalid queue value"); - p = cb->userproc; - ki = p->p_aioinfo; - TAILQ_REMOVE(&so->so_aiojobq, cb, list); - PROC_LOCK(p); - TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist); - /* - * XXX check AIO_RUNDOWN, and don't put on - * jobqueue if it was set. + /* XXX + * We don't have actual sockets backend yet, + * so we simply move the requests to the generic + * file I/O backend. */ - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist); - cb->jobstate = JOBST_JOBQGLOBAL; - mtx_lock(&aio_job_mtx); + TAILQ_REMOVE(&so->so_aiojobq, cb, list); TAILQ_INSERT_TAIL(&aio_jobs, cb, list); - mtx_unlock(&aio_job_mtx); - PROC_UNLOCK(p); wakecount++; } } - mtx_unlock(&aio_sock_mtx); + mtx_unlock(&aio_job_mtx); + SOCKBUF_UNLOCK(sb); while (wakecount--) { mtx_lock(&aio_job_mtx); @@ -1426,14 +1429,15 @@ no_kqueue: SOCKBUF_LOCK(sb); if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode == LIO_WRITE) && (!sowriteable(so)))) { - mtx_lock(&aio_sock_mtx); + sb->sb_flags |= SB_AIO; + + mtx_lock(&aio_job_mtx); TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list); - mtx_unlock(&aio_sock_mtx); + mtx_unlock(&aio_job_mtx); - sb->sb_flags |= SB_AIO; PROC_LOCK(p); - TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist); TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); + TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); aiocbe->jobstate = JOBST_JOBQSOCK; ki->kaio_count++; if (lj) @@ -1651,6 +1655,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) struct file *fp; struct socket *so; int error; + int remove; int cancelled = 0; int notcancelled = 0; struct vnode *vp; @@ -1671,32 +1676,6 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) td->td_retval[0] = AIO_NOTCANCELED; return (0); } - } else if (fp->f_type == DTYPE_SOCKET) { - so = fp->f_data; - mtx_lock(&aio_sock_mtx); - TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) { - if (cbe->userproc == p && - (uap->aiocbp == NULL || - uap->aiocbp == cbe->uuaiocb)) { - TAILQ_REMOVE(&so->so_aiojobq, cbe, list); - PROC_LOCK(p); - TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); - cbe->jobstate = JOBST_JOBRUNNING; - cbe->uaiocb._aiocb_private.status = -1; - cbe->uaiocb._aiocb_private.error = ECANCELED; - aio_bio_done_notify(p, cbe, DONE_QUEUE); - PROC_UNLOCK(p); - cancelled++; - if (uap->aiocbp != NULL) - break; - } - } - mtx_unlock(&aio_sock_mtx); - if (cancelled && uap->aiocbp != NULL) { - fdrop(fp, td); - td->td_retval[0] = AIO_CANCELED; - return (0); - } } PROC_LOCK(p); @@ -1704,33 +1683,55 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap) if ((uap->fd == cbe->uaiocb.aio_fildes) && ((uap->aiocbp == NULL) || (uap->aiocbp == cbe->uuaiocb))) { + remove = 0; + mtx_lock(&aio_job_mtx); if (cbe->jobstate == JOBST_JOBQGLOBAL) { TAILQ_REMOVE(&aio_jobs, cbe, list); - mtx_unlock(&aio_job_mtx); + remove = 1; + } else if (cbe->jobstate == JOBST_JOBQSOCK) { + MPASS(fp->f_type == DTYPE_SOCKET); + so = fp->f_data; + TAILQ_REMOVE(&so->so_aiojobq, cbe, list); + remove = 1; + } + mtx_unlock(&aio_job_mtx); + + if (remove) { TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); cbe->uaiocb._aiocb_private.status = -1; cbe->uaiocb._aiocb_private.error = ECANCELED; aio_bio_done_notify(p, cbe, DONE_QUEUE); cancelled++; } else { - mtx_unlock(&aio_job_mtx); notcancelled++; } + if (uap->aiocbp != NULL) + break; } } PROC_UNLOCK(p); done: fdrop(fp, td); + + if (uap->aiocbp != NULL) { + if (cancelled) { + td->td_retval[0] = AIO_CANCELED; + return (0); + } + } + if (notcancelled) { td->td_retval[0] = AIO_NOTCANCELED; return (0); } + if (cancelled) { td->td_retval[0] = AIO_CANCELED; return (0); } + td->td_retval[0] = AIO_ALLDONE; return (0); |