diff options
author | pjd <pjd@FreeBSD.org> | 2011-02-27 19:41:40 +0000 |
---|---|---|
committer | pjd <pjd@FreeBSD.org> | 2011-02-27 19:41:40 +0000 |
commit | 1b03c5bf41222b723415638f03e00ed12cac076a (patch) | |
tree | ef515cadc08bf427e4d3f1360199ec9827b1596b /sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c | |
parent | c67d387baf03726323703774b1b320235fb1f24b (diff) | |
download | FreeBSD-src-1b03c5bf41222b723415638f03e00ed12cac076a.zip FreeBSD-src-1b03c5bf41222b723415638f03e00ed12cac076a.tar.gz |
Finally... Import the latest open-source ZFS version - (SPA) 28.
Few new things available from now on:
- Data deduplication.
- Triple parity RAIDZ (RAIDZ3).
- zfs diff.
- zpool split.
- Snapshot holds.
- zpool import -F. Allows to rewind corrupted pool to earlier
transaction group.
- Possibility to import pool in read-only mode.
MFC after: 1 month
Diffstat (limited to 'sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c')
-rw-r--r-- | sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c | 175 |
1 files changed, 125 insertions, 50 deletions
diff --git a/sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c b/sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c index c69c117..0885f27 100644 --- a/sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c +++ b/sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c @@ -19,14 +19,15 @@ * CDDL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. */ #include <sys/zfs_context.h> #include <sys/txg_impl.h> #include <sys/dmu_impl.h> +#include <sys/dmu_tx.h> #include <sys/dsl_pool.h> +#include <sys/dsl_scan.h> #include <sys/callb.h> /* @@ -36,24 +37,13 @@ static void txg_sync_thread(void *arg); static void txg_quiesce_thread(void *arg); -int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ -extern int zfs_txg_synctime; -extern uint64_t zfs_write_limit_override; +int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ SYSCTL_DECL(_vfs_zfs); -SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, - "ZFS transaction groups (TXG)"); +SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, "Maximum seconds worth of delta per txg"); -TUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime); -SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime, - 0, "Target seconds to sync a txg"); -TUNABLE_QUAD("vfs.zfs.txg.write_limit_override", &zfs_write_limit_override); -SYSCTL_UQUAD(_vfs_zfs_txg, OID_AUTO, write_limit_override, CTLFLAG_RW, - &zfs_write_limit_override, 0, - "Override maximum size of a txg to this size in bytes, " - "value of 0 means don't override"); /* * Prepare the txg subsystem. @@ -74,10 +64,12 @@ txg_init(dsl_pool_t *dp, uint64_t txg) for (i = 0; i < TXG_SIZE; i++) { cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL); + list_create(&tx->tx_cpu[c].tc_callbacks[i], + sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); } } - rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); @@ -100,7 +92,6 @@ txg_fini(dsl_pool_t *dp) ASSERT(tx->tx_threads == 0); - rw_destroy(&tx->tx_suspend); mutex_destroy(&tx->tx_sync_lock); cv_destroy(&tx->tx_sync_more_cv); @@ -113,10 +104,15 @@ txg_fini(dsl_pool_t *dp) int i; mutex_destroy(&tx->tx_cpu[c].tc_lock); - for (i = 0; i < TXG_SIZE; i++) + for (i = 0; i < TXG_SIZE; i++) { cv_destroy(&tx->tx_cpu[c].tc_cv[i]); + list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); + } } + if (tx->tx_commit_cb_taskq != NULL) + taskq_destroy(tx->tx_commit_cb_taskq); + kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); bzero(tx, sizeof (tx_state_t)); @@ -196,7 +192,11 @@ txg_sync_stop(dsl_pool_t *dp) * Finish off any work in progress. */ ASSERT(tx->tx_threads == 2); - txg_wait_synced(dp, 0); + + /* + * We need to ensure that we've vacated the deferred space_maps. + */ + txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); /* * Wake all sync threads and wait for them to die. @@ -246,6 +246,17 @@ txg_rele_to_quiesce(txg_handle_t *th) } void +txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) +{ + tx_cpu_t *tc = th->th_cpu; + int g = th->th_txg & TXG_MASK; + + mutex_enter(&tc->tc_lock); + list_move_tail(&tc->tc_callbacks[g], tx_callbacks); + mutex_exit(&tc->tc_lock); +} + +void txg_rele_to_sync(txg_handle_t *th) { tx_cpu_t *tc = th->th_cpu; @@ -296,9 +307,61 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg) } static void +txg_do_callbacks(void *arg) +{ + list_t *cb_list = arg; + + dmu_tx_do_callbacks(cb_list, 0); + + list_destroy(cb_list); + + kmem_free(cb_list, sizeof (list_t)); +} + +/* + * Dispatch the commit callbacks registered on this txg to worker threads. + */ +static void +txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) +{ + int c; + tx_state_t *tx = &dp->dp_tx; + list_t *cb_list; + + for (c = 0; c < max_ncpus; c++) { + tx_cpu_t *tc = &tx->tx_cpu[c]; + /* No need to lock tx_cpu_t at this point */ + + int g = txg & TXG_MASK; + + if (list_is_empty(&tc->tc_callbacks[g])) + continue; + + if (tx->tx_commit_cb_taskq == NULL) { + /* + * Commit callback taskq hasn't been created yet. + */ + tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", + max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, + TASKQ_PREPOPULATE); + } + + cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); + list_create(cb_list, sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); + + list_move_tail(&tc->tc_callbacks[g], cb_list); + + (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) + txg_do_callbacks, cb_list, TQ_SLEEP); + } +} + +static void txg_sync_thread(void *arg) { dsl_pool_t *dp = arg; + spa_t *spa = dp->dp_spa; tx_state_t *tx = &dp->dp_tx; callb_cpr_t cpr; uint64_t start, delta; @@ -311,20 +374,19 @@ txg_sync_thread(void *arg) uint64_t txg; /* - * We sync when we're scrubbing, there's someone waiting + * We sync when we're scanning, there's someone waiting * on us, or the quiesce thread has handed off a txg to * us, or we have reached our timeout. */ timer = (delta >= timeout ? 0 : timeout - delta); - while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || - spa_shutting_down(dp->dp_spa)) && + while (!dsl_scan_active(dp->dp_scan) && !tx->tx_exiting && timer > 0 && tx->tx_synced_txg >= tx->tx_sync_txg_waiting && tx->tx_quiesced_txg == 0) { dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); - delta = LBOLT - start; + delta = ddi_get_lbolt() - start; timer = (delta > timeout ? 0 : timeout - delta); } @@ -342,8 +404,6 @@ txg_sync_thread(void *arg) if (tx->tx_exiting) txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); - rw_enter(&tx->tx_suspend, RW_WRITER); - /* * Consume the quiesced txg which has been handed off to * us. This may cause the quiescing thread to now be @@ -353,22 +413,24 @@ txg_sync_thread(void *arg) tx->tx_quiesced_txg = 0; tx->tx_syncing_txg = txg; cv_broadcast(&tx->tx_quiesce_more_cv); - rw_exit(&tx->tx_suspend); dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); mutex_exit(&tx->tx_sync_lock); - start = LBOLT; - spa_sync(dp->dp_spa, txg); - delta = LBOLT - start; + start = ddi_get_lbolt(); + spa_sync(spa, txg); + delta = ddi_get_lbolt() - start; mutex_enter(&tx->tx_sync_lock); - rw_enter(&tx->tx_suspend, RW_WRITER); tx->tx_synced_txg = txg; tx->tx_syncing_txg = 0; - rw_exit(&tx->tx_suspend); cv_broadcast(&tx->tx_sync_done_cv); + + /* + * Dispatch commit callbacks to worker threads. + */ + txg_dispatch_callbacks(dp, txg); } } @@ -426,7 +488,7 @@ void txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) { tx_state_t *tx = &dp->dp_tx; - int timeout = LBOLT + ticks; + int timeout = ddi_get_lbolt() + ticks; /* don't delay if this txg could transition to quiesing immediately */ if (tx->tx_open_txg > txg || @@ -439,10 +501,10 @@ txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) return; } - while (LBOLT < timeout && + while (ddi_get_lbolt() < timeout && tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, - timeout - LBOLT); + timeout - ddi_get_lbolt()); mutex_exit(&tx->tx_sync_lock); } @@ -455,7 +517,7 @@ txg_wait_synced(dsl_pool_t *dp, uint64_t txg) mutex_enter(&tx->tx_sync_lock); ASSERT(tx->tx_threads == 2); if (txg == 0) - txg = tx->tx_open_txg; + txg = tx->tx_open_txg + TXG_DEFER_SIZE; if (tx->tx_sync_txg_waiting < txg) tx->tx_sync_txg_waiting = txg; dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", @@ -506,21 +568,6 @@ txg_sync_waiting(dsl_pool_t *dp) tx->tx_quiesced_txg != 0); } -void -txg_suspend(dsl_pool_t *dp) -{ - tx_state_t *tx = &dp->dp_tx; - /* XXX some code paths suspend when they are already suspended! */ - rw_enter(&tx->tx_suspend, RW_READER); -} - -void -txg_resume(dsl_pool_t *dp) -{ - tx_state_t *tx = &dp->dp_tx; - rw_exit(&tx->tx_suspend); -} - /* * Per-txg object lists. */ @@ -578,6 +625,34 @@ txg_list_add(txg_list_t *tl, void *p, uint64_t txg) } /* + * Add an entry to the end of the list (walks list to find end). + * Returns 0 if it's a new entry, 1 if it's already there. + */ +int +txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) +{ + int t = txg & TXG_MASK; + txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); + int already_on_list; + + mutex_enter(&tl->tl_lock); + already_on_list = tn->tn_member[t]; + if (!already_on_list) { + txg_node_t **tp; + + for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) + continue; + + tn->tn_member[t] = 1; + tn->tn_next[t] = NULL; + *tp = tn; + } + mutex_exit(&tl->tl_lock); + + return (already_on_list); +} + +/* * Remove the head of the list and return it. */ void * |