summaryrefslogtreecommitdiffstats
path: root/sbin/hastd/primary.c
diff options
context:
space:
mode:
Diffstat (limited to 'sbin/hastd/primary.c')
-rw-r--r--sbin/hastd/primary.c186
1 files changed, 165 insertions, 21 deletions
diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c
index 88159cb..fb49ef6 100644
--- a/sbin/hastd/primary.c
+++ b/sbin/hastd/primary.c
@@ -35,7 +35,6 @@ __FBSDID("$FreeBSD$");
#include <sys/time.h>
#include <sys/bio.h>
#include <sys/disk.h>
-#include <sys/refcount.h>
#include <sys/stat.h>
#include <geom/gate/g_gate.h>
@@ -65,6 +64,7 @@ __FBSDID("$FreeBSD$");
#include "metadata.h"
#include "proto.h"
#include "pjdlog.h"
+#include "refcnt.h"
#include "subr.h"
#include "synch.h"
@@ -543,7 +543,7 @@ primary_connect(struct hast_resource *res, struct proto_conn **connp)
return (0);
}
-
+
/*
* Function instructs GEOM_GATE to handle reads directly from within the kernel.
*/
@@ -577,6 +577,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
int32_t extentsize;
int64_t datasize;
uint32_t mapsize;
+ uint8_t version;
size_t size;
int error;
@@ -597,6 +598,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
*/
nvout = nv_alloc();
nv_add_string(nvout, res->hr_name, "resource");
+ nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
if (nv_error(nvout) != 0) {
pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
"Unable to allocate header for connection with %s",
@@ -626,6 +628,20 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
nv_free(nvin);
goto close;
}
+ version = nv_get_uint8(nvin, "version");
+ if (version == 0) {
+ /*
+ * If no version is sent, it means this is protocol version 1.
+ */
+ version = 1;
+ }
+ if (version > HAST_PROTO_VERSION) {
+ pjdlog_warning("Invalid version received (%hhu).", version);
+ nv_free(nvin);
+ goto close;
+ }
+ res->hr_version = version;
+ pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
token = nv_get_uint8_array(nvin, &size, "token");
if (token == NULL) {
pjdlog_warning("Handshake header from %s has no 'token' field.",
@@ -776,6 +792,16 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
#endif
pjdlog_info("Connected to %s.", res->hr_remoteaddr);
+ if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
+ res->hr_version < 2) {
+ pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
+ res->hr_replication = HAST_REPLICATION_FULLSYNC;
+ } else if (res->hr_replication != res->hr_original_replication) {
+ /*
+ * This is in case hastd disconnected and was upgraded.
+ */
+ res->hr_replication = res->hr_original_replication;
+ }
if (inp != NULL && outp != NULL) {
*inp = in;
*outp = out;
@@ -1009,7 +1035,8 @@ hastd_primary(struct hast_resource *res)
}
static void
-reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
+reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
+ const char *fmt, ...)
{
char msg[1024];
va_list ap;
@@ -1020,21 +1047,18 @@ reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt
switch (ggio->gctl_cmd) {
case BIO_READ:
(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
- (uintmax_t)ggio->gctl_offset,
- (uintmax_t)ggio->gctl_length);
+ (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
case BIO_DELETE:
(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
- (uintmax_t)ggio->gctl_offset,
- (uintmax_t)ggio->gctl_length);
+ (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
case BIO_FLUSH:
(void)snprlcat(msg, sizeof(msg), "FLUSH.");
break;
case BIO_WRITE:
(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
- (uintmax_t)ggio->gctl_offset,
- (uintmax_t)ggio->gctl_length);
+ (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
default:
(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
@@ -1274,8 +1298,13 @@ ggate_recv_thread(void *arg)
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queues.", hio);
- refcount_init(&hio->hio_countdown, ncomps);
- for (ii = ncomp; ii < ncomp + ncomps; ii++)
+ hio->hio_countdown = ncomps;
+ if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+ ggio->gctl_cmd == BIO_WRITE) {
+ /* Each remote request needs two responses in memsync. */
+ hio->hio_countdown++;
+ }
+ for (ii = ncomp; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
}
/* NOTREACHED */
@@ -1346,8 +1375,7 @@ local_send_thread(void *arg)
} else {
hio->hio_errors[ncomp] = 0;
if (hio->hio_replication ==
- HAST_REPLICATION_ASYNC &&
- !ISSYNCREQ(hio)) {
+ HAST_REPLICATION_ASYNC) {
ggio->gctl_error = 0;
write_complete(res, hio);
}
@@ -1385,8 +1413,42 @@ local_send_thread(void *arg)
}
break;
}
- if (!refcount_release(&hio->hio_countdown))
- continue;
+
+ if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+ ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+ if (refcnt_release(&hio->hio_countdown) > 0)
+ continue;
+ } else {
+ /*
+ * Depending on hio_countdown value, requests finished
+ * in the following order:
+ * 0: remote memsync, remote final, local write
+ * 1: remote memsync, local write, (remote final)
+ * 2: local write, (remote memsync), (remote final)
+ */
+ switch (refcnt_release(&hio->hio_countdown)) {
+ case 0:
+ /*
+ * Local write finished as last.
+ */
+ break;
+ case 1:
+ /*
+ * Local write finished after remote memsync
+ * reply arrvied. We can complete the write now.
+ */
+ if (hio->hio_errors[0] == 0)
+ write_complete(res, hio);
+ continue;
+ case 2:
+ /*
+ * Local write finished as first.
+ */
+ continue;
+ default:
+ PJDLOG_ABORT("Invalid hio_countdown.");
+ }
+ }
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@@ -1508,6 +1570,10 @@ remote_send_thread(void *arg)
nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
nv_add_uint64(nv, offset, "offset");
nv_add_uint64(nv, length, "length");
+ if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+ ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) {
+ nv_add_uint8(nv, 1, "memsync");
+ }
if (nv_error(nv) != 0) {
hio->hio_errors[ncomp] = nv_error(nv);
pjdlog_debug(2,
@@ -1568,7 +1634,7 @@ remote_send_thread(void *arg)
done_queue:
nv_free(nv);
if (ISSYNCREQ(hio)) {
- if (!refcount_release(&hio->hio_countdown))
+ if (refcnt_release(&hio->hio_countdown) > 0)
continue;
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@@ -1583,8 +1649,10 @@ done_queue:
(void)hast_activemap_flush(res);
}
mtx_unlock(&res->hr_amp_lock);
+ if (hio->hio_replication == HAST_REPLICATION_MEMSYNC)
+ (void)refcnt_release(&hio->hio_countdown);
}
- if (!refcount_release(&hio->hio_countdown))
+ if (refcnt_release(&hio->hio_countdown) > 0)
continue;
pjdlog_debug(2,
"remote_send: (%p) Moving request to the done queue.",
@@ -1608,6 +1676,7 @@ remote_recv_thread(void *arg)
struct nv *nv;
unsigned int ncomp;
uint64_t seq;
+ bool memsyncack;
int error;
/* Remote component is 1 for now. */
@@ -1623,6 +1692,8 @@ remote_recv_thread(void *arg)
}
mtx_unlock(&hio_recv_list_lock[ncomp]);
+ memsyncack = false;
+
rw_rlock(&hio_remote_lock[ncomp]);
if (!ISCONNECTED(res, ncomp)) {
rw_unlock(&hio_remote_lock[ncomp]);
@@ -1652,6 +1723,7 @@ remote_recv_thread(void *arg)
nv_free(nv);
continue;
}
+ memsyncack = nv_exists(nv, "received");
mtx_lock(&hio_recv_list_lock[ncomp]);
TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
if (hio->hio_ggio.gctl_seq == seq) {
@@ -1707,8 +1779,80 @@ remote_recv_thread(void *arg)
hio->hio_errors[ncomp] = 0;
nv_free(nv);
done_queue:
- if (!refcount_release(&hio->hio_countdown))
- continue;
+ if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+ hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+ if (refcnt_release(&hio->hio_countdown) > 0)
+ continue;
+ } else {
+ /*
+ * Depending on hio_countdown value, requests finished
+ * in the following order:
+ *
+ * 0: local write, remote memsync, remote final
+ * or
+ * 0: remote memsync, local write, remote final
+ *
+ * 1: local write, remote memsync, (remote final)
+ * or
+ * 1: remote memsync, remote final, (local write)
+ *
+ * 2: remote memsync, (local write), (remote final)
+ * or
+ * 2: remote memsync, (remote final), (local write)
+ */
+ switch (refcnt_release(&hio->hio_countdown)) {
+ case 0:
+ /*
+ * Remote final reply arrived.
+ */
+ PJDLOG_ASSERT(!memsyncack);
+ break;
+ case 1:
+ if (memsyncack) {
+ /*
+ * Local request already finished, so we
+ * can complete the write.
+ */
+ if (hio->hio_errors[0] == 0)
+ write_complete(res, hio);
+ /*
+ * We still need to wait for final
+ * remote reply.
+ */
+ pjdlog_debug(2,
+ "remote_recv: (%p) Moving request back to the recv queue.",
+ hio);
+ mtx_lock(&hio_recv_list_lock[ncomp]);
+ TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
+ hio, hio_next[ncomp]);
+ mtx_unlock(&hio_recv_list_lock[ncomp]);
+ } else {
+ /*
+ * Remote final reply arrived before
+ * local write finished.
+ * Nothing to do in such case.
+ */
+ }
+ continue;
+ case 2:
+ /*
+ * We received remote memsync reply even before
+ * local write finished.
+ */
+ PJDLOG_ASSERT(memsyncack);
+
+ pjdlog_debug(2,
+ "remote_recv: (%p) Moving request back to the recv queue.",
+ hio);
+ mtx_lock(&hio_recv_list_lock[ncomp]);
+ TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio,
+ hio_next[ncomp]);
+ mtx_unlock(&hio_recv_list_lock[ncomp]);
+ continue;
+ default:
+ PJDLOG_ABORT("Invalid hio_countdown.");
+ }
+ }
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@@ -1977,7 +2121,7 @@ sync_thread(void *arg __unused)
ncomp = 1;
}
mtx_unlock(&metadata_lock);
- refcount_init(&hio->hio_countdown, 1);
+ hio->hio_countdown = 1;
QUEUE_INSERT1(hio, send, ncomp);
/*
@@ -2027,7 +2171,7 @@ sync_thread(void *arg __unused)
pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
hio);
- refcount_init(&hio->hio_countdown, 1);
+ hio->hio_countdown = 1;
QUEUE_INSERT1(hio, send, ncomp);
/*
OpenPOWER on IntegriCloud