diff options
Diffstat (limited to 'sbin/hastd/primary.c')
-rw-r--r-- | sbin/hastd/primary.c | 243 |
1 files changed, 109 insertions, 134 deletions
diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c index bd484e3..385a52a 100644 --- a/sbin/hastd/primary.c +++ b/sbin/hastd/primary.c @@ -94,6 +94,15 @@ struct hio { */ bool hio_done; /* + * Number of components we are still waiting before sending write + * completion ack to GEOM Gate. Used for memsync. + */ + refcnt_t hio_writecount; + /* + * Memsync request was acknowleged by remote. + */ + bool hio_memsyncacked; + /* * Remember replication from the time the request was initiated, * so we won't get confused when replication changes on reload. */ @@ -108,6 +117,7 @@ struct hio { * until some in-progress requests are freed. */ static TAILQ_HEAD(, hio) hio_free_list; +static size_t hio_free_list_size; static pthread_mutex_t hio_free_list_lock; static pthread_cond_t hio_free_list_cond; /* @@ -116,20 +126,26 @@ static pthread_cond_t hio_free_list_cond; * responsible for managing his own send list. */ static TAILQ_HEAD(, hio) *hio_send_list; +static size_t *hio_send_list_size; static pthread_mutex_t *hio_send_list_lock; static pthread_cond_t *hio_send_list_cond; +#define hio_send_local_list_size hio_send_list_size[0] +#define hio_send_remote_list_size hio_send_list_size[1] /* * There is one recv list for every component, although local components don't * use recv lists as local requests are done synchronously. */ static TAILQ_HEAD(, hio) *hio_recv_list; +static size_t *hio_recv_list_size; static pthread_mutex_t *hio_recv_list_lock; static pthread_cond_t *hio_recv_list_cond; +#define hio_recv_remote_list_size hio_recv_list_size[1] /* * Request is placed on done list by the slowest component (the one that * decreased hio_countdown from 1 to 0). */ static TAILQ_HEAD(, hio) hio_done_list; +static size_t hio_done_list_size; static pthread_mutex_t hio_done_list_lock; static pthread_cond_t hio_done_list_cond; /* @@ -164,25 +180,21 @@ static pthread_mutex_t metadata_lock; ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) #define QUEUE_INSERT1(hio, name, ncomp) do { \ - bool _wakeup; \ - \ mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ - _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ + if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \ + cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ - mtx_unlock(&hio_##name##_list_lock[ncomp]); \ - if (_wakeup) \ - cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ + hio_##name##_list_size[(ncomp)]++; \ + mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ } while (0) #define QUEUE_INSERT2(hio, name) do { \ - bool _wakeup; \ - \ mtx_lock(&hio_##name##_list_lock); \ - _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ + if (TAILQ_EMPTY(&hio_##name##_list)) \ + cv_broadcast(&hio_##name##_list_cond); \ TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ + hio_##name##_list_size++; \ mtx_unlock(&hio_##name##_list_lock); \ - if (_wakeup) \ - cv_broadcast(&hio_##name##_list_cond); \ } while (0) #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ bool _last; \ @@ -196,6 +208,8 @@ static pthread_mutex_t metadata_lock; _last = true; \ } \ if (hio != NULL) { \ + PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ + hio_##name##_list_size[(ncomp)]--; \ TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ } \ @@ -207,10 +221,16 @@ static pthread_mutex_t metadata_lock; cv_wait(&hio_##name##_list_cond, \ &hio_##name##_list_lock); \ } \ + PJDLOG_ASSERT(hio_##name##_list_size != 0); \ + hio_##name##_list_size--; \ TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ mtx_unlock(&hio_##name##_list_lock); \ } while (0) +#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC) +#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC) +#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC) + #define SYNCREQ(hio) do { \ (hio)->hio_ggio.gctl_unit = -1; \ (hio)->hio_ggio.gctl_seq = 1; \ @@ -219,6 +239,9 @@ static pthread_mutex_t metadata_lock; #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) +#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \ + (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) + static struct hast_resource *gres; static pthread_mutex_t range_lock; @@ -239,6 +262,22 @@ static void *sync_thread(void *arg); static void *guard_thread(void *arg); static void +output_status_aux(struct nv *nvout) +{ + + nv_add_uint64(nvout, (uint64_t)hio_free_list_size, + "idle_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, + "local_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, + "send_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, + "recv_queue_size"); + nv_add_uint64(nvout, (uint64_t)hio_done_list_size, + "done_queue_size"); +} + +static void cleanup(struct hast_resource *res) { int rerrno; @@ -355,6 +394,12 @@ init_environment(struct hast_resource *res __unused) "Unable to allocate %zu bytes of memory for send lists.", sizeof(hio_send_list[0]) * ncomps); } + hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); + if (hio_send_list_size == NULL) { + primary_exitx(EX_TEMPFAIL, + "Unable to allocate %zu bytes of memory for send list counters.", + sizeof(hio_send_list_size[0]) * ncomps); + } hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); if (hio_send_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, @@ -373,6 +418,12 @@ init_environment(struct hast_resource *res __unused) "Unable to allocate %zu bytes of memory for recv lists.", sizeof(hio_recv_list[0]) * ncomps); } + hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); + if (hio_recv_list_size == NULL) { + primary_exitx(EX_TEMPFAIL, + "Unable to allocate %zu bytes of memory for recv list counters.", + sizeof(hio_recv_list_size[0]) * ncomps); + } hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); if (hio_recv_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, @@ -393,16 +444,18 @@ init_environment(struct hast_resource *res __unused) } /* - * Initialize lists, their locks and theirs condition variables. + * Initialize lists, their counters, locks and condition variables. */ TAILQ_INIT(&hio_free_list); mtx_init(&hio_free_list_lock); cv_init(&hio_free_list_cond); for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { TAILQ_INIT(&hio_send_list[ii]); + hio_send_list_size[ii] = 0; mtx_init(&hio_send_list_lock[ii]); cv_init(&hio_send_list_cond[ii]); TAILQ_INIT(&hio_recv_list[ii]); + hio_recv_list_size[ii] = 0; mtx_init(&hio_recv_list_lock[ii]); cv_init(&hio_recv_list_cond[ii]); rw_init(&hio_remote_lock[ii]); @@ -445,6 +498,7 @@ init_environment(struct hast_resource *res __unused) hio->hio_ggio.gctl_length = MAXPHYS; hio->hio_ggio.gctl_error = 0; TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); + hio_free_list_size++; } } @@ -963,6 +1017,7 @@ hastd_primary(struct hast_resource *res) } gres = res; + res->output_status_aux = output_status_aux; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); @@ -1299,6 +1354,10 @@ ggate_recv_thread(void *arg) } else { mtx_unlock(&res->hr_amp_lock); } + if (ISMEMSYNC(hio)) { + hio->hio_memsyncacked = false; + refcnt_init(&hio->hio_writecount, ncomps); + } break; case BIO_DELETE: res->hr_stat_delete++; @@ -1309,13 +1368,7 @@ ggate_recv_thread(void *arg) } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE) { - /* Each remote request needs two responses in memsync. */ - refcnt_init(&hio->hio_countdown, ncomps + 1); - } else { - refcnt_init(&hio->hio_countdown, ncomps); - } + refcnt_init(&hio->hio_countdown, ncomps); for (ii = ncomp; ii < ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } @@ -1386,8 +1439,7 @@ local_send_thread(void *arg) ret, (intmax_t)ggio->gctl_length); } else { hio->hio_errors[ncomp] = 0; - if (hio->hio_replication == - HAST_REPLICATION_ASYNC) { + if (ISASYNC(hio)) { ggio->gctl_error = 0; write_complete(res, hio); } @@ -1425,42 +1477,13 @@ local_send_thread(void *arg) } break; } - - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * 0: remote memsync, remote final, local write - * 1: remote memsync, local write, (remote final) - * 2: local write, (remote memsync), (remote final) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Local write finished as last. - */ - break; - case 1: - /* - * Local write finished after remote memsync - * reply arrvied. We can complete the write now. - */ - if (hio->hio_errors[0] == 0) - write_complete(res, hio); - continue; - case 2: - /* - * Local write finished as first. - */ - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) { + write_complete(res, hio); } } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); @@ -1582,10 +1605,8 @@ remote_send_thread(void *arg) nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && - ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + if (ISMEMSYNCWRITE(hio)) nv_add_uint8(nv, 1, "memsync"); - } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, @@ -1617,6 +1638,7 @@ remote_send_thread(void *arg) mtx_lock(&hio_recv_list_lock[ncomp]); wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]++; mtx_unlock(&hio_recv_list_lock[ncomp]); if (hast_proto_send(res, res->hr_remoteout, nv, data, data != NULL ? length : 0) == -1) { @@ -1628,17 +1650,9 @@ remote_send_thread(void *arg) "Unable to send request (%s): ", strerror(hio->hio_errors[ncomp])); remote_close(res, ncomp); - /* - * Take request back from the receive queue and move - * it immediately to the done queue. - */ - mtx_lock(&hio_recv_list_lock[ncomp]); - TAILQ_REMOVE(&hio_recv_list[ncomp], hio, - hio_next[ncomp]); - mtx_unlock(&hio_recv_list_lock[ncomp]); - goto done_queue; + } else { + rw_unlock(&hio_remote_lock[ncomp]); } - rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); if (wakeup) cv_signal(&hio_recv_list_cond[ncomp]); @@ -1662,8 +1676,12 @@ done_queue: } else { mtx_unlock(&res->hr_amp_lock); } - if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) - (void)refcnt_release(&hio->hio_countdown); + if (ISMEMSYNCWRITE(hio)) { + if (refcnt_release(&hio->hio_writecount) == 0) { + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + } + } } if (refcnt_release(&hio->hio_countdown) > 0) continue; @@ -1719,7 +1737,9 @@ remote_recv_thread(void *arg) PJDLOG_ASSERT(hio != NULL); TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]--; mtx_unlock(&hio_recv_list_lock[ncomp]); + hio->hio_errors[ncomp] = ENOTCONN; goto done_queue; } if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { @@ -1742,6 +1762,7 @@ remote_recv_thread(void *arg) if (hio->hio_ggio.gctl_seq == seq) { TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]--; break; } } @@ -1792,80 +1813,34 @@ remote_recv_thread(void *arg) hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || - hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { - if (refcnt_release(&hio->hio_countdown) > 0) - continue; - } else { - /* - * Depending on hio_countdown value, requests finished - * in the following order: - * - * 0: local write, remote memsync, remote final - * or - * 0: remote memsync, local write, remote final - * - * 1: local write, remote memsync, (remote final) - * or - * 1: remote memsync, remote final, (local write) - * - * 2: remote memsync, (local write), (remote final) - * or - * 2: remote memsync, (remote final), (local write) - */ - switch (refcnt_release(&hio->hio_countdown)) { - case 0: - /* - * Remote final reply arrived. - */ - PJDLOG_ASSERT(!memsyncack); - break; - case 1: - if (memsyncack) { - /* - * Local request already finished, so we - * can complete the write. - */ + if (ISMEMSYNCWRITE(hio)) { + if (!hio->hio_memsyncacked) { + PJDLOG_ASSERT(memsyncack || + hio->hio_errors[ncomp] != 0); + /* Remote ack arrived. */ + if (refcnt_release(&hio->hio_writecount) == 0) { if (hio->hio_errors[0] == 0) write_complete(res, hio); - /* - * We still need to wait for final - * remote reply. - */ + } + hio->hio_memsyncacked = true; + if (hio->hio_errors[ncomp] == 0) { pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); + "remote_recv: (%p) Moving request " + "back to the recv queue.", hio); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); + hio_recv_list_size[ncomp]++; mtx_unlock(&hio_recv_list_lock[ncomp]); - } else { - /* - * Remote final reply arrived before - * local write finished. - * Nothing to do in such case. - */ + continue; } - continue; - case 2: - /* - * We received remote memsync reply even before - * local write finished. - */ - PJDLOG_ASSERT(memsyncack); - - pjdlog_debug(2, - "remote_recv: (%p) Moving request back to the recv queue.", - hio); - mtx_lock(&hio_recv_list_lock[ncomp]); - TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, - hio_next[ncomp]); - mtx_unlock(&hio_recv_list_lock[ncomp]); - continue; - default: - PJDLOG_ABORT("Invalid hio_countdown."); + } else { + PJDLOG_ASSERT(!memsyncack); + /* Remote final reply arrived. */ } } + if (refcnt_release(&hio->hio_countdown) > 0) + continue; if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); |