diff options
-rw-r--r-- | include/linux/ceph/messenger.h | 12 | ||||
-rw-r--r-- | net/ceph/messenger.c | 130 |
2 files changed, 68 insertions, 74 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index a310d7f..d9c2b8f 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -117,18 +117,6 @@ struct ceph_msg_pos { #define BACKOFF 15 /* - * ceph_connection states - */ -#define CONNECTING 1 -#define NEGOTIATING 2 -#define CONNECTED 5 -#define STANDBY 8 /* no outgoing messages, socket closed. we keep - * the ceph_connection around to maintain shared - * state with the peer. */ -#define CLOSED 10 /* we've closed the connection */ -#define OPENING 13 /* open connection w/ (possibly new) peer */ - -/* * A single connection with another host. * * We maintain a queue of outgoing messages, and some session state to diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e7320cd..563e46a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -77,6 +77,17 @@ #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ +/* + * connection states + */ +#define CON_STATE_CLOSED 1 /* -> PREOPEN */ +#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ +#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ +#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ +#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ +#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ + + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con) mutex_lock(&con->mutex); dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); - clear_bit(NEGOTIATING, &con->state); - clear_bit(CONNECTING, &con->state); - clear_bit(CONNECTED, &con->state); - clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(KEEPALIVE_PENDING, &con->flags); @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con, { mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); - set_bit(OPENING, &con->state); - WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + + BUG_ON(con->state != CON_STATE_CLOSED); + con->state = CON_STATE_PREOPEN; con->peer_name.type = (__u8) entity_type; con->peer_name.num = cpu_to_le64(entity_num); @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; } EXPORT_SYMBOL(ceph_con_init); @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (!con->ops->get_authorizer) { con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; con->out_connect.authorizer_len = 0; - return NULL; } /* Can't hold the mutex while getting authorizer */ - mutex_unlock(&con->mutex); - auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); - mutex_lock(&con->mutex); if (IS_ERR(auth)) return auth; - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) + if (con->state != CON_STATE_NEGOTIATING) return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - - return auth; } @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con) static void fail_protocol(struct ceph_connection *con) { reset_connection(con); - set_bit(CLOSED, &con->state); /* in case there's queued work */ + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_CLOSED; } static int process_connect(struct ceph_connection *con) @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con) if (con->ops->peer_reset) con->ops->peer_reset(con); mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) + if (con->state != CON_STATE_NEGOTIATING) return -EAGAIN; break; @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con) fail_protocol(con); return -1; } - clear_bit(NEGOTIATING, &con->state); - set_bit(CONNECTED, &con->state); + + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_OPEN; + con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; con->peer_features = server_feat; @@ -1994,8 +1998,9 @@ more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); /* open the socket first? */ - if (con->sock == NULL) { - set_bit(CONNECTING, &con->state); + if (con->state == CON_STATE_PREOPEN) { + BUG_ON(con->sock); + con->state = CON_STATE_CONNECTING; con_out_kvec_reset(con); prepare_write_banner(con); @@ -2046,8 +2051,7 @@ more_kvec: } do_next: - if (!test_bit(CONNECTING, &con->state) && - !test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_OPEN) { /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con) { int ret = -1; - if (!con->sock) - return 0; - - if (test_bit(STANDBY, &con->state)) +more: + dout("try_read start on %p state %lu\n", con, con->state); + if (con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN) return 0; - dout("try_read start on %p\n", con); + BUG_ON(!con->sock); -more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, con->in_base_pos); - /* - * process_connect and process_message drop and re-take - * con->mutex. make sure we handle a racing close or reopen. - */ - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) { - ret = -EAGAIN; - goto out; - } - - if (test_bit(CONNECTING, &con->state)) { + if (con->state == CON_STATE_CONNECTING) { dout("try_read connecting\n"); ret = read_partial_banner(con); if (ret <= 0) @@ -2112,8 +2106,8 @@ more: if (ret < 0) goto out; - clear_bit(CONNECTING, &con->state); - set_bit(NEGOTIATING, &con->state); + BUG_ON(con->state != CON_STATE_CONNECTING); + con->state = CON_STATE_NEGOTIATING; /* Banner is good, exchange connection info */ ret = prepare_write_connect(con); @@ -2125,7 +2119,7 @@ more: goto out; } - if (test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_NEGOTIATING) { dout("try_read negotiating\n"); ret = read_partial_connect(con); if (ret <= 0) @@ -2136,6 +2130,8 @@ more: goto more; } + BUG_ON(con->state != CON_STATE_OPEN); + if (con->in_base_pos < 0) { /* * skipping + discarding content. @@ -2169,8 +2165,8 @@ more: prepare_read_ack(con); break; case CEPH_MSGR_TAG_CLOSE: - clear_bit(CONNECTED, &con->state); - set_bit(CLOSED, &con->state); /* fixme */ + con_close_socket(con); + con->state = CON_STATE_CLOSED; goto out; default: goto bad_tag; @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTED, &con->state)) - con->error_msg = "socket closed"; - else if (test_and_clear_bit(NEGOTIATING, &con->state)) - con->error_msg = "negotiation failed"; - else if (test_and_clear_bit(CONNECTING, &con->state)) + switch (con->state) { + case CON_STATE_CONNECTING: con->error_msg = "connection failed"; - else + break; + case CON_STATE_NEGOTIATING: + con->error_msg = "negotiation failed"; + break; + case CON_STATE_OPEN: + con->error_msg = "socket closed"; + break; + default: + dout("unrecognized con state %d\n", (int)con->state); con->error_msg = "unrecognized con state"; + BUG(); + } goto fault; } @@ -2271,17 +2274,16 @@ restart: } } - if (test_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("con_work %p STANDBY\n", con); goto done; } - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_work %p CLOSED\n", con); BUG_ON(con->sock); goto done; } - if (test_and_clear_bit(OPENING, &con->state)) { - /* reopen w/ new peer */ + if (con->state == CON_STATE_PREOPEN) { dout("con_work OPENING\n"); BUG_ON(con->sock); } @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con) dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(CLOSED, &con->state)) - goto out_unlock; + BUG_ON(con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN); con_close_socket(con); if (test_bit(LOSSYTX, &con->flags)) { - dout("fault on LOSSYTX channel\n"); + dout("fault on LOSSYTX channel, marking CLOSED\n"); + con->state = CON_STATE_CLOSED; goto out_unlock; } @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con) !test_bit(KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); clear_bit(WRITE_PENDING, &con->flags); - set_bit(STANDBY, &con->state); + con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ + con->state = CON_STATE_PREOPEN; if (con->delay == 0) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ - if (test_and_clear_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("clear_standby %p and ++connect_seq\n", con); + con->state = CON_STATE_PREOPEN; con->connect_seq++; WARN_ON(test_bit(WRITE_PENDING, &con->flags)); WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_send %p closed, dropping %p\n", con, msg); ceph_msg_put(msg); mutex_unlock(&con->mutex); |