From e5e372da9a469dfe3ece40277090a7056c566838 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: eliminate connection state "DEAD" The ceph connection state "DEAD" is never set and is therefore not needed. Eliminate it. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1a80907..42ca8aa 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2087,12 +2087,6 @@ bad_tag: */ static void queue_con(struct ceph_connection *con) { - if (test_bit(DEAD, &con->state)) { - dout("queue_con %p ignoring: DEAD\n", - con); - return; - } - if (!con->ops->get(con)) { dout("queue_con %p ref count 0\n", con); return; -- cgit v1.1 From 6384bb8b8e88a9c6bf2ae0d9517c2c0199177c34 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 29 May 2012 21:47:38 -0500 Subject: libceph: kill bad_proto ceph connection op No code sets a bad_proto method in its ceph connection operations vector, so just get rid of it. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 5 ----- 1 file changed, 5 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 42ca8aa..07af994 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1356,11 +1356,6 @@ static void fail_protocol(struct ceph_connection *con) { reset_connection(con); set_bit(CLOSED, &con->state); /* in case there's queued work */ - - mutex_unlock(&con->mutex); - if (con->ops->bad_proto) - con->ops->bad_proto(con); - mutex_lock(&con->mutex); } static int process_connect(struct ceph_connection *con) -- cgit v1.1 From 327800bdc2cb9b71f4b458ca07aa9d522668dde0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: rename socket callbacks Change the names of the three socket callback functions to make it more obvious they're specifically associated with a connection's socket (not the ceph connection that uses it). Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/messenger.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 07af994..5452558 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -153,46 +153,46 @@ EXPORT_SYMBOL(ceph_msgr_flush); */ /* data available on socket, or listen socket received a connect */ -static void ceph_data_ready(struct sock *sk, int count_unused) +static void ceph_sock_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = sk->sk_user_data; if (sk->sk_state != TCP_CLOSE_WAIT) { - dout("ceph_data_ready on %p state = %lu, queueing work\n", + dout("%s on %p state = %lu, queueing work\n", __func__, con, con->state); queue_con(con); } } /* socket has buffer space for writing */ -static void ceph_write_space(struct sock *sk) +static void ceph_sock_write_space(struct sock *sk) { struct ceph_connection *con = sk->sk_user_data; /* only queue to workqueue if there is data we want to write, * and there is sufficient space in the socket buffer to accept - * more data. clear SOCK_NOSPACE so that ceph_write_space() + * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() * doesn't get called again until try_write() fills the socket * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (test_bit(WRITE_PENDING, &con->state)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { - dout("ceph_write_space %p queueing write work\n", con); + dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); queue_con(con); } } else { - dout("ceph_write_space %p nothing to write\n", con); + dout("%s %p nothing to write\n", __func__, con); } } /* socket's state has changed */ -static void ceph_state_change(struct sock *sk) +static void ceph_sock_state_change(struct sock *sk) { struct ceph_connection *con = sk->sk_user_data; - dout("ceph_state_change %p state = %lu sk_state = %u\n", + dout("%s %p state = %lu sk_state = %u\n", __func__, con, con->state, sk->sk_state); if (test_bit(CLOSED, &con->state)) @@ -200,9 +200,9 @@ static void ceph_state_change(struct sock *sk) switch (sk->sk_state) { case TCP_CLOSE: - dout("ceph_state_change TCP_CLOSE\n"); + dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: - dout("ceph_state_change TCP_CLOSE_WAIT\n"); + dout("%s TCP_CLOSE_WAIT\n", __func__); if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; @@ -212,7 +212,7 @@ static void ceph_state_change(struct sock *sk) } break; case TCP_ESTABLISHED: - dout("ceph_state_change TCP_ESTABLISHED\n"); + dout("%s TCP_ESTABLISHED\n", __func__); queue_con(con); break; default: /* Everything else is uninteresting */ @@ -228,9 +228,9 @@ static void set_sock_callbacks(struct socket *sock, { struct sock *sk = sock->sk; sk->sk_user_data = con; - sk->sk_data_ready = ceph_data_ready; - sk->sk_write_space = ceph_write_space; - sk->sk_state_change = ceph_state_change; + sk->sk_data_ready = ceph_sock_data_ready; + sk->sk_write_space = ceph_sock_write_space; + sk->sk_state_change = ceph_sock_state_change; } -- cgit v1.1 From e22004235a900213625acd6583ac913d5a30c155 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: rename kvec_reset and kvec_add functions The functions ceph_con_out_kvec_reset() and ceph_con_out_kvec_add() are entirely private functions, so drop the "ceph_" prefix in their name to make them slightly more wieldy. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/messenger.c | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5452558..2ca491f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -486,14 +486,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) return ret; } -static void ceph_con_out_kvec_reset(struct ceph_connection *con) +static void con_out_kvec_reset(struct ceph_connection *con) { con->out_kvec_left = 0; con->out_kvec_bytes = 0; con->out_kvec_cur = &con->out_kvec[0]; } -static void ceph_con_out_kvec_add(struct ceph_connection *con, +static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { int index; @@ -534,7 +534,7 @@ static void prepare_write_message(struct ceph_connection *con) struct ceph_msg *m; u32 crc; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); con->out_kvec_is_msg = true; con->out_msg_done = false; @@ -542,9 +542,9 @@ static void prepare_write_message(struct ceph_connection *con) * TCP packet that's a good thing. */ if (con->in_seq > con->in_seq_acked) { con->in_seq_acked = con->in_seq; - ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); } @@ -572,12 +572,12 @@ static void prepare_write_message(struct ceph_connection *con) BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); /* tag + hdr + front + middle */ - ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); - ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); + con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); + con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) - ceph_con_out_kvec_add(con, m->middle->vec.iov_len, + con_out_kvec_add(con, m->middle->vec.iov_len, m->middle->vec.iov_base); /* fill in crc (except data pages), footer */ @@ -626,12 +626,12 @@ static void prepare_write_ack(struct ceph_connection *con) con->in_seq_acked, con->in_seq); con->in_seq_acked = con->in_seq; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); - ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ @@ -644,8 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con) static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); - ceph_con_out_kvec_reset(con); - ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + con_out_kvec_reset(con); + con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); set_bit(WRITE_PENDING, &con->state); } @@ -690,8 +690,8 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection */ static void prepare_write_banner(struct ceph_connection *con) { - ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), + con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); + con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), &con->msgr->my_enc_addr); con->out_more = 0; @@ -738,10 +738,10 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.authorizer_len = auth ? cpu_to_le32(auth->authorizer_buf_len) : 0; - ceph_con_out_kvec_add(con, sizeof (con->out_connect), + con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); if (auth && auth->authorizer_buf_len) - ceph_con_out_kvec_add(con, auth->authorizer_buf_len, + con_out_kvec_add(con, auth->authorizer_buf_len, auth->authorizer_buf); con->out_more = 0; @@ -935,7 +935,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) /* prepare and queue up footer, too */ if (!do_datacrc) con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); prepare_write_message_footer(con); ret = 1; out: @@ -1398,7 +1398,7 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1419,7 +1419,7 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1445,7 +1445,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1462,7 +1462,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1869,7 +1869,7 @@ more: /* open the socket first? */ if (con->sock == NULL) { - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); prepare_write_banner(con); ret = prepare_write_connect(con); if (ret < 0) -- cgit v1.1 From 15d9882c336db2db73ccf9871ae2398e452f694c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: embed ceph messenger structure in ceph_client A ceph client has a pointer to a ceph messenger structure in it. There is always exactly one ceph messenger for a ceph client, so there is no need to allocate it separate from the ceph client structure. Switch the ceph_client structure to embed its ceph_messenger structure. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/ceph_common.c | 18 +++++------------- net/ceph/messenger.c | 30 +++++++++--------------------- net/ceph/mon_client.c | 6 +++--- net/ceph/osd_client.c | 4 ++-- 4 files changed, 19 insertions(+), 39 deletions(-) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index cc91319..2de3ea1 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -468,19 +468,15 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, /* msgr */ if (ceph_test_opt(client, MYIP)) myaddr = &client->options->my_addr; - client->msgr = ceph_messenger_create(myaddr, - client->supported_features, - client->required_features); - if (IS_ERR(client->msgr)) { - err = PTR_ERR(client->msgr); - goto fail; - } - client->msgr->nocrc = ceph_test_opt(client, NOCRC); + ceph_messenger_init(&client->msgr, myaddr, + client->supported_features, + client->required_features, + ceph_test_opt(client, NOCRC)); /* subsystems */ err = ceph_monc_init(&client->monc, client); if (err < 0) - goto fail_msgr; + goto fail; err = ceph_osdc_init(&client->osdc, client); if (err < 0) goto fail_monc; @@ -489,8 +485,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, fail_monc: ceph_monc_stop(&client->monc); -fail_msgr: - ceph_messenger_destroy(client->msgr); fail: kfree(client); return ERR_PTR(err); @@ -515,8 +509,6 @@ void ceph_destroy_client(struct ceph_client *client) ceph_debugfs_client_cleanup(client); - ceph_messenger_destroy(client->msgr); - ceph_destroy_options(client->options); kfree(client); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 2ca491f..d8423a3 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2245,18 +2245,14 @@ out: /* - * create a new messenger instance + * initialize a new messenger instance */ -struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features) +void ceph_messenger_init(struct ceph_messenger *msgr, + struct ceph_entity_addr *myaddr, + u32 supported_features, + u32 required_features, + bool nocrc) { - struct ceph_messenger *msgr; - - msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); - if (msgr == NULL) - return ERR_PTR(-ENOMEM); - msgr->supported_features = supported_features; msgr->required_features = required_features; @@ -2269,19 +2265,11 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); + msgr->nocrc = nocrc; - dout("messenger_create %p\n", msgr); - return msgr; -} -EXPORT_SYMBOL(ceph_messenger_create); - -void ceph_messenger_destroy(struct ceph_messenger *msgr) -{ - dout("destroy %p\n", msgr); - kfree(msgr); - dout("destroyed messenger %p\n", msgr); + dout("%s %p\n", __func__, msgr); } -EXPORT_SYMBOL(ceph_messenger_destroy); +EXPORT_SYMBOL(ceph_messenger_init); static void clear_standby(struct ceph_connection *con) { diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1845cde..704dc95 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -763,7 +763,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); if (!monc->con) goto out_monmap; - ceph_con_init(monc->client->msgr, monc->con); + ceph_con_init(&monc->client->msgr, monc->con); monc->con->private = monc; monc->con->ops = &mon_con_ops; @@ -880,8 +880,8 @@ static void handle_auth_reply(struct ceph_mon_client *monc, } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { dout("authenticated, starting session\n"); - monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; - monc->client->msgr->inst.name.num = + monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; + monc->client->msgr.inst.name.num = cpu_to_le64(monc->auth->global_id); __send_subscribe(monc); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index b098e7b..cca4c7f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -639,7 +639,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; - ceph_con_init(osdc->client->msgr, &osd->o_con); + ceph_con_init(&osdc->client->msgr, &osd->o_con); osd->o_con.private = osd; osd->o_con.ops = &osd_con_ops; osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; @@ -1391,7 +1391,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) epoch, maplen); newmap = osdmap_apply_incremental(&p, next, osdc->osdmap, - osdc->client->msgr); + &osdc->client->msgr); if (IS_ERR(newmap)) { err = PTR_ERR(newmap); goto bad; -- cgit v1.1 From 928443cd9644e7cfd46f687dbeffda2d1a357ff9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: start separating connection flags from state A ceph_connection holds a mixture of connection state (as in "state machine" state) and connection flags in a single "state" field. To make the distinction more clear, define a new "flags" field and use it rather than the "state" field to hold Boolean flag values. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d8423a3..e84e4fd8 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -176,7 +176,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(WRITE_PENDING, &con->state)) { + if (test_bit(WRITE_PENDING, &con->flags)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -203,7 +203,7 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); - if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { + if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; else @@ -395,9 +395,9 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); set_bit(CLOSED, &con->state); /* in case there's queued work */ clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ - clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ - clear_bit(KEEPALIVE_PENDING, &con->state); - clear_bit(WRITE_PENDING, &con->state); + clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ + clear_bit(KEEPALIVE_PENDING, &con->flags); + clear_bit(WRITE_PENDING, &con->flags); mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; @@ -614,7 +614,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -635,7 +635,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -646,7 +646,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -675,7 +675,7 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (IS_ERR(auth)) return auth; - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; @@ -695,7 +695,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } static int prepare_write_connect(struct ceph_connection *con) @@ -745,7 +745,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); return 0; } @@ -1492,7 +1492,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(LOSSYTX, &con->state); + set_bit(LOSSYTX, &con->flags); prepare_read_tag(con); break; @@ -1933,14 +1933,14 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { + if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(WRITE_PENDING, &con->state); + clear_bit(WRITE_PENDING, &con->flags); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2106,7 +2106,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: - if (test_and_clear_bit(BACKOFF, &con->state)) { + if (test_and_clear_bit(BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay))) { @@ -2135,7 +2135,7 @@ restart: con_close_socket(con); } - if (test_and_clear_bit(SOCK_CLOSED, &con->state)) + if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) goto fault; ret = try_read(con); @@ -2174,7 +2174,7 @@ static void ceph_fault(struct ceph_connection *con) dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(LOSSYTX, &con->state)) { + if (test_bit(LOSSYTX, &con->flags)) { dout("fault on LOSSYTX channel\n"); goto out; } @@ -2196,9 +2196,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(KEEPALIVE_PENDING, &con->state)) { + !test_bit(KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(WRITE_PENDING, &con->state); + clear_bit(WRITE_PENDING, &con->flags); set_bit(STANDBY, &con->state); } else { /* retry after a delay. */ @@ -2222,7 +2222,7 @@ static void ceph_fault(struct ceph_connection *con) * that when con_work restarts we schedule the * delay then. */ - set_bit(BACKOFF, &con->state); + set_bit(BACKOFF, &con->flags); } } @@ -2278,8 +2278,8 @@ static void clear_standby(struct ceph_connection *con) mutex_lock(&con->mutex); dout("clear_standby %p and ++connect_seq\n", con); con->connect_seq++; - WARN_ON(test_bit(WRITE_PENDING, &con->state)); - WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); + WARN_ON(test_bit(WRITE_PENDING, &con->flags)); + WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); mutex_unlock(&con->mutex); } } @@ -2317,7 +2317,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ clear_standby(con); - if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) + if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2384,8 +2384,8 @@ void ceph_con_keepalive(struct ceph_connection *con) { dout("con_keepalive %p\n", con); clear_standby(con); - if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && - test_and_set_bit(WRITE_PENDING, &con->state) == 0) + if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && + test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); -- cgit v1.1 From ce2c8903e76e690846a00a0284e4bd9ee954d680 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 22:15:49 -0500 Subject: libceph: start tracking connection socket state Start explicitly keeping track of the state of a ceph connection's socket, separate from the state of the connection itself. Create placeholder functions to encapsulate the state transitions. -------- | NEW* | transient initial state -------- | con_sock_state_init() v ---------- | CLOSED | initialized, but no socket (and no ---------- TCP connection) ^ \ | \ con_sock_state_connecting() | ---------------------- | \ + con_sock_state_closed() \ |\ \ | \ \ | ----------- \ | | CLOSING | socket event; \ | ----------- await close \ | ^ | | | | | + con_sock_state_closing() | | / \ | | / --------------- | | / \ v | / -------------- | / -----------------| CONNECTING | socket created, TCP | | / -------------- connect initiated | | | con_sock_state_connected() | | v ------------- | CONNECTED | TCP connection established ------------- Make the socket state an atomic variable, reinforcing that it's a distinct transtion with no possible "intermediate/both" states. This is almost certainly overkill at this point, though the transitions into CONNECTED and CLOSING state do get called via socket callback (the rest of the transitions occur with the connection mutex held). We can back out the atomicity later. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e84e4fd8..a4ac3de 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -29,6 +29,14 @@ * the sender. */ +/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */ + +#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ +#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ +#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ +#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ +#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -147,6 +155,55 @@ void ceph_msgr_flush(void) } EXPORT_SYMBOL(ceph_msgr_flush); +/* Connection socket state transition functions */ + +static void con_sock_state_init(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); + if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_connecting(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); + if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_connected(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_closing(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && + old_state != CON_SOCK_STATE_CONNECTED && + old_state != CON_SOCK_STATE_CLOSING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_closed(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && + old_state != CON_SOCK_STATE_CLOSING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} /* * socket callback functions @@ -203,6 +260,7 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); + con_sock_state_closing(con); if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; @@ -213,6 +271,7 @@ static void ceph_sock_state_change(struct sock *sk) break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); + con_sock_state_connected(con); queue_con(con); break; default: /* Everything else is uninteresting */ @@ -277,6 +336,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } con->sock = sock; + con_sock_state_connecting(con); return 0; } @@ -343,6 +403,7 @@ static int con_close_socket(struct ceph_connection *con) sock_release(con->sock); con->sock = NULL; clear_bit(SOCK_CLOSED, &con->state); + con_sock_state_closed(con); return rc; } @@ -462,6 +523,9 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) memset(con, 0, sizeof(*con)); atomic_set(&con->nref, 1); con->msgr = msgr; + + con_sock_state_init(con); + mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); -- cgit v1.1 From e10006f807ffc4d5b1d861305d18d9e8145891ca Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: provide osd number when creating osd Pass the osd number to the create_osd() routine, and move the initialization of fields that depend on it therein. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/osd_client.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index cca4c7f..e30efbc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -624,7 +624,7 @@ static void osd_reset(struct ceph_connection *con) /* * Track open sessions with osds. */ -static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) +static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) { struct ceph_osd *osd; @@ -634,6 +634,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) atomic_set(&osd->o_ref, 1); osd->o_osdc = osdc; + osd->o_osd = onum; INIT_LIST_HEAD(&osd->o_requests); INIT_LIST_HEAD(&osd->o_linger_requests); INIT_LIST_HEAD(&osd->o_osd_lru); @@ -643,6 +644,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) osd->o_con.private = osd; osd->o_con.ops = &osd_con_ops; osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; + osd->o_con.peer_name.num = cpu_to_le64(onum); INIT_LIST_HEAD(&osd->o_keepalive_item); return osd; @@ -998,15 +1000,13 @@ static int __map_request(struct ceph_osd_client *osdc, req->r_osd = __lookup_osd(osdc, o); if (!req->r_osd && o >= 0) { err = -ENOMEM; - req->r_osd = create_osd(osdc); + req->r_osd = create_osd(osdc, o); if (!req->r_osd) { list_move(&req->r_req_lru_item, &osdc->req_notarget); goto out; } dout("map_request osd %p is osd%d\n", req->r_osd, o); - req->r_osd->o_osd = o; - req->r_osd->o_con.peer_name.num = cpu_to_le64(o); __insert_osd(osdc, req->r_osd); ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); -- cgit v1.1 From a5988c490ef66cb04ea2f610681949b25c773b3c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 29 May 2012 11:04:58 -0500 Subject: libceph: set CLOSED state bit in con_init Once a connection is fully initialized, it is really in a CLOSED state, so make that explicit by setting the bit in its state field. It is possible for a connection in NEGOTIATING state to get a failure, leading to ceph_fault() and ultimately ceph_con_close(). Clear that bits if it is set in that case, to reflect that the connection truly is closed and is no longer participating in a connect sequence. Issue a warning if ceph_con_open() is called on a connection that is not in CLOSED state. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a4ac3de..36b440a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -454,11 +454,14 @@ void ceph_con_close(struct ceph_connection *con) { dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); - set_bit(CLOSED, &con->state); /* in case there's queued work */ + clear_bit(NEGOTIATING, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ + set_bit(CLOSED, &con->state); + clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(KEEPALIVE_PENDING, &con->flags); clear_bit(WRITE_PENDING, &con->flags); + mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; @@ -475,7 +478,8 @@ void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) { dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); - clear_bit(CLOSED, &con->state); + WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ queue_con(con); @@ -530,6 +534,8 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); + + set_bit(CLOSED, &con->state); } EXPORT_SYMBOL(ceph_con_init); @@ -1933,14 +1939,15 @@ more: /* open the socket first? */ if (con->sock == NULL) { + clear_bit(NEGOTIATING, &con->state); + set_bit(CONNECTING, &con->state); + con_out_kvec_reset(con); prepare_write_banner(con); ret = prepare_write_connect(con); if (ret < 0) goto out; prepare_read_banner(con); - set_bit(CONNECTING, &con->state); - clear_bit(NEGOTIATING, &con->state); BUG_ON(con->in_msg); con->in_tag = CEPH_MSGR_TAG_READY; -- cgit v1.1 From ab8cb34a4b2f60281a4b18b1f1ad23bc2313d91b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 4 Jun 2012 14:43:32 -0500 Subject: libceph: osd_client: don't drop reply reference too early In ceph_osdc_release_request(), a reference to the r_reply message is dropped. But just after that, that same message is revoked if it was in use to receive an incoming reply. Reorder these so we are sure we hold a reference until we're actually done with the message. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/osd_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e30efbc..d8b6d31 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -139,8 +139,6 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); - if (req->r_reply) - ceph_msg_put(req->r_reply); if (req->r_con_filling_msg) { dout("release_request revoking pages %p from con %p\n", req->r_pages, req->r_con_filling_msg); @@ -148,6 +146,8 @@ void ceph_osdc_release_request(struct kref *kref) req->r_reply); ceph_con_put(req->r_con_filling_msg); } + if (req->r_reply) + ceph_msg_put(req->r_reply); if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); -- cgit v1.1 From 0d47766f14211a73eaf54cab234db134ece79f49 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 31 May 2012 20:22:18 -0700 Subject: libceph: use con get/put ops from osd_client There were a few direct calls to ceph_con_{get,put}() instead of the con ops from osd_client.c. This is a bug since those ops aren't defined to be ceph_con_get/put. This breaks refcounting on the ceph_osd structs that contain the ceph_connections, and could lead to all manner of strangeness. The purpose of the ->get and ->put methods in a ceph connection are to allow the connection to indicate it has a reference to something external to the messaging system, *not* to indicate something external has a reference to the connection. [elder@inktank.com: added that last sentence] Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d8b6d31..5b41a69 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -144,7 +144,7 @@ void ceph_osdc_release_request(struct kref *kref) req->r_pages, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); - ceph_con_put(req->r_con_filling_msg); + req->r_con_filling_msg->ops->put(req->r_con_filling_msg); } if (req->r_reply) ceph_msg_put(req->r_reply); @@ -1216,7 +1216,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, if (req->r_con_filling_msg == con && req->r_reply == msg) { dout(" dropping con_filling_msg ref %p\n", con); req->r_con_filling_msg = NULL; - ceph_con_put(con); + con->ops->put(con); } if (!req->r_got_reply) { @@ -2028,7 +2028,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, dout("get_reply revoking msg %p from old con %p\n", req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); - ceph_con_put(req->r_con_filling_msg); + req->r_con_filling_msg->ops->put(req->r_con_filling_msg); req->r_con_filling_msg = NULL; } @@ -2063,7 +2063,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, #endif } *skip = 0; - req->r_con_filling_msg = ceph_con_get(con); + req->r_con_filling_msg = con->ops->get(con); dout("get_reply tid %lld %p\n", tid, m); out: -- cgit v1.1 From 67130934fb579fdf0f2f6d745960264378b57dc8 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: embed ceph connection structure in mon_client A monitor client has a pointer to a ceph connection structure in it. This is the only one of the three ceph client types that do it this way; the OSD and MDS clients embed the connection into their main structures. There is always exactly one ceph connection for a monitor client, so there is no need to allocate it separate from the monitor client structure. So switch the ceph_mon_client structure to embed its ceph_connection structure. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/mon_client.c | 47 ++++++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 27 deletions(-) (limited to 'net') diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 704dc95..ac4d6b1 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) monc->pending_auth = 1; monc->m_auth->front.iov_len = len; monc->m_auth->hdr.front_len = cpu_to_le32(len); - ceph_con_revoke(monc->con, monc->m_auth); + ceph_con_revoke(&monc->con, monc->m_auth); ceph_msg_get(monc->m_auth); /* keep our ref */ - ceph_con_send(monc->con, monc->m_auth); + ceph_con_send(&monc->con, monc->m_auth); } /* @@ -117,8 +117,8 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) static void __close_session(struct ceph_mon_client *monc) { dout("__close_session closing mon%d\n", monc->cur_mon); - ceph_con_revoke(monc->con, monc->m_auth); - ceph_con_close(monc->con); + ceph_con_revoke(&monc->con, monc->m_auth); + ceph_con_close(&monc->con); monc->cur_mon = -1; monc->pending_auth = 0; ceph_auth_reset(monc->auth); @@ -142,9 +142,9 @@ static int __open_session(struct ceph_mon_client *monc) monc->want_next_osdmap = !!monc->want_next_osdmap; dout("open_session mon%d opening\n", monc->cur_mon); - monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; - monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); - ceph_con_open(monc->con, + monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON; + monc->con.peer_name.num = cpu_to_le64(monc->cur_mon); + ceph_con_open(&monc->con, &monc->monmap->mon_inst[monc->cur_mon].addr); /* initiatiate authentication handshake */ @@ -226,8 +226,8 @@ static void __send_subscribe(struct ceph_mon_client *monc) msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_con_revoke(monc->con, msg); - ceph_con_send(monc->con, ceph_msg_get(msg)); + ceph_con_revoke(&monc->con, msg); + ceph_con_send(&monc->con, ceph_msg_get(msg)); monc->sub_sent = jiffies | 1; /* never 0 */ } @@ -247,7 +247,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, if (monc->hunting) { pr_info("mon%d %s session established\n", monc->cur_mon, - ceph_pr_addr(&monc->con->peer_addr.in_addr)); + ceph_pr_addr(&monc->con.peer_addr.in_addr)); monc->hunting = false; } dout("handle_subscribe_ack after %d seconds\n", seconds); @@ -461,7 +461,7 @@ static int do_generic_request(struct ceph_mon_client *monc, req->request->hdr.tid = cpu_to_le64(req->tid); __insert_generic_request(monc, req); monc->num_generic_requests++; - ceph_con_send(monc->con, ceph_msg_get(req->request)); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); mutex_unlock(&monc->mutex); err = wait_for_completion_interruptible(&req->completion); @@ -684,8 +684,8 @@ static void __resend_generic_request(struct ceph_mon_client *monc) for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_mon_generic_request, node); - ceph_con_revoke(monc->con, req->request); - ceph_con_send(monc->con, ceph_msg_get(req->request)); + ceph_con_revoke(&monc->con, req->request); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); } } @@ -705,7 +705,7 @@ static void delayed_work(struct work_struct *work) __close_session(monc); __open_session(monc); /* continue hunting */ } else { - ceph_con_keepalive(monc->con); + ceph_con_keepalive(&monc->con); __validate_auth(monc); @@ -760,19 +760,16 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) goto out; /* connection */ - monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); - if (!monc->con) - goto out_monmap; - ceph_con_init(&monc->client->msgr, monc->con); - monc->con->private = monc; - monc->con->ops = &mon_con_ops; + ceph_con_init(&monc->client->msgr, &monc->con); + monc->con.private = monc; + monc->con.ops = &mon_con_ops; /* authentication */ monc->auth = ceph_auth_init(cl->options->name, cl->options->key); if (IS_ERR(monc->auth)) { err = PTR_ERR(monc->auth); - goto out_con; + goto out_monmap; } monc->auth->want_keys = CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | @@ -824,8 +821,6 @@ out_subscribe_ack: ceph_msg_put(monc->m_subscribe_ack); out_auth: ceph_auth_destroy(monc->auth); -out_con: - monc->con->ops->put(monc->con); out_monmap: kfree(monc->monmap); out: @@ -841,9 +836,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - monc->con->private = NULL; - monc->con->ops->put(monc->con); - monc->con = NULL; + monc->con.private = NULL; mutex_unlock(&monc->mutex); @@ -1021,7 +1014,7 @@ static void mon_fault(struct ceph_connection *con) if (!monc->hunting) pr_info("mon%d %s session lost, " "hunting for new mon\n", monc->cur_mon, - ceph_pr_addr(&monc->con->peer_addr.in_addr)); + ceph_pr_addr(&monc->con.peer_addr.in_addr)); __close_session(monc); if (!monc->hunting) { -- cgit v1.1 From ec87ef4309d33bd9c87a53bb5152a86ae7a65f25 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 31 May 2012 20:27:50 -0700 Subject: libceph: drop connection refcounting for mon_client All references to the embedded ceph_connection come from the msgr workqueue, which is drained prior to mon_client destruction. That means we can ignore con refcounting entirely. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/mon_client.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index ac4d6b1..062b724 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -1029,9 +1029,23 @@ out: mutex_unlock(&monc->mutex); } +/* + * We can ignore refcounting on the connection struct, as all references + * will come from the messenger workqueue, which is drained prior to + * mon_client destruction. + */ +static struct ceph_connection *con_get(struct ceph_connection *con) +{ + return con; +} + +static void con_put(struct ceph_connection *con) +{ +} + static const struct ceph_connection_operations mon_con_ops = { - .get = ceph_con_get, - .put = ceph_con_put, + .get = con_get, + .put = con_put, .dispatch = dispatch, .fault = mon_fault, .alloc_msg = mon_alloc_msg, -- cgit v1.1 From 20581c1faf7b15ae1f8b80c0ec757877b0b53151 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: init monitor connection when opening Hold off initializing a monitor client's connection until just before it gets opened for use. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/mon_client.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 062b724..6adbea7 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -119,6 +119,7 @@ static void __close_session(struct ceph_mon_client *monc) dout("__close_session closing mon%d\n", monc->cur_mon); ceph_con_revoke(&monc->con, monc->m_auth); ceph_con_close(&monc->con); + monc->con.private = NULL; monc->cur_mon = -1; monc->pending_auth = 0; ceph_auth_reset(monc->auth); @@ -141,9 +142,13 @@ static int __open_session(struct ceph_mon_client *monc) monc->sub_renew_after = jiffies; /* i.e., expired */ monc->want_next_osdmap = !!monc->want_next_osdmap; - dout("open_session mon%d opening\n", monc->cur_mon); + ceph_con_init(&monc->client->msgr, &monc->con); + monc->con.private = monc; + monc->con.ops = &mon_con_ops; monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON; monc->con.peer_name.num = cpu_to_le64(monc->cur_mon); + + dout("open_session mon%d opening\n", monc->cur_mon); ceph_con_open(&monc->con, &monc->monmap->mon_inst[monc->cur_mon].addr); @@ -760,10 +765,6 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) goto out; /* connection */ - ceph_con_init(&monc->client->msgr, &monc->con); - monc->con.private = monc; - monc->con.ops = &mon_con_ops; - /* authentication */ monc->auth = ceph_auth_init(cl->options->name, cl->options->key); @@ -836,8 +837,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - monc->con.private = NULL; - mutex_unlock(&monc->mutex); ceph_auth_destroy(monc->auth); -- cgit v1.1 From 1bfd89f4e6e1adc6a782d94aa5d4c53be1e404d7 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: fully initialize connection in con_init() Move the initialization of a ceph connection's private pointer, operations vector pointer, and peer name information into ceph_con_init(). Rearrange the arguments so the connection pointer is first. Hide the byte-swapping of the peer entity number inside ceph_con_init() Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 ++++++++- net/ceph/mon_client.c | 8 +++----- net/ceph/osd_client.c | 7 ++----- 3 files changed, 13 insertions(+), 11 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 36b440a..3b65f6e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -521,15 +521,22 @@ void ceph_con_put(struct ceph_connection *con) /* * initialize a new connection. */ -void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) +void ceph_con_init(struct ceph_connection *con, void *private, + const struct ceph_connection_operations *ops, + struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num) { dout("con_init %p\n", con); memset(con, 0, sizeof(*con)); + con->private = private; + con->ops = ops; atomic_set(&con->nref, 1); con->msgr = msgr; con_sock_state_init(con); + con->peer_name.type = (__u8) entity_type; + con->peer_name.num = cpu_to_le64(entity_num); + mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 6adbea7..ab6b24a 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -142,11 +142,9 @@ static int __open_session(struct ceph_mon_client *monc) monc->sub_renew_after = jiffies; /* i.e., expired */ monc->want_next_osdmap = !!monc->want_next_osdmap; - ceph_con_init(&monc->client->msgr, &monc->con); - monc->con.private = monc; - monc->con.ops = &mon_con_ops; - monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON; - monc->con.peer_name.num = cpu_to_le64(monc->cur_mon); + ceph_con_init(&monc->con, monc, &mon_con_ops, + &monc->client->msgr, + CEPH_ENTITY_TYPE_MON, monc->cur_mon); dout("open_session mon%d opening\n", monc->cur_mon); ceph_con_open(&monc->con, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 5b41a69..448c9da 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -640,11 +640,8 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; - ceph_con_init(&osdc->client->msgr, &osd->o_con); - osd->o_con.private = osd; - osd->o_con.ops = &osd_con_ops; - osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; - osd->o_con.peer_name.num = cpu_to_le64(onum); + ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr, + CEPH_ENTITY_TYPE_OSD, onum); INIT_LIST_HEAD(&osd->o_keepalive_item); return osd; -- cgit v1.1 From 1c20f2d26795803fc4f5155fe4fca5717a5944b6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 4 Jun 2012 14:43:32 -0500 Subject: libceph: tweak ceph_alloc_msg() The function ceph_alloc_msg() is only used to allocate a message that will be assigned to a connection's in_msg pointer. Rename the function so this implied usage is more clear. In addition, make that assignment inside the function (again, since that's precisely what it's intended to be used for). This allows us to return what is now provided via the passed-in address of a "skip" variable. The return type is now Boolean to be explicit that there are only two possible outcomes. Make sure the result of an ->alloc_msg method call always sets the value of *skip properly. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 61 ++++++++++++++++++++++++++++----------------------- net/ceph/mon_client.c | 3 +++ net/ceph/osd_client.c | 1 + 3 files changed, 38 insertions(+), 27 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3b65f6e..98ca237 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1655,9 +1655,8 @@ static int read_partial_message_section(struct ceph_connection *con, return 1; } -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr, - int *skip); +static bool ceph_con_in_msg_alloc(struct ceph_connection *con, + struct ceph_msg_header *hdr); static int read_partial_message_pages(struct ceph_connection *con, @@ -1740,7 +1739,6 @@ static int read_partial_message(struct ceph_connection *con) int ret; unsigned front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; - int skip; u64 seq; u32 crc; @@ -1793,9 +1791,7 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - skip = 0; - con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); - if (skip) { + if (ceph_con_in_msg_alloc(con, &con->in_hdr)) { /* skip this message */ dout("alloc_msg said skip message\n"); BUG_ON(con->in_msg); @@ -2577,46 +2573,57 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) } /* - * Generic message allocator, for incoming messages. + * Allocate a message for receiving an incoming message on a + * connection, and save the result in con->in_msg. Uses the + * connection's private alloc_msg op if available. + * + * Returns true if the message should be skipped, false otherwise. + * If true is returned (skip message), con->in_msg will be NULL. + * If false is returned, con->in_msg will contain a pointer to the + * newly-allocated message, or NULL in case of memory exhaustion. */ -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr, - int *skip) +static bool ceph_con_in_msg_alloc(struct ceph_connection *con, + struct ceph_msg_header *hdr) { int type = le16_to_cpu(hdr->type); int front_len = le32_to_cpu(hdr->front_len); int middle_len = le32_to_cpu(hdr->middle_len); - struct ceph_msg *msg = NULL; int ret; + BUG_ON(con->in_msg != NULL); + if (con->ops->alloc_msg) { + int skip = 0; + mutex_unlock(&con->mutex); - msg = con->ops->alloc_msg(con, hdr, skip); + con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); - if (!msg || *skip) - return NULL; + if (skip) + con->in_msg = NULL; + + if (!con->in_msg) + return skip != 0; } - if (!msg) { - *skip = 0; - msg = ceph_msg_new(type, front_len, GFP_NOFS, false); - if (!msg) { + if (!con->in_msg) { + con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); + if (!con->in_msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); - return NULL; + return false; } - msg->page_alignment = le16_to_cpu(hdr->data_off); + con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } - memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); + memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); - if (middle_len && !msg->middle) { - ret = ceph_alloc_middle(con, msg); + if (middle_len && !con->in_msg->middle) { + ret = ceph_alloc_middle(con, con->in_msg); if (ret < 0) { - ceph_msg_put(msg); - return NULL; + ceph_msg_put(con->in_msg); + con->in_msg = NULL; } } - return msg; + return false; } diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index ab6b24a..8462cce 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -442,6 +442,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, m = NULL; } else { dout("get_generic_reply %lld got %p\n", tid, req->reply); + *skip = 0; m = ceph_msg_get(req->reply); /* * we don't need to track the connection reading into @@ -982,6 +983,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: m = ceph_msg_new(type, front_len, GFP_NOFS, false); + if (!m) + return NULL; /* ENOMEM--return skip == 0 */ break; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 448c9da..24b427b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2077,6 +2077,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, int type = le16_to_cpu(hdr->type); int front = le32_to_cpu(hdr->front_len); + *skip = 0; switch (type) { case CEPH_MSG_OSD_MAP: case CEPH_MSG_WATCH_NOTIFY: -- cgit v1.1 From 38941f8031bf042dba3ced6394ba3a3b16c244ea Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: have messages point to their connection When a ceph message is queued for sending it is placed on a list of pending messages (ceph_connection->out_queue). When they are actually sent over the wire, they are moved from that list to another (ceph_connection->out_sent). When acknowledgement for the message is received, it is removed from the sent messages list. During that entire time the message is "in the possession" of a single ceph connection. Keep track of that connection in the message. This will be used in the next patch (and is a helpful bit of information for debugging anyway). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 98ca237..68b49b5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); + BUG_ON(msg->con == NULL); + msg->con = NULL; + ceph_msg_put(msg); } static void ceph_msg_remove_list(struct list_head *head) @@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con) ceph_msg_remove_list(&con->out_sent); if (con->in_msg) { + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; } @@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con) &con->out_temp_ack); } + BUG_ON(list_empty(&con->out_queue)); m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); con->out_msg = m; + BUG_ON(m->con != con); /* put message on sent list */ ceph_msg_get(m); @@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con) "error allocating memory for incoming message"; return -ENOMEM; } + + BUG_ON(con->in_msg->con != con); m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ if (m->middle) @@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con) { struct ceph_msg *msg; + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; @@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); if (con->in_msg) { + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; } @@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* queue */ mutex_lock(&con->mutex); + BUG_ON(msg->con != NULL); + msg->con = con; BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, @@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) { mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { - dout("con_revoke %p msg %p - was on queue\n", con, msg); + dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); + BUG_ON(msg->con == NULL); + msg->con = NULL; + ceph_msg_put(msg); msg->hdr.seq = 0; } if (con->out_msg == msg) { - dout("con_revoke %p msg %p - was sending\n", con, msg); + dout("%s %p msg %p - was sending\n", __func__, con, msg); con->out_msg = NULL; if (con->out_kvec_is_msg) { con->out_skip = con->out_kvec_bytes; @@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, if (m == NULL) goto out; kref_init(&m->kref); + + m->con = NULL; INIT_LIST_HEAD(&m->list_head); m->hdr.tid = 0; @@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, mutex_unlock(&con->mutex); con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); + if (con->in_msg) + con->in_msg->con = con; if (skip) con->in_msg = NULL; @@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } + con->in_msg->con = con; con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); -- cgit v1.1 From 92ce034b5a740046cc643a21ea21eaad589e0043 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 4 Jun 2012 14:43:33 -0500 Subject: libceph: have messages take a connection reference There are essentially two types of ceph messages: incoming and outgoing. Outgoing messages are always allocated via ceph_msg_new(), and at the time of their allocation they are not associated with any particular connection. Incoming messages are always allocated via ceph_con_in_msg_alloc(), and they are initially associated with the connection from which incoming data will be placed into the message. When an outgoing message gets sent, it becomes associated with a connection and remains that way until the message is successfully sent. The association of an incoming message goes away at the point it is sent to an upper layer via a con->ops->dispatch method. This patch implements reference counting for all ceph messages, such that every message holds a reference (and a pointer) to a connection if and only if it is associated with that connection (as described above). For background, here is an explanation of the ceph message lifecycle, emphasizing when an association exists between a message and a connection. Outgoing Messages An outgoing message is "owned" by its allocator, from the time it is allocated in ceph_msg_new() up to the point it gets queued for sending in ceph_con_send(). Prior to that point the message's msg->con pointer is null; at the point it is queued for sending its message pointer is assigned to refer to the connection. At that time the message is inserted into a connection's out_queue list. When a message on the out_queue list has been sent to the socket layer to be put on the wire, it is transferred out of that list and into the connection's out_sent list. At that point it is still owned by the connection, and will remain so until an acknowledgement is received from the recipient that indicates the message was successfully transferred. When such an acknowledgement is received (in process_ack()), the message is removed from its list (in ceph_msg_remove()), at which point it is no longer associated with the connection. So basically, any time a message is on one of a connection's lists, it is associated with that connection. Reference counting outgoing messages can thus be done at the points a message is added to the out_queue (in ceph_con_send()) and the point it is removed from either its two lists (in ceph_msg_remove())--at which point its connection pointer becomes null. Incoming Messages When an incoming message on a connection is getting read (in read_partial_message()) and there is no message in con->in_msg, a new one is allocated using ceph_con_in_msg_alloc(). At that point the message is associated with the connection. Once that message has been completely and successfully read, it is passed to upper layer code using the connection's con->ops->dispatch method. At that point the association between the message and the connection no longer exists. Reference counting of connections for incoming messages can be done by taking a reference to the connection when the message gets allocated, and releasing that reference when it gets handed off using the dispatch method. We should never fail to get a connection reference for a message--the since the caller should already hold one. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 68b49b5..88ac083 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -415,6 +415,7 @@ static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); + ceph_con_put(msg->con); msg->con = NULL; ceph_msg_put(msg); @@ -440,6 +441,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; + ceph_con_put(con->in_msg->con); } con->connect_seq = 0; @@ -1914,6 +1916,7 @@ static void process_message(struct ceph_connection *con) con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; + ceph_con_put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2275,6 +2278,7 @@ static void ceph_fault(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; + ceph_con_put(con); } /* Requeue anything that hasn't been acked */ @@ -2391,8 +2395,11 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* queue */ mutex_lock(&con->mutex); + BUG_ON(msg->con != NULL); - msg->con = con; + msg->con = ceph_con_get(con); + BUG_ON(msg->con == NULL); + BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, @@ -2421,10 +2428,11 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); + ceph_con_put(msg->con); msg->con = NULL; + msg->hdr.seq = 0; ceph_msg_put(msg); - msg->hdr.seq = 0; } if (con->out_msg == msg) { dout("%s %p msg %p - was sending\n", __func__, con, msg); @@ -2433,8 +2441,9 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) con->out_skip = con->out_kvec_bytes; con->out_kvec_is_msg = false; } - ceph_msg_put(msg); msg->hdr.seq = 0; + + ceph_msg_put(msg); } mutex_unlock(&con->mutex); } @@ -2618,8 +2627,10 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, mutex_unlock(&con->mutex); con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); - if (con->in_msg) - con->in_msg->con = con; + if (con->in_msg) { + con->in_msg->con = ceph_con_get(con); + BUG_ON(con->in_msg->con == NULL); + } if (skip) con->in_msg = NULL; @@ -2633,7 +2644,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } - con->in_msg->con = con; + con->in_msg->con = ceph_con_get(con); + BUG_ON(con->in_msg->con == NULL); con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); -- cgit v1.1 From 6740a845b2543cc46e1902ba21bac743fbadd0dc Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: make ceph_con_revoke() a msg operation ceph_con_revoke() is passed both a message and a ceph connection. Now that any message associated with a connection holds a pointer to that connection, there's no need to provide the connection when revoking a message. This has the added benefit of precluding the possibility of the providing the wrong connection pointer. If the message's connection pointer is null, it is not being tracked by any connection, so revoking it is a no-op. This is supported as a convenience for upper layers, so they can revoke a message that is not actually "in flight." Rename the function ceph_msg_revoke() to reflect that it is really an operation on a message, not a connection. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 7 ++++++- net/ceph/mon_client.c | 8 ++++---- net/ceph/osd_client.c | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 88ac083..d636903 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2421,8 +2421,13 @@ EXPORT_SYMBOL(ceph_con_send); /* * Revoke a message that was previously queued for send */ -void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) +void ceph_msg_revoke(struct ceph_msg *msg) { + struct ceph_connection *con = msg->con; + + if (!con) + return; /* Message not in our possession */ + mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 8462cce..7a16750 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -106,7 +106,7 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) monc->pending_auth = 1; monc->m_auth->front.iov_len = len; monc->m_auth->hdr.front_len = cpu_to_le32(len); - ceph_con_revoke(&monc->con, monc->m_auth); + ceph_msg_revoke(monc->m_auth); ceph_msg_get(monc->m_auth); /* keep our ref */ ceph_con_send(&monc->con, monc->m_auth); } @@ -117,7 +117,7 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) static void __close_session(struct ceph_mon_client *monc) { dout("__close_session closing mon%d\n", monc->cur_mon); - ceph_con_revoke(&monc->con, monc->m_auth); + ceph_msg_revoke(monc->m_auth); ceph_con_close(&monc->con); monc->con.private = NULL; monc->cur_mon = -1; @@ -229,7 +229,7 @@ static void __send_subscribe(struct ceph_mon_client *monc) msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_con_revoke(&monc->con, msg); + ceph_msg_revoke(msg); ceph_con_send(&monc->con, ceph_msg_get(msg)); monc->sub_sent = jiffies | 1; /* never 0 */ @@ -688,7 +688,7 @@ static void __resend_generic_request(struct ceph_mon_client *monc) for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_mon_generic_request, node); - ceph_con_revoke(&monc->con, req->request); + ceph_msg_revoke(req->request); ceph_con_send(&monc->con, ceph_msg_get(req->request)); } } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 24b427b..ad78705 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -852,7 +852,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, if (req->r_osd) { /* make sure the original request isn't in flight. */ - ceph_con_revoke(&req->r_osd->o_con, req->r_request); + ceph_msg_revoke(req->r_request); list_del_init(&req->r_osd_item); if (list_empty(&req->r_osd->o_requests) && @@ -879,7 +879,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, static void __cancel_request(struct ceph_osd_request *req) { if (req->r_sent && req->r_osd) { - ceph_con_revoke(&req->r_osd->o_con, req->r_request); + ceph_msg_revoke(req->r_request); req->r_sent = 0; } } -- cgit v1.1 From 8921d114f5574c6da2cdd00749d185633ecf88f3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: make ceph_con_revoke_message() a msg op ceph_con_revoke_message() is passed both a message and a ceph connection. A ceph_msg allocated for incoming messages on a connection always has a pointer to that connection, so there's no need to provide the connection when revoking such a message. Note that the existing logic does not preclude the message supplied being a null/bogus message pointer. The only user of this interface is the OSD client, and the only value an osd client passes is a request's r_reply field. That is always non-null (except briefly in an error path in ceph_osdc_alloc_request(), and that drops the only reference so the request won't ever have a reply to revoke). So we can safely assume the passed-in message is non-null, but add a BUG_ON() to make it very obvious we are imposing this restriction. Rename the function ceph_msg_revoke_incoming() to reflect that it is really an operation on an incoming message. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 22 ++++++++++++++++------ net/ceph/osd_client.c | 9 ++++----- 2 files changed, 20 insertions(+), 11 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d636903..3857f81 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2456,17 +2456,27 @@ void ceph_msg_revoke(struct ceph_msg *msg) /* * Revoke a message that we may be reading data into */ -void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) +void ceph_msg_revoke_incoming(struct ceph_msg *msg) { + struct ceph_connection *con; + + BUG_ON(msg == NULL); + if (!msg->con) { + dout("%s msg %p null con\n", __func__, msg); + + return; /* Message not in our possession */ + } + + con = msg->con; mutex_lock(&con->mutex); - if (con->in_msg && con->in_msg == msg) { + if (con->in_msg == msg) { unsigned front_len = le32_to_cpu(con->in_hdr.front_len); unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); unsigned data_len = le32_to_cpu(con->in_hdr.data_len); /* skip rest of message */ - dout("con_revoke_pages %p msg %p revoked\n", con, msg); - con->in_base_pos = con->in_base_pos - + dout("%s %p msg %p revoked\n", __func__, con, msg); + con->in_base_pos = con->in_base_pos - sizeof(struct ceph_msg_header) - front_len - middle_len - @@ -2477,8 +2487,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; } else { - dout("con_revoke_pages %p msg %p pages %p no-op\n", - con, con->in_msg, msg); + dout("%s %p in_msg %p msg %p no-op\n", + __func__, con, con->in_msg, msg); } mutex_unlock(&con->mutex); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ad78705..c178c77 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("release_request revoking pages %p from con %p\n", + dout("%s revoking pages %p from con %p\n", __func__, req->r_pages, req->r_con_filling_msg); - ceph_con_revoke_message(req->r_con_filling_msg, - req->r_reply); + ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); } if (req->r_reply) @@ -2022,9 +2021,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } if (req->r_con_filling_msg) { - dout("get_reply revoking msg %p from old con %p\n", + dout("%s revoking msg %p from old con %p\n", __func__, req->r_reply, req->r_con_filling_msg); - ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); + ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); req->r_con_filling_msg = NULL; } -- cgit v1.1 From 43643528cce60ca184fe8197efa8e8da7c89a037 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 6 Jun 2012 19:35:55 -0500 Subject: rbd: Clear ceph_msg->bio_iter for retransmitted message The bug can cause NULL pointer dereference in write_partial_msg_pages Signed-off-by: Zheng Yan Reviewed-by: Alex Elder --- net/ceph/messenger.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3857f81..769a2c9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -649,6 +649,10 @@ static void prepare_write_message(struct ceph_connection *con) m->hdr.seq = cpu_to_le64(++con->out_seq); m->needs_out_seq = false; } +#ifdef CONFIG_BLOCK + else + m->bio_iter = NULL; +#endif dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), -- cgit v1.1 From ad3b904c07dfa88603689bf9a67bffbb9b99beb5 Mon Sep 17 00:00:00 2001 From: Xi Wang Date: Wed, 6 Jun 2012 19:35:55 -0500 Subject: libceph: fix overflow in __decode_pool_names() `len' is read from network and thus needs validation. Otherwise a large `len' would cause out-of-bounds access via the memcpy() call. In addition, len = 0xffffffff would overflow the kmalloc() size, leading to out-of-bounds write. This patch adds a check of `len' via ceph_decode_need(). Also use kstrndup rather than kmalloc/memcpy. [elder@inktank.com: added -ENOMEM return for null kstrndup() result] Signed-off-by: Xi Wang Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 1892c52..df47871 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -488,15 +488,16 @@ static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) ceph_decode_32_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, len, bad); dout(" pool %d len %d\n", pool, len); + ceph_decode_need(p, end, len, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) { + char *name = kstrndup(*p, len, GFP_NOFS); + + if (!name) + return -ENOMEM; kfree(pi->name); - pi->name = kmalloc(len + 1, GFP_NOFS); - if (pi->name) { - memcpy(pi->name, *p, len); - pi->name[len] = '\0'; - dout(" name is %s\n", pi->name); - } + pi->name = name; + dout(" name is %s\n", pi->name); } *p += len; } -- cgit v1.1 From e91a9b639a691e0982088b5954eaafb5a25c8f1c Mon Sep 17 00:00:00 2001 From: Xi Wang Date: Wed, 6 Jun 2012 19:35:55 -0500 Subject: libceph: fix overflow in osdmap_decode() On 32-bit systems, a large `n' would overflow `n * sizeof(u32)' and bypass the check ceph_decode_need(p, end, n * sizeof(u32), bad). It would also overflow the subsequent kmalloc() size, leading to out-of-bounds write. Signed-off-by: Xi Wang Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'net') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index df47871..d70aaca 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -667,6 +667,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); ceph_decode_copy(p, &pgid, sizeof(pgid)); n = ceph_decode_32(p); + err = -EINVAL; + if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) + goto bad; ceph_decode_need(p, end, n * sizeof(u32), bad); err = -ENOMEM; pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); -- cgit v1.1 From a5506049500b30dbc5edb4d07a3577477c1f3643 Mon Sep 17 00:00:00 2001 From: Xi Wang Date: Wed, 6 Jun 2012 19:35:55 -0500 Subject: libceph: fix overflow in osdmap_apply_incremental() On 32-bit systems, a large `pglen' would overflow `pglen*sizeof(u32)' and bypass the check ceph_decode_need(p, end, pglen*sizeof(u32), bad). It would also overflow the subsequent kmalloc() size, leading to out-of-bounds write. Signed-off-by: Xi Wang Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index d70aaca..d3de09f 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -893,6 +893,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, (void) __remove_pg_mapping(&map->pg_temp, pgid); /* insert */ + if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { + err = -EINVAL; + goto bad; + } pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); if (!pg) { err = -ENOMEM; -- cgit v1.1 From 89a86be0ce20022f6ede8bccec078dbb3d63caaa Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 9 Jun 2012 14:19:21 -0700 Subject: libceph: transition socket state prior to actual connect Once we call ->connect(), we are racing against the actual connection, and a subsequent transition from CONNECTING -> CONNECTED. Set the state to CONNECTING before that, under the protection of the mutex, to avoid the race. This was introduced in 928443cd9644e7cfd46f687dbeffda2d1a357ff9, with the original socket state code. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 769a2c9..bdbecac 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -321,6 +321,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); + con_sock_state_connecting(con); ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), O_NONBLOCK); if (ret == -EINPROGRESS) { @@ -336,8 +337,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } con->sock = sock; - con_sock_state_connecting(con); - return 0; } -- cgit v1.1 From f3dea7edd3d449fe7a6d402c1ce56a294b985261 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sun, 10 Jun 2012 20:43:56 -0700 Subject: libceph: flush msgr queue during mon_client shutdown We need to flush the msgr workqueue during mon_client shutdown to ensure that any work affecting our embedded ceph_connection is finished so that we can be safely destroyed. Previously, we were flushing the work queue after osd_client shutdown and before mon_client shutdown to ensure that any osd connection refs to authorizers are flushed. Remove the redundant flush, and document in the comment that the mon_client flush is needed to cover that case as well. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/ceph_common.c | 7 ------- net/ceph/mon_client.c | 8 ++++++++ 2 files changed, 8 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 2de3ea1..c815f31 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -498,13 +498,6 @@ void ceph_destroy_client(struct ceph_client *client) /* unmount */ ceph_osdc_stop(&client->osdc); - /* - * make sure osd connections close out before destroying the - * auth module, which is needed to free those connections' - * ceph_authorizers. - */ - ceph_msgr_flush(); - ceph_monc_stop(&client->monc); ceph_debugfs_client_cleanup(client); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 7a16750..dc16595 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -838,6 +838,14 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_unlock(&monc->mutex); + /* + * flush msgr queue before we destroy ourselves to ensure that: + * - any work that references our embedded con is finished. + * - any osd_client or other work that may reference an authorizer + * finishes before we shut down the auth subsystem. + */ + ceph_msgr_flush(); + ceph_auth_destroy(monc->auth); ceph_msg_put(monc->m_auth); -- cgit v1.1 From 26ce171915f348abd1f41da1ed139d93750d987f Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Tue, 19 Jun 2012 08:52:33 -0500 Subject: libceph: fix NULL dereference in reset_connection() We dereference "con->in_msg" on the line after it was set to NULL. Signed-off-by: Dan Carpenter Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5e9f61d..23073cf 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -440,7 +440,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con->in_msg->con); + ceph_con_put(con); } con->connect_seq = 0; -- cgit v1.1 From 36eb71aa57e6a33d61fd90a2fd87f00c6844bc86 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Jun 2012 12:47:08 -0700 Subject: libceph: use con get/put methods The ceph_con_get/put() helpers manipulate the embedded con ref count, which isn't used now that ceph_connections are embedded in other structures. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 23073cf..fc0cee7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -414,7 +414,7 @@ static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); - ceph_con_put(msg->con); + msg->con->ops->put(msg->con); msg->con = NULL; ceph_msg_put(msg); @@ -440,7 +440,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); } con->connect_seq = 0; @@ -1919,7 +1919,7 @@ static void process_message(struct ceph_connection *con) con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2281,7 +2281,7 @@ static void ceph_fault(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2400,7 +2400,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) mutex_lock(&con->mutex); BUG_ON(msg->con != NULL); - msg->con = ceph_con_get(con); + msg->con = con->ops->get(con); BUG_ON(msg->con == NULL); BUG_ON(!list_empty(&msg->list_head)); @@ -2436,7 +2436,7 @@ void ceph_msg_revoke(struct ceph_msg *msg) dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); - ceph_con_put(msg->con); + msg->con->ops->put(msg->con); msg->con = NULL; msg->hdr.seq = 0; @@ -2646,7 +2646,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); if (con->in_msg) { - con->in_msg->con = ceph_con_get(con); + con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); } if (skip) @@ -2662,7 +2662,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } - con->in_msg->con = ceph_con_get(con); + con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } -- cgit v1.1 From d59315ca8c0de00df9b363f94a2641a30961ca1c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Jun 2012 12:49:23 -0700 Subject: libceph: drop ceph_con_get/put helpers and nref member These are no longer used. Every ceph_connection instance is embedded in another structure, and refcounts manipulated via the get/put ops. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fc0cee7..ab690e2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -501,30 +501,6 @@ bool ceph_con_opened(struct ceph_connection *con) } /* - * generic get/put - */ -struct ceph_connection *ceph_con_get(struct ceph_connection *con) -{ - int nref = __atomic_add_unless(&con->nref, 1, 0); - - dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); - - return nref ? con : NULL; -} - -void ceph_con_put(struct ceph_connection *con) -{ - int nref = atomic_dec_return(&con->nref); - - BUG_ON(nref < 0); - if (nref == 0) { - BUG_ON(con->sock); - kfree(con); - } - dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); -} - -/* * initialize a new connection. */ void ceph_con_init(struct ceph_connection *con, void *private, @@ -535,7 +511,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, memset(con, 0, sizeof(*con)); con->private = private; con->ops = ops; - atomic_set(&con->nref, 1); con->msgr = msgr; con_sock_state_init(con); @@ -1951,8 +1926,7 @@ static int try_write(struct ceph_connection *con) { int ret = 1; - dout("try_write start %p state %lu nref %d\n", con, con->state, - atomic_read(&con->nref)); + dout("try_write start %p state %lu\n", con, con->state); more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); -- cgit v1.1 From 739c905baa018c99003564ebc367d93aa44d4861 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: encapsulate out message data setup Move the code that prepares to write the data portion of a message into its own function. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ab690e2..5644866 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -565,6 +565,24 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +static void prepare_write_message_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->out_msg; + + BUG_ON(!msg); + BUG_ON(!msg->hdr.data_len); + + /* initialize page iterator */ + con->out_msg_pos.page = 0; + if (msg->pages) + con->out_msg_pos.page_pos = msg->page_alignment; + else + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.data_pos = 0; + con->out_msg_pos.did_page_crc = false; + con->out_more = 1; /* data + footer will follow */ +} + /* * Prepare footer for currently outgoing message, and finish things * off. Assumes out_kvec* are already valid.. we just add on to the end. @@ -657,26 +675,17 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg->footer.middle_crc = cpu_to_le32(crc); } else con->out_msg->footer.middle_crc = 0; - con->out_msg->footer.data_crc = 0; - dout("prepare_write_message front_crc %u data_crc %u\n", + dout("%s front_crc %u middle_crc %u\n", __func__, le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.middle_crc)); /* is there a data payload? */ - if (le32_to_cpu(m->hdr.data_len) > 0) { - /* initialize page iterator */ - con->out_msg_pos.page = 0; - if (m->pages) - con->out_msg_pos.page_pos = m->page_alignment; - else - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.data_pos = 0; - con->out_msg_pos.did_page_crc = false; - con->out_more = 1; /* data + footer will follow */ - } else { + con->out_msg->footer.data_crc = 0; + if (m->hdr.data_len) + prepare_write_message_data(con); + else /* no, queue up footer too and be done */ prepare_write_message_footer(con); - } set_bit(WRITE_PENDING, &con->flags); } -- cgit v1.1 From 84ca8fc87fcf4ab97bb8acdb59bf97bb4820cb14 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: encapsulate advancing msg page In write_partial_msg_pages(), once all the data from a page has been sent we advance to the next one. Put the code that takes care of this into its own function. While modifying write_partial_msg_pages(), make its local variable "in_trail" be Boolean, and use the local variable "msg" (which is just the connection's current out_msg pointer) consistently. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 58 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 24 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5644866..1b92e3b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -891,6 +891,33 @@ static void iter_bio_next(struct bio **bio_iter, int *seg) } #endif +static void out_msg_pos_next(struct ceph_connection *con, struct page *page, + size_t len, size_t sent, bool in_trail) +{ + struct ceph_msg *msg = con->out_msg; + + BUG_ON(!msg); + BUG_ON(!sent); + + con->out_msg_pos.data_pos += sent; + con->out_msg_pos.page_pos += sent; + if (sent == len) { + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + con->out_msg_pos.did_page_crc = false; + if (in_trail) + list_move_tail(&page->lru, + &msg->trail->head); + else if (msg->pagelist) + list_move_tail(&page->lru, + &msg->pagelist->head); +#ifdef CONFIG_BLOCK + else if (msg->bio) + iter_bio_next(&msg->bio_iter, &msg->bio_seg); +#endif + } +} + /* * Write as much message data payload as we can. If we finish, queue * up the footer. @@ -906,11 +933,11 @@ static int write_partial_msg_pages(struct ceph_connection *con) bool do_datacrc = !con->msgr->nocrc; int ret; int total_max_write; - int in_trail = 0; + bool in_trail = false; size_t trail_len = (msg->trail ? msg->trail->length : 0); dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", - con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, + con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); #ifdef CONFIG_BLOCK @@ -934,13 +961,12 @@ static int write_partial_msg_pages(struct ceph_connection *con) /* have we reached the trail part of the data? */ if (con->out_msg_pos.data_pos >= data_len - trail_len) { - in_trail = 1; + in_trail = true; total_max_write = data_len - con->out_msg_pos.data_pos; page = list_first_entry(&msg->trail->head, struct page, lru); - max_write = PAGE_SIZE; } else if (msg->pages) { page = msg->pages[con->out_msg_pos.page]; } else if (msg->pagelist) { @@ -964,14 +990,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (do_datacrc && !con->out_msg_pos.did_page_crc) { void *base; u32 crc; - u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); + u32 tmpcrc = le32_to_cpu(msg->footer.data_crc); char *kaddr; kaddr = kmap(page); BUG_ON(kaddr == NULL); base = kaddr + con->out_msg_pos.page_pos + bio_offset; crc = crc32c(tmpcrc, base, len); - con->out_msg->footer.data_crc = cpu_to_le32(crc); + msg->footer.data_crc = cpu_to_le32(crc); con->out_msg_pos.did_page_crc = true; } ret = ceph_tcp_sendpage(con->sock, page, @@ -984,30 +1010,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (ret <= 0) goto out; - con->out_msg_pos.data_pos += ret; - con->out_msg_pos.page_pos += ret; - if (ret == len) { - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = false; - if (in_trail) - list_move_tail(&page->lru, - &msg->trail->head); - else if (msg->pagelist) - list_move_tail(&page->lru, - &msg->pagelist->head); -#ifdef CONFIG_BLOCK - else if (msg->bio) - iter_bio_next(&msg->bio_iter, &msg->bio_seg); -#endif - } + out_msg_pos_next(con, page, len, (size_t) ret, in_trail); } dout("write_partial_msg_pages %p msg %p done\n", con, msg); /* prepare and queue up footer, too */ if (!do_datacrc) - con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; + msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; con_out_kvec_reset(con); prepare_write_message_footer(con); ret = 1; -- cgit v1.1 From fd154f3c75465abd83b7a395033e3755908a1e6e Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: don't mark footer complete before it is This is a nit, but prepare_write_message() sets the FOOTER_COMPLETE flag before the CRC for the data portion (recorded in the footer) has been completely computed. Hold off setting the complete flag until we've decided it's ready to send. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1b92e3b..5354d59 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -592,6 +592,8 @@ static void prepare_write_message_footer(struct ceph_connection *con) struct ceph_msg *m = con->out_msg; int v = con->out_kvec_left; + m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; + dout("prepare_write_message_footer %p\n", con); con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; @@ -665,7 +667,7 @@ static void prepare_write_message(struct ceph_connection *con) /* fill in crc (except data pages), footer */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; + con->out_msg->footer.flags = 0; crc = crc32c(0, m->front.iov_base, m->front.iov_len); con->out_msg->footer.front_crc = cpu_to_le32(crc); -- cgit v1.1 From df6ad1f97342ebc4270128222e896541405eecdb Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: move init_bio_*() functions up Move init_bio_iter() and iter_bio_next() up in their source file so the'll be defined before they're needed. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5354d59..7b5ff45 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -565,6 +565,31 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +#ifdef CONFIG_BLOCK +static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) +{ + if (!bio) { + *iter = NULL; + *seg = 0; + return; + } + *iter = bio; + *seg = bio->bi_idx; +} + +static void iter_bio_next(struct bio **bio_iter, int *seg) +{ + if (*bio_iter == NULL) + return; + + BUG_ON(*seg >= (*bio_iter)->bi_vcnt); + + (*seg)++; + if (*seg == (*bio_iter)->bi_vcnt) + init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); +} +#endif + static void prepare_write_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; @@ -868,31 +893,6 @@ out: return ret; /* done! */ } -#ifdef CONFIG_BLOCK -static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) -{ - if (!bio) { - *iter = NULL; - *seg = 0; - return; - } - *iter = bio; - *seg = bio->bi_idx; -} - -static void iter_bio_next(struct bio **bio_iter, int *seg) -{ - if (*bio_iter == NULL) - return; - - BUG_ON(*seg >= (*bio_iter)->bi_vcnt); - - (*seg)++; - if (*seg == (*bio_iter)->bi_vcnt) - init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); -} -#endif - static void out_msg_pos_next(struct ceph_connection *con, struct page *page, size_t len, size_t sent, bool in_trail) { -- cgit v1.1 From 572c588edadaa3da3992bd8a0fed830bbcc861f8 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: move init of bio_iter If a message has a non-null bio pointer, its bio_iter field is initialized in write_partial_msg_pages() if this has not been done already. This is really a one-time setup operation for sending a message's (bio) data, so move that initialization code into prepare_write_message_data() which serves that purpose. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 7b5ff45..fedad91 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -603,6 +603,10 @@ static void prepare_write_message_data(struct ceph_connection *con) con->out_msg_pos.page_pos = msg->page_alignment; else con->out_msg_pos.page_pos = 0; +#ifdef CONFIG_BLOCK + if (msg->bio && !msg->bio_iter) + init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); +#endif con->out_msg_pos.data_pos = 0; con->out_msg_pos.did_page_crc = false; con->out_more = 1; /* data + footer will follow */ @@ -942,11 +946,6 @@ static int write_partial_msg_pages(struct ceph_connection *con) con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); -#ifdef CONFIG_BLOCK - if (msg->bio && !msg->bio_iter) - init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); -#endif - while (data_len > con->out_msg_pos.data_pos) { struct page *page = NULL; int max_write = PAGE_SIZE; -- cgit v1.1 From abdaa6a849af1d63153682c11f5bbb22dacb1f6b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: don't use bio_iter as a flag Recently a bug was fixed in which the bio_iter field in a ceph message was not being properly re-initialized when a message got re-transmitted: commit 43643528cce60ca184fe8197efa8e8da7c89a037 Author: Yan, Zheng rbd: Clear ceph_msg->bio_iter for retransmitted message We are now only initializing the bio_iter field when we are about to start to write message data (in prepare_write_message_data()), rather than every time we are attempting to write any portion of the message data (in write_partial_msg_pages()). This means we no longer need to use the msg->bio_iter field as a flag. So just don't do that any more. Trust prepare_write_message_data() to ensure msg->bio_iter is properly initialized, every time we are about to begin writing (or re-writing) a message's bio data. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fedad91..3a43303 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -604,7 +604,7 @@ static void prepare_write_message_data(struct ceph_connection *con) else con->out_msg_pos.page_pos = 0; #ifdef CONFIG_BLOCK - if (msg->bio && !msg->bio_iter) + if (msg->bio) init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); #endif con->out_msg_pos.data_pos = 0; @@ -672,10 +672,6 @@ static void prepare_write_message(struct ceph_connection *con) m->hdr.seq = cpu_to_le64(++con->out_seq); m->needs_out_seq = false; } -#ifdef CONFIG_BLOCK - else - m->bio_iter = NULL; -#endif dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), -- cgit v1.1 From a8d00e3cdef4c1c4f194414b72b24cd995439a05 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: SOCK_CLOSED is a flag, not a state The following commit changed it so SOCK_CLOSED bit was stored in a connection's new "flags" field rather than its "state" field. libceph: start separating connection flags from state commit 928443cd That bit is used in con_close_socket() to protect against setting an error message more than once in the socket event handler function. Unfortunately, the field being operated on in that function was not updated to be "flags" as it should have been. This fixes that error. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3a43303..3965394 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -397,11 +397,11 @@ static int con_close_socket(struct ceph_connection *con) dout("con_close_socket on %p sock %p\n", con, con->sock); if (!con->sock) return 0; - set_bit(SOCK_CLOSED, &con->state); + set_bit(SOCK_CLOSED, &con->flags); rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); con->sock = NULL; - clear_bit(SOCK_CLOSED, &con->state); + clear_bit(SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; } -- cgit v1.1 From 188048bce311ee41e5178bc3255415d0eae28423 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: don't change socket state on sock event Currently the socket state change event handler records an error message on a connection to distinguish a close while connecting from a close while a connection was already established. Changing connection information during handling of a socket event is not very clean, so instead move this assignment inside con_work(), where it can be done during normal connection-level processing (and under protection of the connection mutex as well). Move the handling of a socket closed event up to the top of the processing loop in con_work(); there's no point in handling backoff etc. if we have a newly-closed socket to take care of. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3965394..56381b9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -261,13 +261,8 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { - if (test_bit(CONNECTING, &con->state)) - con->error_msg = "connection failed"; - else - con->error_msg = "socket closed"; + if (!test_and_set_bit(SOCK_CLOSED, &con->flags)) queue_con(con); - } break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); @@ -2187,6 +2182,14 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: + if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { + if (test_bit(CONNECTING, &con->state)) + con->error_msg = "connection failed"; + else + con->error_msg = "socket closed"; + goto fault; + } + if (test_and_clear_bit(BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, @@ -2216,9 +2219,6 @@ restart: con_close_socket(con); } - if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) - goto fault; - ret = try_read(con); if (ret == -EAGAIN) goto restart; -- cgit v1.1 From d65c9e0b9eb43d14ece9dd843506ccba06162ee7 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: just set SOCK_CLOSED when state changes When a TCP_CLOSE or TCP_CLOSE_WAIT event occurs, the SOCK_CLOSED connection flag bit is set, and if it had not been previously set queue_con() is called to ensure con_work() will get a chance to handle the changed state. con_work() atomically checks--and if set, clears--the SOCK_CLOSED bit if it was set. This means that even if the bit were set repeatedly, the related processing in con_work() only gets called once per transition of the bit from 0 to 1. What's important then is that we ensure con_work() gets called *at least* once when a socket close event occurs, not that it gets called *exactly* once. The work queue mechanism already takes care of queueing work only if it is not already queued, so there's no need for us to call queue_con() conditionally. So this patch just makes it so the SOCK_CLOSED flag gets set unconditionally in ceph_sock_state_change(). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 56381b9..cebef85 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -261,8 +261,8 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - if (!test_and_set_bit(SOCK_CLOSED, &con->flags)) - queue_con(con); + set_bit(SOCK_CLOSED, &con->flags); + queue_con(con); break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); -- cgit v1.1 From 456ea46865787283088b23a8a7f69244513b95f0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: don't touch con state in con_close_socket() In con_close_socket(), a connection's SOCK_CLOSED flag gets set and then cleared while its shutdown method is called and its reference gets dropped. Previously, that flag got set only if it had not already been set, so setting it in con_close_socket() might have prevented additional processing being done on a socket being shut down. We no longer set SOCK_CLOSED in the socket event routine conditionally, so setting that bit here no longer provides whatever benefit it might have provided before. A race condition could still leave the SOCK_CLOSED bit set even after we've issued the call to con_close_socket(), so we still clear that bit after shutting the socket down. Add a comment explaining the reason for this. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cebef85..cfcca1f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -392,10 +392,16 @@ static int con_close_socket(struct ceph_connection *con) dout("con_close_socket on %p sock %p\n", con, con->sock); if (!con->sock) return 0; - set_bit(SOCK_CLOSED, &con->flags); rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); con->sock = NULL; + + /* + * Forcibly clear the SOCK_CLOSE flag. It gets set + * independent of the connection mutex, and we could have + * received a socket close event before we had the chance to + * shut the socket down. + */ clear_bit(SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; -- cgit v1.1 From bb9e6bba5d8b85b631390f8dbe8a24ae1ff5b48a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: clear CONNECTING in ceph_con_close() A connection that is closed will no longer be connecting. So clear the CONNECTING state bit in ceph_con_close(). Similarly, if the socket has been closed we no longer are in connecting state (a new connect sequence will need to be initiated). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cfcca1f..beee382 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -462,6 +462,7 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); + clear_bit(CONNECTING, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ set_bit(CLOSED, &con->state); @@ -2189,7 +2190,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_bit(CONNECTING, &con->state)) + if (test_and_clear_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; else con->error_msg = "socket closed"; -- cgit v1.1 From 3ec50d1868a9e0493046400bb1fdd054c7f64ebd Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: clear NEGOTIATING when done A connection state's NEGOTIATING bit gets set while in CONNECTING state after we have successfully exchanged a ceph banner and IP addresses with the connection's peer (the server). But that bit is not cleared again--at least not until another connection attempt is initiated. Instead, clear it as soon as the connection is fully established. Also, clear it when a socket connection gets prematurely closed in the midst of establishing a ceph connection (in case we had reached the point where it was set). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index beee382..500207b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1562,6 +1562,7 @@ static int process_connect(struct ceph_connection *con) fail_protocol(con); return -1; } + clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; @@ -1951,7 +1952,6 @@ more: /* open the socket first? */ if (con->sock == NULL) { - clear_bit(NEGOTIATING, &con->state); set_bit(CONNECTING, &con->state); con_out_kvec_reset(con); @@ -2190,10 +2190,12 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTING, &con->state)) + if (test_and_clear_bit(CONNECTING, &con->state)) { + clear_bit(NEGOTIATING, &con->state); con->error_msg = "connection failed"; - else + } else { con->error_msg = "socket closed"; + } goto fault; } -- cgit v1.1 From e27947c767f5bed15048f4e4dad3e2eb69133697 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: define and use an explicit CONNECTED state There is no state explicitly defined when a ceph connection is fully operational. So define one. It's set when the connection sequence completes successfully, and is cleared when the connection gets closed. Be a little more careful when examining the old state when a socket disconnect event is reported. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 500207b..83bcf97 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -463,6 +463,7 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); + clear_bit(CONNECTED, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ set_bit(CLOSED, &con->state); @@ -1564,6 +1565,7 @@ static int process_connect(struct ceph_connection *con) } clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); + set_bit(CONNECTED, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; con->peer_features = server_feat; @@ -2114,6 +2116,7 @@ more: prepare_read_ack(con); break; case CEPH_MSGR_TAG_CLOSE: + clear_bit(CONNECTED, &con->state); set_bit(CLOSED, &con->state); /* fixme */ goto out; default: @@ -2190,11 +2193,13 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTING, &con->state)) { + if (test_and_clear_bit(CONNECTED, &con->state)) + con->error_msg = "socket closed"; + else if (test_and_clear_bit(CONNECTING, &con->state)) { clear_bit(NEGOTIATING, &con->state); con->error_msg = "connection failed"; } else { - con->error_msg = "socket closed"; + con->error_msg = "unrecognized con state"; } goto fault; } -- cgit v1.1 From ab166d5aa3bc036fba7efaca6e4e43a7e9510acf Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 31 May 2012 11:37:29 -0500 Subject: libceph: separate banner and connect writes There are two phases in the process of linking together the two ends of a ceph connection. The first involves exchanging a banner and IP addresses, and if that is successful a second phase exchanges some detail about each side's connection capabilities. When initiating a connection, the client side now queues to send its information for both phases of this process at the same time. This is probably a bit more efficient, but it is slightly messier from a layering perspective in the code. So rearrange things so that the client doesn't send the connection information until it has received and processed the response in the initial banner phase (in process_banner()). Move the code (in the (con->sock == NULL) case in try_write()) that prepares for writing the connection information, delaying doing that until the banner exchange has completed. Move the code that begins the transition to this second "NEGOTIATING" phase out of process_banner() and into its caller, so preparing to write the connection information and preparing to read the response are adjacent to each other. Finally, preparing to write the connection information now requires the output kvec to be reset in all cases, so move that into the prepare_write_connect() and delete it from all callers. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 83bcf97..5e67be3 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -841,6 +841,7 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.authorizer_len = auth ? cpu_to_le32(auth->authorizer_buf_len) : 0; + con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); if (auth && auth->authorizer_buf_len) @@ -1430,8 +1431,6 @@ static int process_banner(struct ceph_connection *con) ceph_pr_addr(&con->msgr->inst.addr.in_addr)); } - set_bit(NEGOTIATING, &con->state); - prepare_read_connect(con); return 0; } @@ -1481,7 +1480,6 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1502,7 +1500,6 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1528,7 +1525,6 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1545,7 +1541,6 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1958,9 +1953,6 @@ more: con_out_kvec_reset(con); prepare_write_banner(con); - ret = prepare_write_connect(con); - if (ret < 0) - goto out; prepare_read_banner(con); BUG_ON(con->in_msg); @@ -2073,6 +2065,16 @@ more: ret = process_banner(con); if (ret < 0) goto out; + + /* Banner is good, exchange connection info */ + ret = prepare_write_connect(con); + if (ret < 0) + goto out; + prepare_read_connect(con); + set_bit(NEGOTIATING, &con->state); + + /* Send connection info before awaiting response */ + goto out; } ret = read_partial_connect(con); if (ret <= 0) -- cgit v1.1 From 7593af920baac37752190a0db703d2732bed4a3b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 24 May 2012 11:55:03 -0500 Subject: libceph: distinguish two phases of connect sequence Currently a ceph connection enters a "CONNECTING" state when it begins the process of (re-)connecting with its peer. Once the two ends have successfully exchanged their banner and addresses, an additional NEGOTIATING bit is set in the ceph connection's state to indicate the connection information exhange has begun. The CONNECTING bit/state continues to be set during this phase. Rather than have the CONNECTING state continue while the NEGOTIATING bit is set, interpret these two phases as distinct states. In other words, when NEGOTIATING is set, clear CONNECTING. That way only one of them will be active at a time. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 52 ++++++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 24 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5e67be3..32a3a2a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1559,7 +1559,6 @@ static int process_connect(struct ceph_connection *con) return -1; } clear_bit(NEGOTIATING, &con->state); - clear_bit(CONNECTING, &con->state); set_bit(CONNECTED, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; @@ -2000,7 +1999,8 @@ more_kvec: } do_next: - if (!test_bit(CONNECTING, &con->state)) { + if (!test_bit(CONNECTING, &con->state) && + !test_bit(NEGOTIATING, &con->state)) { /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2057,25 +2057,29 @@ more: } if (test_bit(CONNECTING, &con->state)) { - if (!test_bit(NEGOTIATING, &con->state)) { - dout("try_read connecting\n"); - ret = read_partial_banner(con); - if (ret <= 0) - goto out; - ret = process_banner(con); - if (ret < 0) - goto out; - - /* Banner is good, exchange connection info */ - ret = prepare_write_connect(con); - if (ret < 0) - goto out; - prepare_read_connect(con); - set_bit(NEGOTIATING, &con->state); - - /* Send connection info before awaiting response */ + dout("try_read connecting\n"); + ret = read_partial_banner(con); + if (ret <= 0) goto out; - } + ret = process_banner(con); + if (ret < 0) + goto out; + + clear_bit(CONNECTING, &con->state); + set_bit(NEGOTIATING, &con->state); + + /* Banner is good, exchange connection info */ + ret = prepare_write_connect(con); + if (ret < 0) + goto out; + prepare_read_connect(con); + + /* Send connection info before awaiting response */ + goto out; + } + + if (test_bit(NEGOTIATING, &con->state)) { + dout("try_read negotiating\n"); ret = read_partial_connect(con); if (ret <= 0) goto out; @@ -2197,12 +2201,12 @@ restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { if (test_and_clear_bit(CONNECTED, &con->state)) con->error_msg = "socket closed"; - else if (test_and_clear_bit(CONNECTING, &con->state)) { - clear_bit(NEGOTIATING, &con->state); + else if (test_and_clear_bit(NEGOTIATING, &con->state)) + con->error_msg = "negotiation failed"; + else if (test_and_clear_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; - } else { + else con->error_msg = "unrecognized con state"; - } goto fault; } -- cgit v1.1 From 5821bd8ccdf5d17ab2c391c773756538603838c3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: small changes to messenger.c This patch gathers a few small changes in "net/ceph/messenger.c": out_msg_pos_next() - small logic change that mostly affects indentation write_partial_msg_pages(). - use a local variable trail_off to represent the offset into a message of the trail portion of the data (if present) - once we are in the trail portion we will always be there, so we don't always need to check against our data position - avoid computing len twice after we've reached the trail - get rid of the variable tmpcrc, which is not needed - trail_off and trail_len never change so mark them const - update some comments read_partial_message_bio() - bio_iovec_idx() will never return an error, so don't bother checking for it Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 63 ++++++++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 32 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 32a3a2a..4578e99 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -907,21 +907,23 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, con->out_msg_pos.data_pos += sent; con->out_msg_pos.page_pos += sent; - if (sent == len) { - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = false; - if (in_trail) - list_move_tail(&page->lru, - &msg->trail->head); - else if (msg->pagelist) - list_move_tail(&page->lru, - &msg->pagelist->head); + if (sent < len) + return; + + BUG_ON(sent != len); + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + con->out_msg_pos.did_page_crc = false; + if (in_trail) + list_move_tail(&page->lru, + &msg->trail->head); + else if (msg->pagelist) + list_move_tail(&page->lru, + &msg->pagelist->head); #ifdef CONFIG_BLOCK - else if (msg->bio) - iter_bio_next(&msg->bio_iter, &msg->bio_seg); + else if (msg->bio) + iter_bio_next(&msg->bio_iter, &msg->bio_seg); #endif - } } /* @@ -940,30 +942,31 @@ static int write_partial_msg_pages(struct ceph_connection *con) int ret; int total_max_write; bool in_trail = false; - size_t trail_len = (msg->trail ? msg->trail->length : 0); + const size_t trail_len = (msg->trail ? msg->trail->length : 0); + const size_t trail_off = data_len - trail_len; dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); + /* + * Iterate through each page that contains data to be + * written, and send as much as possible for each. + * + * If we are calculating the data crc (the default), we will + * need to map the page. If we have no pages, they have + * been revoked, so use the zero page. + */ while (data_len > con->out_msg_pos.data_pos) { struct page *page = NULL; int max_write = PAGE_SIZE; int bio_offset = 0; - total_max_write = data_len - trail_len - - con->out_msg_pos.data_pos; - - /* - * if we are calculating the data crc (the default), we need - * to map the page. if our pages[] has been revoked, use the - * zero page. - */ - - /* have we reached the trail part of the data? */ - if (con->out_msg_pos.data_pos >= data_len - trail_len) { - in_trail = true; + in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; + if (!in_trail) + total_max_write = trail_off - con->out_msg_pos.data_pos; + if (in_trail) { total_max_write = data_len - con->out_msg_pos.data_pos; page = list_first_entry(&msg->trail->head, @@ -990,14 +993,13 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (do_datacrc && !con->out_msg_pos.did_page_crc) { void *base; - u32 crc; - u32 tmpcrc = le32_to_cpu(msg->footer.data_crc); + u32 crc = le32_to_cpu(msg->footer.data_crc); char *kaddr; kaddr = kmap(page); BUG_ON(kaddr == NULL); base = kaddr + con->out_msg_pos.page_pos + bio_offset; - crc = crc32c(tmpcrc, base, len); + crc = crc32c(crc, base, len); msg->footer.data_crc = cpu_to_le32(crc); con->out_msg_pos.did_page_crc = true; } @@ -1702,9 +1704,6 @@ static int read_partial_message_bio(struct ceph_connection *con, void *p; int ret, left; - if (IS_ERR(bv)) - return PTR_ERR(bv); - left = min((int)(data_len - con->in_msg_pos.data_pos), (int)(bv->bv_len - con->in_msg_pos.page_pos)); -- cgit v1.1 From bc18f4b1c850ab355e38373fbb60fd28568d84b5 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: add some fine ASCII art Sage liked the state diagram I put in my commit description so I'm putting it in with the code. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4578e99..dcc50e4 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -29,7 +29,47 @@ * the sender. */ -/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */ +/* + * We track the state of the socket on a given connection using + * values defined below. The transition to a new socket state is + * handled by a function which verifies we aren't coming from an + * unexpected state. + * + * -------- + * | NEW* | transient initial state + * -------- + * | con_sock_state_init() + * v + * ---------- + * | CLOSED | initialized, but no socket (and no + * ---------- TCP connection) + * ^ \ + * | \ con_sock_state_connecting() + * | ---------------------- + * | \ + * + con_sock_state_closed() \ + * |\ \ + * | \ \ + * | ----------- \ + * | | CLOSING | socket event; \ + * | ----------- await close \ + * | ^ | + * | | | + * | + con_sock_state_closing() | + * | / \ | + * | / --------------- | + * | / \ v + * | / -------------- + * | / -----------------| CONNECTING | socket created, TCP + * | | / -------------- connect initiated + * | | | con_sock_state_connected() + * | | v + * ------------- + * | CONNECTED | TCP connection established + * ------------- + * + * State values for ceph_connection->sock_state; NEW is assumed to be 0. + */ #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ -- cgit v1.1 From b7a9e5dd40f17a48a72f249b8bbc989b63bae5fd Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jun 2012 12:24:08 -0700 Subject: libceph: set peer name on con_open, not init The peer name may change on each open attempt, even when the connection is reused. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 12 +++++++----- net/ceph/mon_client.c | 4 ++-- net/ceph/osd_client.c | 10 ++++++---- 3 files changed, 15 insertions(+), 11 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index dcc50e4..ae082d9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -523,12 +523,17 @@ EXPORT_SYMBOL(ceph_con_close); /* * Reopen a closed connection, with a new peer address. */ -void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) +void ceph_con_open(struct ceph_connection *con, + __u8 entity_type, __u64 entity_num, + struct ceph_entity_addr *addr) { dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + con->peer_name.type = (__u8) entity_type; + con->peer_name.num = cpu_to_le64(entity_num); + memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ queue_con(con); @@ -548,7 +553,7 @@ bool ceph_con_opened(struct ceph_connection *con) */ void ceph_con_init(struct ceph_connection *con, void *private, const struct ceph_connection_operations *ops, - struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num) + struct ceph_messenger *msgr) { dout("con_init %p\n", con); memset(con, 0, sizeof(*con)); @@ -558,9 +563,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, con_sock_state_init(con); - con->peer_name.type = (__u8) entity_type; - con->peer_name.num = cpu_to_le64(entity_num); - mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index e9db3de..bcc80a0 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -143,11 +143,11 @@ static int __open_session(struct ceph_mon_client *monc) monc->want_next_osdmap = !!monc->want_next_osdmap; ceph_con_init(&monc->con, monc, &mon_con_ops, - &monc->client->msgr, - CEPH_ENTITY_TYPE_MON, monc->cur_mon); + &monc->client->msgr); dout("open_session mon%d opening\n", monc->cur_mon); ceph_con_open(&monc->con, + CEPH_ENTITY_TYPE_MON, monc->cur_mon, &monc->monmap->mon_inst[monc->cur_mon].addr); /* initiatiate authentication handshake */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index db2da54..c252711 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -639,8 +639,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; - ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr, - CEPH_ENTITY_TYPE_OSD, onum); + ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); INIT_LIST_HEAD(&osd->o_keepalive_item); return osd; @@ -750,7 +749,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) ret = -EAGAIN; } else { ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, + &osdc->osdmap->osd_addr[osd->o_osd]); osd->o_incarnation++; } return ret; @@ -1005,7 +1005,9 @@ static int __map_request(struct ceph_osd_client *osdc, dout("map_request osd %p is osd%d\n", req->r_osd, o); __insert_osd(osdc, req->r_osd); - ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); + ceph_con_open(&req->r_osd->o_con, + CEPH_ENTITY_TYPE_OSD, o, + &osdc->osdmap->osd_addr[o]); } if (req->r_osd) { -- cgit v1.1 From 735a72ef952d42a256f79ae3e6dc1c17a45c041b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jun 2012 12:24:34 -0700 Subject: libceph: initialize mon_client con only once Do not re-initialize the con on every connection attempt. When we ceph_con_close, there may still be work queued on the socket (e.g., to close it), and re-initializing will clobber the work_struct state. Signed-off-by: Sage Weil --- net/ceph/mon_client.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index bcc80a0..bfd21a8 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -119,7 +119,6 @@ static void __close_session(struct ceph_mon_client *monc) dout("__close_session closing mon%d\n", monc->cur_mon); ceph_msg_revoke(monc->m_auth); ceph_con_close(&monc->con); - monc->con.private = NULL; monc->cur_mon = -1; monc->pending_auth = 0; ceph_auth_reset(monc->auth); @@ -142,9 +141,6 @@ static int __open_session(struct ceph_mon_client *monc) monc->sub_renew_after = jiffies; /* i.e., expired */ monc->want_next_osdmap = !!monc->want_next_osdmap; - ceph_con_init(&monc->con, monc, &mon_con_ops, - &monc->client->msgr); - dout("open_session mon%d opening\n", monc->cur_mon); ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, @@ -798,6 +794,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) if (!monc->m_auth) goto out_auth_reply; + ceph_con_init(&monc->con, monc, &mon_con_ops, + &monc->client->msgr); + monc->cur_mon = -1; monc->hunting = true; monc->sub_renew_after = jiffies; -- cgit v1.1 From fbb85a478f6d4cce6942f1c25c6a68ec5b1e7e7f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jun 2012 12:31:02 -0700 Subject: libceph: allow sock transition from CONNECTING to CLOSED It is possible to close a socket that is in the OPENING state. For example, it can happen if ceph_con_close() is called on the con before the TCP connection is established. con_work() will come around and shut down the socket. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ae082d9..09ada79 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -48,17 +48,17 @@ * | ---------------------- * | \ * + con_sock_state_closed() \ - * |\ \ - * | \ \ - * | ----------- \ - * | | CLOSING | socket event; \ - * | ----------- await close \ - * | ^ | - * | | | - * | + con_sock_state_closing() | - * | / \ | - * | / --------------- | - * | / \ v + * |+--------------------------- \ + * | \ \ \ + * | ----------- \ \ + * | | CLOSING | socket event; \ \ + * | ----------- await close \ \ + * | ^ \ | + * | | \ | + * | + con_sock_state_closing() \ | + * | / \ | | + * | / --------------- | | + * | / \ v v * | / -------------- * | / -----------------| CONNECTING | socket created, TCP * | | / -------------- connect initiated @@ -241,7 +241,8 @@ static void con_sock_state_closed(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && - old_state != CON_SOCK_STATE_CLOSING)) + old_state != CON_SOCK_STATE_CLOSING && + old_state != CON_SOCK_STATE_CONNECTING)) printk("%s: unexpected old state %d\n", __func__, old_state); } -- cgit v1.1 From d50b409fb8698571d8209e5adfe122e287e31290 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 9 Jul 2012 14:22:34 -0700 Subject: libceph: initialize msgpool message types Initialize the type field for messages in a msgpool. The caller was doing this for osd ops, but not for the reply messages. Reported-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/msgpool.c | 7 ++++--- net/ceph/osd_client.c | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 11d5f41..ddec1c1 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) struct ceph_msgpool *pool = arg; struct ceph_msg *msg; - msg = ceph_msg_new(0, pool->front_len, gfp_mask, true); + msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); if (!msg) { dout("msgpool_alloc %s failed\n", pool->name); } else { @@ -32,10 +32,11 @@ static void msgpool_free(void *element, void *arg) ceph_msg_put(msg); } -int ceph_msgpool_init(struct ceph_msgpool *pool, +int ceph_msgpool_init(struct ceph_msgpool *pool, int type, int front_len, int size, bool blocking, const char *name) { dout("msgpool %s init\n", name); + pool->type = type; pool->front_len = front_len; pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); if (!pool->pool) @@ -61,7 +62,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, WARN_ON(1); /* try to alloc a fresh message */ - return ceph_msg_new(0, front_len, GFP_NOFS, false); + return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); } msg = mempool_alloc(pool->pool, GFP_NOFS); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index c252711..4475d17 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -242,6 +242,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } ceph_pagelist_init(req->r_trail); } + /* create request message; allow space for oid */ msg_size += MAX_OBJ_NAME_SIZE; if (snapc) @@ -255,7 +256,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, return NULL; } - msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; @@ -1837,11 +1837,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; - err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, + err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, + OSD_OP_FRONT_LEN, 10, true, "osd_op"); if (err < 0) goto out_mempool; - err = ceph_msgpool_init(&osdc->msgpool_op_reply, + err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, OSD_OPREPLY_FRONT_LEN, 10, true, "osd_op_reply"); if (err < 0) -- cgit v1.1 From cd43045c2de60f40a0aea49bfb252a2eafe58f8c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 9 Jul 2012 14:31:41 -0700 Subject: libceph: initialize rb, list nodes in ceph_osd_request These don't strictly need to be initialized based on how they are used, but it is good practice to do so. Reported-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/osd_client.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 4475d17..07920ca 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -213,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, kref_init(&req->r_kref); init_completion(&req->r_completion); init_completion(&req->r_safe_completion); + rb_init_node(&req->r_node); INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); INIT_LIST_HEAD(&req->r_linger_osd); INIT_LIST_HEAD(&req->r_req_lru_item); + INIT_LIST_HEAD(&req->r_osd_item); + req->r_flags = flags; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); -- cgit v1.1 From a16cb1f70799c851410d9dca0a24122e258df06c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 10 Jul 2012 11:53:34 -0700 Subject: libceph: fix messenger retry In ancient times, the messenger could both initiate and accept connections. An artifact if that was data structures to store/process an incoming ceph_msg_connect request and send an outgoing ceph_msg_connect_reply. Sadly, the negotiation code was referencing those structures and ignoring important information (like the peer's connect_seq) from the correct ones. Among other things, this fixes tight reconnect loops where the server sends RETRY_SESSION and we (the client) retries with the same connect_seq as last time. This bug pretty easily triggered by injecting socket failures on the MDS and running some fs workload like workunits/direct_io/test_sync_io. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 09ada79..16814d1 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1540,7 +1540,7 @@ static int process_connect(struct ceph_connection *con) * dropped messages. */ dout("process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_connect.connect_seq)); + le32_to_cpu(con->in_reply.connect_seq)); pr_err("%s%lld %s connection reset\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); @@ -1566,10 +1566,10 @@ static int process_connect(struct ceph_connection *con) * If we sent a smaller connect_seq than the peer has, try * again with a larger value. */ - dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", + dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", le32_to_cpu(con->out_connect.connect_seq), - le32_to_cpu(con->in_connect.connect_seq)); - con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); + le32_to_cpu(con->in_reply.connect_seq)); + con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1583,9 +1583,9 @@ static int process_connect(struct ceph_connection *con) */ dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", con->peer_global_seq, - le32_to_cpu(con->in_connect.global_seq)); + le32_to_cpu(con->in_reply.global_seq)); get_global_seq(con->msgr, - le32_to_cpu(con->in_connect.global_seq)); + le32_to_cpu(con->in_reply.global_seq)); ret = prepare_write_connect(con); if (ret < 0) return ret; -- cgit v1.1 From a2a3258417eb6a1799cf893350771428875a8287 Mon Sep 17 00:00:00 2001 From: Guanjun He Date: Sun, 8 Jul 2012 19:50:33 -0700 Subject: libceph: prevent the race of incoming work during teardown Add an atomic variable 'stopping' as flag in struct ceph_messenger, set this flag to 1 in function ceph_destroy_client(), and add the condition code in function ceph_data_ready() to test the flag value, if true(1), just return. Signed-off-by: Guanjun He Reviewed-by: Sage Weil --- net/ceph/ceph_common.c | 2 ++ net/ceph/messenger.c | 5 +++++ 2 files changed, 7 insertions(+) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 58b09ef..3b45e01 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -495,6 +495,8 @@ void ceph_destroy_client(struct ceph_client *client) { dout("destroy_client %p\n", client); + atomic_set(&client->msgr.stopping, 1); + /* unmount */ ceph_osdc_stop(&client->osdc); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 16814d1..63e1252 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -254,6 +254,9 @@ static void con_sock_state_closed(struct ceph_connection *con) static void ceph_sock_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = sk->sk_user_data; + if (atomic_read(&con->msgr->stopping)) { + return; + } if (sk->sk_state != TCP_CLOSE_WAIT) { dout("%s on %p state = %lu, queueing work\n", __func__, @@ -2413,6 +2416,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr, encode_my_addr(msgr); msgr->nocrc = nocrc; + atomic_set(&msgr->stopping, 0); + dout("%s %p\n", __func__, msgr); } EXPORT_SYMBOL(ceph_messenger_init); -- cgit v1.1 From 1fe60e51a3744528f3939b1b1167ca909133d9ae Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:23:22 -0700 Subject: libceph: move feature bits to separate header This is simply cleanup that will keep things more closely synced with the userland code. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/ceph_common.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 3b45e01..69e38db 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -460,9 +461,9 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, client->auth_err = 0; client->extra_mon_dispatch = NULL; - client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT | + client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT | supported_features; - client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT | + client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT | required_features; /* msgr */ -- cgit v1.1 From 546f04ef716dd49521774653d8b032a7d64c05d9 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:15:23 -0700 Subject: libceph: support crush tunables The server side recently added support for tuning some magic crush variables. Decode these variables if they are present, or use the default values if they are not present. Corresponds to ceph.git commit 89af369c25f274fe62ef730e5e8aad0c54f1e5a5. Signed-off-by: caleb miles Reviewed-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/crush/mapper.c | 13 +++++++------ net/ceph/osdmap.c | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index d7edc24..35fce75 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map, int item = 0; int itemtype; int collide, reject; - const unsigned int orig_tries = 5; /* attempts before we fall back to search */ dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", bucket->id, x, outpos, numrep); @@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map, reject = 1; goto reject; } - if (flocal >= (in->size>>1) && - flocal > orig_tries) + if (map->choose_local_fallback_tries > 0 && + flocal >= (in->size>>1) && + flocal > map->choose_local_fallback_tries) item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); @@ -422,13 +422,14 @@ reject: ftotal++; flocal++; - if (collide && flocal < 3) + if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; - else if (flocal <= in->size + orig_tries) + else if (map->choose_local_fallback_tries > 0 && + flocal <= in->size + map->choose_local_fallback_tries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal < 20) + else if (ftotal <= map->choose_total_tries) /* then retry descent */ retry_descent = 1; else diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 9600674..3124b71 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -135,6 +135,21 @@ bad: return -EINVAL; } +static int skip_name_map(void **p, void *end) +{ + int len; + ceph_decode_32_safe(p, end, len ,bad); + while (len--) { + int strlen; + *p += sizeof(u32); + ceph_decode_32_safe(p, end, strlen, bad); + *p += strlen; +} + return 0; +bad: + return -EINVAL; +} + static struct crush_map *crush_decode(void *pbyval, void *end) { struct crush_map *c; @@ -143,6 +158,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) void **p = &pbyval; void *start = pbyval; u32 magic; + u32 num_name_maps; dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); @@ -150,6 +166,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) if (c == NULL) return ERR_PTR(-ENOMEM); + /* set tunables to default values */ + c->choose_local_tries = 2; + c->choose_local_fallback_tries = 5; + c->choose_total_tries = 19; + ceph_decode_need(p, end, 4*sizeof(u32), bad); magic = ceph_decode_32(p); if (magic != CRUSH_MAGIC) { @@ -297,7 +318,25 @@ static struct crush_map *crush_decode(void *pbyval, void *end) } /* ignore trailing name maps. */ + for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) { + err = skip_name_map(p, end); + if (err < 0) + goto done; + } + + /* tunables */ + ceph_decode_need(p, end, 3*sizeof(u32), done); + c->choose_local_tries = ceph_decode_32(p); + c->choose_local_fallback_tries = ceph_decode_32(p); + c->choose_total_tries = ceph_decode_32(p); + dout("crush decode tunable choose_local_tries = %d", + c->choose_local_tries); + dout("crush decode tunable choose_local_fallback_tries = %d", + c->choose_local_fallback_tries); + dout("crush decode tunable choose_total_tries = %d", + c->choose_total_tries); +done: dout("crush_decode success\n"); return c; -- cgit v1.1 From 3a140a0d5c4b9e35373b016e41dfc85f1e526bdb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:24:21 -0700 Subject: libceph: report socket read/write error message We need to set error_msg to something useful before calling ceph_fault(); do so here for try_{read,write}(). This is more informative than libceph: osd0 192.168.106.220:6801 (null) Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 63e1252..6e2f678 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2287,14 +2287,18 @@ restart: ret = try_read(con); if (ret == -EAGAIN) goto restart; - if (ret < 0) + if (ret < 0) { + con->error_msg = "socket error on read"; goto fault; + } ret = try_write(con); if (ret == -EAGAIN) goto restart; - if (ret < 0) + if (ret < 0) { + con->error_msg = "socket error on write"; goto fault; + } done: mutex_unlock(&con->mutex); -- cgit v1.1 From 8c50c817566dfa4581f82373aac39f3e608a7dc8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:24:37 -0700 Subject: libceph: fix mutex coverage for ceph_con_close Hold the mutex while twiddling all of the state bits to avoid possible races. While we're here, make not of why we cannot close the socket directly. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 6e2f678..e65b15d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -503,6 +503,7 @@ static void reset_connection(struct ceph_connection *con) */ void ceph_con_close(struct ceph_connection *con) { + mutex_lock(&con->mutex); dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); @@ -515,11 +516,16 @@ void ceph_con_close(struct ceph_connection *con) clear_bit(KEEPALIVE_PENDING, &con->flags); clear_bit(WRITE_PENDING, &con->flags); - mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; cancel_delayed_work(&con->work); mutex_unlock(&con->mutex); + + /* + * We cannot close the socket directly from here because the + * work threads use it without holding the mutex. Instead, let + * con_work() do it. + */ queue_con(con); } EXPORT_SYMBOL(ceph_con_close); -- cgit v1.1 From 6194ea895e447fdf4adfd23f67873a32bf4f15ae Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:19:28 -0700 Subject: libceph: resubmit linger ops when pg mapping changes The linger op registration (i.e., watch) modifies the object state. As such, the OSD will reply with success if it has already applied without doing the associated side-effects (setting up the watch session state). If we lose the ACK and resubmit, we will see success but the watch will not be correctly registered and we won't get notifies. To fix this, always resubmit the linger op with a new tid. We accomplish this by re-registering as a linger (i.e., 'registered') if we are not yet registered. Then the second loop will treat this just like a normal case of re-registering. This mirrors a similar fix on the userland ceph.git, commit 5dd68b95, and ceph bug #2796. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/osd_client.c | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 07920ca..c605705 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -891,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc, { dout("__register_linger_request %p\n", req); list_add_tail(&req->r_linger_item, &osdc->req_linger); - list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); + if (req->r_osd) + list_add_tail(&req->r_linger_osd, + &req->r_osd->o_linger_requests); } static void __unregister_linger_request(struct ceph_osd_client *osdc, @@ -1305,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { + for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); + p = rb_next(p); err = __map_request(osdc, req, force_resend); if (err < 0) continue; /* error */ @@ -1314,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("%p tid %llu maps to no osd\n", req, req->r_tid); needmap++; /* request a newer map */ } else if (err > 0) { - dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - if (!req->r_linger) + if (!req->r_linger) { + dout("%p tid %llu requeued on osd%d\n", req, + req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); req->r_flags |= CEPH_OSD_FLAG_RETRY; + } + } + if (req->r_linger && list_empty(&req->r_linger_item)) { + /* + * register as a linger so that we will + * re-submit below and get a new tid + */ + dout("%p tid %llu restart on osd%d\n", + req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + __register_linger_request(osdc, req); + __unregister_request(osdc, req); } } -- cgit v1.1 From a4107026976f06c9a6ce8cc84a763564ee39d901 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:20:25 -0700 Subject: libceph: (re)initialize bio_iter on start of message receive Previously, we were opportunistically initializing the bio_iter if it appeared to be uninitialized in the middle of the read path. The problem is that a sequence like: - start reading message - initialize bio_iter - read half a message - messenger fault, reconnect - restart reading message - ** bio_iter now non-NULL, not reinitialized ** - read past end of bio, crash Instead, initialize the bio_iter unconditionally when we allocate/claim the message for read. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e65b15d..f1bd3bb 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1872,6 +1872,11 @@ static int read_partial_message(struct ceph_connection *con) else con->in_msg_pos.page_pos = 0; con->in_msg_pos.data_pos = 0; + +#ifdef CONFIG_BLOCK + if (m->bio) + init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); +#endif } /* front */ @@ -1888,10 +1893,6 @@ static int read_partial_message(struct ceph_connection *con) if (ret <= 0) return ret; } -#ifdef CONFIG_BLOCK - if (m->bio && !m->bio_iter) - init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); -#endif /* (page) data */ while (con->in_msg_pos.data_pos < data_len) { @@ -1902,7 +1903,7 @@ static int read_partial_message(struct ceph_connection *con) return ret; #ifdef CONFIG_BLOCK } else if (m->bio) { - + BUG_ON(!m->bio_iter); ret = read_partial_message_bio(con, &m->bio_iter, &m->bio_seg, data_len, do_datacrc); -- cgit v1.1 From 5469155f2bc83bb2c88b0a0370c3d54d87eed06e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:21:40 -0700 Subject: libceph: protect ceph_con_open() with mutex Take the con mutex while we are initiating a ceph open. This is necessary because the may have previously been in use and then closed, which could result in a racing workqueue running con_work(). Signed-off-by: Sage Weil Reviewed-by: Yehuda Sadeh Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f1bd3bb..a477998 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -537,6 +537,7 @@ void ceph_con_open(struct ceph_connection *con, __u8 entity_type, __u64 entity_num, struct ceph_entity_addr *addr) { + mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); @@ -546,6 +547,7 @@ void ceph_con_open(struct ceph_connection *con, memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ + mutex_unlock(&con->mutex); queue_con(con); } EXPORT_SYMBOL(ceph_con_open); -- cgit v1.1 From 85effe183dd45854d1ad1a370b88cddb403c4c91 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:22:05 -0700 Subject: libceph: reset connection retry on successfully negotiation We exponentially back off when we encounter connection errors. If several errors accumulate, we will eventually wait ages before even trying to reconnect. Fix this by resetting the backoff counter after a successful negotiation/ connection with the remote node. Fixes ceph issue #2802. Signed-off-by: Sage Weil Reviewed-by: Yehuda Sadeh Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a477998..07204f1 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1629,6 +1629,8 @@ static int process_connect(struct ceph_connection *con) if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) set_bit(LOSSYTX, &con->flags); + con->delay = 0; /* reset backoff memory */ + prepare_read_tag(con); break; -- cgit v1.1 From 048a9d2d069e3d63c9169de82649be00de65a8f6 Mon Sep 17 00:00:00 2001 From: Jiaju Zhang Date: Fri, 20 Jul 2012 08:18:36 -0500 Subject: libceph: trivial fix for the incorrect debug output This is a trivial fix for the debug output, as it is inconsistent with the function name so may confuse people when debugging. [elder@inktank.com: switched to use __func__] Signed-off-by: Jiaju Zhang Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index c605705..ad427e6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) static void remove_all_osds(struct ceph_osd_client *osdc) { - dout("__remove_old_osds %p\n", osdc); + dout("%s %p\n", __func__, osdc); mutex_lock(&osdc->request_mutex); while (!RB_EMPTY_ROOT(&osdc->osds)) { struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), -- cgit v1.1 From 3b5ede07b55b52c3be27749d183d87257d032065 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:22:53 -0700 Subject: libceph: fix fault locking; close socket on lossy fault If we fault on a lossy connection, we should still close the socket immediately, and do so under the con mutex. We should also take the con mutex before printing out the state bits in the debug output. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 07204f1..9aaf539 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2330,22 +2330,23 @@ fault: */ static void ceph_fault(struct ceph_connection *con) { + mutex_lock(&con->mutex); + pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(LOSSYTX, &con->flags)) { - dout("fault on LOSSYTX channel\n"); - goto out; - } - - mutex_lock(&con->mutex); if (test_bit(CLOSED, &con->state)) goto out_unlock; con_close_socket(con); + if (test_bit(LOSSYTX, &con->flags)) { + dout("fault on LOSSYTX channel\n"); + goto out_unlock; + } + if (con->in_msg) { BUG_ON(con->in_msg->con != con); con->in_msg->con = NULL; @@ -2392,7 +2393,6 @@ static void ceph_fault(struct ceph_connection *con) out_unlock: mutex_unlock(&con->mutex); -out: /* * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones. -- cgit v1.1 From 00650931e52e97fe64096bec167f5a6780dfd94a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:33:04 -0700 Subject: libceph: move msgr clear_standby under con mutex protection Avoid dropping and retaking con->mutex in the ceph_con_send() case by leaving locking up to the caller. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 9aaf539..1a3cb4a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2441,12 +2441,10 @@ static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ if (test_and_clear_bit(STANDBY, &con->state)) { - mutex_lock(&con->mutex); dout("clear_standby %p and ++connect_seq\n", con); con->connect_seq++; WARN_ON(test_bit(WRITE_PENDING, &con->flags)); WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); - mutex_unlock(&con->mutex); } } @@ -2483,11 +2481,12 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) le32_to_cpu(msg->hdr.front_len), le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len)); + + clear_standby(con); mutex_unlock(&con->mutex); /* if there wasn't anything waiting to send before, queue * new work */ - clear_standby(con); if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } @@ -2574,7 +2573,9 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg) void ceph_con_keepalive(struct ceph_connection *con) { dout("con_keepalive %p\n", con); + mutex_lock(&con->mutex); clear_standby(con); + mutex_unlock(&con->mutex); if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); -- cgit v1.1 From a59b55a602b6c741052d79c1e3643f8440cddd27 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:34:04 -0700 Subject: libceph: move ceph_con_send() closed check under the con mutex Take the con mutex before checking whether the connection is closed to avoid racing with someone else closing it. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1a3cb4a..20e60a8 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2453,22 +2453,20 @@ static void clear_standby(struct ceph_connection *con) */ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) { - if (test_bit(CLOSED, &con->state)) { - dout("con_send %p closed, dropping %p\n", con, msg); - ceph_msg_put(msg); - return; - } - /* set src+dst */ msg->hdr.src = con->msgr->inst.name; - BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); - msg->needs_out_seq = true; - /* queue */ mutex_lock(&con->mutex); + if (test_bit(CLOSED, &con->state)) { + dout("con_send %p closed, dropping %p\n", con, msg); + ceph_msg_put(msg); + mutex_unlock(&con->mutex); + return; + } + BUG_ON(msg->con != NULL); msg->con = con->ops->get(con); BUG_ON(msg->con == NULL); -- cgit v1.1 From 2e8cb10063820af7ed7638e3fd9013eee21266e7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:40:04 -0700 Subject: libceph: drop gratuitous socket close calls in con_work If the state is CLOSED or OPENING, we shouldn't have a socket. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 20e60a8..32ab7cd 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2284,15 +2284,15 @@ restart: dout("con_work %p STANDBY\n", con); goto done; } - if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ - dout("con_work CLOSED\n"); - con_close_socket(con); + if (test_bit(CLOSED, &con->state)) { + dout("con_work %p CLOSED\n", con); + BUG_ON(con->sock); goto done; } if (test_and_clear_bit(OPENING, &con->state)) { /* reopen w/ new peer */ dout("con_work OPENING\n"); - con_close_socket(con); + BUG_ON(con->sock); } ret = try_read(con); -- cgit v1.1 From ee76e0736db8455e3b11827d6899bd2a4e1d0584 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 16:45:49 -0700 Subject: libceph: close socket directly from ceph_con_close() It is simpler to do this immediately, since we already hold the con mutex. It also avoids the need to deal with a not-quite-CLOSED socket in con_work. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 32ab7cd..46ce113 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -519,14 +519,8 @@ void ceph_con_close(struct ceph_connection *con) reset_connection(con); con->peer_global_seq = 0; cancel_delayed_work(&con->work); + con_close_socket(con); mutex_unlock(&con->mutex); - - /* - * We cannot close the socket directly from here because the - * work threads use it without holding the mutex. Instead, let - * con_work() do it. - */ - queue_con(con); } EXPORT_SYMBOL(ceph_con_close); -- cgit v1.1 From d7353dd5aaf22ed611fbcd0d4a4a12fb30659290 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:19:43 -0700 Subject: libceph: drop unnecessary CLOSED check in socket state change callback If we are CLOSED, the socket is closed and we won't get these. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 46ce113..e7320cd 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -296,9 +296,6 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s %p state = %lu sk_state = %u\n", __func__, con, con->state, sk->sk_state); - if (test_bit(CLOSED, &con->state)) - return; - switch (sk->sk_state) { case TCP_CLOSE: dout("%s TCP_CLOSE\n", __func__); -- cgit v1.1 From 8dacc7da69a491c515851e68de6036f21b5663ce Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:24:40 -0700 Subject: libceph: replace connection state bits with states Use a simple set of 6 enumerated values for the socket states (CON_STATE_*) and use those instead of the state bits. All of the con->state checks are now under the protection of the con mutex, so this is safe. It also simplifies many of the state checks because we can check for anything other than the expected state instead of various bits for races we can think of. This appears to hold up well to stress testing both with and without socket failure injection on the server side. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 130 +++++++++++++++++++++++++++------------------------ 1 file changed, 68 insertions(+), 62 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e7320cd..563e46a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -77,6 +77,17 @@ #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ +/* + * connection states + */ +#define CON_STATE_CLOSED 1 /* -> PREOPEN */ +#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ +#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ +#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ +#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ +#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ + + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con) mutex_lock(&con->mutex); dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); - clear_bit(NEGOTIATING, &con->state); - clear_bit(CONNECTING, &con->state); - clear_bit(CONNECTED, &con->state); - clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(KEEPALIVE_PENDING, &con->flags); @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con, { mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); - set_bit(OPENING, &con->state); - WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + + BUG_ON(con->state != CON_STATE_CLOSED); + con->state = CON_STATE_PREOPEN; con->peer_name.type = (__u8) entity_type; con->peer_name.num = cpu_to_le64(entity_num); @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; } EXPORT_SYMBOL(ceph_con_init); @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (!con->ops->get_authorizer) { con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; con->out_connect.authorizer_len = 0; - return NULL; } /* Can't hold the mutex while getting authorizer */ - mutex_unlock(&con->mutex); - auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); - mutex_lock(&con->mutex); if (IS_ERR(auth)) return auth; - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) + if (con->state != CON_STATE_NEGOTIATING) return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - - return auth; } @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con) static void fail_protocol(struct ceph_connection *con) { reset_connection(con); - set_bit(CLOSED, &con->state); /* in case there's queued work */ + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_CLOSED; } static int process_connect(struct ceph_connection *con) @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con) if (con->ops->peer_reset) con->ops->peer_reset(con); mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) + if (con->state != CON_STATE_NEGOTIATING) return -EAGAIN; break; @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con) fail_protocol(con); return -1; } - clear_bit(NEGOTIATING, &con->state); - set_bit(CONNECTED, &con->state); + + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_OPEN; + con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; con->peer_features = server_feat; @@ -1994,8 +1998,9 @@ more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); /* open the socket first? */ - if (con->sock == NULL) { - set_bit(CONNECTING, &con->state); + if (con->state == CON_STATE_PREOPEN) { + BUG_ON(con->sock); + con->state = CON_STATE_CONNECTING; con_out_kvec_reset(con); prepare_write_banner(con); @@ -2046,8 +2051,7 @@ more_kvec: } do_next: - if (!test_bit(CONNECTING, &con->state) && - !test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_OPEN) { /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con) { int ret = -1; - if (!con->sock) - return 0; - - if (test_bit(STANDBY, &con->state)) +more: + dout("try_read start on %p state %lu\n", con, con->state); + if (con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN) return 0; - dout("try_read start on %p\n", con); + BUG_ON(!con->sock); -more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, con->in_base_pos); - /* - * process_connect and process_message drop and re-take - * con->mutex. make sure we handle a racing close or reopen. - */ - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) { - ret = -EAGAIN; - goto out; - } - - if (test_bit(CONNECTING, &con->state)) { + if (con->state == CON_STATE_CONNECTING) { dout("try_read connecting\n"); ret = read_partial_banner(con); if (ret <= 0) @@ -2112,8 +2106,8 @@ more: if (ret < 0) goto out; - clear_bit(CONNECTING, &con->state); - set_bit(NEGOTIATING, &con->state); + BUG_ON(con->state != CON_STATE_CONNECTING); + con->state = CON_STATE_NEGOTIATING; /* Banner is good, exchange connection info */ ret = prepare_write_connect(con); @@ -2125,7 +2119,7 @@ more: goto out; } - if (test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_NEGOTIATING) { dout("try_read negotiating\n"); ret = read_partial_connect(con); if (ret <= 0) @@ -2136,6 +2130,8 @@ more: goto more; } + BUG_ON(con->state != CON_STATE_OPEN); + if (con->in_base_pos < 0) { /* * skipping + discarding content. @@ -2169,8 +2165,8 @@ more: prepare_read_ack(con); break; case CEPH_MSGR_TAG_CLOSE: - clear_bit(CONNECTED, &con->state); - set_bit(CLOSED, &con->state); /* fixme */ + con_close_socket(con); + con->state = CON_STATE_CLOSED; goto out; default: goto bad_tag; @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTED, &con->state)) - con->error_msg = "socket closed"; - else if (test_and_clear_bit(NEGOTIATING, &con->state)) - con->error_msg = "negotiation failed"; - else if (test_and_clear_bit(CONNECTING, &con->state)) + switch (con->state) { + case CON_STATE_CONNECTING: con->error_msg = "connection failed"; - else + break; + case CON_STATE_NEGOTIATING: + con->error_msg = "negotiation failed"; + break; + case CON_STATE_OPEN: + con->error_msg = "socket closed"; + break; + default: + dout("unrecognized con state %d\n", (int)con->state); con->error_msg = "unrecognized con state"; + BUG(); + } goto fault; } @@ -2271,17 +2274,16 @@ restart: } } - if (test_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("con_work %p STANDBY\n", con); goto done; } - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_work %p CLOSED\n", con); BUG_ON(con->sock); goto done; } - if (test_and_clear_bit(OPENING, &con->state)) { - /* reopen w/ new peer */ + if (con->state == CON_STATE_PREOPEN) { dout("con_work OPENING\n"); BUG_ON(con->sock); } @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con) dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(CLOSED, &con->state)) - goto out_unlock; + BUG_ON(con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN); con_close_socket(con); if (test_bit(LOSSYTX, &con->flags)) { - dout("fault on LOSSYTX channel\n"); + dout("fault on LOSSYTX channel, marking CLOSED\n"); + con->state = CON_STATE_CLOSED; goto out_unlock; } @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con) !test_bit(KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); clear_bit(WRITE_PENDING, &con->flags); - set_bit(STANDBY, &con->state); + con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ + con->state = CON_STATE_PREOPEN; if (con->delay == 0) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ - if (test_and_clear_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("clear_standby %p and ++connect_seq\n", con); + con->state = CON_STATE_PREOPEN; con->connect_seq++; WARN_ON(test_bit(WRITE_PENDING, &con->flags)); WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_send %p closed, dropping %p\n", con, msg); ceph_msg_put(msg); mutex_unlock(&con->mutex); -- cgit v1.1 From 4a8616920860920abaa51193146fe36b38ef09aa Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:29:55 -0700 Subject: libceph: clean up con flags Rename flags with CON_FLAG prefix, move the definitions into the c file, and (better) document their meaning. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 62 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 26 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 563e46a..b872db5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -87,6 +87,15 @@ #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ +/* + * ceph_connection flag bits + */ +#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop + * messages on errors */ +#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ +#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ +#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ +#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; @@ -288,7 +297,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(WRITE_PENDING, &con->flags)) { + if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -313,7 +322,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - set_bit(SOCK_CLOSED, &con->flags); + set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); queue_con(con); break; case TCP_ESTABLISHED: @@ -449,12 +458,12 @@ static int con_close_socket(struct ceph_connection *con) con->sock = NULL; /* - * Forcibly clear the SOCK_CLOSE flag. It gets set + * Forcibly clear the SOCK_CLOSED flag. It gets set * independent of the connection mutex, and we could have * received a socket close event before we had the chance to * shut the socket down. */ - clear_bit(SOCK_CLOSED, &con->flags); + clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; } @@ -516,9 +525,9 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); con->state = CON_STATE_CLOSED; - clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ - clear_bit(KEEPALIVE_PENDING, &con->flags); - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ + clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); reset_connection(con); con->peer_global_seq = 0; @@ -770,7 +779,7 @@ static void prepare_write_message(struct ceph_connection *con) /* no, queue up footer too and be done */ prepare_write_message_footer(con); - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -791,7 +800,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -802,7 +811,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -845,7 +854,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } static int prepare_write_connect(struct ceph_connection *con) @@ -896,7 +905,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); return 0; } @@ -1622,7 +1631,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(LOSSYTX, &con->flags); + set_bit(CON_FLAG_LOSSYTX, &con->flags); con->delay = 0; /* reset backoff memory */ @@ -2061,14 +2070,15 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, + &con->flags)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2241,7 +2251,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: - if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { switch (con->state) { case CON_STATE_CONNECTING: con->error_msg = "connection failed"; @@ -2260,7 +2270,7 @@ restart: goto fault; } - if (test_and_clear_bit(BACKOFF, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay))) { @@ -2336,7 +2346,7 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); - if (test_bit(LOSSYTX, &con->flags)) { + if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; goto out_unlock; @@ -2356,9 +2366,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(KEEPALIVE_PENDING, &con->flags)) { + !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ @@ -2383,7 +2393,7 @@ static void ceph_fault(struct ceph_connection *con) * that when con_work restarts we schedule the * delay then. */ - set_bit(BACKOFF, &con->flags); + set_bit(CON_FLAG_BACKOFF, &con->flags); } } @@ -2440,8 +2450,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CON_STATE_PREOPEN; con->connect_seq++; - WARN_ON(test_bit(WRITE_PENDING, &con->flags)); - WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); + WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); + WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); } } @@ -2482,7 +2492,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) + if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2571,8 +2581,8 @@ void ceph_con_keepalive(struct ceph_connection *con) mutex_lock(&con->mutex); clear_standby(con); mutex_unlock(&con->mutex); - if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && - test_and_set_bit(WRITE_PENDING, &con->flags) == 0) + if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && + test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); -- cgit v1.1 From 43c7427d100769451601b8a36988ac0528ce0124 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:30:40 -0700 Subject: libceph: clear all flags on con_close Signed-off-by: Sage Weil --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b872db5..fa16f2c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -528,6 +528,8 @@ void ceph_con_close(struct ceph_connection *con) clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); + clear_bit(CON_FLAG_BACKOFF, &con->flags); reset_connection(con); con->peer_global_seq = 0; -- cgit v1.1 From 756a16a5d53c072cd1f937b261641c1a3b53fdd7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:26:13 -0700 Subject: libceph: be less chatty about stray replies There are many (normal) conditions that can lead to us getting unexpected replies, include cluster topology changes, osd failures, and timeouts. There's no need to spam the console about it. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ad427e6..42119c0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2037,8 +2037,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; - pr_info("get_reply unknown tid %llu from osd%d\n", tid, - osd->o_osd); + dout("get_reply unknown tid %llu from osd%d\n", tid, + osd->o_osd); goto out; } -- cgit v1.1 From 8007b8d626b49c34fb146ec16dc639d8b10c862f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:16:16 -0700 Subject: libceph: fix handling of immediate socket connect failure If the connect() call immediately fails such that sock == NULL, we still need con_close_socket() to reset our socket state to CLOSED. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fa16f2c..a6a0c7a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -224,6 +224,8 @@ static void con_sock_state_init(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSED); } static void con_sock_state_connecting(struct ceph_connection *con) @@ -233,6 +235,8 @@ static void con_sock_state_connecting(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CONNECTING); } static void con_sock_state_connected(struct ceph_connection *con) @@ -242,6 +246,8 @@ static void con_sock_state_connected(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CONNECTED); } static void con_sock_state_closing(struct ceph_connection *con) @@ -253,6 +259,8 @@ static void con_sock_state_closing(struct ceph_connection *con) old_state != CON_SOCK_STATE_CONNECTED && old_state != CON_SOCK_STATE_CLOSING)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSING); } static void con_sock_state_closed(struct ceph_connection *con) @@ -262,8 +270,11 @@ static void con_sock_state_closed(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && old_state != CON_SOCK_STATE_CLOSING && - old_state != CON_SOCK_STATE_CONNECTING)) + old_state != CON_SOCK_STATE_CONNECTING && + old_state != CON_SOCK_STATE_CLOSED)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSED); } /* @@ -448,14 +459,14 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, */ static int con_close_socket(struct ceph_connection *con) { - int rc; + int rc = 0; dout("con_close_socket on %p sock %p\n", con, con->sock); - if (!con->sock) - return 0; - rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); - sock_release(con->sock); - con->sock = NULL; + if (con->sock) { + rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); + sock_release(con->sock); + con->sock = NULL; + } /* * Forcibly clear the SOCK_CLOSED flag. It gets set @@ -464,6 +475,7 @@ static int con_close_socket(struct ceph_connection *con) * shut the socket down. */ clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_sock_state_closed(con); return rc; } -- cgit v1.1 From 4f471e4a9c7db0256834e1b376ea50c82e345c3c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:16:40 -0700 Subject: libceph: revoke mon_client messages on session restart Revoke all mon_client messages when we shut down the old connection. This is mostly moot since we are re-using the same ceph_connection, but it is cleaner. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/mon_client.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net') diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index bfd21a8..105d533 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -118,6 +118,9 @@ static void __close_session(struct ceph_mon_client *monc) { dout("__close_session closing mon%d\n", monc->cur_mon); ceph_msg_revoke(monc->m_auth); + ceph_msg_revoke_incoming(monc->m_auth_reply); + ceph_msg_revoke(monc->m_subscribe); + ceph_msg_revoke_incoming(monc->m_subscribe_ack); ceph_con_close(&monc->con); monc->cur_mon = -1; monc->pending_auth = 0; @@ -685,6 +688,7 @@ static void __resend_generic_request(struct ceph_mon_client *monc) for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_mon_generic_request, node); ceph_msg_revoke(req->request); + ceph_msg_revoke_incoming(req->reply); ceph_con_send(&monc->con, ceph_msg_get(req->request)); } } -- cgit v1.1 From 7b862e07b1a4d5c963d19027f10ea78085f27f9b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:16:56 -0700 Subject: libceph: verify state after retaking con lock after dispatch We drop the con mutex when delivering a message. When we retake the lock, we need to verify we are still in the OPEN state before preparing to read the next tag, or else we risk stepping on a connection that has been closed. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a6a0c7a..feb5a2a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2003,7 +2003,6 @@ static void process_message(struct ceph_connection *con) con->ops->dispatch(con, msg); mutex_lock(&con->mutex); - prepare_read_tag(con); } @@ -2213,6 +2212,8 @@ more: if (con->in_tag == CEPH_MSGR_TAG_READY) goto more; process_message(con); + if (con->state == CON_STATE_OPEN) + prepare_read_tag(con); goto more; } if (con->in_tag == CEPH_MSGR_TAG_ACK) { -- cgit v1.1 From 8636ea672f0c5ab7478c42c5b6705ebd1db7eb6a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:17:13 -0700 Subject: libceph: avoid dropping con mutex before fault The ceph_fault() function takes the con mutex, so we should avoid dropping it before calling it. This fixes a potential race with another thread calling ceph_con_close(), or _open(), or similar (we don't reverify con->state after retaking the lock). Add annotation so that lockdep realizes we will drop the mutex before returning. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index feb5a2a..c3b628c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2336,7 +2336,6 @@ done_unlocked: return; fault: - mutex_unlock(&con->mutex); ceph_fault(con); /* error/fault path */ goto done_unlocked; } @@ -2347,9 +2346,8 @@ fault: * exponential backoff */ static void ceph_fault(struct ceph_connection *con) + __releases(con->mutex) { - mutex_lock(&con->mutex); - pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", -- cgit v1.1 From 4740a623d20c51d167da7f752b63e2b8714b2543 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:19:30 -0700 Subject: libceph: change ceph_con_in_msg_alloc convention to be less weird This function's calling convention is very limiting. In particular, we can't return any error other than ENOMEM (and only implicitly), which is a problem (see next patch). Instead, return an normal 0 or error code, and make the skip a pointer output parameter. Drop the useless in_hdr argument (we have the con pointer). Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 56 +++++++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 25 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index c3b628c..13b549b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1733,9 +1733,7 @@ static int read_partial_message_section(struct ceph_connection *con, return 1; } -static bool ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr); - +static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); static int read_partial_message_pages(struct ceph_connection *con, struct page **pages, @@ -1864,9 +1862,14 @@ static int read_partial_message(struct ceph_connection *con) /* allocate message? */ if (!con->in_msg) { + int skip = 0; + dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - if (ceph_con_in_msg_alloc(con, &con->in_hdr)) { + ret = ceph_con_in_msg_alloc(con, &skip); + if (ret < 0) + return ret; + if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); BUG_ON(con->in_msg); @@ -1876,12 +1879,8 @@ static int read_partial_message(struct ceph_connection *con) con->in_seq++; return 0; } - if (!con->in_msg) { - con->error_msg = - "error allocating memory for incoming message"; - return -ENOMEM; - } + BUG_ON(!con->in_msg); BUG_ON(con->in_msg->con != con); m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ @@ -2715,43 +2714,50 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) * connection, and save the result in con->in_msg. Uses the * connection's private alloc_msg op if available. * - * Returns true if the message should be skipped, false otherwise. - * If true is returned (skip message), con->in_msg will be NULL. - * If false is returned, con->in_msg will contain a pointer to the - * newly-allocated message, or NULL in case of memory exhaustion. + * Returns 0 on success, or a negative error code. + * + * On success, if we set *skip = 1: + * - the next message should be skipped and ignored. + * - con->in_msg == NULL + * or if we set *skip = 0: + * - con->in_msg is non-null. + * On error (ENOMEM, EAGAIN, ...), + * - con->in_msg == NULL */ -static bool ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr) +static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) { + struct ceph_msg_header *hdr = &con->in_hdr; int type = le16_to_cpu(hdr->type); int front_len = le32_to_cpu(hdr->front_len); int middle_len = le32_to_cpu(hdr->middle_len); - int ret; + int ret = 0; BUG_ON(con->in_msg != NULL); if (con->ops->alloc_msg) { - int skip = 0; - mutex_unlock(&con->mutex); - con->in_msg = con->ops->alloc_msg(con, hdr, &skip); + con->in_msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); if (con->in_msg) { con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); } - if (skip) + if (*skip) { con->in_msg = NULL; - - if (!con->in_msg) - return skip != 0; + return 0; + } + if (!con->in_msg) { + con->error_msg = + "error allocating memory for incoming message"; + return -ENOMEM; + } } if (!con->in_msg) { con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); if (!con->in_msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); - return false; + return -ENOMEM; } con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); @@ -2767,7 +2773,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, } } - return false; + return ret; } -- cgit v1.1 From 6139919133377652992a5fe134e22abce3e9c25e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:19:45 -0700 Subject: libceph: recheck con state after allocating incoming message We drop the lock when calling the ->alloc_msg() con op, which means we need to (a) not clobber con->in_msg without the mutex held, and (b) we need to verify that we are still in the OPEN state when we retake it to avoid causing any mayhem. If the state does change, -EAGAIN will get us back to con_work() and loop. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 13b549b..b6655b1 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2735,9 +2735,16 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) BUG_ON(con->in_msg != NULL); if (con->ops->alloc_msg) { + struct ceph_msg *msg; + mutex_unlock(&con->mutex); - con->in_msg = con->ops->alloc_msg(con, hdr, skip); + msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); + if (con->state != CON_STATE_OPEN) { + ceph_msg_put(msg); + return -EAGAIN; + } + con->in_msg = msg; if (con->in_msg) { con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); -- cgit v1.1