summaryrefslogtreecommitdiffstats
path: root/sys/kern/vfs_aio.c
diff options
context:
space:
mode:
authordavidxu <davidxu@FreeBSD.org>2006-01-22 05:59:27 +0000
committerdavidxu <davidxu@FreeBSD.org>2006-01-22 05:59:27 +0000
commit72c8645faa631d2798bf531a57784fb9f81c9952 (patch)
treeb753e6f8c06e1f7d68d98aef60c6a0f0ad839b0e /sys/kern/vfs_aio.c
parentaf24439a4a62065bd0824fe04d44ebc883288dba (diff)
downloadFreeBSD-src-72c8645faa631d2798bf531a57784fb9f81c9952.zip
FreeBSD-src-72c8645faa631d2798bf531a57784fb9f81c9952.tar.gz
Make aio code MP safe.
Diffstat (limited to 'sys/kern/vfs_aio.c')
-rw-r--r--sys/kern/vfs_aio.c1417
1 files changed, 574 insertions, 843 deletions
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
index 22a3bb7..b1b3288 100644
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -42,15 +42,20 @@ __FBSDID("$FreeBSD$");
#include <sys/resourcevar.h>
#include <sys/signalvar.h>
#include <sys/protosw.h>
+#include <sys/sema.h>
+#include <sys/socket.h>
#include <sys/socketvar.h>
#include <sys/syscall.h>
#include <sys/sysent.h>
#include <sys/sysctl.h>
#include <sys/sx.h>
+#include <sys/taskqueue.h>
#include <sys/vnode.h>
#include <sys/conf.h>
#include <sys/event.h>
+#include <machine/atomic.h>
+
#include <posix4/posix4.h>
#include <vm/vm.h>
#include <vm/vm_extern.h>
@@ -61,8 +66,6 @@ __FBSDID("$FreeBSD$");
#include "opt_vfs_aio.h"
-NET_NEEDS_GIANT("aio");
-
/*
* Counter for allocating reference ids to new jobs. Wrapped to 1 on
* overflow.
@@ -70,11 +73,11 @@ NET_NEEDS_GIANT("aio");
static long jobrefid;
#define JOBST_NULL 0x0
+#define JOBST_JOBQSOCK 0x1
#define JOBST_JOBQGLOBAL 0x2
#define JOBST_JOBRUNNING 0x3
#define JOBST_JOBFINISHED 0x4
#define JOBST_JOBQBUF 0x5
-#define JOBST_JOBBFINISHED 0x6
#ifndef MAX_AIO_PER_PROC
#define MAX_AIO_PER_PROC 32
@@ -184,6 +187,7 @@ typedef struct oaiocb {
struct aiocblist {
TAILQ_ENTRY(aiocblist) list; /* List of jobs */
TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */
+ TAILQ_ENTRY(aiocblist) allist;
int jobflags;
int jobstate;
int inputcharge;
@@ -192,16 +196,18 @@ struct aiocblist {
struct proc *userproc; /* User process */
struct ucred *cred; /* Active credential when created */
struct file *fd_file; /* Pointer to file structure */
- struct aio_liojob *lio; /* Optional lio job */
+ struct aioliojob *lio; /* Optional lio job */
struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */
struct knlist klist; /* list of knotes */
struct aiocb uaiocb; /* Kernel I/O control block */
ksiginfo_t ksi; /* Realtime signal info */
+ struct task biotask;
};
/* jobflags */
-#define AIOCBLIST_RUNDOWN 0x4
+#define AIOCBLIST_RUNDOWN 0x04
#define AIOCBLIST_DONE 0x10
+#define AIOCBLIST_BUFDONE 0x20
/*
* AIO process info
@@ -217,18 +223,17 @@ struct aiothreadlist {
/*
* data-structure for lio signal management
*/
-struct aio_liojob {
+struct aioliojob {
int lioj_flags;
- int lioj_buffer_count;
- int lioj_buffer_finished_count;
- int lioj_queue_count;
- int lioj_queue_finished_count;
- int lioj_total_count;
+ int lioj_count;
+ int lioj_finished_count;
+ int lioj_ref_count;
struct sigevent lioj_signal; /* signal on all I/O done */
- TAILQ_ENTRY(aio_liojob) lioj_list;
+ TAILQ_ENTRY(aioliojob) lioj_list;
struct knlist klist; /* list of knotes */
ksiginfo_t lioj_ksi; /* Realtime signal info */
};
+
#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
#define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */
#define LIOJ_KEVENT_POSTED 0x4 /* kevent triggered */
@@ -241,16 +246,14 @@ struct kaioinfo {
int kaio_maxactive_count; /* maximum number of AIOs */
int kaio_active_count; /* number of currently used AIOs */
int kaio_qallowed_count; /* maxiumu size of AIO queue */
- int kaio_queue_count; /* size of AIO queue */
+ int kaio_count; /* size of AIO queue */
int kaio_ballowed_count; /* maximum number of buffers */
- int kaio_queue_finished_count; /* number of daemon jobs finished */
int kaio_buffer_count; /* number of physio buffers */
- int kaio_buffer_finished_count; /* count of I/O done */
- TAILQ_HEAD(,aio_liojob) kaio_liojoblist; /* list of lio jobs */
+ TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */
+ TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */
+ TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */
TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */
- TAILQ_HEAD(,aiocblist) kaio_jobdone; /* done queue for process */
TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */
- TAILQ_HEAD(,aiocblist) kaio_bufdone; /* buffer done queue for process */
TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */
};
@@ -258,22 +261,24 @@ struct kaioinfo {
#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */
-static struct mtx aio_freeproc_mtx;
-
+static struct sema aio_newproc_sem;
+static struct mtx aio_job_mtx;
+static struct mtx aio_sock_mtx;
static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
+static struct unrhdr *aiod_unr;
static void aio_init_aioinfo(struct proc *p);
static void aio_onceonly(void);
static int aio_free_entry(struct aiocblist *aiocbe);
static void aio_process(struct aiocblist *aiocbe);
-static int aio_newproc(void);
+static int aio_newproc(int *);
static int aio_aqueue(struct thread *td, struct aiocb *job, int type,
int osigev);
static void aio_physwakeup(struct buf *bp);
static void aio_proc_rundown(void *arg, struct proc *p);
-static int aio_fphysio(struct aiocblist *aiocbe);
static int aio_qphysio(struct proc *p, struct aiocblist *iocb);
-static void aio_daemon(void *uproc);
+static void biohelper(void *, int);
+static void aio_daemon(void *param);
static void aio_swake_cb(struct socket *, struct sockbuf *);
static int aio_unload(void);
static int filt_aioattach(struct knote *kn);
@@ -306,6 +311,8 @@ static struct filterops lio_filtops =
static eventhandler_tag exit_tag, exec_tag;
+TASKQUEUE_DEFINE_THREAD(aiod_bio);
+
/*
* Main operations function for use as a kernel module.
*/
@@ -368,8 +375,11 @@ aio_onceonly(void)
kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
kqueue_add_filteropts(EVFILT_LIO, &lio_filtops);
TAILQ_INIT(&aio_freeproc);
- mtx_init(&aio_freeproc_mtx, "aio_freeproc", NULL, MTX_DEF);
+ sema_init(&aio_newproc_sem, 0, "aio_new_proc");
+ mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF);
+ mtx_init(&aio_sock_mtx, "aio_sock", NULL, MTX_DEF);
TAILQ_INIT(&aio_jobs);
+ aiod_unr = new_unrhdr(1, INT_MAX, NULL);
kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL,
NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL,
@@ -378,7 +388,7 @@ aio_onceonly(void)
NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL,
NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
- aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aio_liojob), NULL,
+ aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aioliojob), NULL,
NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
aiod_timeout = AIOD_TIMEOUT_DEFAULT;
aiod_lifetime = AIOD_LIFETIME_DEFAULT;
@@ -411,12 +421,15 @@ aio_unload(void)
error = kqueue_del_filteropts(EVFILT_AIO);
if (error)
return error;
-
async_io_version = 0;
aio_swake = NULL;
+ taskqueue_free(taskqueue_aiod_bio);
+ delete_unrhdr(aiod_unr);
EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
- mtx_destroy(&aio_freeproc_mtx);
+ mtx_destroy(&aio_job_mtx);
+ mtx_destroy(&aio_sock_mtx);
+ sema_destroy(&aio_newproc_sem);
p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
@@ -437,13 +450,12 @@ aio_init_aioinfo(struct proc *p)
ki->kaio_maxactive_count = max_aio_per_proc;
ki->kaio_active_count = 0;
ki->kaio_qallowed_count = max_aio_queue_per_proc;
- ki->kaio_queue_count = 0;
+ ki->kaio_count = 0;
ki->kaio_ballowed_count = max_buf_aio;
ki->kaio_buffer_count = 0;
- ki->kaio_buffer_finished_count = 0;
- TAILQ_INIT(&ki->kaio_jobdone);
+ TAILQ_INIT(&ki->kaio_all);
+ TAILQ_INIT(&ki->kaio_done);
TAILQ_INIT(&ki->kaio_jobqueue);
- TAILQ_INIT(&ki->kaio_bufdone);
TAILQ_INIT(&ki->kaio_bufqueue);
TAILQ_INIT(&ki->kaio_liojoblist);
TAILQ_INIT(&ki->kaio_sockqueue);
@@ -457,7 +469,7 @@ aio_init_aioinfo(struct proc *p)
}
while (num_aio_procs < target_aio_procs)
- aio_newproc();
+ aio_newproc(NULL);
}
static int
@@ -481,54 +493,53 @@ static int
aio_free_entry(struct aiocblist *aiocbe)
{
struct kaioinfo *ki;
- struct aio_liojob *lj;
+ struct aioliojob *lj;
struct proc *p;
- int error;
- int s;
-
- if (aiocbe->jobstate == JOBST_NULL)
- panic("aio_free_entry: freeing already free job");
p = aiocbe->userproc;
- KASSERT(curthread->td_proc == p,
- ("%s: called for non-curproc", __func__));
+
+ PROC_LOCK_ASSERT(p, MA_OWNED);
+ MPASS(curproc == p);
+ MPASS(aiocbe->jobstate == JOBST_JOBFINISHED);
+
ki = p->p_aioinfo;
+ MPASS(ki != NULL);
+
+ atomic_subtract_int(&num_queue_count, 1);
+
+ ki->kaio_count--;
+ MPASS(ki->kaio_count >= 0);
+
lj = aiocbe->lio;
- if (ki == NULL)
- panic("aio_free_entry: missing p->p_aioinfo");
+ if (lj) {
+ lj->lioj_count--;
+ lj->lioj_finished_count--;
- while (aiocbe->jobstate == JOBST_JOBRUNNING) {
- aiocbe->jobflags |= AIOCBLIST_RUNDOWN;
- tsleep(aiocbe, PRIBIO, "jobwai", 0);
- }
- if (aiocbe->bp == NULL) {
- if (ki->kaio_queue_count <= 0)
- panic("aio_free_entry: process queue size <= 0");
- if (num_queue_count <= 0)
- panic("aio_free_entry: system wide queue size <= 0");
-
- if (lj) {
- lj->lioj_queue_count--;
- if (aiocbe->jobflags & AIOCBLIST_DONE)
- lj->lioj_queue_finished_count--;
- }
- ki->kaio_queue_count--;
- if (aiocbe->jobflags & AIOCBLIST_DONE)
- ki->kaio_queue_finished_count--;
- num_queue_count--;
- } else {
- if (lj) {
- lj->lioj_buffer_count--;
- if (aiocbe->jobflags & AIOCBLIST_DONE)
- lj->lioj_buffer_finished_count--;
+ if (lj->lioj_count == 0 && lj->lioj_ref_count == 0) {
+ TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
+ /* lio is going away, we need to destroy any knotes */
+ knlist_delete(&lj->klist, curthread, 1);
+ sigqueue_take(&lj->lioj_ksi);
+ uma_zfree(aiolio_zone, lj);
}
- if (aiocbe->jobflags & AIOCBLIST_DONE)
- ki->kaio_buffer_finished_count--;
- ki->kaio_buffer_count--;
- num_buf_aio--;
}
+ TAILQ_REMOVE(&ki->kaio_done, aiocbe, plist);
+ TAILQ_REMOVE(&ki->kaio_all, aiocbe, allist);
+
/* aiocbe is going away, we need to destroy any knotes */
+ knlist_delete(&aiocbe->klist, curthread, 1);
+ sigqueue_take(&aiocbe->ksi);
+
+ MPASS(aiocbe->bp == NULL);
+ aiocbe->jobstate = JOBST_NULL;
+
+ /* Wake up anyone who has interest to do cleanup work. */
+ if (ki->kaio_flags & (KAIO_WAKEUP | KAIO_RUNDOWN)) {
+ ki->kaio_flags &= ~KAIO_WAKEUP;
+ wakeup(&p->p_aioinfo);
+ }
+ PROC_UNLOCK(p);
/*
* The thread argument here is used to find the owning process
@@ -550,55 +561,11 @@ aio_free_entry(struct aiocblist *aiocbe)
* at open time, but this is already true of file descriptors in
* a multithreaded process.
*/
- if (lj)
- knlist_delete(&lj->klist, curthread, 0);
- knlist_delete(&aiocbe->klist, curthread, 0);
-
- if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN)
- && ((ki->kaio_buffer_count == 0) && (ki->kaio_queue_count == 0)))) {
- ki->kaio_flags &= ~KAIO_WAKEUP;
- wakeup(p);
- }
-
- if (aiocbe->jobstate == JOBST_JOBQBUF) {
- if ((error = aio_fphysio(aiocbe)) != 0)
- return (error);
- if (aiocbe->jobstate != JOBST_JOBBFINISHED)
- panic("aio_free_entry: invalid physio finish-up state");
- s = splbio();
- TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
- splx(s);
- } else if (aiocbe->jobstate == JOBST_JOBQGLOBAL) {
- s = splnet();
- TAILQ_REMOVE(&aio_jobs, aiocbe, list);
- TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
- splx(s);
- } else if (aiocbe->jobstate == JOBST_JOBFINISHED)
- TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist);
- else if (aiocbe->jobstate == JOBST_JOBBFINISHED) {
- s = splbio();
- TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
- splx(s);
- if (aiocbe->bp) {
- vunmapbuf(aiocbe->bp);
- relpbuf(aiocbe->bp, NULL);
- aiocbe->bp = NULL;
- }
- }
- if (lj && (lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) {
- TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
- PROC_LOCK(p);
- sigqueue_take(&lj->lioj_ksi);
- PROC_UNLOCK(p);
- uma_zfree(aiolio_zone, lj);
- }
- aiocbe->jobstate = JOBST_NULL;
fdrop(aiocbe->fd_file, curthread);
crfree(aiocbe->cred);
- PROC_LOCK(p);
- sigqueue_take(&aiocbe->ksi);
- PROC_UNLOCK(p);
uma_zfree(aiocb_zone, aiocbe);
+ PROC_LOCK(p);
+
return (0);
}
@@ -608,10 +575,9 @@ aio_free_entry(struct aiocblist *aiocbe)
static void
aio_proc_rundown(void *arg, struct proc *p)
{
- int s;
struct kaioinfo *ki;
- struct aio_liojob *lj, *ljn;
- struct aiocblist *aiocbe, *aiocbn;
+ struct aioliojob *lj;
+ struct aiocblist *cbe, *cbn;
struct file *fp;
struct socket *so;
@@ -621,105 +587,72 @@ aio_proc_rundown(void *arg, struct proc *p)
if (ki == NULL)
return;
- mtx_lock(&Giant);
- ki->kaio_flags |= LIOJ_SIGNAL_POSTED;
- while ((ki->kaio_active_count > 0) || (ki->kaio_buffer_count >
- ki->kaio_buffer_finished_count)) {
- ki->kaio_flags |= KAIO_RUNDOWN;
- if (tsleep(p, PRIBIO, "kaiowt", aiod_timeout))
- break;
- }
+ PROC_LOCK(p);
+
+restart:
+ ki->kaio_flags |= KAIO_RUNDOWN;
/*
- * Move any aio ops that are waiting on socket I/O to the normal job
- * queues so they are cleaned up with any others.
+ * Try to cancel all pending requests. This code simulates
+ * aio_cancel on all pending I/O requests.
*/
- s = splnet();
- TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_sockqueue, plist, aiocbn) {
- fp = aiocbe->fd_file;
- if (fp != NULL) {
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, aiocbe, list);
- if (TAILQ_EMPTY(&so->so_aiojobq)) {
- SOCKBUF_LOCK(&so->so_snd);
- so->so_snd.sb_flags &= ~SB_AIO;
- SOCKBUF_UNLOCK(&so->so_snd);
- SOCKBUF_LOCK(&so->so_rcv);
- so->so_rcv.sb_flags &= ~SB_AIO;
- SOCKBUF_UNLOCK(&so->so_rcv);
- }
- }
- TAILQ_REMOVE(&ki->kaio_sockqueue, aiocbe, plist);
- TAILQ_INSERT_HEAD(&aio_jobs, aiocbe, list);
- TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, aiocbe, plist);
+ while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) {
+ fp = cbe->fd_file;
+ so = fp->f_data;
+ mtx_lock(&aio_sock_mtx);
+ TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
+ mtx_unlock(&aio_sock_mtx);
+ TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
+ TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist);
+ cbe->jobstate = JOBST_JOBQGLOBAL;
}
- splx(s);
-restart1:
- TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_jobdone, plist, aiocbn) {
- if (aio_free_entry(aiocbe))
- goto restart1;
+ TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
+ mtx_lock(&aio_job_mtx);
+ if (cbe->jobstate == JOBST_JOBQGLOBAL) {
+ TAILQ_REMOVE(&aio_jobs, cbe, list);
+ mtx_unlock(&aio_job_mtx);
+ cbe->jobstate = JOBST_JOBFINISHED;
+ cbe->uaiocb._aiocb_private.status = -1;
+ cbe->uaiocb._aiocb_private.error = ECANCELED;
+ TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
+ aio_bio_done_notify(p, cbe, DONE_QUEUE);
+ } else {
+ mtx_unlock(&aio_job_mtx);
+ }
}
-restart2:
- TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_jobqueue, plist, aiocbn) {
- if (aio_free_entry(aiocbe))
- goto restart2;
- }
+ if (TAILQ_FIRST(&ki->kaio_sockqueue))
+ goto restart;
-/*
- * Note the use of lots of splbio here, trying to avoid splbio for long chains
- * of I/O. Probably unnecessary.
- */
-restart3:
- s = splbio();
- while (TAILQ_FIRST(&ki->kaio_bufqueue)) {
+ /* Wait for all running I/O to be finished */
+ if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
+ TAILQ_FIRST(&ki->kaio_jobqueue)) {
ki->kaio_flags |= KAIO_WAKEUP;
- tsleep(p, PRIBIO, "aioprn", 0);
- splx(s);
- goto restart3;
+ msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz);
+ goto restart;
}
- splx(s);
-
-restart4:
- s = splbio();
- TAILQ_FOREACH_SAFE(aiocbe, &ki->kaio_bufdone, plist, aiocbn) {
- if (aio_free_entry(aiocbe)) {
- splx(s);
- goto restart4;
- }
- }
- splx(s);
- /*
- * If we've slept, jobs might have moved from one queue to another.
- * Retry rundown if we didn't manage to empty the queues.
- */
- if (TAILQ_FIRST(&ki->kaio_jobdone) != NULL ||
- TAILQ_FIRST(&ki->kaio_jobqueue) != NULL ||
- TAILQ_FIRST(&ki->kaio_bufqueue) != NULL ||
- TAILQ_FIRST(&ki->kaio_bufdone) != NULL)
- goto restart1;
-
- TAILQ_FOREACH_SAFE(lj, &ki->kaio_liojoblist, lioj_list, ljn) {
- if ((lj->lioj_buffer_count == 0) && (lj->lioj_queue_count ==
- 0)) {
+ /* Free all completed I/O requests. */
+ while ((cbe = TAILQ_FIRST(&ki->kaio_done)) != NULL)
+ aio_free_entry(cbe);
+
+ while ((lj = TAILQ_FIRST(&ki->kaio_liojoblist)) != NULL) {
+ if (lj->lioj_count == 0 && lj->lioj_ref_count == 0) {
TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
+ knlist_delete(&lj->klist, curthread, 1);
+ sigqueue_take(&lj->lioj_ksi);
uma_zfree(aiolio_zone, lj);
} else {
-#ifdef DIAGNOSTIC
- printf("LIO job not cleaned up: B:%d, BF:%d, Q:%d, "
- "QF:%d\n", lj->lioj_buffer_count,
- lj->lioj_buffer_finished_count,
- lj->lioj_queue_count,
- lj->lioj_queue_finished_count);
-#endif
+ panic("LIO job not cleaned up: C:%d, FC:%d, RC:%d\n",
+ lj->lioj_count, lj->lioj_finished_count,
+ lj->lioj_ref_count);
}
}
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
- mtx_unlock(&Giant);
+ PROC_UNLOCK(p);
}
/*
@@ -728,25 +661,24 @@ restart4:
static struct aiocblist *
aio_selectjob(struct aiothreadlist *aiop)
{
- int s;
struct aiocblist *aiocbe;
struct kaioinfo *ki;
struct proc *userp;
- s = splnet();
+ mtx_assert(&aio_job_mtx, MA_OWNED);
TAILQ_FOREACH(aiocbe, &aio_jobs, list) {
userp = aiocbe->userproc;
ki = userp->p_aioinfo;
if (ki->kaio_active_count < ki->kaio_maxactive_count) {
TAILQ_REMOVE(&aio_jobs, aiocbe, list);
- splx(s);
- return (aiocbe);
+ /* Account for currently active jobs. */
+ ki->kaio_active_count++;
+ aiocbe->jobstate = JOBST_JOBRUNNING;
+ break;
}
}
- splx(s);
-
- return (NULL);
+ return (aiocbe);
}
/*
@@ -754,6 +686,15 @@ aio_selectjob(struct aiothreadlist *aiop)
* the non-physio version of the operations. The normal vn operations are used,
* and this code should work in all instances for every type of file, including
* pipes, sockets, fifos, and regular files.
+ *
+ * XXX I don't think these code work well with pipes, sockets and fifo, the
+ * problem is the aiod threads can be blocked if there is not data or no
+ * buffer space, and file was not opened with O_NONBLOCK, all aiod threads
+ * will be blocked if there is couple of such processes. We need a FOF_OFFSET
+ * like flag to override f_flag to tell low level system to do non-blocking
+ * I/O, we can not muck O_NONBLOCK because there is full of race between
+ * userland and aiod threads, although there is a trigger mechanism for socket,
+ * but it also does not work well if userland is misbehaviored.
*/
static void
aio_process(struct aiocblist *aiocbe)
@@ -763,6 +704,7 @@ aio_process(struct aiocblist *aiocbe)
struct proc *mycp;
struct aiocb *cb;
struct file *fp;
+ struct socket *so;
struct uio auio;
struct iovec aiov;
int cnt;
@@ -811,9 +753,17 @@ aio_process(struct aiocblist *aiocbe)
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
error = 0;
if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
- PROC_LOCK(aiocbe->userproc);
- psignal(aiocbe->userproc, SIGPIPE);
- PROC_UNLOCK(aiocbe->userproc);
+ int sigpipe = 1;
+ if (fp->f_type == DTYPE_SOCKET) {
+ so = fp->f_data;
+ if (so->so_options & SO_NOSIGPIPE)
+ sigpipe = 0;
+ }
+ if (sigpipe) {
+ PROC_LOCK(aiocbe->userproc);
+ psignal(aiocbe->userproc, SIGPIPE);
+ PROC_UNLOCK(aiocbe->userproc);
+ }
}
}
@@ -824,77 +774,61 @@ aio_process(struct aiocblist *aiocbe)
}
static void
-aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type){
- int lj_done;
- struct aio_liojob *lj;
+aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type)
+{
+ struct aioliojob *lj;
struct kaioinfo *ki;
+ int lj_done;
+ PROC_LOCK_ASSERT(userp, MA_OWNED);
ki = userp->p_aioinfo;
lj = aiocbe->lio;
lj_done = 0;
if (lj) {
- if (type == DONE_QUEUE)
- lj->lioj_queue_finished_count++;
- else
- lj->lioj_buffer_finished_count++;
- if (lj->lioj_queue_finished_count +
- lj->lioj_buffer_finished_count ==
- lj->lioj_total_count)
+ lj->lioj_finished_count++;
+ if (lj->lioj_count == lj->lioj_finished_count)
lj_done = 1;
}
+ if (type == DONE_QUEUE) {
+ aiocbe->jobflags |= AIOCBLIST_DONE;
+ } else {
+ aiocbe->jobflags |= AIOCBLIST_BUFDONE;
+ ki->kaio_buffer_count--;
+ }
+ TAILQ_INSERT_TAIL(&ki->kaio_done, aiocbe, plist);
+ aiocbe->jobstate = JOBST_JOBFINISHED;
+ if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
+ aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID)
+ aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi);
- if (ki) {
- if (type == DONE_QUEUE) {
- ki->kaio_queue_finished_count++;
- TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist);
- } else {
- ki->kaio_buffer_finished_count++;
- TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
- }
- if (lj_done) {
- if (!knlist_empty(&lj->klist)
- && lj->lioj_signal.sigev_notify ==
- SIGEV_KEVENT) {
- lj->lioj_flags |= LIOJ_KEVENT_POSTED;
- KNOTE_UNLOCKED(&lj->klist, 0);
- }
- if ((lj->lioj_flags &
- (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
- == LIOJ_SIGNAL
- && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
- lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
- PROC_LOCK(userp);
- aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi);
- PROC_UNLOCK(userp);
- lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
- }
- }
- KNOTE_UNLOCKED(&aiocbe->klist, 0);
+ KNOTE_LOCKED(&aiocbe->klist, 1);
- if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
- ki->kaio_flags &= ~KAIO_WAKEUP;
- wakeup(userp);
+ if (lj_done) {
+ if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
+ lj->lioj_flags |= LIOJ_KEVENT_POSTED;
+ KNOTE_LOCKED(&lj->klist, 1);
+ }
+ if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
+ == LIOJ_SIGNAL
+ && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
+ lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
+ aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi);
+ lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
}
}
-
- if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
- aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) {
- PROC_LOCK(userp);
- aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi);
- PROC_UNLOCK(userp);
+ if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
+ ki->kaio_flags &= ~KAIO_WAKEUP;
+ wakeup(&userp->p_aioinfo);
}
}
+
/*
* The AIO daemon, most of the actual work is done in aio_process,
* but the setup (and address space mgmt) is done in this routine.
*/
static void
-aio_daemon(void *uproc)
+aio_daemon(void *_id)
{
- int s;
- struct aiocb *cb;
struct aiocblist *aiocbe;
struct aiothreadlist *aiop;
struct kaioinfo *ki;
@@ -903,6 +837,7 @@ aio_daemon(void *uproc)
struct thread *td = curthread;
struct pgrp *newpgrp;
struct session *newsess;
+ int id = (intptr_t)_id;
/*
* Local copies of curproc (cp) and vmspace (myvm)
@@ -923,18 +858,16 @@ aio_daemon(void *uproc)
/*
* Place thread (lightweight process) onto the AIO free thread list.
*/
- mtx_lock(&aio_freeproc_mtx);
+ mtx_lock(&aio_job_mtx);
TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
- mtx_unlock(&aio_freeproc_mtx);
+ mtx_unlock(&aio_job_mtx);
/*
* Get rid of our current filedescriptors. AIOD's don't need any
* filedescriptors, except as temporarily inherited from the client.
*/
- mtx_lock(&Giant);
fdfree(td);
- mtx_unlock(&Giant);
/* The daemon resides in its own pgrp. */
MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP,
M_WAITOK | M_ZERO);
@@ -944,14 +877,14 @@ aio_daemon(void *uproc)
sx_xlock(&proctree_lock);
enterpgrp(mycp, mycp->p_pid, newpgrp, newsess);
sx_xunlock(&proctree_lock);
- mtx_lock(&Giant);
/*
* Wakeup parent process. (Parent sleeps to keep from blasting away
* and creating too many daemons.)
*/
- wakeup(mycp);
+ sema_post(&aio_newproc_sem);
+ mtx_lock(&aio_job_mtx);
for (;;) {
/*
* curcp is the current daemon process context.
@@ -962,22 +895,18 @@ aio_daemon(void *uproc)
/*
* Take daemon off of free queue
*/
- mtx_lock(&aio_freeproc_mtx);
if (aiop->aiothreadflags & AIOP_FREE) {
TAILQ_REMOVE(&aio_freeproc, aiop, list);
aiop->aiothreadflags &= ~AIOP_FREE;
}
- mtx_unlock(&aio_freeproc_mtx);
/*
* Check for jobs.
*/
while ((aiocbe = aio_selectjob(aiop)) != NULL) {
- cb = &aiocbe->uaiocb;
+ mtx_unlock(&aio_job_mtx);
userp = aiocbe->userproc;
- aiocbe->jobstate = JOBST_JOBRUNNING;
-
/*
* Connect to process address space for user program.
*/
@@ -1012,29 +941,30 @@ aio_daemon(void *uproc)
ki = userp->p_aioinfo;
- /* Account for currently active jobs. */
- ki->kaio_active_count++;
-
/* Do the I/O function. */
aio_process(aiocbe);
- s = splbio();
- /* Decrement the active job count. */
- ki->kaio_active_count--;
-
- aiocbe->jobflags |= AIOCBLIST_DONE;
- aiocbe->jobstate = JOBST_JOBFINISHED;
+ PROC_LOCK(userp);
+ TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
aio_bio_done_notify(userp, aiocbe, DONE_QUEUE);
if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
wakeup(aiocbe);
aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
}
+ PROC_UNLOCK(userp);
+
+ mtx_lock(&aio_job_mtx);
+ /* Decrement the active job count. */
+ ki->kaio_active_count--;
}
/*
* Disconnect from user address space.
*/
if (curcp != mycp) {
+
+ mtx_unlock(&aio_job_mtx);
+
/* Get the user address space to disconnect from. */
tmpvm = mycp->p_vmspace;
@@ -1053,9 +983,18 @@ aio_daemon(void *uproc)
vmspace_free(tmpvm);
curcp = mycp;
+
+ mtx_lock(&aio_job_mtx);
+ /*
+ * We have to restart to avoid race, we only sleep if
+ * no job can be selected, that should be
+ * curcp == mycp.
+ */
+ continue;
}
- mtx_lock(&aio_freeproc_mtx);
+ mtx_assert(&aio_job_mtx, MA_OWNED);
+
TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
aiop->aiothreadflags |= AIOP_FREE;
@@ -1063,18 +1002,16 @@ aio_daemon(void *uproc)
* If daemon is inactive for a long time, allow it to exit,
* thereby freeing resources.
*/
- if (msleep(aiop->aiothread, &aio_freeproc_mtx, PDROP | PRIBIO,
- "aiordy", aiod_lifetime)) {
- s = splnet();
+ if (msleep(aiop->aiothread, &aio_job_mtx, PRIBIO, "aiordy",
+ aiod_lifetime)) {
if (TAILQ_EMPTY(&aio_jobs)) {
- mtx_lock(&aio_freeproc_mtx);
if ((aiop->aiothreadflags & AIOP_FREE) &&
(num_aio_procs > target_aio_procs)) {
TAILQ_REMOVE(&aio_freeproc, aiop, list);
- mtx_unlock(&aio_freeproc_mtx);
- splx(s);
- uma_zfree(aiop_zone, aiop);
num_aio_procs--;
+ mtx_unlock(&aio_job_mtx);
+ uma_zfree(aiop_zone, aiop);
+ free_unr(aiod_unr, id);
#ifdef DIAGNOSTIC
if (mycp->p_vmspace->vm_refcnt <= 1) {
printf("AIOD: bad vm refcnt for"
@@ -1084,36 +1021,40 @@ aio_daemon(void *uproc)
#endif
kthread_exit(0);
}
- mtx_unlock(&aio_freeproc_mtx);
}
- splx(s);
}
}
+ mtx_unlock(&aio_job_mtx);
+ panic("shouldn't be here\n");
}
/*
- * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The
+ * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The
* AIO daemon modifies its environment itself.
*/
static int
-aio_newproc(void)
+aio_newproc(int *start)
{
int error;
struct proc *p;
+ int id;
- error = kthread_create(aio_daemon, curproc, &p, RFNOWAIT, 0, "aiod%d",
- num_aio_procs);
- if (error)
- return (error);
-
- /*
- * Wait until daemon is started, but continue on just in case to
- * handle error conditions.
- */
- error = tsleep(p, PZERO, "aiosta", aiod_timeout);
-
- num_aio_procs++;
-
+ id = alloc_unr(aiod_unr);
+ error = kthread_create(aio_daemon, (void *)(intptr_t)id, &p,
+ RFNOWAIT, 0, "aiod%d", id);
+ if (error == 0) {
+ /*
+ * Wait until daemon is started.
+ */
+ sema_wait(&aio_newproc_sem);
+ mtx_lock(&aio_job_mtx);
+ num_aio_procs++;
+ if (start != NULL)
+ *start--;
+ mtx_unlock(&aio_job_mtx);
+ } else {
+ free_unr(aiod_unr, id);
+ }
return (error);
}
@@ -1129,15 +1070,13 @@ aio_newproc(void)
static int
aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
{
- int error;
struct aiocb *cb;
struct file *fp;
struct buf *bp;
struct vnode *vp;
struct kaioinfo *ki;
- struct aio_liojob *lj;
- int s, lj_done = 0;
- int notify;
+ struct aioliojob *lj;
+ int error;
cb = &aiocbe->uaiocb;
fp = aiocbe->fd_file;
@@ -1173,16 +1112,18 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
if (ki->kaio_buffer_count >= ki->kaio_ballowed_count)
return (-1);
- ki->kaio_buffer_count++;
-
- lj = aiocbe->lio;
- if (lj)
- lj->lioj_buffer_count++;
-
/* Create and build a buffer header for a transfer. */
bp = (struct buf *)getpbuf(NULL);
BUF_KERNPROC(bp);
+ PROC_LOCK(p);
+ ki->kaio_count++;
+ ki->kaio_buffer_count++;
+ lj = aiocbe->lio;
+ if (lj)
+ lj->lioj_count++;
+ PROC_UNLOCK(p);
+
/*
* Get a copy of the kva from the physical buffer.
*/
@@ -1206,115 +1147,34 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
goto doerror;
}
- s = splbio();
+ PROC_LOCK(p);
aiocbe->bp = bp;
bp->b_caller1 = (void *)aiocbe;
TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
+ TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
aiocbe->jobstate = JOBST_JOBQBUF;
cb->_aiocb_private.status = cb->aio_nbytes;
- num_buf_aio++;
+ PROC_UNLOCK(p);
+
+ atomic_add_int(&num_queue_count, 1);
+ atomic_add_int(&num_buf_aio, 1);
+
bp->b_error = 0;
- splx(s);
+ TASK_INIT(&aiocbe->biotask, 0, biohelper, aiocbe);
/* Perform transfer. */
dev_strategy(vp->v_rdev, bp);
-
- notify = 0;
- s = splbio();
-
- /*
- * If we had an error invoking the request, or an error in processing
- * the request before we have returned, we process it as an error in
- * transfer. Note that such an I/O error is not indicated immediately,
- * but is returned using the aio_error mechanism. In this case,
- * aio_suspend will return immediately.
- */
- if (bp->b_error || (bp->b_ioflags & BIO_ERROR)) {
- struct aiocb *job = aiocbe->uuaiocb;
-
- aiocbe->uaiocb._aiocb_private.status = 0;
- suword(&job->_aiocb_private.status, 0);
- aiocbe->uaiocb._aiocb_private.error = bp->b_error;
- suword(&job->_aiocb_private.error, bp->b_error);
-
- if (lj) {
- lj->lioj_buffer_finished_count++;
- if (lj->lioj_queue_finished_count +
- lj->lioj_buffer_finished_count ==
- lj->lioj_total_count)
- lj_done = 1;
- }
-
- ki->kaio_buffer_finished_count++;
-
- if (aiocbe->jobstate != JOBST_JOBBFINISHED) {
- aiocbe->jobstate = JOBST_JOBBFINISHED;
- aiocbe->jobflags |= AIOCBLIST_DONE;
- TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
- notify = 1;
- }
- }
- splx(s);
- if (notify) {
- if (lj && !knlist_empty(&lj->klist)) {
- lj->lioj_flags |= LIOJ_KEVENT_POSTED;
- KNOTE_UNLOCKED(&lj->klist, 0);
- }
- KNOTE_UNLOCKED(&aiocbe->klist, 0);
-
- }
- if (cb->aio_lio_opcode == LIO_WRITE) {
- aiocbe->outputcharge += btodb(cb->aio_nbytes);
- } else if (cb->aio_lio_opcode == LIO_READ) {
- aiocbe->inputcharge += btodb(cb->aio_nbytes);
- }
return (0);
doerror:
+ PROC_LOCK(p);
+ ki->kaio_count--;
ki->kaio_buffer_count--;
if (lj)
- lj->lioj_buffer_count--;
+ lj->lioj_count--;
aiocbe->bp = NULL;
- relpbuf(bp, NULL);
- return (error);
-}
-
-/*
- * This waits/tests physio completion.
- */
-static int
-aio_fphysio(struct aiocblist *iocb)
-{
- int s;
- struct buf *bp;
- int error;
-
- bp = iocb->bp;
-
- s = splbio();
- while ((bp->b_flags & B_DONE) == 0) {
- if (tsleep(bp, PRIBIO, "physstr", aiod_timeout)) {
- if ((bp->b_flags & B_DONE) == 0) {
- splx(s);
- return (EINPROGRESS);
- } else
- break;
- }
- }
- splx(s);
-
- /* Release mapping into kernel space. */
- vunmapbuf(bp);
- iocb->bp = 0;
-
- error = 0;
-
- /* Check for an error. */
- if (bp->b_ioflags & BIO_ERROR)
- error = bp->b_error;
-
+ PROC_UNLOCK(p);
relpbuf(bp, NULL);
return (error);
}
@@ -1325,7 +1185,7 @@ aio_fphysio(struct aiocblist *iocb)
static void
aio_swake_cb(struct socket *so, struct sockbuf *sb)
{
- struct aiocblist *cb,*cbn;
+ struct aiocblist *cb, *cbn;
struct proc *p;
struct kaioinfo *ki = NULL;
int opcode, wakecount = 0;
@@ -1343,28 +1203,39 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb)
SOCKBUF_UNLOCK(&so->so_rcv);
}
+ mtx_lock(&aio_sock_mtx);
TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
if (opcode == cb->uaiocb.aio_lio_opcode) {
+ if (cb->jobstate != JOBST_JOBQGLOBAL)
+ panic("invalid queue value");
p = cb->userproc;
ki = p->p_aioinfo;
TAILQ_REMOVE(&so->so_aiojobq, cb, list);
+ PROC_LOCK(p);
TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
- TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
+ /*
+ * XXX check AIO_RUNDOWN, and don't put on
+ * jobqueue if it was set.
+ */
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
+ cb->jobstate = JOBST_JOBQGLOBAL;
+ mtx_lock(&aio_job_mtx);
+ TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
+ mtx_unlock(&aio_job_mtx);
+ PROC_UNLOCK(p);
wakecount++;
- if (cb->jobstate != JOBST_JOBQGLOBAL)
- panic("invalid queue value");
}
}
+ mtx_unlock(&aio_sock_mtx);
while (wakecount--) {
- mtx_lock(&aio_freeproc_mtx);
+ mtx_lock(&aio_job_mtx);
if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
TAILQ_REMOVE(&aio_freeproc, aiop, list);
aiop->aiothreadflags &= ~AIOP_FREE;
wakeup(aiop->aiothread);
}
- mtx_unlock(&aio_freeproc_mtx);
+ mtx_unlock(&aio_job_mtx);
}
}
@@ -1373,16 +1244,12 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb)
* technique is done in this code.
*/
static int
-_aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj,
+_aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
int type, int oldsigev)
{
struct proc *p = td->td_proc;
struct file *fp;
- unsigned int fd;
struct socket *so;
- int s;
- int error;
- int opcode;
struct aiocblist *aiocbe;
struct aiothreadlist *aiop;
struct kaioinfo *ki;
@@ -1390,12 +1257,17 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj,
struct kqueue *kq;
struct file *kq_fp;
struct sockbuf *sb;
+ int opcode;
+ int error;
+ int fd;
+ int jid;
+
+ ki = p->p_aioinfo;
- aiocbe = uma_zalloc(aiocb_zone, M_WAITOK);
+ aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO);
aiocbe->inputcharge = 0;
aiocbe->outputcharge = 0;
- /* XXX - need a lock */
- knlist_init(&aiocbe->klist, NULL, NULL, NULL, NULL);
+ knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL);
suword(&job->_aiocb_private.status, -1);
suword(&job->_aiocb_private.error, 0);
@@ -1445,8 +1317,7 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj,
}
if (error) {
uma_zfree(aiocb_zone, aiocbe);
- if (type == 0)
- suword(&job->_aiocb_private.error, EBADF);
+ suword(&job->_aiocb_private.error, EBADF);
return (error);
}
aiocbe->fd_file = fp;
@@ -1455,37 +1326,34 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj,
error = EINVAL;
goto aqueue_fail;
}
- error = suword(&job->_aiocb_private.kernelinfo, jobrefid);
- if (error) {
- error = EINVAL;
- goto aqueue_fail;
- }
- aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jobrefid;
+
+ mtx_lock(&aio_job_mtx);
+ jid = jobrefid;
if (jobrefid == LONG_MAX)
jobrefid = 1;
else
jobrefid++;
+ mtx_unlock(&aio_job_mtx);
+
+ error = suword(&job->_aiocb_private.kernelinfo, jid);
+ if (error) {
+ error = EINVAL;
+ goto aqueue_fail;
+ }
+ aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jid;
if (opcode == LIO_NOP) {
fdrop(fp, td);
uma_zfree(aiocb_zone, aiocbe);
- if (type == 0) {
- suword(&job->_aiocb_private.error, 0);
- suword(&job->_aiocb_private.status, 0);
- suword(&job->_aiocb_private.kernelinfo, 0);
- }
return (0);
}
if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) {
- if (type == 0)
- suword(&job->_aiocb_private.status, 0);
error = EINVAL;
goto aqueue_fail;
}
if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue;
- kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr;
} else
goto no_kqueue;
error = fget(td, (u_int)kev.ident, &kq_fp);
@@ -1501,14 +1369,14 @@ _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj,
kev.filter = EVFILT_AIO;
kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
kev.data = (intptr_t)aiocbe;
+ kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr;
error = kqueue_register(kq, &kev, td, 1);
fdrop(kq_fp, td);
aqueue_fail:
if (error) {
fdrop(fp, td);
uma_zfree(aiocb_zone, aiocbe);
- if (type == 0)
- suword(&job->_aiocb_private.error, error);
+ suword(&job->_aiocb_private.error, error);
goto done;
}
no_kqueue:
@@ -1519,7 +1387,6 @@ no_kqueue:
aiocbe->cred = crhold(td->td_ucred);
aiocbe->jobflags = 0;
aiocbe->lio = lj;
- ki = p->p_aioinfo;
if (fp->f_type == DTYPE_SOCKET) {
/*
@@ -1538,47 +1405,54 @@ no_kqueue:
so = fp->f_data;
sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
SOCKBUF_LOCK(sb);
- s = splnet();
if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
LIO_WRITE) && (!sowriteable(so)))) {
+ mtx_lock(&aio_sock_mtx);
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
- TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
+ mtx_unlock(&aio_sock_mtx);
+
sb->sb_flags |= SB_AIO;
- aiocbe->jobstate = JOBST_JOBQGLOBAL; /* XXX */
- ki->kaio_queue_count++;
- num_queue_count++;
+ PROC_LOCK(p);
+ TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
+ TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
+ aiocbe->jobstate = JOBST_JOBQSOCK;
+ ki->kaio_count++;
+ if (lj)
+ lj->lioj_count++;
+ PROC_UNLOCK(p);
SOCKBUF_UNLOCK(sb);
- splx(s);
+ atomic_add_int(&num_queue_count, 1);
error = 0;
goto done;
}
SOCKBUF_UNLOCK(sb);
- splx(s);
}
if ((error = aio_qphysio(p, aiocbe)) == 0)
goto done;
+#if 0
if (error > 0) {
- suword(&job->_aiocb_private.status, 0);
aiocbe->uaiocb._aiocb_private.error = error;
suword(&job->_aiocb_private.error, error);
goto done;
}
-
+#endif
/* No buffer for daemon I/O. */
aiocbe->bp = NULL;
- ki->kaio_queue_count++;
+ PROC_LOCK(p);
+ ki->kaio_count++;
if (lj)
- lj->lioj_queue_count++;
- s = splnet();
+ lj->lioj_count++;
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
+ TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
+
+ mtx_lock(&aio_job_mtx);
TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
- splx(s);
aiocbe->jobstate = JOBST_JOBQGLOBAL;
+ PROC_UNLOCK(p);
- num_queue_count++;
- error = 0;
+ atomic_add_int(&num_queue_count, 1);
/*
* If we don't have a free AIO process, and we are below our quota, then
@@ -1587,8 +1461,8 @@ no_kqueue:
* (thread) due to resource issues, we return an error for now (EAGAIN),
* which is likely not the correct thing to do.
*/
- mtx_lock(&aio_freeproc_mtx);
retryproc:
+ error = 0;
if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
TAILQ_REMOVE(&aio_freeproc, aiop, list);
aiop->aiothreadflags &= ~AIOP_FREE;
@@ -1597,14 +1471,16 @@ retryproc:
((ki->kaio_active_count + num_aio_resv_start) <
ki->kaio_maxactive_count)) {
num_aio_resv_start++;
- mtx_unlock(&aio_freeproc_mtx);
- error = aio_newproc();
- mtx_lock(&aio_freeproc_mtx);
- num_aio_resv_start--;
- if (error)
+ mtx_unlock(&aio_job_mtx);
+ error = aio_newproc(&num_aio_resv_start);
+ mtx_lock(&aio_job_mtx);
+ if (error) {
+ num_aio_resv_start--;
goto retryproc;
+ }
}
- mtx_unlock(&aio_freeproc_mtx);
+ mtx_unlock(&aio_job_mtx);
+
done:
return (error);
}
@@ -1625,7 +1501,7 @@ aio_aqueue(struct thread *td, struct aiocb *job, int type, int oldsigev)
return (EAGAIN);
ki = p->p_aioinfo;
- if (ki->kaio_queue_count >= ki->kaio_qallowed_count)
+ if (ki->kaio_count >= ki->kaio_qallowed_count)
return (EAGAIN);
return _aio_aqueue(td, job, NULL, type, oldsigev);
@@ -1639,44 +1515,25 @@ int
aio_return(struct thread *td, struct aio_return_args *uap)
{
struct proc *p = td->td_proc;
- int s;
- long jobref;
- struct aiocblist *cb, *ncb;
- struct aiocb *ujob;
+ struct aiocblist *cb;
+ struct aiocb *uaiocb;
struct kaioinfo *ki;
-
- ujob = uap->aiocbp;
- jobref = fuword(&ujob->_aiocb_private.kernelinfo);
- if (jobref == -1 || jobref == 0)
- return (EINVAL);
+ int status, error;
ki = p->p_aioinfo;
if (ki == NULL)
return (EINVAL);
+ uaiocb = uap->aiocbp;
PROC_LOCK(p);
- TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
- if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref)
- goto done;
- }
-
- s = splbio();
- /* aio_physwakeup */
- TAILQ_FOREACH_SAFE(cb, &ki->kaio_bufdone, plist, ncb) {
- if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo)
- == jobref) {
+ TAILQ_FOREACH(cb, &ki->kaio_done, plist) {
+ if (cb->uuaiocb == uaiocb)
break;
- }
}
- splx(s);
- done:
- PROC_UNLOCK(p);
if (cb != NULL) {
- if (ujob == cb->uuaiocb) {
- td->td_retval[0] =
- cb->uaiocb._aiocb_private.status;
- } else
- td->td_retval[0] = EFAULT;
+ MPASS(cb->jobstate == JOBST_JOBFINISHED);
+ status = cb->uaiocb._aiocb_private.status;
+ error = cb->uaiocb._aiocb_private.error;
+ td->td_retval[0] = status;
if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
p->p_stats->p_ru.ru_oublock +=
cb->outputcharge;
@@ -1686,9 +1543,13 @@ aio_return(struct thread *td, struct aio_return_args *uap)
cb->inputcharge = 0;
}
aio_free_entry(cb);
- return (0);
- }
- return (EINVAL);
+ suword(&uaiocb->_aiocb_private.error, error);
+ suword(&uaiocb->_aiocb_private.status, status);
+ error = 0;
+ } else
+ error = EINVAL;
+ PROC_UNLOCK(p);
+ return (error);
}
/*
@@ -1702,12 +1563,12 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
struct timespec ts;
struct aiocb *const *cbptr, *cbp;
struct kaioinfo *ki;
- struct aiocblist *cb;
- int i;
- int njoblist;
- int error, s, timo;
- long *ijoblist;
+ struct aiocblist *cb, *cbfirst;
struct aiocb **ujoblist;
+ int njoblist;
+ int error;
+ int timo;
+ int i;
if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX)
return (EINVAL);
@@ -1732,7 +1593,6 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
return (EAGAIN);
njoblist = 0;
- ijoblist = uma_zalloc(aiol_zone, M_WAITOK);
ujoblist = uma_zalloc(aiol_zone, M_WAITOK);
cbptr = uap->aiocbp;
@@ -1741,69 +1601,44 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
if (cbp == 0)
continue;
ujoblist[njoblist] = cbp;
- ijoblist[njoblist] = fuword(&cbp->_aiocb_private.kernelinfo);
njoblist++;
}
if (njoblist == 0) {
- uma_zfree(aiol_zone, ijoblist);
uma_zfree(aiol_zone, ujoblist);
return (0);
}
- error = 0;
+ PROC_LOCK(p);
for (;;) {
- PROC_LOCK(p);
- TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
- for (i = 0; i < njoblist; i++) {
- if (((intptr_t)
- cb->uaiocb._aiocb_private.kernelinfo) ==
- ijoblist[i]) {
- PROC_UNLOCK(p);
- if (ujoblist[i] != cb->uuaiocb)
- error = EINVAL;
- uma_zfree(aiol_zone, ijoblist);
- uma_zfree(aiol_zone, ujoblist);
- return (error);
- }
- }
- }
-
- s = splbio();
- TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) {
+ cbfirst = NULL;
+ error = 0;
+ TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
for (i = 0; i < njoblist; i++) {
- if (((intptr_t)
- cb->uaiocb._aiocb_private.kernelinfo) ==
- ijoblist[i]) {
- PROC_UNLOCK(p);
- splx(s);
- if (ujoblist[i] != cb->uuaiocb)
- error = EINVAL;
- uma_zfree(aiol_zone, ijoblist);
- uma_zfree(aiol_zone, ujoblist);
- return (error);
+ if (cb->uuaiocb == ujoblist[i]) {
+ if (cbfirst == NULL)
+ cbfirst = cb;
+ if (cb->jobstate == JOBST_JOBFINISHED)
+ goto RETURN;
}
}
}
+ /* All tasks were finished. */
+ if (cbfirst == NULL)
+ break;
ki->kaio_flags |= KAIO_WAKEUP;
- error = msleep(p, &p->p_mtx, PDROP | PRIBIO | PCATCH, "aiospn",
- timo);
- splx(s);
-
- if (error == ERESTART || error == EINTR) {
- uma_zfree(aiol_zone, ijoblist);
- uma_zfree(aiol_zone, ujoblist);
- return (EINTR);
- } else if (error == EWOULDBLOCK) {
- uma_zfree(aiol_zone, ijoblist);
- uma_zfree(aiol_zone, ujoblist);
- return (EAGAIN);
- }
+ error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
+ "aiospn", timo);
+ if (error == ERESTART)
+ error = EINTR;
+ if (error)
+ break;
}
-
-/* NOTREACHED */
- return (EINVAL);
+RETURN:
+ PROC_UNLOCK(p);
+ uma_zfree(aiol_zone, ujoblist);
+ return (error);
}
/*
@@ -1818,92 +1653,77 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
struct aiocblist *cbe, *cbn;
struct file *fp;
struct socket *so;
- struct proc *po;
- int s,error;
- int cancelled=0;
- int notcancelled=0;
+ int error;
+ int cancelled = 0;
+ int notcancelled = 0;
struct vnode *vp;
/* Lookup file object. */
- error = fget(td, (u_int)uap->fd, &fp);
+ error = fget(td, uap->fd, &fp);
if (error)
return (error);
+ ki = p->p_aioinfo;
+ if (ki == NULL)
+ goto done;
+
if (fp->f_type == DTYPE_VNODE) {
vp = fp->f_vnode;
-
- if (vn_isdisk(vp,&error)) {
+ if (vn_isdisk(vp, &error)) {
fdrop(fp, td);
td->td_retval[0] = AIO_NOTCANCELED;
return (0);
}
} else if (fp->f_type == DTYPE_SOCKET) {
so = fp->f_data;
-
- s = splnet();
-
+ mtx_lock(&aio_sock_mtx);
TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) {
- if ((uap->aiocbp == NULL) ||
- (uap->aiocbp == cbe->uuaiocb) ) {
- po = cbe->userproc;
- ki = po->p_aioinfo;
+ if (cbe->userproc == p &&
+ (uap->aiocbp == NULL ||
+ uap->aiocbp == cbe->uuaiocb)) {
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
+ PROC_LOCK(p);
TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe, plist);
- if (ki->kaio_flags & KAIO_WAKEUP) {
- wakeup(po);
- }
- cbe->jobstate = JOBST_JOBFINISHED;
- cbe->uaiocb._aiocb_private.status=-1;
- cbe->uaiocb._aiocb_private.error=ECANCELED;
+ cbe->jobstate = JOBST_JOBRUNNING;
+ cbe->uaiocb._aiocb_private.status = -1;
+ cbe->uaiocb._aiocb_private.error = ECANCELED;
+ aio_bio_done_notify(p, cbe, DONE_QUEUE);
+ PROC_UNLOCK(p);
cancelled++;
-/* XXX cancelled, knote? */
- if (cbe->uaiocb.aio_sigevent.sigev_notify ==
- SIGEV_SIGNAL ||
- cbe->uaiocb.aio_sigevent.sigev_notify ==
- SIGEV_THREAD_ID) {
- PROC_LOCK(cbe->userproc);
- aio_sendsig(cbe->userproc,
- &cbe->uaiocb.aio_sigevent,
- &cbe->ksi);
- PROC_UNLOCK(cbe->userproc);
- }
- if (uap->aiocbp)
+ if (uap->aiocbp != NULL)
break;
}
}
- splx(s);
-
- if ((cancelled) && (uap->aiocbp)) {
+ mtx_unlock(&aio_sock_mtx);
+ if (cancelled && uap->aiocbp != NULL) {
fdrop(fp, td);
td->td_retval[0] = AIO_CANCELED;
return (0);
}
}
- ki = p->p_aioinfo;
- if (ki == NULL)
- goto done;
- s = splnet();
+ PROC_LOCK(p);
TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
-
if ((uap->fd == cbe->uaiocb.aio_fildes) &&
- ((uap->aiocbp == NULL ) ||
+ ((uap->aiocbp == NULL) ||
(uap->aiocbp == cbe->uuaiocb))) {
-
+ mtx_lock(&aio_job_mtx);
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
- cbe->jobstate = JOBST_JOBFINISHED;
- cancelled++;
+ mtx_unlock(&aio_job_mtx);
+ TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(cbe->userproc, cbe, DONE_QUEUE);
+ aio_bio_done_notify(p, cbe, DONE_QUEUE);
+ cancelled++;
} else {
+ mtx_unlock(&aio_job_mtx);
notcancelled++;
}
}
}
- splx(s);
+ PROC_UNLOCK(p);
+
done:
fdrop(fp, td);
if (notcancelled) {
@@ -1928,84 +1748,41 @@ int
aio_error(struct thread *td, struct aio_error_args *uap)
{
struct proc *p = td->td_proc;
- int s;
struct aiocblist *cb;
struct kaioinfo *ki;
- long jobref;
+ int status;
ki = p->p_aioinfo;
- if (ki == NULL)
- return (EINVAL);
-
- jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo);
- if ((jobref == -1) || (jobref == 0))
- return (EINVAL);
-
- PROC_LOCK(p);
- TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref) {
- PROC_UNLOCK(p);
- td->td_retval[0] = cb->uaiocb._aiocb_private.error;
- return (0);
- }
- }
-
- s = splnet();
-
- TAILQ_FOREACH(cb, &ki->kaio_jobqueue, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref) {
- PROC_UNLOCK(p);
- td->td_retval[0] = EINPROGRESS;
- splx(s);
- return (0);
- }
- }
-
- TAILQ_FOREACH(cb, &ki->kaio_sockqueue, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref) {
- PROC_UNLOCK(p);
- td->td_retval[0] = EINPROGRESS;
- splx(s);
- return (0);
- }
- }
- splx(s);
-
- s = splbio();
- TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref) {
- PROC_UNLOCK(p);
- td->td_retval[0] = cb->uaiocb._aiocb_private.error;
- splx(s);
- return (0);
- }
+ if (ki == NULL) {
+ td->td_retval[0] = EINVAL;
+ return (0);
}
- TAILQ_FOREACH(cb, &ki->kaio_bufqueue, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
- jobref) {
+ PROC_LOCK(p);
+ TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
+ if (cb->uuaiocb == uap->aiocbp) {
+ if (cb->jobstate == JOBST_JOBFINISHED)
+ td->td_retval[0] =
+ cb->uaiocb._aiocb_private.error;
+ else
+ td->td_retval[0] = EINPROGRESS;
PROC_UNLOCK(p);
- td->td_retval[0] = EINPROGRESS;
- splx(s);
return (0);
}
}
- splx(s);
PROC_UNLOCK(p);
-#if (0)
/*
- * Hack for lio.
+ * Hack for failure of _aio_aqueue.
*/
status = fuword(&uap->aiocbp->_aiocb_private.status);
- if (status == -1)
- return fuword(&uap->aiocbp->_aiocb_private.error);
-#endif
- return (EINVAL);
+ if (status == -1) {
+ td->td_retval[0] = fuword(&uap->aiocbp->_aiocb_private.error);
+ return (0);
+ }
+
+ td->td_retval[0] = EINVAL;
+ return (0);
}
/* syscall - asynchronous read from a file (REALTIME) */
@@ -2056,15 +1833,14 @@ static int
do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
{
struct proc *p = td->td_proc;
- int nent, nentqueued;
struct aiocb *iocb, * const *cbptr;
- struct aiocblist *cb;
struct kaioinfo *ki;
- struct aio_liojob *lj;
+ struct aioliojob *lj;
struct kevent kev;
struct kqueue * kq;
struct file *kq_fp;
- int error, runningcode;
+ int nent;
+ int error;
int nerror;
int i;
@@ -2078,28 +1854,16 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
if (p->p_aioinfo == NULL)
aio_init_aioinfo(p);
- if ((nent + num_queue_count) > max_queue_count)
- return (EAGAIN);
-
ki = p->p_aioinfo;
- if ((nent + ki->kaio_queue_count) > ki->kaio_qallowed_count)
- return (EAGAIN);
lj = uma_zalloc(aiolio_zone, M_WAITOK);
- if (!lj)
- return (EAGAIN);
-
lj->lioj_flags = 0;
- lj->lioj_buffer_count = 0;
- lj->lioj_buffer_finished_count = 0;
- lj->lioj_queue_count = 0;
- lj->lioj_queue_finished_count = 0;
- lj->lioj_total_count = nent;
- knlist_init(&lj->klist, NULL, NULL, NULL, NULL);
+ lj->lioj_count = 0;
+ lj->lioj_finished_count = 0;
+ lj->lioj_ref_count = 0;
+ knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL);
ksiginfo_init(&lj->lioj_ksi);
- kev.ident = 0;
-
/*
* Setup signal.
*/
@@ -2115,10 +1879,8 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
/* Assume only new style KEVENT */
- kev.ident = lj->lioj_signal.sigev_notify_kqueue;
- kev.udata = lj->lioj_signal.sigev_value.sival_ptr;
-
- error = fget(td, (u_int)kev.ident, &kq_fp);
+ error = fget(td, lj->lioj_signal.sigev_notify_kqueue,
+ &kq_fp);
if (error) {
uma_zfree(aiolio_zone, lj);
return (error);
@@ -2133,160 +1895,137 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
kev.ident = (uintptr_t)lj; /* something unique */
kev.data = (intptr_t)lj;
+ /* pass user defined sigval data */
+ kev.udata = lj->lioj_signal.sigev_value.sival_ptr;
error = kqueue_register(kq, &kev, td, 1);
fdrop(kq_fp, td);
if (error) {
uma_zfree(aiolio_zone, lj);
return (error);
}
- } else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
+ } else if (lj->lioj_signal.sigev_notify == SIGEV_NONE) {
+ ;
+ } else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
uma_zfree(aiolio_zone, lj);
return EINVAL;
} else {
lj->lioj_flags |= LIOJ_SIGNAL;
- lj->lioj_flags &= ~LIOJ_SIGNAL_POSTED;
}
- } else
- lj->lioj_flags &= ~LIOJ_SIGNAL;
+ }
+ PROC_LOCK(p);
TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
/*
+ * Add extra aiocb count to avoid the lio to be freed
+ * by other threads doing aio_waitcomplete or aio_return,
+ * and prevent event from being sent until we have queued
+ * all tasks.
+ */
+ lj->lioj_count = 1;
+ PROC_UNLOCK(p);
+
+ /*
* Get pointers to the list of I/O requests.
*/
nerror = 0;
- nentqueued = 0;
cbptr = uap->acb_list;
for (i = 0; i < uap->nent; i++) {
iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
error = _aio_aqueue(td, iocb, lj, 0, oldsigev);
- if (error == 0)
- nentqueued++;
- else
+ if (error != 0)
nerror++;
}
}
- /*
- * If we haven't queued any, then just return error.
- */
- if (nentqueued == 0)
- return (0);
-
- /*
- * Calculate the appropriate error return.
- */
- runningcode = 0;
- if (nerror)
- runningcode = EIO;
-
+ error = 0;
+ PROC_LOCK(p);
if (uap->mode == LIO_WAIT) {
- int command, found;
- long jobref;
-
- for (;;) {
- found = 0;
- for (i = 0; i < uap->nent; i++) {
- /*
- * Fetch address of the control buf pointer in
- * user space.
- */
- iocb = (struct aiocb *)
- (intptr_t)fuword(&cbptr[i]);
- if (((intptr_t)iocb == -1) || ((intptr_t)iocb
- == 0))
- continue;
-
- /*
- * Fetch the associated command from user space.
- */
- command = fuword(&iocb->aio_lio_opcode);
- if (command == LIO_NOP) {
- found++;
- continue;
- }
-
- jobref =
- fuword(&iocb->_aiocb_private.kernelinfo);
-
- TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
- == jobref) {
- if (cb->uaiocb.aio_lio_opcode
- == LIO_WRITE) {
- p->p_stats->p_ru.ru_oublock
- +=
- cb->outputcharge;
- cb->outputcharge = 0;
- } else if (cb->uaiocb.aio_lio_opcode
- == LIO_READ) {
- p->p_stats->p_ru.ru_inblock
- += cb->inputcharge;
- cb->inputcharge = 0;
- }
- found++;
- break;
- }
- }
-
- TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) {
- if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
- == jobref) {
- found++;
- break;
- }
- }
- }
-
- /*
- * If all I/Os have been disposed of, then we can
- * return.
- */
- if (found == nentqueued)
- return (runningcode);
-
+ while (lj->lioj_count - 1 != lj->lioj_finished_count) {
ki->kaio_flags |= KAIO_WAKEUP;
- error = tsleep(p, PRIBIO | PCATCH, "aiospn", 0);
-
- if (error == EINTR)
- return (EINTR);
- else if (error == EWOULDBLOCK)
- return (EAGAIN);
+ error = msleep(&p->p_aioinfo, &p->p_mtx,
+ PRIBIO | PCATCH, "aiospn", 0);
+ if (error == ERESTART)
+ error = EINTR;
+ if (error)
+ break;
+ }
+ } else {
+ if (lj->lioj_count - 1 == lj->lioj_finished_count) {
+ if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
+ lj->lioj_flags |= LIOJ_KEVENT_POSTED;
+ KNOTE_LOCKED(&lj->klist, 1);
+ }
+ if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
+ == LIOJ_SIGNAL
+ && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
+ lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
+ aio_sendsig(p, &lj->lioj_signal,
+ &lj->lioj_ksi);
+ lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
+ }
}
}
+ lj->lioj_count--;
+ if (lj->lioj_count == 0) {
+ TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
+ knlist_delete(&lj->klist, curthread, 1);
+ sigqueue_take(&lj->lioj_ksi);
+ PROC_UNLOCK(p);
+ uma_zfree(aiolio_zone, lj);
+ } else
+ PROC_UNLOCK(p);
- return (runningcode);
+ if (nerror)
+ return (EIO);
+ return (error);
}
/*
- * Interrupt handler for physio, performs the necessary process wakeups, and
- * signals.
+ * Called from interrupt thread for physio, we should return as fast
+ * as possible, so we schedule a biohelper task.
*/
static void
aio_physwakeup(struct buf *bp)
{
struct aiocblist *aiocbe;
- struct proc *userp;
-
- mtx_lock(&Giant);
- bp->b_flags |= B_DONE;
- wakeup(bp);
aiocbe = (struct aiocblist *)bp->b_caller1;
- if (aiocbe) {
- userp = aiocbe->userproc;
+ taskqueue_enqueue(taskqueue_aiod_bio, &aiocbe->biotask);
+}
- aiocbe->jobstate = JOBST_JOBBFINISHED;
- aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
- aiocbe->uaiocb._aiocb_private.error = 0;
- aiocbe->jobflags |= AIOCBLIST_DONE;
+/*
+ * Task routine to perform heavy tasks, process wakeup, and signals.
+ */
+static void
+biohelper(void *context, int pending)
+{
+ struct aiocblist *aiocbe = context;
+ struct buf *bp;
+ struct proc *userp;
+ int nblks;
- if (bp->b_ioflags & BIO_ERROR)
- aiocbe->uaiocb._aiocb_private.error = bp->b_error;
+ bp = aiocbe->bp;
+ userp = aiocbe->userproc;
+ PROC_LOCK(userp);
+ aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
+ aiocbe->uaiocb._aiocb_private.error = 0;
+ if (bp->b_ioflags & BIO_ERROR)
+ aiocbe->uaiocb._aiocb_private.error = bp->b_error;
+ nblks = btodb(aiocbe->uaiocb.aio_nbytes);
+ if (aiocbe->uaiocb.aio_lio_opcode == LIO_WRITE)
+ aiocbe->outputcharge += nblks;
+ else
+ aiocbe->inputcharge += nblks;
+ aiocbe->bp = NULL;
+ TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist);
+ aio_bio_done_notify(userp, aiocbe, DONE_BUF);
+ PROC_UNLOCK(userp);
- aio_bio_done_notify(userp, aiocbe, DONE_BUF);
- }
- mtx_unlock(&Giant);
+ /* Release mapping into kernel space. */
+ vunmapbuf(bp);
+ relpbuf(bp, NULL);
+ atomic_subtract_int(&num_buf_aio, 1);
}
/* syscall - wait for the next completion of an aio request */
@@ -2297,10 +2036,11 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
struct timeval atv;
struct timespec ts;
struct kaioinfo *ki;
- struct aiocblist *cb = NULL;
- int error, s, timo;
+ struct aiocblist *cb;
+ struct aiocb *uuaiocb;
+ int error, status, timo;
- suword(uap->aiocbp, (int)NULL);
+ suword(uap->aiocbp, (long)NULL);
timo = 0;
if (uap->timeout) {
@@ -2322,50 +2062,41 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
aio_init_aioinfo(p);
ki = p->p_aioinfo;
- for (;;) {
- PROC_LOCK(p);
- if ((cb = TAILQ_FIRST(&ki->kaio_jobdone)) != 0) {
- PROC_UNLOCK(p);
- suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
- td->td_retval[0] = cb->uaiocb._aiocb_private.status;
- if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
- p->p_stats->p_ru.ru_oublock +=
- cb->outputcharge;
- cb->outputcharge = 0;
- } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
- p->p_stats->p_ru.ru_inblock += cb->inputcharge;
- cb->inputcharge = 0;
- }
- error = cb->uaiocb._aiocb_private.error;
- aio_free_entry(cb);
- return (error);
- }
-
- s = splbio();
- if ((cb = TAILQ_FIRST(&ki->kaio_bufdone)) != 0 ) {
- PROC_UNLOCK(p);
- splx(s);
- suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
- error = cb->uaiocb._aiocb_private.error;
- td->td_retval[0] = cb->uaiocb._aiocb_private.status;
- aio_free_entry(cb);
- return (error);
- }
-
+ error = 0;
+ cb = NULL;
+ PROC_LOCK(p);
+ while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) {
ki->kaio_flags |= KAIO_WAKEUP;
- error = msleep(p, &p->p_mtx, PDROP | PRIBIO | PCATCH, "aiowc",
- timo);
- splx(s);
-
+ error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
+ "aiowc", timo);
if (error == ERESTART)
- return (EINTR);
- else if (error < 0)
- return (error);
- else if (error == EINTR)
- return (EINTR);
- else if (error == EWOULDBLOCK)
- return (EAGAIN);
+ error = EINTR;
+ if (error)
+ break;
}
+
+ if (cb != NULL) {
+ MPASS(cb->jobstate == JOBST_JOBFINISHED);
+ uuaiocb = cb->uuaiocb;
+ status = cb->uaiocb._aiocb_private.status;
+ error = cb->uaiocb._aiocb_private.error;
+ td->td_retval[0] = status;
+ if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
+ p->p_stats->p_ru.ru_oublock += cb->outputcharge;
+ cb->outputcharge = 0;
+ } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
+ p->p_stats->p_ru.ru_inblock += cb->inputcharge;
+ cb->inputcharge = 0;
+ }
+ aio_free_entry(cb);
+ PROC_UNLOCK(p);
+ suword(uap->aiocbp, (long)uuaiocb);
+ suword(&uuaiocb->_aiocb_private.error, error);
+ suword(&uuaiocb->_aiocb_private.status, status);
+ } else
+ PROC_UNLOCK(p);
+
+ return (error);
}
/* kqueue attach function */
@@ -2406,8 +2137,7 @@ filt_aio(struct knote *kn, long hint)
struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
kn->kn_data = aiocbe->uaiocb._aiocb_private.error;
- if (aiocbe->jobstate != JOBST_JOBFINISHED &&
- aiocbe->jobstate != JOBST_JOBBFINISHED)
+ if (aiocbe->jobstate != JOBST_JOBFINISHED)
return (0);
kn->kn_flags |= EV_EOF;
return (1);
@@ -2417,10 +2147,10 @@ filt_aio(struct knote *kn, long hint)
static int
filt_lioattach(struct knote *kn)
{
- struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
+ struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
/*
- * The aio_liojob pointer must be validated before using it, so
+ * The aioliojob pointer must be validated before using it, so
* registration is restricted to the kernel; the user cannot
* set EV_FLAG1.
*/
@@ -2437,7 +2167,7 @@ filt_lioattach(struct knote *kn)
static void
filt_liodetach(struct knote *kn)
{
- struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
+ struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
if (!knlist_empty(&lj->klist))
knlist_remove(&lj->klist, kn, 0);
@@ -2448,6 +2178,7 @@ filt_liodetach(struct knote *kn)
static int
filt_lio(struct knote *kn, long hint)
{
- struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
+ struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
+
return (lj->lioj_flags & LIOJ_KEVENT_POSTED);
}
OpenPOWER on IntegriCloud