summaryrefslogtreecommitdiffstats
path: root/sys/kern
diff options
context:
space:
mode:
Diffstat (limited to 'sys/kern')
-rw-r--r--sys/kern/subr_taskqueue.c75
1 files changed, 45 insertions, 30 deletions
diff --git a/sys/kern/subr_taskqueue.c b/sys/kern/subr_taskqueue.c
index 8bd3868..fd6dd4f 100644
--- a/sys/kern/subr_taskqueue.c
+++ b/sys/kern/subr_taskqueue.c
@@ -46,12 +46,17 @@ static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
static void *taskqueue_giant_ih;
static void *taskqueue_ih;
+struct taskqueue_busy {
+ struct task *tb_running;
+ TAILQ_ENTRY(taskqueue_busy) tb_link;
+};
+
struct taskqueue {
STAILQ_HEAD(, task) tq_queue;
const char *tq_name;
taskqueue_enqueue_fn tq_enqueue;
void *tq_context;
- struct task *tq_running;
+ TAILQ_HEAD(, taskqueue_busy) tq_active;
struct mtx tq_mutex;
struct thread **tq_threads;
int tq_tcount;
@@ -102,6 +107,7 @@ _taskqueue_create(const char *name, int mflags,
return NULL;
STAILQ_INIT(&queue->tq_queue);
+ TAILQ_INIT(&queue->tq_active);
queue->tq_name = name;
queue->tq_enqueue = enqueue;
queue->tq_context = context;
@@ -140,6 +146,7 @@ taskqueue_free(struct taskqueue *queue)
TQ_LOCK(queue);
queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
taskqueue_terminate(queue->tq_threads, queue);
+ KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
mtx_destroy(&queue->tq_mutex);
free(queue->tq_threads, M_TASKQUEUE);
free(queue, M_TASKQUEUE);
@@ -214,13 +221,17 @@ taskqueue_unblock(struct taskqueue *queue)
TQ_UNLOCK(queue);
}
-void
-taskqueue_run(struct taskqueue *queue, struct task **tpp)
+static void
+taskqueue_run_locked(struct taskqueue *queue)
{
+ struct taskqueue_busy tb;
struct task *task;
int pending;
mtx_assert(&queue->tq_mutex, MA_OWNED);
+ tb.tb_running = NULL;
+ TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
+
while (STAILQ_FIRST(&queue->tq_queue)) {
/*
* Carefully remove the first task from the queue and
@@ -230,16 +241,38 @@ taskqueue_run(struct taskqueue *queue, struct task **tpp)
STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
pending = task->ta_pending;
task->ta_pending = 0;
- task->ta_running = tpp;
- *tpp = task;
+ tb.tb_running = task;
TQ_UNLOCK(queue);
task->ta_func(task->ta_context, pending);
TQ_LOCK(queue);
- *tpp = NULL;
+ tb.tb_running = NULL;
wakeup(task);
}
+ TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
+}
+
+void
+taskqueue_run(struct taskqueue *queue)
+{
+
+ TQ_LOCK(queue);
+ taskqueue_run_locked(queue);
+ TQ_UNLOCK(queue);
+}
+
+static int
+task_is_running(struct taskqueue *queue, struct task *task)
+{
+ struct taskqueue_busy *tb;
+
+ mtx_assert(&queue->tq_mutex, MA_OWNED);
+ TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
+ if (tb->tb_running == task)
+ return (1);
+ }
+ return (0);
}
void
@@ -250,10 +283,8 @@ taskqueue_drain(struct taskqueue *queue, struct task *task)
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
TQ_LOCK(queue);
- while (task->ta_pending != 0 ||
- (task->ta_running != NULL && task == *task->ta_running)) {
+ while (task->ta_pending != 0 || task_is_running(queue, task))
TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
- }
TQ_UNLOCK(queue);
}
@@ -266,9 +297,7 @@ taskqueue_swi_enqueue(void *context)
static void
taskqueue_swi_run(void *dummy)
{
- TQ_LOCK(taskqueue_swi);
- taskqueue_run(taskqueue_swi, &taskqueue_swi->tq_running);
- TQ_UNLOCK(taskqueue_swi);
+ taskqueue_run(taskqueue_swi);
}
static void
@@ -280,9 +309,7 @@ taskqueue_swi_giant_enqueue(void *context)
static void
taskqueue_swi_giant_run(void *dummy)
{
- TQ_LOCK(taskqueue_swi_giant);
- taskqueue_run(taskqueue_swi_giant, &taskqueue_swi_giant->tq_running);
- TQ_UNLOCK(taskqueue_swi_giant);
+ taskqueue_run(taskqueue_swi_giant);
}
int
@@ -344,22 +371,12 @@ void
taskqueue_thread_loop(void *arg)
{
struct taskqueue **tqp, *tq;
- struct task *running;
-
- /*
- * The kernel stack space is globaly addressable, and it would
- * be an error to ask whether a task is running after the
- * taskqueue has been released. So it is safe to have the
- * task point back to an address in the taskqueue's stack to
- * determine if the task is running.
- */
- running = NULL;
tqp = arg;
tq = *tqp;
TQ_LOCK(tq);
while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
- taskqueue_run(tq, &running);
+ taskqueue_run_locked(tq);
/*
* Because taskqueue_run() can drop tq_mutex, we need to
* check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
@@ -369,7 +386,7 @@ taskqueue_thread_loop(void *arg)
break;
TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
}
- taskqueue_run(tq, &running);
+ taskqueue_run_locked(tq);
/* rendezvous with thread that asked us to terminate */
tq->tq_tcount--;
@@ -426,9 +443,7 @@ taskqueue_fast_enqueue(void *context)
static void
taskqueue_fast_run(void *dummy)
{
- TQ_LOCK(taskqueue_fast);
- taskqueue_run(taskqueue_fast, &taskqueue_fast->tq_running);
- TQ_UNLOCK(taskqueue_fast);
+ taskqueue_run(taskqueue_fast);
}
TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
OpenPOWER on IntegriCloud