diff options
author | pjd <pjd@FreeBSD.org> | 2013-02-17 21:12:34 +0000 |
---|---|---|
committer | pjd <pjd@FreeBSD.org> | 2013-02-17 21:12:34 +0000 |
commit | 0bf19fd81213327b0ddfd5708b76528fbdb507af (patch) | |
tree | 583fbe402107b2dad647b0a09363284f8a367ac2 /sbin/hastd/primary.c | |
parent | e5238fcb15d4fbacf835cb112ac49dcb28ff894d (diff) | |
download | FreeBSD-src-0bf19fd81213327b0ddfd5708b76528fbdb507af.zip FreeBSD-src-0bf19fd81213327b0ddfd5708b76528fbdb507af.tar.gz |
- Add support for 'memsync' mode. This is the fastest replication mode that's
why it will now be the default.
- Bump protocol version to 2 and add backward compatibility for version 1.
- Allow to specify hosts by kern.hostid as well (in addition to hostname and
kern.hostuuid) in configuration file.
Sponsored by: Panzura
Tested by: trociny
Diffstat (limited to 'sbin/hastd/primary.c')
-rw-r--r-- | sbin/hastd/primary.c | 186 |
1 files changed, 165 insertions, 21 deletions
diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c index 88159cb..fb49ef6 100644 --- a/sbin/hastd/primary.c +++ b/sbin/hastd/primary.c @@ -35,7 +35,6 @@ __FBSDID("$FreeBSD$"); #include <sys/time.h> #include <sys/bio.h> #include <sys/disk.h> -#include <sys/refcount.h> #include <sys/stat.h> #include <geom/gate/g_gate.h> @@ -65,6 +64,7 @@ __FBSDID("$FreeBSD$"); #include "metadata.h" #include "proto.h" #include "pjdlog.h" +#include "refcnt.h" #include "subr.h" #include "synch.h" @@ -543,7 +543,7 @@ primary_connect(struct hast_resource *res, struct proto_conn **connp) return (0); } - + /* * Function instructs GEOM_GATE to handle reads directly from within the kernel. */ @@ -577,6 +577,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp, int32_t extentsize; int64_t datasize; uint32_t mapsize; + uint8_t version; size_t size; int error; @@ -597,6 +598,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp, */ nvout = nv_alloc(); nv_add_string(nvout, res->hr_name, "resource"); + nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_WARNING, 0, nv_error(nvout), "Unable to allocate header for connection with %s", @@ -626,6 +628,20 @@ init_remote(struct hast_resource *res, struct proto_conn **inp, nv_free(nvin); goto close; } + version = nv_get_uint8(nvin, "version"); + if (version == 0) { + /* + * If no version is sent, it means this is protocol version 1. + */ + version = 1; + } + if (version > HAST_PROTO_VERSION) { + pjdlog_warning("Invalid version received (%hhu).", version); + nv_free(nvin); + goto close; + } + res->hr_version = version; + pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); token = nv_get_uint8_array(nvin, &size, "token"); if (token == NULL) { pjdlog_warning("Handshake header from %s has no 'token' field.", @@ -776,6 +792,16 @@ init_remote(struct hast_resource *res, struct proto_conn **inp, pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); #endif pjdlog_info("Connected to %s.", res->hr_remoteaddr); + if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && + res->hr_version < 2) { + pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); + res->hr_replication = HAST_REPLICATION_FULLSYNC; + } else if (res->hr_replication != res->hr_original_replication) { + /* + * This is in case hastd disconnected and was upgraded. + */ + res->hr_replication = res->hr_original_replication; + } if (inp != NULL && outp != NULL) { *inp = in; *outp = out; @@ -1009,7 +1035,8 @@ hastd_primary(struct hast_resource *res) } static void -reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) +reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, + const char *fmt, ...) { char msg[1024]; va_list ap; @@ -1020,21 +1047,18 @@ reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt switch (ggio->gctl_cmd) { case BIO_READ: (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_DELETE: (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_FLUSH: (void)snprlcat(msg, sizeof(msg), "FLUSH."); break; case BIO_WRITE: (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; default: (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", @@ -1274,8 +1298,13 @@ ggate_recv_thread(void *arg) } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - refcount_init(&hio->hio_countdown, ncomps); - for (ii = ncomp; ii < ncomp + ncomps; ii++) + hio->hio_countdown = ncomps; + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE) { + /* Each remote request needs two responses in memsync. */ + hio->hio_countdown++; + } + for (ii = ncomp; ii < ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } /* NOTREACHED */ @@ -1346,8 +1375,7 @@ local_send_thread(void *arg) } else { hio->hio_errors[ncomp] = 0; if (hio->hio_replication == - HAST_REPLICATION_ASYNC && - !ISSYNCREQ(hio)) { + HAST_REPLICATION_ASYNC) { ggio->gctl_error = 0; write_complete(res, hio); } @@ -1385,8 +1413,42 @@ local_send_thread(void *arg) } break; } - if (!refcount_release(&hio->hio_countdown)) - continue; + + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * 0: remote memsync, remote final, local write + * 1: remote memsync, local write, (remote final) + * 2: local write, (remote memsync), (remote final) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Local write finished as last. + */ + break; + case 1: + /* + * Local write finished after remote memsync + * reply arrvied. We can complete the write now. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + continue; + case 2: + /* + * Local write finished as first. + */ + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1508,6 +1570,10 @@ remote_send_thread(void *arg) nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + nv_add_uint8(nv, 1, "memsync"); + } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, @@ -1568,7 +1634,7 @@ remote_send_thread(void *arg) done_queue: nv_free(nv); if (ISSYNCREQ(hio)) { - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1583,8 +1649,10 @@ done_queue: (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) + (void)refcnt_release(&hio->hio_countdown); } - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; pjdlog_debug(2, "remote_send: (%p) Moving request to the done queue.", @@ -1608,6 +1676,7 @@ remote_recv_thread(void *arg) struct nv *nv; unsigned int ncomp; uint64_t seq; + bool memsyncack; int error; /* Remote component is 1 for now. */ @@ -1623,6 +1692,8 @@ remote_recv_thread(void *arg) } mtx_unlock(&hio_recv_list_lock[ncomp]); + memsyncack = false; + rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); @@ -1652,6 +1723,7 @@ remote_recv_thread(void *arg) nv_free(nv); continue; } + memsyncack = nv_exists(nv, "received"); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { if (hio->hio_ggio.gctl_seq == seq) { @@ -1707,8 +1779,80 @@ remote_recv_thread(void *arg) hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (!refcount_release(&hio->hio_countdown)) - continue; + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * + * 0: local write, remote memsync, remote final + * or + * 0: remote memsync, local write, remote final + * + * 1: local write, remote memsync, (remote final) + * or + * 1: remote memsync, remote final, (local write) + * + * 2: remote memsync, (local write), (remote final) + * or + * 2: remote memsync, (remote final), (local write) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Remote final reply arrived. + */ + PJDLOG_ASSERT(!memsyncack); + break; + case 1: + if (memsyncack) { + /* + * Local request already finished, so we + * can complete the write. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + /* + * We still need to wait for final + * remote reply. + */ + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], + hio, hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + } else { + /* + * Remote final reply arrived before + * local write finished. + * Nothing to do in such case. + */ + } + continue; + case 2: + /* + * We received remote memsync reply even before + * local write finished. + */ + PJDLOG_ASSERT(memsyncack); + + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, + hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1977,7 +2121,7 @@ sync_thread(void *arg __unused) ncomp = 1; } mtx_unlock(&metadata_lock); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* @@ -2027,7 +2171,7 @@ sync_thread(void *arg __unused) pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", hio); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* |