summaryrefslogtreecommitdiffstats
path: root/sbin/hastd
diff options
context:
space:
mode:
Diffstat (limited to 'sbin/hastd')
-rw-r--r--sbin/hastd/control.c11
-rw-r--r--sbin/hastd/hast.h5
-rw-r--r--sbin/hastd/hastd.82
-rw-r--r--sbin/hastd/hastd.c12
-rw-r--r--sbin/hastd/nv.c2
-rw-r--r--sbin/hastd/primary.c243
-rw-r--r--sbin/hastd/proto.c6
-rw-r--r--sbin/hastd/secondary.c24
8 files changed, 155 insertions, 150 deletions
diff --git a/sbin/hastd/control.c b/sbin/hastd/control.c
index 922f507..364225b 100644
--- a/sbin/hastd/control.c
+++ b/sbin/hastd/control.c
@@ -215,6 +215,16 @@ control_status_worker(struct hast_resource *res, struct nv *nvout,
"stat_delete_error%u", no);
nv_add_uint64(nvout, nv_get_uint64(cnvin, "stat_flush_error"),
"stat_flush_error%u", no);
+ nv_add_uint64(nvout, nv_get_uint64(cnvin, "idle_queue_size"),
+ "idle_queue_size%u", no);
+ nv_add_uint64(nvout, nv_get_uint64(cnvin, "local_queue_size"),
+ "local_queue_size%u", no);
+ nv_add_uint64(nvout, nv_get_uint64(cnvin, "send_queue_size"),
+ "send_queue_size%u", no);
+ nv_add_uint64(nvout, nv_get_uint64(cnvin, "recv_queue_size"),
+ "recv_queue_size%u", no);
+ nv_add_uint64(nvout, nv_get_uint64(cnvin, "done_queue_size"),
+ "done_queue_size%u", no);
end:
if (cnvin != NULL)
nv_free(cnvin);
@@ -478,6 +488,7 @@ ctrl_thread(void *arg)
nv_add_uint64(nvout, res->hr_stat_flush_error +
res->hr_stat_activemap_flush_error,
"stat_flush_error");
+ res->output_status_aux(nvout);
nv_add_int16(nvout, 0, "error");
break;
case CONTROL_RELOAD:
diff --git a/sbin/hastd/hast.h b/sbin/hastd/hast.h
index 65c24f8..c529de5 100644
--- a/sbin/hastd/hast.h
+++ b/sbin/hastd/hast.h
@@ -137,6 +137,8 @@ struct hastd_config {
#define HAST_CHECKSUM_CRC32 1
#define HAST_CHECKSUM_SHA256 2
+struct nv;
+
/*
* Structure that describes single resource.
*/
@@ -254,6 +256,9 @@ struct hast_resource {
/* Number of activemap flush errors. */
uint64_t hr_stat_activemap_flush_error;
+ /* Function to output worker specific info on control status request. */
+ void (*output_status_aux)(struct nv *);
+
/* Next resource. */
TAILQ_ENTRY(hast_resource) hr_next;
};
diff --git a/sbin/hastd/hastd.8 b/sbin/hastd/hastd.8
index 017e895..68c98cb 100644
--- a/sbin/hastd/hastd.8
+++ b/sbin/hastd/hastd.8
@@ -171,7 +171,7 @@ The default location is
.Pa /var/run/hastd.pid .
.El
.Sh FILES
-.Bl -tag -width ".Pa /var/run/hastctl" -compact
+.Bl -tag -width ".Pa /var/run/hastd.pid" -compact
.It Pa /etc/hast.conf
The configuration file for
.Nm
diff --git a/sbin/hastd/hastd.c b/sbin/hastd/hastd.c
index 06b38e9..bac390a 100644
--- a/sbin/hastd/hastd.c
+++ b/sbin/hastd/hastd.c
@@ -806,12 +806,6 @@ listen_accept(struct hastd_listen *lst)
*/
version = 1;
}
- if (version > HAST_PROTO_VERSION) {
- pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.",
- version, (unsigned char)HAST_PROTO_VERSION);
- version = HAST_PROTO_VERSION;
- }
- pjdlog_debug(1, "Negotiated protocol version %hhu.", version);
token = nv_get_uint8_array(nvin, &size, "token");
/*
* NULL token means that this is first connection.
@@ -925,6 +919,12 @@ listen_accept(struct hastd_listen *lst)
*/
if (token == NULL) {
+ if (version > HAST_PROTO_VERSION) {
+ pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.",
+ version, (unsigned char)HAST_PROTO_VERSION);
+ version = HAST_PROTO_VERSION;
+ }
+ pjdlog_debug(1, "Negotiated protocol version %hhu.", version);
res->hr_version = version;
arc4random_buf(res->hr_token, sizeof(res->hr_token));
nvout = nv_alloc();
diff --git a/sbin/hastd/nv.c b/sbin/hastd/nv.c
index 8dcf697..fefc2df 100644
--- a/sbin/hastd/nv.c
+++ b/sbin/hastd/nv.c
@@ -566,7 +566,7 @@ nv_get_string(struct nv *nv, const char *namefmt, ...)
return (NULL);
PJDLOG_ASSERT((nvh->nvh_type & NV_ORDER_MASK) == NV_ORDER_HOST);
PJDLOG_ASSERT(nvh->nvh_dsize >= 1);
- str = NVH_DATA(nvh);
+ str = (char *)NVH_DATA(nvh);
PJDLOG_ASSERT(str[nvh->nvh_dsize - 1] == '\0');
PJDLOG_ASSERT(strlen(str) == nvh->nvh_dsize - 1);
return (str);
diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c
index bd484e3..385a52a 100644
--- a/sbin/hastd/primary.c
+++ b/sbin/hastd/primary.c
@@ -94,6 +94,15 @@ struct hio {
*/
bool hio_done;
/*
+ * Number of components we are still waiting before sending write
+ * completion ack to GEOM Gate. Used for memsync.
+ */
+ refcnt_t hio_writecount;
+ /*
+ * Memsync request was acknowleged by remote.
+ */
+ bool hio_memsyncacked;
+ /*
* Remember replication from the time the request was initiated,
* so we won't get confused when replication changes on reload.
*/
@@ -108,6 +117,7 @@ struct hio {
* until some in-progress requests are freed.
*/
static TAILQ_HEAD(, hio) hio_free_list;
+static size_t hio_free_list_size;
static pthread_mutex_t hio_free_list_lock;
static pthread_cond_t hio_free_list_cond;
/*
@@ -116,20 +126,26 @@ static pthread_cond_t hio_free_list_cond;
* responsible for managing his own send list.
*/
static TAILQ_HEAD(, hio) *hio_send_list;
+static size_t *hio_send_list_size;
static pthread_mutex_t *hio_send_list_lock;
static pthread_cond_t *hio_send_list_cond;
+#define hio_send_local_list_size hio_send_list_size[0]
+#define hio_send_remote_list_size hio_send_list_size[1]
/*
* There is one recv list for every component, although local components don't
* use recv lists as local requests are done synchronously.
*/
static TAILQ_HEAD(, hio) *hio_recv_list;
+static size_t *hio_recv_list_size;
static pthread_mutex_t *hio_recv_list_lock;
static pthread_cond_t *hio_recv_list_cond;
+#define hio_recv_remote_list_size hio_recv_list_size[1]
/*
* Request is placed on done list by the slowest component (the one that
* decreased hio_countdown from 1 to 0).
*/
static TAILQ_HEAD(, hio) hio_done_list;
+static size_t hio_done_list_size;
static pthread_mutex_t hio_done_list_lock;
static pthread_cond_t hio_done_list_cond;
/*
@@ -164,25 +180,21 @@ static pthread_mutex_t metadata_lock;
((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
#define QUEUE_INSERT1(hio, name, ncomp) do { \
- bool _wakeup; \
- \
mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
- _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \
+ if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \
+ cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \
TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \
hio_next[(ncomp)]); \
- mtx_unlock(&hio_##name##_list_lock[ncomp]); \
- if (_wakeup) \
- cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \
+ hio_##name##_list_size[(ncomp)]++; \
+ mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
} while (0)
#define QUEUE_INSERT2(hio, name) do { \
- bool _wakeup; \
- \
mtx_lock(&hio_##name##_list_lock); \
- _wakeup = TAILQ_EMPTY(&hio_##name##_list); \
+ if (TAILQ_EMPTY(&hio_##name##_list)) \
+ cv_broadcast(&hio_##name##_list_cond); \
TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
+ hio_##name##_list_size++; \
mtx_unlock(&hio_##name##_list_lock); \
- if (_wakeup) \
- cv_broadcast(&hio_##name##_list_cond); \
} while (0)
#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \
bool _last; \
@@ -196,6 +208,8 @@ static pthread_mutex_t metadata_lock;
_last = true; \
} \
if (hio != NULL) { \
+ PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \
+ hio_##name##_list_size[(ncomp)]--; \
TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
hio_next[(ncomp)]); \
} \
@@ -207,10 +221,16 @@ static pthread_mutex_t metadata_lock;
cv_wait(&hio_##name##_list_cond, \
&hio_##name##_list_lock); \
} \
+ PJDLOG_ASSERT(hio_##name##_list_size != 0); \
+ hio_##name##_list_size--; \
TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \
mtx_unlock(&hio_##name##_list_lock); \
} while (0)
+#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC)
+#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC)
+#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC)
+
#define SYNCREQ(hio) do { \
(hio)->hio_ggio.gctl_unit = -1; \
(hio)->hio_ggio.gctl_seq = 1; \
@@ -219,6 +239,9 @@ static pthread_mutex_t metadata_lock;
#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2)
+#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \
+ (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio))
+
static struct hast_resource *gres;
static pthread_mutex_t range_lock;
@@ -239,6 +262,22 @@ static void *sync_thread(void *arg);
static void *guard_thread(void *arg);
static void
+output_status_aux(struct nv *nvout)
+{
+
+ nv_add_uint64(nvout, (uint64_t)hio_free_list_size,
+ "idle_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size,
+ "local_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size,
+ "send_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size,
+ "recv_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_done_list_size,
+ "done_queue_size");
+}
+
+static void
cleanup(struct hast_resource *res)
{
int rerrno;
@@ -355,6 +394,12 @@ init_environment(struct hast_resource *res __unused)
"Unable to allocate %zu bytes of memory for send lists.",
sizeof(hio_send_list[0]) * ncomps);
}
+ hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps);
+ if (hio_send_list_size == NULL) {
+ primary_exitx(EX_TEMPFAIL,
+ "Unable to allocate %zu bytes of memory for send list counters.",
+ sizeof(hio_send_list_size[0]) * ncomps);
+ }
hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
if (hio_send_list_lock == NULL) {
primary_exitx(EX_TEMPFAIL,
@@ -373,6 +418,12 @@ init_environment(struct hast_resource *res __unused)
"Unable to allocate %zu bytes of memory for recv lists.",
sizeof(hio_recv_list[0]) * ncomps);
}
+ hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps);
+ if (hio_recv_list_size == NULL) {
+ primary_exitx(EX_TEMPFAIL,
+ "Unable to allocate %zu bytes of memory for recv list counters.",
+ sizeof(hio_recv_list_size[0]) * ncomps);
+ }
hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
if (hio_recv_list_lock == NULL) {
primary_exitx(EX_TEMPFAIL,
@@ -393,16 +444,18 @@ init_environment(struct hast_resource *res __unused)
}
/*
- * Initialize lists, their locks and theirs condition variables.
+ * Initialize lists, their counters, locks and condition variables.
*/
TAILQ_INIT(&hio_free_list);
mtx_init(&hio_free_list_lock);
cv_init(&hio_free_list_cond);
for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
TAILQ_INIT(&hio_send_list[ii]);
+ hio_send_list_size[ii] = 0;
mtx_init(&hio_send_list_lock[ii]);
cv_init(&hio_send_list_cond[ii]);
TAILQ_INIT(&hio_recv_list[ii]);
+ hio_recv_list_size[ii] = 0;
mtx_init(&hio_recv_list_lock[ii]);
cv_init(&hio_recv_list_cond[ii]);
rw_init(&hio_remote_lock[ii]);
@@ -445,6 +498,7 @@ init_environment(struct hast_resource *res __unused)
hio->hio_ggio.gctl_length = MAXPHYS;
hio->hio_ggio.gctl_error = 0;
TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
+ hio_free_list_size++;
}
}
@@ -963,6 +1017,7 @@ hastd_primary(struct hast_resource *res)
}
gres = res;
+ res->output_status_aux = output_status_aux;
mode = pjdlog_mode_get();
debuglevel = pjdlog_debug_get();
@@ -1299,6 +1354,10 @@ ggate_recv_thread(void *arg)
} else {
mtx_unlock(&res->hr_amp_lock);
}
+ if (ISMEMSYNC(hio)) {
+ hio->hio_memsyncacked = false;
+ refcnt_init(&hio->hio_writecount, ncomps);
+ }
break;
case BIO_DELETE:
res->hr_stat_delete++;
@@ -1309,13 +1368,7 @@ ggate_recv_thread(void *arg)
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queues.", hio);
- if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
- ggio->gctl_cmd == BIO_WRITE) {
- /* Each remote request needs two responses in memsync. */
- refcnt_init(&hio->hio_countdown, ncomps + 1);
- } else {
- refcnt_init(&hio->hio_countdown, ncomps);
- }
+ refcnt_init(&hio->hio_countdown, ncomps);
for (ii = ncomp; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
}
@@ -1386,8 +1439,7 @@ local_send_thread(void *arg)
ret, (intmax_t)ggio->gctl_length);
} else {
hio->hio_errors[ncomp] = 0;
- if (hio->hio_replication ==
- HAST_REPLICATION_ASYNC) {
+ if (ISASYNC(hio)) {
ggio->gctl_error = 0;
write_complete(res, hio);
}
@@ -1425,42 +1477,13 @@ local_send_thread(void *arg)
}
break;
}
-
- if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
- ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
- if (refcnt_release(&hio->hio_countdown) > 0)
- continue;
- } else {
- /*
- * Depending on hio_countdown value, requests finished
- * in the following order:
- * 0: remote memsync, remote final, local write
- * 1: remote memsync, local write, (remote final)
- * 2: local write, (remote memsync), (remote final)
- */
- switch (refcnt_release(&hio->hio_countdown)) {
- case 0:
- /*
- * Local write finished as last.
- */
- break;
- case 1:
- /*
- * Local write finished after remote memsync
- * reply arrvied. We can complete the write now.
- */
- if (hio->hio_errors[0] == 0)
- write_complete(res, hio);
- continue;
- case 2:
- /*
- * Local write finished as first.
- */
- continue;
- default:
- PJDLOG_ABORT("Invalid hio_countdown.");
+ if (ISMEMSYNCWRITE(hio)) {
+ if (refcnt_release(&hio->hio_writecount) == 0) {
+ write_complete(res, hio);
}
}
+ if (refcnt_release(&hio->hio_countdown) > 0)
+ continue;
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@@ -1582,10 +1605,8 @@ remote_send_thread(void *arg)
nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
nv_add_uint64(nv, offset, "offset");
nv_add_uint64(nv, length, "length");
- if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
- ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) {
+ if (ISMEMSYNCWRITE(hio))
nv_add_uint8(nv, 1, "memsync");
- }
if (nv_error(nv) != 0) {
hio->hio_errors[ncomp] = nv_error(nv);
pjdlog_debug(2,
@@ -1617,6 +1638,7 @@ remote_send_thread(void *arg)
mtx_lock(&hio_recv_list_lock[ncomp]);
wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
+ hio_recv_list_size[ncomp]++;
mtx_unlock(&hio_recv_list_lock[ncomp]);
if (hast_proto_send(res, res->hr_remoteout, nv, data,
data != NULL ? length : 0) == -1) {
@@ -1628,17 +1650,9 @@ remote_send_thread(void *arg)
"Unable to send request (%s): ",
strerror(hio->hio_errors[ncomp]));
remote_close(res, ncomp);
- /*
- * Take request back from the receive queue and move
- * it immediately to the done queue.
- */
- mtx_lock(&hio_recv_list_lock[ncomp]);
- TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
- hio_next[ncomp]);
- mtx_unlock(&hio_recv_list_lock[ncomp]);
- goto done_queue;
+ } else {
+ rw_unlock(&hio_remote_lock[ncomp]);
}
- rw_unlock(&hio_remote_lock[ncomp]);
nv_free(nv);
if (wakeup)
cv_signal(&hio_recv_list_cond[ncomp]);
@@ -1662,8 +1676,12 @@ done_queue:
} else {
mtx_unlock(&res->hr_amp_lock);
}
- if (hio->hio_replication == HAST_REPLICATION_MEMSYNC)
- (void)refcnt_release(&hio->hio_countdown);
+ if (ISMEMSYNCWRITE(hio)) {
+ if (refcnt_release(&hio->hio_writecount) == 0) {
+ if (hio->hio_errors[0] == 0)
+ write_complete(res, hio);
+ }
+ }
}
if (refcnt_release(&hio->hio_countdown) > 0)
continue;
@@ -1719,7 +1737,9 @@ remote_recv_thread(void *arg)
PJDLOG_ASSERT(hio != NULL);
TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
hio_next[ncomp]);
+ hio_recv_list_size[ncomp]--;
mtx_unlock(&hio_recv_list_lock[ncomp]);
+ hio->hio_errors[ncomp] = ENOTCONN;
goto done_queue;
}
if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
@@ -1742,6 +1762,7 @@ remote_recv_thread(void *arg)
if (hio->hio_ggio.gctl_seq == seq) {
TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
hio_next[ncomp]);
+ hio_recv_list_size[ncomp]--;
break;
}
}
@@ -1792,80 +1813,34 @@ remote_recv_thread(void *arg)
hio->hio_errors[ncomp] = 0;
nv_free(nv);
done_queue:
- if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
- hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
- if (refcnt_release(&hio->hio_countdown) > 0)
- continue;
- } else {
- /*
- * Depending on hio_countdown value, requests finished
- * in the following order:
- *
- * 0: local write, remote memsync, remote final
- * or
- * 0: remote memsync, local write, remote final
- *
- * 1: local write, remote memsync, (remote final)
- * or
- * 1: remote memsync, remote final, (local write)
- *
- * 2: remote memsync, (local write), (remote final)
- * or
- * 2: remote memsync, (remote final), (local write)
- */
- switch (refcnt_release(&hio->hio_countdown)) {
- case 0:
- /*
- * Remote final reply arrived.
- */
- PJDLOG_ASSERT(!memsyncack);
- break;
- case 1:
- if (memsyncack) {
- /*
- * Local request already finished, so we
- * can complete the write.
- */
+ if (ISMEMSYNCWRITE(hio)) {
+ if (!hio->hio_memsyncacked) {
+ PJDLOG_ASSERT(memsyncack ||
+ hio->hio_errors[ncomp] != 0);
+ /* Remote ack arrived. */
+ if (refcnt_release(&hio->hio_writecount) == 0) {
if (hio->hio_errors[0] == 0)
write_complete(res, hio);
- /*
- * We still need to wait for final
- * remote reply.
- */
+ }
+ hio->hio_memsyncacked = true;
+ if (hio->hio_errors[ncomp] == 0) {
pjdlog_debug(2,
- "remote_recv: (%p) Moving request back to the recv queue.",
- hio);
+ "remote_recv: (%p) Moving request "
+ "back to the recv queue.", hio);
mtx_lock(&hio_recv_list_lock[ncomp]);
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
hio, hio_next[ncomp]);
+ hio_recv_list_size[ncomp]++;
mtx_unlock(&hio_recv_list_lock[ncomp]);
- } else {
- /*
- * Remote final reply arrived before
- * local write finished.
- * Nothing to do in such case.
- */
+ continue;
}
- continue;
- case 2:
- /*
- * We received remote memsync reply even before
- * local write finished.
- */
- PJDLOG_ASSERT(memsyncack);
-
- pjdlog_debug(2,
- "remote_recv: (%p) Moving request back to the recv queue.",
- hio);
- mtx_lock(&hio_recv_list_lock[ncomp]);
- TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio,
- hio_next[ncomp]);
- mtx_unlock(&hio_recv_list_lock[ncomp]);
- continue;
- default:
- PJDLOG_ABORT("Invalid hio_countdown.");
+ } else {
+ PJDLOG_ASSERT(!memsyncack);
+ /* Remote final reply arrived. */
}
}
+ if (refcnt_release(&hio->hio_countdown) > 0)
+ continue;
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
diff --git a/sbin/hastd/proto.c b/sbin/hastd/proto.c
index 73487c0..53bbf7a 100644
--- a/sbin/hastd/proto.c
+++ b/sbin/hastd/proto.c
@@ -298,8 +298,8 @@ proto_connection_send(const struct proto_conn *conn, struct proto_conn *mconn)
protoname = mconn->pc_proto->prt_name;
PJDLOG_ASSERT(protoname != NULL);
- ret = conn->pc_proto->prt_send(conn->pc_ctx, protoname,
- strlen(protoname) + 1, fd);
+ ret = conn->pc_proto->prt_send(conn->pc_ctx,
+ (const unsigned char *)protoname, strlen(protoname) + 1, fd);
proto_close(mconn);
if (ret != 0) {
errno = ret;
@@ -325,7 +325,7 @@ proto_connection_recv(const struct proto_conn *conn, bool client,
bzero(protoname, sizeof(protoname));
- ret = conn->pc_proto->prt_recv(conn->pc_ctx, protoname,
+ ret = conn->pc_proto->prt_recv(conn->pc_ctx, (unsigned char *)protoname,
sizeof(protoname) - 1, &fd);
if (ret != 0) {
errno = ret;
diff --git a/sbin/hastd/secondary.c b/sbin/hastd/secondary.c
index 067c5d9..5e1207a 100644
--- a/sbin/hastd/secondary.c
+++ b/sbin/hastd/secondary.c
@@ -82,18 +82,21 @@ static struct hast_resource *gres;
* until some in-progress requests are freed.
*/
static TAILQ_HEAD(, hio) hio_free_list;
+static size_t hio_free_list_size;
static pthread_mutex_t hio_free_list_lock;
static pthread_cond_t hio_free_list_cond;
/*
* Disk thread (the one that does I/O requests) takes requests from this list.
*/
static TAILQ_HEAD(, hio) hio_disk_list;
+static size_t hio_disk_list_size;
static pthread_mutex_t hio_disk_list_lock;
static pthread_cond_t hio_disk_list_cond;
/*
* Thread that sends requests back to primary takes requests from this list.
*/
static TAILQ_HEAD(, hio) hio_send_list;
+static size_t hio_send_list_size;
static pthread_mutex_t hio_send_list_lock;
static pthread_cond_t hio_send_list_cond;
@@ -107,14 +110,12 @@ static void *disk_thread(void *arg);
static void *send_thread(void *arg);
#define QUEUE_INSERT(name, hio) do { \
- bool _wakeup; \
- \
mtx_lock(&hio_##name##_list_lock); \
- _wakeup = TAILQ_EMPTY(&hio_##name##_list); \
+ if (TAILQ_EMPTY(&hio_##name##_list)) \
+ cv_broadcast(&hio_##name##_list_cond); \
TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \
+ hio_##name##_list_size++; \
mtx_unlock(&hio_##name##_list_lock); \
- if (_wakeup) \
- cv_broadcast(&hio_##name##_list_cond); \
} while (0)
#define QUEUE_TAKE(name, hio) do { \
mtx_lock(&hio_##name##_list_lock); \
@@ -122,11 +123,22 @@ static void *send_thread(void *arg);
cv_wait(&hio_##name##_list_cond, \
&hio_##name##_list_lock); \
} \
+ PJDLOG_ASSERT(hio_##name##_list_size != 0); \
+ hio_##name##_list_size--; \
TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \
mtx_unlock(&hio_##name##_list_lock); \
} while (0)
static void
+output_status_aux(struct nv *nvout)
+{
+
+ nv_add_uint64(nvout, (uint64_t)hio_free_list_size, "idle_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_disk_list_size, "local_queue_size");
+ nv_add_uint64(nvout, (uint64_t)hio_send_list_size, "send_queue_size");
+}
+
+static void
hio_clear(struct hio *hio)
{
@@ -190,6 +202,7 @@ init_environment(void)
}
hio_clear(hio);
TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
+ hio_free_list_size++;
}
}
@@ -441,6 +454,7 @@ hastd_secondary(struct hast_resource *res, struct nv *nvin)
}
gres = res;
+ res->output_status_aux = output_status_aux;
mode = pjdlog_mode_get();
debuglevel = pjdlog_debug_get();
OpenPOWER on IntegriCloud