diff options
Diffstat (limited to 'sbin/hastd')
-rw-r--r-- | sbin/hastd/control.c | 11 | ||||
-rw-r--r-- | sbin/hastd/hast.h | 5 | ||||
-rw-r--r-- | sbin/hastd/hastd.8 | 2 | ||||
-rw-r--r-- | sbin/hastd/hastd.c | 12 | ||||
-rw-r--r-- | sbin/hastd/nv.c | 2 | ||||
-rw-r--r-- | sbin/hastd/primary.c | 243 | ||||
-rw-r--r-- | sbin/hastd/proto.c | 6 | ||||
-rw-r--r-- | sbin/hastd/secondary.c | 24 |
8 files changed, 155 insertions, 150 deletions
diff --git a/sbin/hastd/control.c b/sbin/hastd/control.c index 922f507..364225b 100644 --- a/sbin/hastd/control.c +++ b/sbin/hastd/control.c @@ -215,6 +215,16 @@ control_status_worker(struct hast_resource *res, struct nv *nvout, "stat_delete_error%u", no); nv_add_uint64(nvout, nv_get_uint64(cnvin, "stat_flush_error"), "stat_flush_error%u", no); + nv_add_uint64(nvout, nv_get_uint64(cnvin, "idle_queue_size"), + "idle_queue_size%u", no); + nv_add_uint64(nvout, nv_get_uint64(cnvin, "local_queue_size"), + "local_queue_size%u", no); + nv_add_uint64(nvout, nv_get_uint64(cnvin, "send_queue_size"), + "send_queue_size%u", no); + nv_add_uint64(nvout, nv_get_uint64(cnvin, "recv_queue_size"), + "recv_queue_size%u", no); + nv_add_uint64(nvout, nv_get_uint64(cnvin, "done_queue_size"), + "done_queue_size%u", no); end: if (cnvin != NULL) nv_free(cnvin); @@ -478,6 +488,7 @@ ctrl_thread(void *arg) nv_add_uint64(nvout, res->hr_stat_flush_error + res->hr_stat_activemap_flush_error, "stat_flush_error"); + res->output_status_aux(nvout); nv_add_int16(nvout, 0, "error"); break; case CONTROL_RELOAD: diff --git a/sbin/hastd/hast.h b/sbin/hastd/hast.h index 65c24f8..c529de5 100644 --- a/sbin/hastd/hast.h +++ b/sbin/hastd/hast.h @@ -137,6 +137,8 @@ struct hastd_config { #define HAST_CHECKSUM_CRC32 1 #define HAST_CHECKSUM_SHA256 2 +struct nv; + /* * Structure that describes single resource. */ @@ -254,6 +256,9 @@ struct hast_resource { /* Number of activemap flush errors. */ uint64_t hr_stat_activemap_flush_error; + /* Function to output worker specific info on control status request. */ + void (*output_status_aux)(struct nv *); + /* Next resource. */ TAILQ_ENTRY(hast_resource) hr_next; }; diff --git a/sbin/hastd/hastd.8 b/sbin/hastd/hastd.8 index 017e895..68c98cb 100644 --- a/sbin/hastd/hastd.8 +++ b/sbin/hastd/hastd.8 @@ -171,7 +171,7 @@ The default location is .Pa /var/run/hastd.pid . .El .Sh FILES -.Bl -tag -width ".Pa /var/run/hastctl" -compact +.Bl -tag -width ".Pa /var/run/hastd.pid" -compact .It Pa /etc/hast.conf The configuration file for .Nm diff --git a/sbin/hastd/hastd.c b/sbin/hastd/hastd.c index 06b38e9..bac390a 100644 --- a/sbin/hastd/hastd.c +++ b/sbin/hastd/hastd.c @@ -806,12 +806,6 @@ listen_accept(struct hastd_listen *lst) */ version = 1; } - if (version > HAST_PROTO_VERSION) { - pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.", - version, (unsigned char)HAST_PROTO_VERSION); - version = HAST_PROTO_VERSION; - } - pjdlog_debug(1, "Negotiated protocol version %hhu.", version); token = nv_get_uint8_array(nvin, &size, "token"); /* * NULL token means that this is first connection. @@ -925,6 +919,12 @@ listen_accept(struct hastd_listen *lst) */ if (token == NULL) { + if (version > HAST_PROTO_VERSION) { + pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.", + version, (unsigned char)HAST_PROTO_VERSION); + version = HAST_PROTO_VERSION; + } + pjdlog_debug(1, "Negotiated protocol version %hhu.", version); res->hr_version = version; arc4random_buf(res->hr_token, sizeof(res->hr_token)); nvout = nv_alloc(); diff --git a/sbin/hastd/nv.c b/sbin/hastd/nv.c index 8dcf697..fefc2df 100644 --- a/sbin/hastd/nv.c +++ b/sbin/hastd/nv.c @@ -566,7 +566,7 @@ nv_get_string(struct nv *nv, const char *namefmt, ...) return (NULL); PJDLOG_ASSERT((nvh->nvh_type & NV_ORDER_MASK) == NV_ORDER_HOST); PJDLOG_ASSERT(nvh->nvh_dsize >= 1); - str = NVH_DATA(nvh); + str = (char *)NVH_DATA(nvh); PJDLOG_ASSERT(str[nvh->nvh_dsize - 1] == '\0'); PJDLOG_ASSERT(strlen(str) == nvh->nvh_dsize - 1); return (str); diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c index bd484e3..385a52a 100644 --- a/sbin/hastd/primary.c +++ b/sbin/hastd/primary.c @@ -94,6 +94,15 @@ struct hio { */ bool hio_done; /* + * Number of components we are still waiting before sending write + * completion ack to GEOM Gate. Used for memsync. + */ + refcnt_t hio_writecount; + /* + * Memsync request was acknowleged by remote. + */ + bool hio_memsyncacked; + /* * Remember replication from the time the request was initiated, * so we won't get confused when replication changes on reload. */ @@ -108,6 +117,7 @@ struct hio { * until some in-progress requests are freed. */ static TAILQ_HEAD(, hio) hio_free_list; +static size_t hio_free_list_size; static pthread_mutex_t hio_free_list_lock; static pthread_cond_t hio_free_list_cond; /* @@ -116,20 +126,26 @@ static pthread_cond_t hio_free_list_cond; * responsible for managing his own send list. */ static TAILQ_HEAD(, hio) *hio_send_list; +static size_t *hio_send_list_size; static pthread_mutex_t *hio_send_list_lock; static pthread_cond_t *hio_send_list_cond; +#define hio_send_local_list_size hio_send_list_size[0] +#define hio_send_remote_list_size hio_send_list_size[1] /* * There is one recv list for every component, although local components don't * use recv lists as local requests are done synchronously. */ static TAILQ_HEAD(, hio) *hio_recv_list; +static size_t *hio_recv_list_size; static pthread_mutex_t *hio_recv_list_lock; static pthread_cond_t *hio_recv_list_cond; +#define hio_recv_remote_list_size hio_recv_list_size[1] /* * Request is placed on done list by the slowest component (the one that * decreased hio_countdown from 1 to 0). */ static TAILQ_HEAD(, hio) hio_done_list; +static size_t hio_done_list_size; static pthread_mutex_t hio_done_list_lock; static pthread_cond_t hio_done_list_cond; /* @@ -164,25 +180,21 @@ static pthread_mutex_t metadata_lock; ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) #define QUEUE_INSERT1(hio, name, ncomp) do { \ - bool _wakeup; \ - \ mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ - _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ + if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \ + cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ - mtx_unlock(&hio_##name##_list_lock[ncomp]); \ - if (_wakeup) \ - cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ + hio_##name##_list_size[(ncomp)]++; \ + mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ } while (0) #define QUEUE_INSERT2(hio, name) do { \ - bool _wakeup; \ - \ mtx_lock(&hio_##name##_list_lock); \ - _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ + if (TAILQ_EMPTY(&hio_##name##_list)) \ + cv_broadcast(&hio_##name##_list_cond); \ TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ + hio_##name##_list_size++; \ mtx_unlock(&hio_##name##_list_lock); \ - if (_wakeup) \ - cv_broadcast(&hio_##name##_list_cond); \ } while (0) #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ bool _last; \ @@ -196,6 +208,8 @@ static pthread_mutex_t metadata_lock; _last = true; \ } \ if (hio != NULL) { \ + PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ + hio_##name##_list_size[(ncomp)]--; \ TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ } \ @@ -207,10 +221,16 @@ static pthread_mutex_t metadata_lock; cv_wait(&hio_##name##_list_cond, \ &hio_##name##_list_lock); \ } \ + PJDLOG_ASSERT(hio_##name##_list_size != 0); \ + hio_##name##_list_size--; \ TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ mtx_unlock(&hio_##name##_list_lock); \ } while (0) +#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC) +#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC) +#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC) + #define SYNCREQ(hio) do { \ (hio)->hio_ggio.gctl_unit = -1; \ (hio)->hio_ggio.gctl_seq = 1; \ @@ -219,6 +239,9 @@ static pthread_mutex_t metadata_lock; #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) +#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \ + (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) + static struct hast_resource *gres; static pthread_mutex_t range_lock; @@ -239,6 +262,22 @@ static void *sync_thread(void *arg); static void *guard_thread(void *arg); static void +output_status_aux(struct nv *nvout) +{ + + nv_add_uint64(nvout, (uint64_t)hio_free_list_size, + "idle_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, + "local_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, + "send_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, + "recv_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_done_list_size, + "done_queue_size"); +} + +static void cleanup(struct hast_resource *res) { int rerrno; @@ -355,6 +394,12 @@ init_environment(struct hast_resource *res __unused) "Unable to allocate %zu bytes of memory for send lists.", sizeof(hio_send_list[0]) * ncomps); } + hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); + if (hio_send_list_size == NULL) { + primary_exitx(EX_TEMPFAIL, + "Unable to allocate %zu bytes of memory for send list counters.", + sizeof(hio_send_list_size[0]) * ncomps); + } hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); if (hio_send_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, @@ -373,6 +418,12 @@ init_environment(struct hast_resource *res __unused) "Unable to allocate %zu bytes of memory for recv lists.", sizeof(hio_recv_list[0]) * ncomps); } + hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); + if (hio_recv_list_size == NULL) { + primary_exitx(EX_TEMPFAIL, + "Unable to allocate %zu bytes of memory for recv list counters.", + sizeof(hio_recv_list_size[0]) * ncomps); + } hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); if (hio_recv_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, @@ -393,16 +444,18 @@ init_environment(struct hast_resource *res __unused) } /* - * Initialize lists, their locks and theirs condition variables. + * Initialize lists, their counters, locks and condition variables. */ TAILQ_INIT(&hio_free_list); mtx_init(&hio_free_list_lock); cv_init(&hio_free_list_cond); for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { TAILQ_INIT(&hio_send_list[ii]); + hio_send_list_size[ii] = 0; mtx_init(&hio_send_list_lock[ii]); cv_init(&hio_send_list_cond[ii]); TAILQ_INIT(&hio_recv_list[ii]); + hio_recv_list_size[ii] = 0; mtx_init(&hio_recv_list_lock[ii]); cv_init(&hio_recv_list_cond[ii]); rw_init(&hio_remote_lock[ii]); @@ -445,6 +498,7 @@ init_environment(struct hast_resource *res __unused) hio->hio_ggio.gctl_length = MAXPHYS; hio->hio_ggio.gctl_error = 0; TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); + hio_free_list_size++; } } @@ -963,6 +1017,7 @@ hastd_primary(struct hast_resource *res) } gres = res; + res->output_status_aux = output_status_aux; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); @@ -1299,6 +1354,10 @@ ggate_recv_thread(void *arg) } else { mtx_unlock(&res->hr_amp_lock); } + if (ISMEMSYNC(hio)) { + hio->hio_memsyncacked = false; + refcnt_init(&hio->hio_writecount, ncomps); + } break; case BIO_DELETE: res->hr_stat_delete++; @@ -1309,13 +1368,7 @@ ggate_recv_thread(void *arg) } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE) { - /* Each remote request needs two responses in memsync. */ - refcnt_init(&hio->hio_countdown, ncomps + 1); - } else { - refcnt_init(&hio->hio_countdown, ncomps); - } + refcnt_init(&hio->hio_countdown, ncomps); for (ii = ncomp; ii < ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } @@ -1386,8 +1439,7 @@ local_send_thread(void *arg) ret, (intmax_t)ggio->gctl_length); } else { hio->hio_errors[ncomp] = 0; - if (hio->hio_replication == - HAST_REPLICATION_ASYNC) { + if (ISASYNC(hio)) { ggio->gctl_error = 0; write_complete(res, hio); } @@ -1425,42 +1477,13 @@ local_send_thread(void *arg) } break; } - - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * 0: remote memsync, remote final, local write - * 1: remote memsync, local write, (remote final) - * 2: local write, (remote memsync), (remote final) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Local write finished as last. - */ - break; - case 1: - /* - * Local write finished after remote memsync - * reply arrvied. We can complete the write now. - */ - if (hio->hio_errors[0] == 0) - write_complete(res, hio); - continue; - case 2: - /* - * Local write finished as first. - */ - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) { + write_complete(res, hio); } } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1582,10 +1605,8 @@ remote_send_thread(void *arg) nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + if (ISMEMSYNCWRITE(hio)) nv_add_uint8(nv, 1, "memsync"); - } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, @@ -1617,6 +1638,7 @@ remote_send_thread(void *arg) mtx_lock(&hio_recv_list_lock[ncomp]); wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]++; mtx_unlock(&hio_recv_list_lock[ncomp]); if (hast_proto_send(res, res->hr_remoteout, nv, data, data != NULL ? length : 0) == -1) { @@ -1628,17 +1650,9 @@ remote_send_thread(void *arg) "Unable to send request (%s): ", strerror(hio->hio_errors[ncomp])); remote_close(res, ncomp); - /* - * Take request back from the receive queue and move - * it immediately to the done queue. - */ - mtx_lock(&hio_recv_list_lock[ncomp]); - TAILQ_REMOVE(&hio_recv_list[ncomp], hio, - hio_next[ncomp]); - mtx_unlock(&hio_recv_list_lock[ncomp]); - goto done_queue; + } else { + rw_unlock(&hio_remote_lock[ncomp]); } - rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); if (wakeup) cv_signal(&hio_recv_list_cond[ncomp]); @@ -1662,8 +1676,12 @@ done_queue: } else { mtx_unlock(&res->hr_amp_lock); } - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) - (void)refcnt_release(&hio->hio_countdown); + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) { + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + } + } } if (refcnt_release(&hio->hio_countdown) > 0) continue; @@ -1719,7 +1737,9 @@ remote_recv_thread(void *arg) PJDLOG_ASSERT(hio != NULL); TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]--; mtx_unlock(&hio_recv_list_lock[ncomp]); + hio->hio_errors[ncomp] = ENOTCONN; goto done_queue; } if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { @@ -1742,6 +1762,7 @@ remote_recv_thread(void *arg) if (hio->hio_ggio.gctl_seq == seq) { TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]--; break; } } @@ -1792,80 +1813,34 @@ remote_recv_thread(void *arg) hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * - * 0: local write, remote memsync, remote final - * or - * 0: remote memsync, local write, remote final - * - * 1: local write, remote memsync, (remote final) - * or - * 1: remote memsync, remote final, (local write) - * - * 2: remote memsync, (local write), (remote final) - * or - * 2: remote memsync, (remote final), (local write) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Remote final reply arrived. - */ - PJDLOG_ASSERT(!memsyncack); - break; - case 1: - if (memsyncack) { - /* - * Local request already finished, so we - * can complete the write. - */ + if (ISMEMSYNCWRITE(hio)) { + if (!hio->hio_memsyncacked) { + PJDLOG_ASSERT(memsyncack || + hio->hio_errors[ncomp] != 0); + /* Remote ack arrived. */ + if (refcnt_release(&hio->hio_writecount) == 0) { if (hio->hio_errors[0] == 0) write_complete(res, hio); - /* - * We still need to wait for final - * remote reply. - */ + } + hio->hio_memsyncacked = true; + if (hio->hio_errors[ncomp] == 0) { pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); + "remote_recv: (%p) Moving request " + "back to the recv queue.", hio); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]++; mtx_unlock(&hio_recv_list_lock[ncomp]); - } else { - /* - * Remote final reply arrived before - * local write finished. - * Nothing to do in such case. - */ + continue; } - continue; - case 2: - /* - * We received remote memsync reply even before - * local write finished. - */ - PJDLOG_ASSERT(memsyncack); - - pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); - mtx_lock(&hio_recv_list_lock[ncomp]); - TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, - hio_next[ncomp]); - mtx_unlock(&hio_recv_list_lock[ncomp]); - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); + } else { + PJDLOG_ASSERT(!memsyncack); + /* Remote final reply arrived. */ } } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); diff --git a/sbin/hastd/proto.c b/sbin/hastd/proto.c index 73487c0..53bbf7a 100644 --- a/sbin/hastd/proto.c +++ b/sbin/hastd/proto.c @@ -298,8 +298,8 @@ proto_connection_send(const struct proto_conn *conn, struct proto_conn *mconn) protoname = mconn->pc_proto->prt_name; PJDLOG_ASSERT(protoname != NULL); - ret = conn->pc_proto->prt_send(conn->pc_ctx, protoname, - strlen(protoname) + 1, fd); + ret = conn->pc_proto->prt_send(conn->pc_ctx, + (const unsigned char *)protoname, strlen(protoname) + 1, fd); proto_close(mconn); if (ret != 0) { errno = ret; @@ -325,7 +325,7 @@ proto_connection_recv(const struct proto_conn *conn, bool client, bzero(protoname, sizeof(protoname)); - ret = conn->pc_proto->prt_recv(conn->pc_ctx, protoname, + ret = conn->pc_proto->prt_recv(conn->pc_ctx, (unsigned char *)protoname, sizeof(protoname) - 1, &fd); if (ret != 0) { errno = ret; diff --git a/sbin/hastd/secondary.c b/sbin/hastd/secondary.c index 067c5d9..5e1207a 100644 --- a/sbin/hastd/secondary.c +++ b/sbin/hastd/secondary.c @@ -82,18 +82,21 @@ static struct hast_resource *gres; * until some in-progress requests are freed. */ static TAILQ_HEAD(, hio) hio_free_list; +static size_t hio_free_list_size; static pthread_mutex_t hio_free_list_lock; static pthread_cond_t hio_free_list_cond; /* * Disk thread (the one that does I/O requests) takes requests from this list. */ static TAILQ_HEAD(, hio) hio_disk_list; +static size_t hio_disk_list_size; static pthread_mutex_t hio_disk_list_lock; static pthread_cond_t hio_disk_list_cond; /* * Thread that sends requests back to primary takes requests from this list. */ static TAILQ_HEAD(, hio) hio_send_list; +static size_t hio_send_list_size; static pthread_mutex_t hio_send_list_lock; static pthread_cond_t hio_send_list_cond; @@ -107,14 +110,12 @@ static void *disk_thread(void *arg); static void *send_thread(void *arg); #define QUEUE_INSERT(name, hio) do { \ - bool _wakeup; \ - \ mtx_lock(&hio_##name##_list_lock); \ - _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ + if (TAILQ_EMPTY(&hio_##name##_list)) \ + cv_broadcast(&hio_##name##_list_cond); \ TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ + hio_##name##_list_size++; \ mtx_unlock(&hio_##name##_list_lock); \ - if (_wakeup) \ - cv_broadcast(&hio_##name##_list_cond); \ } while (0) #define QUEUE_TAKE(name, hio) do { \ mtx_lock(&hio_##name##_list_lock); \ @@ -122,11 +123,22 @@ static void *send_thread(void *arg); cv_wait(&hio_##name##_list_cond, \ &hio_##name##_list_lock); \ } \ + PJDLOG_ASSERT(hio_##name##_list_size != 0); \ + hio_##name##_list_size--; \ TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ mtx_unlock(&hio_##name##_list_lock); \ } while (0) static void +output_status_aux(struct nv *nvout) +{ + + nv_add_uint64(nvout, (uint64_t)hio_free_list_size, "idle_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_disk_list_size, "local_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_send_list_size, "send_queue_size"); +} + +static void hio_clear(struct hio *hio) { @@ -190,6 +202,7 @@ init_environment(void) } hio_clear(hio); TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); + hio_free_list_size++; } } @@ -441,6 +454,7 @@ hastd_secondary(struct hast_resource *res, struct nv *nvin) } gres = res; + res->output_status_aux = output_status_aux; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); |