summaryrefslogtreecommitdiffstats
path: root/fs/btrfs/async-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/btrfs/async-thread.c')
-rw-r--r--fs/btrfs/async-thread.c230
1 files changed, 184 insertions, 46 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index 019e8af..6ea5cd0 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -48,6 +48,9 @@ struct btrfs_worker_thread {
/* number of things on the pending list */
atomic_t num_pending;
+ /* reference counter for this struct */
+ atomic_t refs;
+
unsigned long sequence;
/* protects the pending list. */
@@ -93,17 +96,40 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
}
}
-static noinline int run_ordered_completions(struct btrfs_workers *workers,
- struct btrfs_work *work)
+static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
+ struct btrfs_workers *workers = worker->workers;
unsigned long flags;
+ rmb();
+ if (!workers->atomic_start_pending)
+ return;
+
+ spin_lock_irqsave(&workers->lock, flags);
+ if (!workers->atomic_start_pending)
+ goto out;
+
+ workers->atomic_start_pending = 0;
+ if (workers->num_workers >= workers->max_workers)
+ goto out;
+
+ spin_unlock_irqrestore(&workers->lock, flags);
+ btrfs_start_workers(workers, 1);
+ return;
+
+out:
+ spin_unlock_irqrestore(&workers->lock, flags);
+}
+
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+ struct btrfs_work *work)
+{
if (!workers->ordered)
return 0;
set_bit(WORK_DONE_BIT, &work->flags);
- spin_lock_irqsave(&workers->lock, flags);
+ spin_lock(&workers->order_lock);
while (1) {
if (!list_empty(&workers->prio_order_list)) {
@@ -126,45 +152,117 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
break;
- spin_unlock_irqrestore(&workers->lock, flags);
+ spin_unlock(&workers->order_lock);
work->ordered_func(work);
/* now take the lock again and call the freeing code */
- spin_lock_irqsave(&workers->lock, flags);
+ spin_lock(&workers->order_lock);
list_del(&work->order_list);
work->ordered_free(work);
}
- spin_unlock_irqrestore(&workers->lock, flags);
+ spin_unlock(&workers->order_lock);
return 0;
}
+static void put_worker(struct btrfs_worker_thread *worker)
+{
+ if (atomic_dec_and_test(&worker->refs))
+ kfree(worker);
+}
+
+static int try_worker_shutdown(struct btrfs_worker_thread *worker)
+{
+ int freeit = 0;
+
+ spin_lock_irq(&worker->lock);
+ spin_lock_irq(&worker->workers->lock);
+ if (worker->workers->num_workers > 1 &&
+ worker->idle &&
+ !worker->working &&
+ !list_empty(&worker->worker_list) &&
+ list_empty(&worker->prio_pending) &&
+ list_empty(&worker->pending)) {
+ freeit = 1;
+ list_del_init(&worker->worker_list);
+ worker->workers->num_workers--;
+ }
+ spin_unlock_irq(&worker->workers->lock);
+ spin_unlock_irq(&worker->lock);
+
+ if (freeit)
+ put_worker(worker);
+ return freeit;
+}
+
+static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
+ struct list_head *prio_head,
+ struct list_head *head)
+{
+ struct btrfs_work *work = NULL;
+ struct list_head *cur = NULL;
+
+ if(!list_empty(prio_head))
+ cur = prio_head->next;
+
+ smp_mb();
+ if (!list_empty(&worker->prio_pending))
+ goto refill;
+
+ if (!list_empty(head))
+ cur = head->next;
+
+ if (cur)
+ goto out;
+
+refill:
+ spin_lock_irq(&worker->lock);
+ list_splice_tail_init(&worker->prio_pending, prio_head);
+ list_splice_tail_init(&worker->pending, head);
+
+ if (!list_empty(prio_head))
+ cur = prio_head->next;
+ else if (!list_empty(head))
+ cur = head->next;
+ spin_unlock_irq(&worker->lock);
+
+ if (!cur)
+ goto out_fail;
+
+out:
+ work = list_entry(cur, struct btrfs_work, list);
+
+out_fail:
+ return work;
+}
+
/*
* main loop for servicing work items
*/
static int worker_loop(void *arg)
{
struct btrfs_worker_thread *worker = arg;
- struct list_head *cur;
+ struct list_head head;
+ struct list_head prio_head;
struct btrfs_work *work;
+
+ INIT_LIST_HEAD(&head);
+ INIT_LIST_HEAD(&prio_head);
+
do {
- spin_lock_irq(&worker->lock);
-again_locked:
+again:
while (1) {
- if (!list_empty(&worker->prio_pending))
- cur = worker->prio_pending.next;
- else if (!list_empty(&worker->pending))
- cur = worker->pending.next;
- else
+
+
+ work = get_next_work(worker, &prio_head, &head);
+ if (!work)
break;
- work = list_entry(cur, struct btrfs_work, list);
list_del(&work->list);
clear_bit(WORK_QUEUED_BIT, &work->flags);
work->worker = worker;
- spin_unlock_irq(&worker->lock);
work->func(work);
@@ -175,9 +273,13 @@ again_locked:
*/
run_ordered_completions(worker->workers, work);
- spin_lock_irq(&worker->lock);
- check_idle_worker(worker);
+ check_pending_worker_creates(worker);
+
}
+
+ spin_lock_irq(&worker->lock);
+ check_idle_worker(worker);
+
if (freezing(current)) {
worker->working = 0;
spin_unlock_irq(&worker->lock);
@@ -216,8 +318,10 @@ again_locked:
spin_lock_irq(&worker->lock);
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&worker->pending) ||
- !list_empty(&worker->prio_pending))
- goto again_locked;
+ !list_empty(&worker->prio_pending)) {
+ spin_unlock_irq(&worker->lock);
+ goto again;
+ }
/*
* this makes sure we get a wakeup when someone
@@ -226,8 +330,13 @@ again_locked:
worker->working = 0;
spin_unlock_irq(&worker->lock);
- if (!kthread_should_stop())
- schedule();
+ if (!kthread_should_stop()) {
+ schedule_timeout(HZ * 120);
+ if (!worker->working &&
+ try_worker_shutdown(worker)) {
+ return 0;
+ }
+ }
}
__set_current_state(TASK_RUNNING);
}
@@ -242,16 +351,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
{
struct list_head *cur;
struct btrfs_worker_thread *worker;
+ int can_stop;
+ spin_lock_irq(&workers->lock);
list_splice_init(&workers->idle_list, &workers->worker_list);
while (!list_empty(&workers->worker_list)) {
cur = workers->worker_list.next;
worker = list_entry(cur, struct btrfs_worker_thread,
worker_list);
- kthread_stop(worker->task);
- list_del(&worker->worker_list);
- kfree(worker);
+
+ atomic_inc(&worker->refs);
+ workers->num_workers -= 1;
+ if (!list_empty(&worker->worker_list)) {
+ list_del_init(&worker->worker_list);
+ put_worker(worker);
+ can_stop = 1;
+ } else
+ can_stop = 0;
+ spin_unlock_irq(&workers->lock);
+ if (can_stop)
+ kthread_stop(worker->task);
+ spin_lock_irq(&workers->lock);
+ put_worker(worker);
}
+ spin_unlock_irq(&workers->lock);
return 0;
}
@@ -266,10 +389,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
INIT_LIST_HEAD(&workers->order_list);
INIT_LIST_HEAD(&workers->prio_order_list);
spin_lock_init(&workers->lock);
+ spin_lock_init(&workers->order_lock);
workers->max_workers = max;
workers->idle_thresh = 32;
workers->name = name;
workers->ordered = 0;
+ workers->atomic_start_pending = 0;
+ workers->atomic_worker_start = 0;
}
/*
@@ -293,7 +419,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
INIT_LIST_HEAD(&worker->prio_pending);
INIT_LIST_HEAD(&worker->worker_list);
spin_lock_init(&worker->lock);
+
atomic_set(&worker->num_pending, 0);
+ atomic_set(&worker->refs, 1);
worker->workers = workers;
worker->task = kthread_run(worker_loop, worker,
"btrfs-%s-%d", workers->name,
@@ -303,7 +431,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
kfree(worker);
goto fail;
}
-
spin_lock_irq(&workers->lock);
list_add_tail(&worker->worker_list, &workers->idle_list);
worker->idle = 1;
@@ -367,28 +494,18 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
struct btrfs_worker_thread *worker;
unsigned long flags;
+ struct list_head *fallback;
again:
spin_lock_irqsave(&workers->lock, flags);
worker = next_worker(workers);
- spin_unlock_irqrestore(&workers->lock, flags);
if (!worker) {
- spin_lock_irqsave(&workers->lock, flags);
if (workers->num_workers >= workers->max_workers) {
- struct list_head *fallback = NULL;
- /*
- * we have failed to find any workers, just
- * return the force one
- */
- if (!list_empty(&workers->worker_list))
- fallback = workers->worker_list.next;
- if (!list_empty(&workers->idle_list))
- fallback = workers->idle_list.next;
- BUG_ON(!fallback);
- worker = list_entry(fallback,
- struct btrfs_worker_thread, worker_list);
- spin_unlock_irqrestore(&workers->lock, flags);
+ goto fallback;
+ } else if (workers->atomic_worker_start) {
+ workers->atomic_start_pending = 1;
+ goto fallback;
} else {
spin_unlock_irqrestore(&workers->lock, flags);
/* we're below the limit, start another worker */
@@ -396,6 +513,23 @@ again:
goto again;
}
}
+ spin_unlock_irqrestore(&workers->lock, flags);
+ return worker;
+
+fallback:
+ fallback = NULL;
+ /*
+ * we have failed to find any workers, just
+ * return the first one we can find.
+ */
+ if (!list_empty(&workers->worker_list))
+ fallback = workers->worker_list.next;
+ if (!list_empty(&workers->idle_list))
+ fallback = workers->idle_list.next;
+ BUG_ON(!fallback);
+ worker = list_entry(fallback,
+ struct btrfs_worker_thread, worker_list);
+ spin_unlock_irqrestore(&workers->lock, flags);
return worker;
}
@@ -435,9 +569,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
worker->working = 1;
}
- spin_unlock_irqrestore(&worker->lock, flags);
if (wake)
wake_up_process(worker->task);
+ spin_unlock_irqrestore(&worker->lock, flags);
out:
return 0;
@@ -463,14 +597,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
worker = find_worker(workers);
if (workers->ordered) {
- spin_lock_irqsave(&workers->lock, flags);
+ /*
+ * you're not allowed to do ordered queues from an
+ * interrupt handler
+ */
+ spin_lock(&workers->order_lock);
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
list_add_tail(&work->order_list,
&workers->prio_order_list);
} else {
list_add_tail(&work->order_list, &workers->order_list);
}
- spin_unlock_irqrestore(&workers->lock, flags);
+ spin_unlock(&workers->order_lock);
} else {
INIT_LIST_HEAD(&work->order_list);
}
@@ -492,10 +630,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
wake = 1;
worker->working = 1;
- spin_unlock_irqrestore(&worker->lock, flags);
-
if (wake)
wake_up_process(worker->task);
+ spin_unlock_irqrestore(&worker->lock, flags);
+
out:
return 0;
}
OpenPOWER on IntegriCloud