diff options
Diffstat (limited to 'sbin/ggate/ggatec/ggatec.c')
-rw-r--r-- | sbin/ggate/ggatec/ggatec.c | 516 |
1 files changed, 323 insertions, 193 deletions
diff --git a/sbin/ggate/ggatec/ggatec.c b/sbin/ggate/ggatec/ggatec.c index 84aa60e..70f667d 100644 --- a/sbin/ggate/ggatec/ggatec.c +++ b/sbin/ggate/ggatec/ggatec.c @@ -34,8 +34,12 @@ #include <string.h> #include <ctype.h> #include <libgen.h> +#include <pthread.h> +#include <signal.h> #include <err.h> #include <errno.h> +#include <assert.h> + #include <sys/param.h> #include <sys/ioctl.h> #include <sys/socket.h> @@ -51,21 +55,22 @@ #include "ggate.h" -enum { UNSET, ATTACH, CREATE, DESTROY, LIST } action = UNSET; +enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; static const char *path = NULL; static const char *host = NULL; static int unit = -1; static unsigned flags = 0; static int force = 0; -static int nagle = 1; static unsigned queue_size = G_GATE_QUEUE_SIZE; static unsigned port = G_GATE_PORT; static off_t mediasize; static unsigned sectorsize = 0; static unsigned timeout = G_GATE_TIMEOUT; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +static int sendfd, recvfd; +static uint32_t token; +static pthread_t sendtd, recvtd; +static int reconnect; static void usage(void) @@ -74,129 +79,30 @@ usage(void) fprintf(stderr, "usage: %s create [-nv] [-o <ro|wo|rw>] [-p port] " "[-q queue_size] [-R rcvbuf] [-S sndbuf] [-s sectorsize] " "[-t timeout] [-u unit] <host> <path>\n", getprogname()); - fprintf(stderr, " %s attach [-nv] [-o <ro|wo|rw>] [-p port] " + fprintf(stderr, " %s rescue [-nv] [-o <ro|wo|rw>] [-p port] " "[-R rcvbuf] [-S sndbuf] <-u unit> <host> <path>\n", getprogname()); fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); exit(EXIT_FAILURE); } -static int -handshake(void) -{ - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct sockaddr_in serv; - struct timeval tv; - size_t bsize; - int sfd; - - /* - * Do the network stuff. - */ - bzero(&serv, sizeof(serv)); - serv.sin_family = AF_INET; - serv.sin_addr.s_addr = g_gate_str2ip(host); - if (serv.sin_addr.s_addr == INADDR_NONE) { - g_gate_log(LOG_ERR, "Invalid IP/host name: %s.", host); - return (-1); - } - serv.sin_port = htons(port); - sfd = socket(AF_INET, SOCK_STREAM, 0); - if (sfd == -1) - g_gate_xlog("Can't open socket: %s.", strerror(errno)); - /* - * Some trivial network optimalization. - * This should be much more advanced. - */ - if (nagle) { - int on = 1; - - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - tv.tv_sec = timeout; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { - g_gate_log(LOG_ERR, "Can't connect to server: %s.", - strerror(errno)); - return (-1); - } - - g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); - - /* - * Creating and sending initial packet. - */ - if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= - sizeof(cinit.gc_path)) { - g_gate_xlog("Path name too long."); - } - cinit.gc_flags = flags; - g_gate_log(LOG_DEBUG, "Sending initial packet."); - g_gate_swap2n_cinit(&cinit); - if (send(sfd, &cinit, sizeof(cinit), 0) == -1) { - g_gate_log(LOG_ERR, "Error while sending initial packet: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_cinit(&cinit); - - /* - * Receiving initial packet from server. - */ - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - if (recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { - g_gate_log(LOG_ERR, "Error while receiving data: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_sinit(&sinit); - if (sinit.gs_error != 0) - g_gate_xlog("Error from server: %s.", strerror(sinit.gs_error)); - - mediasize = sinit.gs_mediasize; - if (sectorsize == 0) - sectorsize = sinit.gs_sectorsize; - return (sfd); -} - -static int -serve(int sfd) +static void * +send_thread(void *arg __unused) { struct g_gate_ctl_io ggio; - size_t bsize; - char *buf; - - bsize = G_GATE_BUFSIZE_START; - buf = malloc(bsize); - if (buf == NULL) { - if (action == CREATE) - g_gate_destroy(unit, 1); - g_gate_xlog("No enough memory"); - } + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + int error; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); ggio.gctl_version = G_GATE_VERSION; ggio.gctl_unit = unit; - bsize = sectorsize; - ggio.gctl_data = malloc(bsize); + ggio.gctl_data = buf; + for (;;) { - struct g_gate_hdr hdr; - int data, error; -once_again: - ggio.gctl_length = bsize; + ggio.gctl_length = sizeof(buf); ggio.gctl_error = 0; g_gate_ioctl(G_GATE_CMD_START, &ggio); error = ggio.gctl_error; @@ -204,11 +110,12 @@ once_again: case 0: break; case ECANCELED: + if (reconnect) + break; /* Exit gracefully. */ - free(ggio.gctl_data); g_gate_close_device(); - close(sfd); exit(EXIT_SUCCESS); +#if 0 case ENOMEM: /* Buffer too small. */ ggio.gctl_data = realloc(ggio.gctl_data, @@ -218,87 +125,232 @@ once_again: goto once_again; } /* FALLTHROUGH */ +#endif case ENXIO: default: g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, strerror(error)); } - hdr.gh_cmd = ggio.gctl_cmd; + if (reconnect) + break; + + switch (ggio.gctl_cmd) { + case BIO_READ: + hdr.gh_cmd = GGATE_CMD_READ; + break; + case BIO_WRITE: + hdr.gh_cmd = GGATE_CMD_WRITE; + break; + } + hdr.gh_seq = ggio.gctl_seq; hdr.gh_offset = ggio.gctl_offset; hdr.gh_length = ggio.gctl_length; hdr.gh_error = 0; g_gate_swap2n_hdr(&hdr); - data = send(sfd, &hdr, sizeof(hdr), 0); + + data = g_gate_send(sendfd, &hdr, sizeof(hdr), MSG_NOSIGNAL); g_gate_log(LOG_DEBUG, "Sent hdr packet."); g_gate_swap2h_hdr(&hdr); + if (reconnect) + break; if (data != sizeof(hdr)) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 1."); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_DELETE || ggio.gctl_cmd == BIO_WRITE) { - data = send(sfd, ggio.gctl_data, ggio.gctl_length, 0); - g_gate_log(LOG_DEBUG, "Sent data packet."); + + if (hdr.gh_cmd == GGATE_CMD_WRITE) { + data = g_gate_send(sendfd, ggio.gctl_data, + ggio.gctl_length, MSG_NOSIGNAL); + if (reconnect) + break; if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 2 (%zd != %zd).", data, (ssize_t)ggio.gctl_length); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - g_gate_log(LOG_DEBUG, "Sent %d bytes (offset=%llu, " + g_gate_log(LOG_DEBUG, "Sent %zd bytes (offset=%llu, " "size=%u).", data, hdr.gh_offset, hdr.gh_length); } - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); - g_gate_log(LOG_DEBUG, "Received hdr packet."); + } + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + return (NULL); +} + +static void * +recv_thread(void *arg __unused) +{ + struct g_gate_ctl_io ggio; + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); + + ggio.gctl_version = G_GATE_VERSION; + ggio.gctl_unit = unit; + ggio.gctl_data = buf; + + for (;;) { + data = g_gate_recv(recvfd, &hdr, sizeof(hdr), MSG_WAITALL); + if (reconnect) + break; g_gate_swap2h_hdr(&hdr); if (data != sizeof(hdr)) { - ggio.gctl_error = EIO; - goto done; + if (data == -1 && errno == EAGAIN) + continue; + g_gate_log(LOG_ERR, "Lost connection 3."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_READ) { - if (bsize < (size_t)ggio.gctl_length) { - ggio.gctl_data = realloc(ggio.gctl_data, - ggio.gctl_length); - if (ggio.gctl_data != NULL) - bsize = ggio.gctl_length; - else - g_gate_xlog("No memory."); - } - data = recv(sfd, ggio.gctl_data, ggio.gctl_length, - MSG_WAITALL); + g_gate_log(LOG_DEBUG, "Received hdr packet."); + + ggio.gctl_seq = hdr.gh_seq; + ggio.gctl_cmd = hdr.gh_cmd; + ggio.gctl_offset = hdr.gh_offset; + ggio.gctl_length = hdr.gh_length; + ggio.gctl_error = hdr.gh_error; + + if (ggio.gctl_error == 0 && ggio.gctl_cmd == GGATE_CMD_READ) { + data = g_gate_recv(recvfd, ggio.gctl_data, + ggio.gctl_length, MSG_WAITALL); + if (reconnect) + break; g_gate_log(LOG_DEBUG, "Received data packet."); if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 4."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%ju, " "size=%zu).", data, (uintmax_t)hdr.gh_offset, (size_t)hdr.gh_length); } -done: + g_gate_ioctl(G_GATE_CMD_DONE, &ggio); - if (ggio.gctl_error == EAGAIN) - return (ggio.gctl_error); } - /* NOTREACHED */ - return (0); + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + pthread_exit(NULL); } -static void -serve_loop(int sfd) +static int +handshake(int dir) { + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct sockaddr_in serv; + int sfd; - for (;;) { - int error; + /* + * Do the network stuff. + */ + bzero(&serv, sizeof(serv)); + serv.sin_family = AF_INET; + serv.sin_addr.s_addr = g_gate_str2ip(host); + if (serv.sin_addr.s_addr == INADDR_NONE) { + g_gate_log(LOG_DEBUG, "Invalid IP/host name: %s.", host); + return (-1); + } + serv.sin_port = htons(port); + sfd = socket(AF_INET, SOCK_STREAM, 0); + if (sfd == -1) { + g_gate_log(LOG_DEBUG, "Cannot open socket: %s.", + strerror(errno)); + return (-1); + } + + g_gate_socket_settings(sfd); - error = serve(sfd); + if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { + g_gate_log(LOG_DEBUG, "Cannot connect to server: %s.", + strerror(errno)); close(sfd); - if (error != EAGAIN) - g_gate_xlog("%s.", strerror(error)); - sfd = handshake(); - if (sfd == -1) { - sleep(2); - continue; - } + return (-1); + } + + g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); + + /* + * Create and send version packet. + */ + g_gate_log(LOG_DEBUG, "Sending version packet."); + assert(strlen(GGATE_MAGIC) == sizeof(ver.gv_magic)); + bcopy(GGATE_MAGIC, ver.gv_magic, sizeof(ver.gv_magic)); + ver.gv_version = GGATE_VERSION; + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + if (g_gate_send(sfd, &ver, sizeof(ver), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending version packet: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + bzero(&ver, sizeof(ver)); + if (g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + if (ver.gv_error != 0) { + g_gate_log(LOG_DEBUG, "Version verification problem: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + + /* + * Create and send initial packet. + */ + g_gate_log(LOG_DEBUG, "Sending initial packet."); + if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= + sizeof(cinit.gc_path)) { + g_gate_log(LOG_DEBUG, "Path name too long."); + close(sfd); + return (-1); + } + cinit.gc_flags = flags | dir; + cinit.gc_token = token; + cinit.gc_nconn = 2; + g_gate_swap2n_cinit(&cinit); + if (g_gate_send(sfd, &cinit, sizeof(cinit), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending initial packet: %s.", + strerror(errno)); + close(sfd); + return (-1); } + g_gate_swap2h_cinit(&cinit); + + /* + * Receiving initial packet from server. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + if (g_gate_recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + g_gate_swap2h_sinit(&sinit); + if (sinit.gs_error != 0) { + g_gate_log(LOG_DEBUG, "Error from server: %s.", + strerror(sinit.gs_error)); + close(sfd); + return (-1); + } + g_gate_log(LOG_DEBUG, "Received initial packet."); + + mediasize = sinit.gs_mediasize; + if (sectorsize == 0) + sectorsize = sinit.gs_sectorsize; + + return (sfd); } static void @@ -314,26 +366,87 @@ mydaemon(void) err(EXIT_FAILURE, "Cannot daemonize"); } +static int +g_gatec_connect(void) +{ + + token = arc4random(); + /* + * Our receive descriptor is connected to the send descriptor on the + * server side. + */ + recvfd = handshake(GGATE_FLAG_SEND); + if (recvfd == -1) + return (0); + /* + * Our send descriptor is connected to the receive descriptor on the + * server side. + */ + sendfd = handshake(GGATE_FLAG_RECV); + if (sendfd == -1) + return (0); + return (1); +} + static void -g_gatec_attach(void) +g_gatec_start(void) { - int sfd; + int error; - sfd = handshake(); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); - mydaemon(); - serve_loop(sfd); + reconnect = 0; + error = pthread_create(&recvtd, NULL, recv_thread, NULL); + if (error != 0) { + g_gate_destroy(unit, 1); + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + sendtd = pthread_self(); + send_thread(NULL); + /* Disconnected. */ + close(sendfd); + close(recvfd); +} + +static void +signop(int sig __unused) +{ + + /* Do nothing. */ +} + +static void +g_gatec_loop(void) +{ + struct g_gate_ctl_cancel ggioc; + + signal(SIGUSR1, signop); + for (;;) { + g_gatec_start(); + g_gate_log(LOG_NOTICE, "Disconnected [%s %s]. Connecting...", + host, path); + while (!g_gatec_connect()) { + sleep(2); + g_gate_log(LOG_NOTICE, "Connecting [%s %s]...", host, + path); + } + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + } } static void g_gatec_create(void) { struct g_gate_ctl_create ggioc; - int sfd; - sfd = handshake(); - if (sfd == -1) - exit(EXIT_FAILURE); + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + /* + * Ok, got both sockets, time to create provider. + */ ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_mediasize = mediasize; ggioc.gctl_sectorsize = sectorsize; @@ -344,12 +457,29 @@ g_gatec_create(void) snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s:%u %s", host, port, path); g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); if (unit == -1) printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); unit = ggioc.gctl_unit; + mydaemon(); - serve_loop(sfd); + g_gatec_loop(); +} + +static void +g_gatec_rescue(void) +{ + struct g_gate_ctl_cancel ggioc; + + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + + mydaemon(); + g_gatec_loop(); } int @@ -358,14 +488,14 @@ main(int argc, char *argv[]) if (argc < 2) usage(); - if (strcasecmp(argv[1], "attach") == 0) - action = ATTACH; - else if (strcasecmp(argv[1], "create") == 0) + if (strcasecmp(argv[1], "create") == 0) action = CREATE; else if (strcasecmp(argv[1], "destroy") == 0) action = DESTROY; else if (strcasecmp(argv[1], "list") == 0) action = LIST; + else if (strcasecmp(argv[1], "rescue") == 0) + action = RESCUE; else usage(); argc -= 1; @@ -383,12 +513,12 @@ main(int argc, char *argv[]) force = 1; break; case 'n': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); nagle = 0; break; case 'o': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); if (strcasecmp("ro", optarg) == 0) flags = G_GATE_FLAG_READONLY; @@ -402,7 +532,7 @@ main(int argc, char *argv[]) } break; case 'p': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; port = strtoul(optarg, NULL, 10); @@ -418,7 +548,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid queue_size."); break; case 'R': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; rcvbuf = strtoul(optarg, NULL, 10); @@ -426,7 +556,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid rcvbuf."); break; case 'S': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; sndbuf = strtoul(optarg, NULL, 10); @@ -468,18 +598,6 @@ main(int argc, char *argv[]) argv += optind; switch (action) { - case ATTACH: - if (argc != 2) - usage(); - if (unit == -1) { - fprintf(stderr, "Required unit number.\n"); - usage(); - } - g_gate_open_device(); - host = argv[0]; - path = argv[1]; - g_gatec_attach(); - break; case CREATE: if (argc != 2) usage(); @@ -501,6 +619,18 @@ main(int argc, char *argv[]) case LIST: g_gate_list(unit, g_gate_verbose); break; + case RESCUE: + if (argc != 2) + usage(); + if (unit == -1) { + fprintf(stderr, "Required unit number.\n"); + usage(); + } + g_gate_open_device(); + host = argv[0]; + path = argv[1]; + g_gatec_rescue(); + break; case UNSET: default: usage(); |