summaryrefslogtreecommitdiffstats
path: root/sys/kern/vfs_aio.c
diff options
context:
space:
mode:
authordavidxu <davidxu@FreeBSD.org>2006-01-24 07:24:24 +0000
committerdavidxu <davidxu@FreeBSD.org>2006-01-24 07:24:24 +0000
commit1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16 (patch)
treebedf52c1584a15c55361c7f2bea8fd9354017eee /sys/kern/vfs_aio.c
parent0ba6836a1ee728cb7194558e58365c057278e60f (diff)
downloadFreeBSD-src-1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16.zip
FreeBSD-src-1f5105d8a7b8d380fb0dc8ca4418eba706d5cb16.tar.gz
Add locking annotation and comments about socket, pipe, fifo problem.
Temporarily fix a locking problem for socket I/O.
Diffstat (limited to 'sys/kern/vfs_aio.c')
-rw-r--r--sys/kern/vfs_aio.c251
1 files changed, 126 insertions, 125 deletions
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
index 25da746..37acf94 100644
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -184,24 +184,50 @@ typedef struct oaiocb {
struct __aiocb_private _aiocb_private;
} oaiocb_t;
+/*
+ * Below is a key of locks used to protect each member of struct aiocblist
+ * aioliojob and kaioinfo and any backends.
+ *
+ * * - need not protected
+ * a - locked by proc mtx
+ * b - locked by backend lock, the backend lock can be null in some cases,
+ * for example, BIO belongs to this type, in this case, proc lock is
+ * reused.
+ * c - locked by aio_job_mtx, the lock for the generic file I/O backend.
+ */
+
+/*
+ * Current, there is only two backends: BIO and generic file I/O.
+ * socket I/O is served by generic file I/O, this is not a good idea, since
+ * disk file I/O and any other types without O_NONBLOCK flag can block daemon
+ * threads, if there is no thread to serve socket I/O, the socket I/O will be
+ * delayed too long or starved, we should create some threads dedicated to
+ * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
+ * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
+ * structure is not safe because there is race between userland and aio
+ * daemons.
+ */
+
struct aiocblist {
- TAILQ_ENTRY(aiocblist) list; /* List of jobs */
- TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */
- TAILQ_ENTRY(aiocblist) allist;
- int jobflags;
- int jobstate;
- int inputcharge;
- int outputcharge;
- struct buf *bp; /* Buffer pointer */
- struct proc *userproc; /* User process */
- struct ucred *cred; /* Active credential when created */
- struct file *fd_file; /* Pointer to file structure */
- struct aioliojob *lio; /* Optional lio job */
- struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */
- struct knlist klist; /* list of knotes */
- struct aiocb uaiocb; /* Kernel I/O control block */
- ksiginfo_t ksi; /* Realtime signal info */
- struct task biotask;
+ TAILQ_ENTRY(aiocblist) list; /* (b) internal list of for backend */
+ TAILQ_ENTRY(aiocblist) plist; /* (a) list of jobs for each backend */
+ TAILQ_ENTRY(aiocblist) allist; /* (a) list of all jobs in proc */
+ int jobflags; /* (a) job flags */
+ int jobstate; /* (b) job state */
+ int inputcharge; /* (*) input blockes */
+ int outputcharge; /* (*) output blockes */
+ struct buf *bp; /* (*) private to BIO backend,
+ * buffer pointer
+ */
+ struct proc *userproc; /* (*) user process */
+ struct ucred *cred; /* (*) active credential when created */
+ struct file *fd_file; /* (*) pointer to file structure */
+ struct aioliojob *lio; /* (*) optional lio job */
+ struct aiocb *uuaiocb; /* (*) pointer in userspace of aiocb */
+ struct knlist klist; /* (a) list of knotes */
+ struct aiocb uaiocb; /* (*) kernel I/O control block */
+ ksiginfo_t ksi; /* (a) realtime signal info */
+ struct task biotask; /* (*) private to BIO backend */
};
/* jobflags */
@@ -215,22 +241,22 @@ struct aiocblist {
#define AIOP_FREE 0x1 /* proc on free queue */
struct aiothreadlist {
- int aiothreadflags; /* AIO proc flags */
- TAILQ_ENTRY(aiothreadlist) list; /* List of processes */
- struct thread *aiothread; /* The AIO thread */
+ int aiothreadflags; /* (c) AIO proc flags */
+ TAILQ_ENTRY(aiothreadlist) list; /* (c) list of processes */
+ struct thread *aiothread; /* (*) the AIO thread */
};
/*
* data-structure for lio signal management
*/
struct aioliojob {
- int lioj_flags;
- int lioj_count;
- int lioj_finished_count;
- struct sigevent lioj_signal; /* signal on all I/O done */
- TAILQ_ENTRY(aioliojob) lioj_list;
- struct knlist klist; /* list of knotes */
- ksiginfo_t lioj_ksi; /* Realtime signal info */
+ int lioj_flags; /* (a) listio flags */
+ int lioj_count; /* (a) listio flags */
+ int lioj_finished_count; /* (a) listio flags */
+ struct sigevent lioj_signal; /* (a) signal on all I/O done */
+ TAILQ_ENTRY(aioliojob) lioj_list; /* (a) lio list */
+ struct knlist klist; /* (a) list of knotes */
+ ksiginfo_t lioj_ksi; /* (a) Realtime signal info */
};
#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
@@ -241,29 +267,31 @@ struct aioliojob {
* per process aio data structure
*/
struct kaioinfo {
- int kaio_flags; /* per process kaio flags */
- int kaio_maxactive_count; /* maximum number of AIOs */
- int kaio_active_count; /* number of currently used AIOs */
- int kaio_qallowed_count; /* maxiumu size of AIO queue */
- int kaio_count; /* size of AIO queue */
- int kaio_ballowed_count; /* maximum number of buffers */
- int kaio_buffer_count; /* number of physio buffers */
- TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */
- TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */
- TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */
- TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */
- TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */
- TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */
+ int kaio_flags; /* (a) per process kaio flags */
+ int kaio_maxactive_count; /* (*) maximum number of AIOs */
+ int kaio_active_count; /* (c) number of currently used AIOs */
+ int kaio_qallowed_count; /* (*) maxiumu size of AIO queue */
+ int kaio_count; /* (a) size of AIO queue */
+ int kaio_ballowed_count; /* (*) maximum number of buffers */
+ int kaio_buffer_count; /* (a) number of physio buffers */
+ TAILQ_HEAD(,aiocblist) kaio_all; /* (a) all AIOs in the process */
+ TAILQ_HEAD(,aiocblist) kaio_done; /* (a) done queue for process */
+ TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
+ TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* (a) job queue for process */
+ TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* (a) buffer job queue for process */
+ TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* (a) queue for aios waiting on sockets,
+ * not used yet.
+ */
};
#define KAIO_RUNDOWN 0x1 /* process is being run down */
#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
-static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */
+static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* (c) Idle daemons */
static struct sema aio_newproc_sem;
static struct mtx aio_job_mtx;
static struct mtx aio_sock_mtx;
-static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
+static TAILQ_HEAD(,aiocblist) aio_jobs; /* (c) Async job list */
static struct unrhdr *aiod_unr;
static void aio_init_aioinfo(struct proc *p);
@@ -587,6 +615,7 @@ aio_proc_rundown(void *arg, struct proc *p)
struct aiocblist *cbe, *cbn;
struct file *fp;
struct socket *so;
+ int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@@ -603,35 +632,30 @@ restart:
* Try to cancel all pending requests. This code simulates
* aio_cancel on all pending I/O requests.
*/
- while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) {
- fp = cbe->fd_file;
- so = fp->f_data;
- mtx_lock(&aio_sock_mtx);
- TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
- mtx_unlock(&aio_sock_mtx);
- TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
- TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist);
- cbe->jobstate = JOBST_JOBQGLOBAL;
- }
-
TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
+ remove = 0;
mtx_lock(&aio_job_mtx);
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
- mtx_unlock(&aio_job_mtx);
+ remove = 1;
+ } else if (cbe->jobstate == JOBST_JOBQSOCK) {
+ fp = cbe->fd_file;
+ MPASS(fp->f_type == DTYPE_SOCKET);
+ so = fp->f_data;
+ TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
+ remove = 1;
+ }
+ mtx_unlock(&aio_job_mtx);
+
+ if (remove) {
cbe->jobstate = JOBST_JOBFINISHED;
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
aio_bio_done_notify(p, cbe, DONE_QUEUE);
- } else {
- mtx_unlock(&aio_job_mtx);
}
}
- if (TAILQ_FIRST(&ki->kaio_sockqueue))
- goto restart;
-
/* Wait for all running I/O to be finished */
if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
TAILQ_FIRST(&ki->kaio_jobqueue)) {
@@ -693,14 +717,7 @@ aio_selectjob(struct aiothreadlist *aiop)
* and this code should work in all instances for every type of file, including
* pipes, sockets, fifos, and regular files.
*
- * XXX I don't think these code work well with pipes, sockets and fifo, the
- * problem is the aiod threads can be blocked if there is not data or no
- * buffer space, and file was not opened with O_NONBLOCK, all aiod threads
- * will be blocked if there is couple of such processes. We need a FOF_OFFSET
- * like flag to override f_flag to tell low level system to do non-blocking
- * I/O, we can not muck O_NONBLOCK because there is full of race between
- * userland and aiod threads, although there is a trigger mechanism for socket,
- * but it also does not work well if userland is misbehaviored.
+ * XXX I don't think it works well for socket, pipe, and fifo.
*/
static void
aio_process(struct aiocblist *aiocbe)
@@ -1186,47 +1203,33 @@ static void
aio_swake_cb(struct socket *so, struct sockbuf *sb)
{
struct aiocblist *cb, *cbn;
- struct proc *p;
- struct kaioinfo *ki = NULL;
int opcode, wakecount = 0;
struct aiothreadlist *aiop;
- if (sb == &so->so_snd) {
+ if (sb == &so->so_snd)
opcode = LIO_WRITE;
- SOCKBUF_LOCK(&so->so_snd);
- so->so_snd.sb_flags &= ~SB_AIO;
- SOCKBUF_UNLOCK(&so->so_snd);
- } else {
+ else
opcode = LIO_READ;
- SOCKBUF_LOCK(&so->so_rcv);
- so->so_rcv.sb_flags &= ~SB_AIO;
- SOCKBUF_UNLOCK(&so->so_rcv);
- }
- mtx_lock(&aio_sock_mtx);
+ SOCKBUF_LOCK(sb);
+ sb->sb_flags &= ~SB_AIO;
+ mtx_lock(&aio_job_mtx);
TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
if (opcode == cb->uaiocb.aio_lio_opcode) {
if (cb->jobstate != JOBST_JOBQSOCK)
panic("invalid queue value");
- p = cb->userproc;
- ki = p->p_aioinfo;
- TAILQ_REMOVE(&so->so_aiojobq, cb, list);
- PROC_LOCK(p);
- TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
- /*
- * XXX check AIO_RUNDOWN, and don't put on
- * jobqueue if it was set.
+ /* XXX
+ * We don't have actual sockets backend yet,
+ * so we simply move the requests to the generic
+ * file I/O backend.
*/
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
- cb->jobstate = JOBST_JOBQGLOBAL;
- mtx_lock(&aio_job_mtx);
+ TAILQ_REMOVE(&so->so_aiojobq, cb, list);
TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
- mtx_unlock(&aio_job_mtx);
- PROC_UNLOCK(p);
wakecount++;
}
}
- mtx_unlock(&aio_sock_mtx);
+ mtx_unlock(&aio_job_mtx);
+ SOCKBUF_UNLOCK(sb);
while (wakecount--) {
mtx_lock(&aio_job_mtx);
@@ -1426,14 +1429,15 @@ no_kqueue:
SOCKBUF_LOCK(sb);
if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
LIO_WRITE) && (!sowriteable(so)))) {
- mtx_lock(&aio_sock_mtx);
+ sb->sb_flags |= SB_AIO;
+
+ mtx_lock(&aio_job_mtx);
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
- mtx_unlock(&aio_sock_mtx);
+ mtx_unlock(&aio_job_mtx);
- sb->sb_flags |= SB_AIO;
PROC_LOCK(p);
- TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
aiocbe->jobstate = JOBST_JOBQSOCK;
ki->kaio_count++;
if (lj)
@@ -1651,6 +1655,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
struct file *fp;
struct socket *so;
int error;
+ int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@@ -1671,32 +1676,6 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
td->td_retval[0] = AIO_NOTCANCELED;
return (0);
}
- } else if (fp->f_type == DTYPE_SOCKET) {
- so = fp->f_data;
- mtx_lock(&aio_sock_mtx);
- TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) {
- if (cbe->userproc == p &&
- (uap->aiocbp == NULL ||
- uap->aiocbp == cbe->uuaiocb)) {
- TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
- PROC_LOCK(p);
- TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
- cbe->jobstate = JOBST_JOBRUNNING;
- cbe->uaiocb._aiocb_private.status = -1;
- cbe->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(p, cbe, DONE_QUEUE);
- PROC_UNLOCK(p);
- cancelled++;
- if (uap->aiocbp != NULL)
- break;
- }
- }
- mtx_unlock(&aio_sock_mtx);
- if (cancelled && uap->aiocbp != NULL) {
- fdrop(fp, td);
- td->td_retval[0] = AIO_CANCELED;
- return (0);
- }
}
PROC_LOCK(p);
@@ -1704,33 +1683,55 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
if ((uap->fd == cbe->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == cbe->uuaiocb))) {
+ remove = 0;
+
mtx_lock(&aio_job_mtx);
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
- mtx_unlock(&aio_job_mtx);
+ remove = 1;
+ } else if (cbe->jobstate == JOBST_JOBQSOCK) {
+ MPASS(fp->f_type == DTYPE_SOCKET);
+ so = fp->f_data;
+ TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
+ remove = 1;
+ }
+ mtx_unlock(&aio_job_mtx);
+
+ if (remove) {
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
aio_bio_done_notify(p, cbe, DONE_QUEUE);
cancelled++;
} else {
- mtx_unlock(&aio_job_mtx);
notcancelled++;
}
+ if (uap->aiocbp != NULL)
+ break;
}
}
PROC_UNLOCK(p);
done:
fdrop(fp, td);
+
+ if (uap->aiocbp != NULL) {
+ if (cancelled) {
+ td->td_retval[0] = AIO_CANCELED;
+ return (0);
+ }
+ }
+
if (notcancelled) {
td->td_retval[0] = AIO_NOTCANCELED;
return (0);
}
+
if (cancelled) {
td->td_retval[0] = AIO_CANCELED;
return (0);
}
+
td->td_retval[0] = AIO_ALLDONE;
return (0);
OpenPOWER on IntegriCloud