summaryrefslogtreecommitdiffstats
path: root/sbin/ggate
diff options
context:
space:
mode:
Diffstat (limited to 'sbin/ggate')
-rw-r--r--sbin/ggate/ggatec/Makefile4
-rw-r--r--sbin/ggate/ggatec/ggatec.c516
-rw-r--r--sbin/ggate/ggated/Makefile3
-rw-r--r--sbin/ggate/ggated/ggated.c866
-rw-r--r--sbin/ggate/shared/ggate.c82
-rw-r--r--sbin/ggate/shared/ggate.h78
6 files changed, 1094 insertions, 455 deletions
diff --git a/sbin/ggate/ggatec/Makefile b/sbin/ggate/ggatec/Makefile
index 5674aa1..c49cfe8 100644
--- a/sbin/ggate/ggatec/Makefile
+++ b/sbin/ggate/ggatec/Makefile
@@ -9,7 +9,7 @@ SRCS= ggatec.c ggate.c
CFLAGS+= -DLIBGEOM
CFLAGS+= -I${.CURDIR}/../shared
-DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL}
-LDADD= -lgeom -lsbuf -lbsdxml -lutil
+DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} ${LIBPTHREAD}
+LDADD= -lgeom -lsbuf -lbsdxml -lutil -lpthread
.include <bsd.prog.mk>
diff --git a/sbin/ggate/ggatec/ggatec.c b/sbin/ggate/ggatec/ggatec.c
index 84aa60e..70f667d 100644
--- a/sbin/ggate/ggatec/ggatec.c
+++ b/sbin/ggate/ggatec/ggatec.c
@@ -34,8 +34,12 @@
#include <string.h>
#include <ctype.h>
#include <libgen.h>
+#include <pthread.h>
+#include <signal.h>
#include <err.h>
#include <errno.h>
+#include <assert.h>
+
#include <sys/param.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
@@ -51,21 +55,22 @@
#include "ggate.h"
-enum { UNSET, ATTACH, CREATE, DESTROY, LIST } action = UNSET;
+enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
static const char *path = NULL;
static const char *host = NULL;
static int unit = -1;
static unsigned flags = 0;
static int force = 0;
-static int nagle = 1;
static unsigned queue_size = G_GATE_QUEUE_SIZE;
static unsigned port = G_GATE_PORT;
static off_t mediasize;
static unsigned sectorsize = 0;
static unsigned timeout = G_GATE_TIMEOUT;
-static unsigned rcvbuf = G_GATE_RCVBUF;
-static unsigned sndbuf = G_GATE_SNDBUF;
+static int sendfd, recvfd;
+static uint32_t token;
+static pthread_t sendtd, recvtd;
+static int reconnect;
static void
usage(void)
@@ -74,129 +79,30 @@ usage(void)
fprintf(stderr, "usage: %s create [-nv] [-o <ro|wo|rw>] [-p port] "
"[-q queue_size] [-R rcvbuf] [-S sndbuf] [-s sectorsize] "
"[-t timeout] [-u unit] <host> <path>\n", getprogname());
- fprintf(stderr, " %s attach [-nv] [-o <ro|wo|rw>] [-p port] "
+ fprintf(stderr, " %s rescue [-nv] [-o <ro|wo|rw>] [-p port] "
"[-R rcvbuf] [-S sndbuf] <-u unit> <host> <path>\n", getprogname());
fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
exit(EXIT_FAILURE);
}
-static int
-handshake(void)
-{
- struct g_gate_cinit cinit;
- struct g_gate_sinit sinit;
- struct sockaddr_in serv;
- struct timeval tv;
- size_t bsize;
- int sfd;
-
- /*
- * Do the network stuff.
- */
- bzero(&serv, sizeof(serv));
- serv.sin_family = AF_INET;
- serv.sin_addr.s_addr = g_gate_str2ip(host);
- if (serv.sin_addr.s_addr == INADDR_NONE) {
- g_gate_log(LOG_ERR, "Invalid IP/host name: %s.", host);
- return (-1);
- }
- serv.sin_port = htons(port);
- sfd = socket(AF_INET, SOCK_STREAM, 0);
- if (sfd == -1)
- g_gate_xlog("Can't open socket: %s.", strerror(errno));
- /*
- * Some trivial network optimalization.
- * This should be much more advanced.
- */
- if (nagle) {
- int on = 1;
-
- if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on,
- sizeof(on)) == -1) {
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- }
- }
- bsize = rcvbuf;
- if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1)
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- bsize = sndbuf;
- if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1)
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- tv.tv_sec = timeout;
- tv.tv_usec = 0;
- if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 ||
- setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- }
- if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) {
- g_gate_log(LOG_ERR, "Can't connect to server: %s.",
- strerror(errno));
- return (-1);
- }
-
- g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port);
-
- /*
- * Creating and sending initial packet.
- */
- if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >=
- sizeof(cinit.gc_path)) {
- g_gate_xlog("Path name too long.");
- }
- cinit.gc_flags = flags;
- g_gate_log(LOG_DEBUG, "Sending initial packet.");
- g_gate_swap2n_cinit(&cinit);
- if (send(sfd, &cinit, sizeof(cinit), 0) == -1) {
- g_gate_log(LOG_ERR, "Error while sending initial packet: %s.",
- strerror(errno));
- return (-1);
- }
- g_gate_swap2h_cinit(&cinit);
-
- /*
- * Receiving initial packet from server.
- */
- g_gate_log(LOG_DEBUG, "Receiving initial packet.");
- if (recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) {
- g_gate_log(LOG_ERR, "Error while receiving data: %s.",
- strerror(errno));
- return (-1);
- }
- g_gate_swap2h_sinit(&sinit);
- if (sinit.gs_error != 0)
- g_gate_xlog("Error from server: %s.", strerror(sinit.gs_error));
-
- mediasize = sinit.gs_mediasize;
- if (sectorsize == 0)
- sectorsize = sinit.gs_sectorsize;
- return (sfd);
-}
-
-static int
-serve(int sfd)
+static void *
+send_thread(void *arg __unused)
{
struct g_gate_ctl_io ggio;
- size_t bsize;
- char *buf;
-
- bsize = G_GATE_BUFSIZE_START;
- buf = malloc(bsize);
- if (buf == NULL) {
- if (action == CREATE)
- g_gate_destroy(unit, 1);
- g_gate_xlog("No enough memory");
- }
+ struct g_gate_hdr hdr;
+ char buf[MAXPHYS];
+ ssize_t data;
+ int error;
+
+ g_gate_log(LOG_NOTICE, "%s: started!", __func__);
ggio.gctl_version = G_GATE_VERSION;
ggio.gctl_unit = unit;
- bsize = sectorsize;
- ggio.gctl_data = malloc(bsize);
+ ggio.gctl_data = buf;
+
for (;;) {
- struct g_gate_hdr hdr;
- int data, error;
-once_again:
- ggio.gctl_length = bsize;
+ ggio.gctl_length = sizeof(buf);
ggio.gctl_error = 0;
g_gate_ioctl(G_GATE_CMD_START, &ggio);
error = ggio.gctl_error;
@@ -204,11 +110,12 @@ once_again:
case 0:
break;
case ECANCELED:
+ if (reconnect)
+ break;
/* Exit gracefully. */
- free(ggio.gctl_data);
g_gate_close_device();
- close(sfd);
exit(EXIT_SUCCESS);
+#if 0
case ENOMEM:
/* Buffer too small. */
ggio.gctl_data = realloc(ggio.gctl_data,
@@ -218,87 +125,232 @@ once_again:
goto once_again;
}
/* FALLTHROUGH */
+#endif
case ENXIO:
default:
g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
strerror(error));
}
- hdr.gh_cmd = ggio.gctl_cmd;
+ if (reconnect)
+ break;
+
+ switch (ggio.gctl_cmd) {
+ case BIO_READ:
+ hdr.gh_cmd = GGATE_CMD_READ;
+ break;
+ case BIO_WRITE:
+ hdr.gh_cmd = GGATE_CMD_WRITE;
+ break;
+ }
+ hdr.gh_seq = ggio.gctl_seq;
hdr.gh_offset = ggio.gctl_offset;
hdr.gh_length = ggio.gctl_length;
hdr.gh_error = 0;
g_gate_swap2n_hdr(&hdr);
- data = send(sfd, &hdr, sizeof(hdr), 0);
+
+ data = g_gate_send(sendfd, &hdr, sizeof(hdr), MSG_NOSIGNAL);
g_gate_log(LOG_DEBUG, "Sent hdr packet.");
g_gate_swap2h_hdr(&hdr);
+ if (reconnect)
+ break;
if (data != sizeof(hdr)) {
- ggio.gctl_error = EAGAIN;
- goto done;
+ g_gate_log(LOG_ERR, "Lost connection 1.");
+ reconnect = 1;
+ pthread_kill(recvtd, SIGUSR1);
+ break;
}
- if (ggio.gctl_cmd == BIO_DELETE || ggio.gctl_cmd == BIO_WRITE) {
- data = send(sfd, ggio.gctl_data, ggio.gctl_length, 0);
- g_gate_log(LOG_DEBUG, "Sent data packet.");
+
+ if (hdr.gh_cmd == GGATE_CMD_WRITE) {
+ data = g_gate_send(sendfd, ggio.gctl_data,
+ ggio.gctl_length, MSG_NOSIGNAL);
+ if (reconnect)
+ break;
if (data != ggio.gctl_length) {
- ggio.gctl_error = EAGAIN;
- goto done;
+ g_gate_log(LOG_ERR, "Lost connection 2 (%zd != %zd).", data, (ssize_t)ggio.gctl_length);
+ reconnect = 1;
+ pthread_kill(recvtd, SIGUSR1);
+ break;
}
- g_gate_log(LOG_DEBUG, "Sent %d bytes (offset=%llu, "
+ g_gate_log(LOG_DEBUG, "Sent %zd bytes (offset=%llu, "
"size=%u).", data, hdr.gh_offset, hdr.gh_length);
}
- data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL);
- g_gate_log(LOG_DEBUG, "Received hdr packet.");
+ }
+ g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
+ return (NULL);
+}
+
+static void *
+recv_thread(void *arg __unused)
+{
+ struct g_gate_ctl_io ggio;
+ struct g_gate_hdr hdr;
+ char buf[MAXPHYS];
+ ssize_t data;
+
+ g_gate_log(LOG_NOTICE, "%s: started!", __func__);
+
+ ggio.gctl_version = G_GATE_VERSION;
+ ggio.gctl_unit = unit;
+ ggio.gctl_data = buf;
+
+ for (;;) {
+ data = g_gate_recv(recvfd, &hdr, sizeof(hdr), MSG_WAITALL);
+ if (reconnect)
+ break;
g_gate_swap2h_hdr(&hdr);
if (data != sizeof(hdr)) {
- ggio.gctl_error = EIO;
- goto done;
+ if (data == -1 && errno == EAGAIN)
+ continue;
+ g_gate_log(LOG_ERR, "Lost connection 3.");
+ reconnect = 1;
+ pthread_kill(sendtd, SIGUSR1);
+ break;
}
- if (ggio.gctl_cmd == BIO_READ) {
- if (bsize < (size_t)ggio.gctl_length) {
- ggio.gctl_data = realloc(ggio.gctl_data,
- ggio.gctl_length);
- if (ggio.gctl_data != NULL)
- bsize = ggio.gctl_length;
- else
- g_gate_xlog("No memory.");
- }
- data = recv(sfd, ggio.gctl_data, ggio.gctl_length,
- MSG_WAITALL);
+ g_gate_log(LOG_DEBUG, "Received hdr packet.");
+
+ ggio.gctl_seq = hdr.gh_seq;
+ ggio.gctl_cmd = hdr.gh_cmd;
+ ggio.gctl_offset = hdr.gh_offset;
+ ggio.gctl_length = hdr.gh_length;
+ ggio.gctl_error = hdr.gh_error;
+
+ if (ggio.gctl_error == 0 && ggio.gctl_cmd == GGATE_CMD_READ) {
+ data = g_gate_recv(recvfd, ggio.gctl_data,
+ ggio.gctl_length, MSG_WAITALL);
+ if (reconnect)
+ break;
g_gate_log(LOG_DEBUG, "Received data packet.");
if (data != ggio.gctl_length) {
- ggio.gctl_error = EAGAIN;
- goto done;
+ g_gate_log(LOG_ERR, "Lost connection 4.");
+ reconnect = 1;
+ pthread_kill(sendtd, SIGUSR1);
+ break;
}
g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%ju, "
"size=%zu).", data, (uintmax_t)hdr.gh_offset,
(size_t)hdr.gh_length);
}
-done:
+
g_gate_ioctl(G_GATE_CMD_DONE, &ggio);
- if (ggio.gctl_error == EAGAIN)
- return (ggio.gctl_error);
}
- /* NOTREACHED */
- return (0);
+ g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
+ pthread_exit(NULL);
}
-static void
-serve_loop(int sfd)
+static int
+handshake(int dir)
{
+ struct g_gate_version ver;
+ struct g_gate_cinit cinit;
+ struct g_gate_sinit sinit;
+ struct sockaddr_in serv;
+ int sfd;
- for (;;) {
- int error;
+ /*
+ * Do the network stuff.
+ */
+ bzero(&serv, sizeof(serv));
+ serv.sin_family = AF_INET;
+ serv.sin_addr.s_addr = g_gate_str2ip(host);
+ if (serv.sin_addr.s_addr == INADDR_NONE) {
+ g_gate_log(LOG_DEBUG, "Invalid IP/host name: %s.", host);
+ return (-1);
+ }
+ serv.sin_port = htons(port);
+ sfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sfd == -1) {
+ g_gate_log(LOG_DEBUG, "Cannot open socket: %s.",
+ strerror(errno));
+ return (-1);
+ }
+
+ g_gate_socket_settings(sfd);
- error = serve(sfd);
+ if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) {
+ g_gate_log(LOG_DEBUG, "Cannot connect to server: %s.",
+ strerror(errno));
close(sfd);
- if (error != EAGAIN)
- g_gate_xlog("%s.", strerror(error));
- sfd = handshake();
- if (sfd == -1) {
- sleep(2);
- continue;
- }
+ return (-1);
+ }
+
+ g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port);
+
+ /*
+ * Create and send version packet.
+ */
+ g_gate_log(LOG_DEBUG, "Sending version packet.");
+ assert(strlen(GGATE_MAGIC) == sizeof(ver.gv_magic));
+ bcopy(GGATE_MAGIC, ver.gv_magic, sizeof(ver.gv_magic));
+ ver.gv_version = GGATE_VERSION;
+ ver.gv_error = 0;
+ g_gate_swap2n_version(&ver);
+ if (g_gate_send(sfd, &ver, sizeof(ver), MSG_NOSIGNAL) == -1) {
+ g_gate_log(LOG_DEBUG, "Error while sending version packet: %s.",
+ strerror(errno));
+ close(sfd);
+ return (-1);
+ }
+ bzero(&ver, sizeof(ver));
+ if (g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL) == -1) {
+ g_gate_log(LOG_DEBUG, "Error while receiving data: %s.",
+ strerror(errno));
+ close(sfd);
+ return (-1);
+ }
+ if (ver.gv_error != 0) {
+ g_gate_log(LOG_DEBUG, "Version verification problem: %s.",
+ strerror(errno));
+ close(sfd);
+ return (-1);
+ }
+
+ /*
+ * Create and send initial packet.
+ */
+ g_gate_log(LOG_DEBUG, "Sending initial packet.");
+ if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >=
+ sizeof(cinit.gc_path)) {
+ g_gate_log(LOG_DEBUG, "Path name too long.");
+ close(sfd);
+ return (-1);
+ }
+ cinit.gc_flags = flags | dir;
+ cinit.gc_token = token;
+ cinit.gc_nconn = 2;
+ g_gate_swap2n_cinit(&cinit);
+ if (g_gate_send(sfd, &cinit, sizeof(cinit), MSG_NOSIGNAL) == -1) {
+ g_gate_log(LOG_DEBUG, "Error while sending initial packet: %s.",
+ strerror(errno));
+ close(sfd);
+ return (-1);
}
+ g_gate_swap2h_cinit(&cinit);
+
+ /*
+ * Receiving initial packet from server.
+ */
+ g_gate_log(LOG_DEBUG, "Receiving initial packet.");
+ if (g_gate_recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) {
+ g_gate_log(LOG_DEBUG, "Error while receiving data: %s.",
+ strerror(errno));
+ close(sfd);
+ return (-1);
+ }
+ g_gate_swap2h_sinit(&sinit);
+ if (sinit.gs_error != 0) {
+ g_gate_log(LOG_DEBUG, "Error from server: %s.",
+ strerror(sinit.gs_error));
+ close(sfd);
+ return (-1);
+ }
+ g_gate_log(LOG_DEBUG, "Received initial packet.");
+
+ mediasize = sinit.gs_mediasize;
+ if (sectorsize == 0)
+ sectorsize = sinit.gs_sectorsize;
+
+ return (sfd);
}
static void
@@ -314,26 +366,87 @@ mydaemon(void)
err(EXIT_FAILURE, "Cannot daemonize");
}
+static int
+g_gatec_connect(void)
+{
+
+ token = arc4random();
+ /*
+ * Our receive descriptor is connected to the send descriptor on the
+ * server side.
+ */
+ recvfd = handshake(GGATE_FLAG_SEND);
+ if (recvfd == -1)
+ return (0);
+ /*
+ * Our send descriptor is connected to the receive descriptor on the
+ * server side.
+ */
+ sendfd = handshake(GGATE_FLAG_RECV);
+ if (sendfd == -1)
+ return (0);
+ return (1);
+}
+
static void
-g_gatec_attach(void)
+g_gatec_start(void)
{
- int sfd;
+ int error;
- sfd = handshake();
- g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid());
- mydaemon();
- serve_loop(sfd);
+ reconnect = 0;
+ error = pthread_create(&recvtd, NULL, recv_thread, NULL);
+ if (error != 0) {
+ g_gate_destroy(unit, 1);
+ g_gate_xlog("pthread_create(recv_thread): %s.",
+ strerror(error));
+ }
+ sendtd = pthread_self();
+ send_thread(NULL);
+ /* Disconnected. */
+ close(sendfd);
+ close(recvfd);
+}
+
+static void
+signop(int sig __unused)
+{
+
+ /* Do nothing. */
+}
+
+static void
+g_gatec_loop(void)
+{
+ struct g_gate_ctl_cancel ggioc;
+
+ signal(SIGUSR1, signop);
+ for (;;) {
+ g_gatec_start();
+ g_gate_log(LOG_NOTICE, "Disconnected [%s %s]. Connecting...",
+ host, path);
+ while (!g_gatec_connect()) {
+ sleep(2);
+ g_gate_log(LOG_NOTICE, "Connecting [%s %s]...", host,
+ path);
+ }
+ ggioc.gctl_version = G_GATE_VERSION;
+ ggioc.gctl_unit = unit;
+ ggioc.gctl_seq = 0;
+ g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
+ }
}
static void
g_gatec_create(void)
{
struct g_gate_ctl_create ggioc;
- int sfd;
- sfd = handshake();
- if (sfd == -1)
- exit(EXIT_FAILURE);
+ if (!g_gatec_connect())
+ g_gate_xlog("Cannot connect: %s.", strerror(errno));
+
+ /*
+ * Ok, got both sockets, time to create provider.
+ */
ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_mediasize = mediasize;
ggioc.gctl_sectorsize = sectorsize;
@@ -344,12 +457,29 @@ g_gatec_create(void)
snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s:%u %s", host,
port, path);
g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
- g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid());
if (unit == -1)
printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
unit = ggioc.gctl_unit;
+
mydaemon();
- serve_loop(sfd);
+ g_gatec_loop();
+}
+
+static void
+g_gatec_rescue(void)
+{
+ struct g_gate_ctl_cancel ggioc;
+
+ if (!g_gatec_connect())
+ g_gate_xlog("Cannot connect: %s.", strerror(errno));
+
+ ggioc.gctl_version = G_GATE_VERSION;
+ ggioc.gctl_unit = unit;
+ ggioc.gctl_seq = 0;
+ g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
+
+ mydaemon();
+ g_gatec_loop();
}
int
@@ -358,14 +488,14 @@ main(int argc, char *argv[])
if (argc < 2)
usage();
- if (strcasecmp(argv[1], "attach") == 0)
- action = ATTACH;
- else if (strcasecmp(argv[1], "create") == 0)
+ if (strcasecmp(argv[1], "create") == 0)
action = CREATE;
else if (strcasecmp(argv[1], "destroy") == 0)
action = DESTROY;
else if (strcasecmp(argv[1], "list") == 0)
action = LIST;
+ else if (strcasecmp(argv[1], "rescue") == 0)
+ action = RESCUE;
else
usage();
argc -= 1;
@@ -383,12 +513,12 @@ main(int argc, char *argv[])
force = 1;
break;
case 'n':
- if (action != ATTACH && action != CREATE)
+ if (action != CREATE && action != RESCUE)
usage();
nagle = 0;
break;
case 'o':
- if (action != ATTACH && action != CREATE)
+ if (action != CREATE && action != RESCUE)
usage();
if (strcasecmp("ro", optarg) == 0)
flags = G_GATE_FLAG_READONLY;
@@ -402,7 +532,7 @@ main(int argc, char *argv[])
}
break;
case 'p':
- if (action != ATTACH && action != CREATE)
+ if (action != CREATE && action != RESCUE)
usage();
errno = 0;
port = strtoul(optarg, NULL, 10);
@@ -418,7 +548,7 @@ main(int argc, char *argv[])
errx(EXIT_FAILURE, "Invalid queue_size.");
break;
case 'R':
- if (action != ATTACH && action != CREATE)
+ if (action != CREATE && action != RESCUE)
usage();
errno = 0;
rcvbuf = strtoul(optarg, NULL, 10);
@@ -426,7 +556,7 @@ main(int argc, char *argv[])
errx(EXIT_FAILURE, "Invalid rcvbuf.");
break;
case 'S':
- if (action != ATTACH && action != CREATE)
+ if (action != CREATE && action != RESCUE)
usage();
errno = 0;
sndbuf = strtoul(optarg, NULL, 10);
@@ -468,18 +598,6 @@ main(int argc, char *argv[])
argv += optind;
switch (action) {
- case ATTACH:
- if (argc != 2)
- usage();
- if (unit == -1) {
- fprintf(stderr, "Required unit number.\n");
- usage();
- }
- g_gate_open_device();
- host = argv[0];
- path = argv[1];
- g_gatec_attach();
- break;
case CREATE:
if (argc != 2)
usage();
@@ -501,6 +619,18 @@ main(int argc, char *argv[])
case LIST:
g_gate_list(unit, g_gate_verbose);
break;
+ case RESCUE:
+ if (argc != 2)
+ usage();
+ if (unit == -1) {
+ fprintf(stderr, "Required unit number.\n");
+ usage();
+ }
+ g_gate_open_device();
+ host = argv[0];
+ path = argv[1];
+ g_gatec_rescue();
+ break;
case UNSET:
default:
usage();
diff --git a/sbin/ggate/ggated/Makefile b/sbin/ggate/ggated/Makefile
index fa5c0fb..4e7708e 100644
--- a/sbin/ggate/ggated/Makefile
+++ b/sbin/ggate/ggated/Makefile
@@ -6,6 +6,9 @@ PROG= ggated
MAN= ggated.8
SRCS= ggated.c ggate.c
+DPADD= ${LIBPTHREAD}
+LDADD= -lpthread
+
CFLAGS+= -I${.CURDIR}/../shared
.include <bsd.prog.mk>
diff --git a/sbin/ggate/ggated/ggated.c b/sbin/ggate/ggated/ggated.c
index a0628ea..82d66e1 100644
--- a/sbin/ggate/ggated/ggated.c
+++ b/sbin/ggate/ggated/ggated.c
@@ -31,6 +31,7 @@
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
+#include <pthread.h>
#include <sys/param.h>
#include <sys/queue.h>
#include <sys/endian.h>
@@ -44,6 +45,7 @@
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <signal.h>
+#include <assert.h>
#include <err.h>
#include <errno.h>
#include <string.h>
@@ -51,32 +53,58 @@
#include <syslog.h>
#include <stdarg.h>
-#include <geom/gate/g_gate.h>
#include "ggate.h"
-#define G_GATED_EXPORT_FILE "/etc/gg.exports"
-#define G_GATED_DEBUG(...) \
- if (g_gate_verbose) { \
- printf(__VA_ARGS__); \
- printf("\n"); \
- }
+#define GGATED_EXPORT_FILE "/etc/gg.exports"
+
+struct ggd_connection {
+ off_t c_mediasize;
+ off_t c_sectorsize;
+ unsigned c_flags; /* flags (RO/RW) */
+ int c_diskfd;
+ int c_sendfd;
+ int c_recvfd;
+ time_t c_birthtime;
+ char *c_path;
+ uint64_t c_token;
+ in_addr_t c_srcip;
+ LIST_ENTRY(ggd_connection) c_next;
+};
-static const char *exports = G_GATED_EXPORT_FILE;
-static int got_sighup = 0;
-static int nagle = 1;
-static unsigned rcvbuf = G_GATE_RCVBUF;
-static unsigned sndbuf = G_GATE_SNDBUF;
+struct ggd_request {
+ struct g_gate_hdr r_hdr;
+ char *r_data;
+ TAILQ_ENTRY(ggd_request) r_next;
+};
+#define r_cmd r_hdr.gh_cmd
+#define r_offset r_hdr.gh_offset
+#define r_length r_hdr.gh_length
+#define r_error r_hdr.gh_error
-struct export {
+struct ggd_export {
char *e_path; /* path to device/file */
in_addr_t e_ip; /* remote IP address */
in_addr_t e_mask; /* IP mask */
unsigned e_flags; /* flags (RO/RW) */
- SLIST_ENTRY(export) e_next;
+ SLIST_ENTRY(ggd_export) e_next;
};
-static SLIST_HEAD(, export) exports_list =
- SLIST_HEAD_INITIALIZER(&exports_list);
+
+static const char *exports_file = GGATED_EXPORT_FILE;
+static int got_sighup = 0;
+in_addr_t bindaddr;
+
+static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue);
+static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue);
+pthread_mutex_t inqueue_mtx, outqueue_mtx;
+pthread_cond_t inqueue_cond, outqueue_cond;
+
+static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(&exports);
+static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(&connection);
+
+static void *recv_thread(void *arg);
+static void *disk_thread(void *arg);
+static void *send_thread(void *arg);
static void
usage(void)
@@ -118,7 +146,7 @@ countmask(unsigned m)
static void
line_parse(char *line, unsigned lineno)
{
- struct export *ex;
+ struct ggd_export *ex;
char *word, *path, *sflags;
unsigned flags, i, vmask;
in_addr_t ip, mask;
@@ -193,7 +221,7 @@ line_parse(char *line, unsigned lineno)
ex->e_mask = mask;
ex->e_flags = flags;
- SLIST_INSERT_HEAD(&exports_list, ex, e_next);
+ SLIST_INSERT_HEAD(&exports, ex, e_next);
g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.",
ip2str(ex->e_ip), vmask, path, sflags);
@@ -202,11 +230,11 @@ line_parse(char *line, unsigned lineno)
static void
exports_clear(void)
{
- struct export *ex;
+ struct ggd_export *ex;
- while (!SLIST_EMPTY(&exports_list)) {
- ex = SLIST_FIRST(&exports_list);
- SLIST_REMOVE_HEAD(&exports_list, e_next);
+ while (!SLIST_EMPTY(&exports)) {
+ ex = SLIST_FIRST(&exports);
+ SLIST_REMOVE_HEAD(&exports, e_next);
free(ex);
}
}
@@ -221,13 +249,13 @@ exports_get(void)
exports_clear();
- fd = fopen(exports, "r");
+ fd = fopen(exports_file, "r");
if (fd == NULL) {
- g_gate_xlog("Cannot open exports file (%s): %s.", exports,
+ g_gate_xlog("Cannot open exports file (%s): %s.", exports_file,
strerror(errno));
}
- g_gate_log(LOG_INFO, "Reading exports file (%s).", exports);
+ g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file);
for (;;) {
if (fgets(buf, sizeof(buf), fd) == NULL) {
@@ -270,236 +298,636 @@ exports_get(void)
g_gate_log(LOG_INFO, "Exporting %u object(s).", objs);
}
-static struct export *
-exports_find(struct sockaddr *s, const char *path)
+static int
+exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit,
+ struct ggd_connection *conn)
+{
+ char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */
+ int error = 0, flags;
+
+ strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask));
+ strlcat(ipmask, "/", sizeof(ipmask));
+ strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask));
+ if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) {
+ if (ex->e_flags == O_WRONLY) {
+ g_gate_log(LOG_WARNING, "Read-only access requested, "
+ "but %s (%s) is exported write-only.", ex->e_path,
+ ipmask);
+ return (EPERM);
+ } else {
+ conn->c_flags |= GGATE_FLAG_RDONLY;
+ }
+ } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) {
+ if (ex->e_flags == O_RDONLY) {
+ g_gate_log(LOG_WARNING, "Write-only access requested, "
+ "but %s (%s) is exported read-only.", ex->e_path,
+ ipmask);
+ return (EPERM);
+ } else {
+ conn->c_flags |= GGATE_FLAG_WRONLY;
+ }
+ } else {
+ if (ex->e_flags == O_RDONLY) {
+ g_gate_log(LOG_WARNING, "Read-write access requested, "
+ "but %s (%s) is exported read-only.", ex->e_path,
+ ipmask);
+ return (EPERM);
+ } else if (ex->e_flags == O_WRONLY) {
+ g_gate_log(LOG_WARNING, "Read-write access requested, "
+ "but %s (%s) is exported write-only.", ex->e_path,
+ ipmask);
+ return (EPERM);
+ }
+ }
+ if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0)
+ flags = O_RDONLY;
+ else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0)
+ flags = O_WRONLY;
+ else
+ flags = O_RDWR;
+ conn->c_diskfd = open(ex->e_path, flags);
+ if (conn->c_diskfd == -1) {
+ error = errno;
+ g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path,
+ strerror(error));
+ return (error);
+ }
+ return (0);
+}
+
+static struct ggd_export *
+exports_find(struct sockaddr *s, struct g_gate_cinit *cinit,
+ struct ggd_connection *conn)
{
- struct export *ex;
+ struct ggd_export *ex;
in_addr_t ip;
+ int error;
ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr);
- SLIST_FOREACH(ex, &exports_list, e_next) {
- if ((ip & ex->e_mask) != ex->e_ip)
+ SLIST_FOREACH(ex, &exports, e_next) {
+ if ((ip & ex->e_mask) != ex->e_ip) {
+ g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.",
+ ex->e_path);
continue;
- if (path != NULL && strcmp(path, ex->e_path) != 0)
+ }
+ if (strcmp(cinit->gc_path, ex->e_path) != 0) {
+ g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.",
+ ex->e_path);
continue;
-
- g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip));
- return (ex);
+ }
+ error = exports_check(ex, cinit, conn);
+ if (error == 0)
+ return (ex);
+ else {
+ errno = error;
+ return (NULL);
+ }
}
- g_gate_log(LOG_INFO, "Unauthorized connection from: %s.", ip2str(ip));
-
+ g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.",
+ ip2str(ip));
+ errno = EPERM;
return (NULL);
}
+/*
+ * Remove timed out connections.
+ */
static void
-sendfail(int sfd, int error, const char *fmt, ...)
+connection_cleanups(void)
{
- struct g_gate_sinit sinit;
- va_list ap;
- int data;
-
- sinit.gs_error = error;
- g_gate_swap2n_sinit(&sinit);
- data = send(sfd, &sinit, sizeof(sinit), 0);
- g_gate_swap2h_sinit(&sinit);
- if (data == -1) {
- g_gate_xlog("Error while sending initial packet: %s.",
- strerror(errno));
- }
- if (fmt != NULL) {
- va_start(ap, fmt);
- g_gate_xvlog(fmt, ap);
- /* NOTREACHED */
- va_end(ap);
+ struct ggd_connection *conn, *tconn;
+ time_t now;
+
+ time(&now);
+ LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) {
+ if (now - conn->c_birthtime > 10) {
+ LIST_REMOVE(conn, c_next);
+ g_gate_log(LOG_NOTICE,
+ "Connection from %s [%s] removed.",
+ ip2str(conn->c_srcip), conn->c_path);
+ close(conn->c_diskfd);
+ close(conn->c_sendfd);
+ close(conn->c_recvfd);
+ free(conn->c_path);
+ free(conn);
+ }
}
- exit(EXIT_FAILURE);
}
-static void
-serve(int sfd, struct sockaddr *s)
+static struct ggd_connection *
+connection_find(struct g_gate_cinit *cinit)
{
- struct g_gate_cinit cinit;
- struct g_gate_sinit sinit;
- struct g_gate_hdr hdr;
- struct export *ex;
- char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */
- size_t bufsize;
- int32_t error;
- int fd, flags;
- ssize_t data;
- char *buf;
+ struct ggd_connection *conn;
- g_gate_log(LOG_DEBUG, "Receiving initial packet.");
- data = recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL);
- g_gate_swap2h_cinit(&cinit);
- if (data == -1) {
- g_gate_xlog("Error while receiving initial packet: %s.",
- strerror(errno));
+ LIST_FOREACH(conn, &connections, c_next) {
+ if (conn->c_token == cinit->gc_token)
+ break;
}
+ return (conn);
+}
- ex = exports_find(s, cinit.gc_path);
- if (ex == NULL) {
- sendfail(sfd, EINVAL, "Requested path isn't exported: %s.",
- strerror(errno));
+static struct ggd_connection *
+connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd)
+{
+ struct ggd_connection *conn;
+ in_addr_t ip;
+
+ /*
+ * First, look for old connections.
+ * We probably should do it every X seconds, but what for?
+ * It is only dangerous if an attacker wants to overload connections
+ * queue, so here is a good place to do the cleanups.
+ */
+ connection_cleanups();
+
+ conn = malloc(sizeof(*conn));
+ if (conn == NULL)
+ return (NULL);
+ conn->c_path = strdup(cinit->gc_path);
+ if (conn->c_path == NULL) {
+ free(conn);
+ return (NULL);
}
+ conn->c_token = cinit->gc_token;
+ ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr);
+ conn->c_srcip = ip;
+ conn->c_sendfd = conn->c_recvfd = -1;
+ if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0)
+ conn->c_sendfd = sfd;
+ else
+ conn->c_recvfd = sfd;
+ conn->c_mediasize = 0;
+ conn->c_sectorsize = 0;
+ time(&conn->c_birthtime);
+ conn->c_flags = cinit->gc_flags;
+ LIST_INSERT_HEAD(&connections, conn, c_next);
+ g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip),
+ conn->c_path);
+ return (conn);
+}
- error = 0;
- strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask));
- strlcat(ipmask, "/", sizeof(ipmask));
- strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask));
- if ((cinit.gc_flags & G_GATE_FLAG_READONLY) != 0) {
- if (ex->e_flags == O_WRONLY) {
- g_gate_log(LOG_ERR, "Read-only access requested, but "
- "%s (%s) is exported write-only.", ex->e_path,
- ipmask);
- error = EPERM;
- } else {
- sinit.gs_flags = G_GATE_FLAG_READONLY;
- }
- } else if ((cinit.gc_flags & G_GATE_FLAG_WRITEONLY) != 0) {
- if (ex->e_flags == O_RDONLY) {
- g_gate_log(LOG_ERR, "Write-only access requested, but "
- "%s (%s) is exported read-only.", ex->e_path,
- ipmask);
- error = EPERM;
- } else {
- sinit.gs_flags = G_GATE_FLAG_WRITEONLY;
+static int
+connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit,
+ struct sockaddr *s, int sfd)
+{
+ in_addr_t ip;
+
+ ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr);
+ if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) {
+ if (conn->c_sendfd != -1) {
+ g_gate_log(LOG_WARNING,
+ "Send socket already exists [%s, %s].", ip2str(ip),
+ conn->c_path);
+ return (EEXIST);
}
+ conn->c_sendfd = sfd;
} else {
- if (ex->e_flags == O_RDONLY) {
- g_gate_log(LOG_ERR, "Read-write access requested, but "
- "%s (%s) is exported read-only.", ex->e_path,
- ipmask);
- error = EPERM;
- } else if (ex->e_flags == O_WRONLY) {
- g_gate_log(LOG_ERR, "Read-write access requested, but "
- "%s (%s) is exported write-only.", ex->e_path,
- ipmask);
- error = EPERM;
- } else {
- sinit.gs_flags = 0;
+ if (conn->c_recvfd != -1) {
+ g_gate_log(LOG_WARNING,
+ "Receive socket already exists [%s, %s].",
+ ip2str(ip), conn->c_path);
+ return (EEXIST);
}
+ conn->c_recvfd = sfd;
}
- if (error != 0)
- sendfail(sfd, error, NULL);
- flags = g_gate_openflags(sinit.gs_flags);
- fd = open(ex->e_path, flags);
- if (fd < 0) {
- sendfail(sfd, errno, "Error while opening %s: %s.", ex->e_path,
- strerror(errno));
+ g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip),
+ conn->c_path);
+ return (0);
+}
+
+/*
+ * Remove one socket from the given connection or the whole
+ * connection if sfd == -1.
+ */
+static void
+connection_remove(struct ggd_connection *conn)
+{
+
+ LIST_REMOVE(conn, c_next);
+ g_gate_log(LOG_DEBUG, "Connection removed [%s %s].",
+ ip2str(conn->c_srcip), conn->c_path);
+ if (conn->c_sendfd != -1)
+ close(conn->c_sendfd);
+ if (conn->c_recvfd != -1)
+ close(conn->c_recvfd);
+ free(conn->c_path);
+ free(conn);
+}
+
+static int
+connection_ready(struct ggd_connection *conn)
+{
+
+ return (conn->c_sendfd != -1 && conn->c_recvfd != -1);
+}
+
+static void
+connection_launch(struct ggd_connection *conn)
+{
+ pthread_t td;
+ int error, pid;
+
+ pid = fork();
+ if (pid > 0)
+ return;
+ else if (pid == -1) {
+ g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno));
+ return;
}
+ g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path);
- g_gate_log(LOG_DEBUG, "Sending initial packet.");
/*
- * This field isn't used by ggc(8) for now.
- * It should be used in future when user don't give device size.
+ * Create condition variables and mutexes for in-queue and out-queue
+ * synchronization.
*/
- sinit.gs_mediasize = g_gate_mediasize(fd);
- sinit.gs_sectorsize = g_gate_sectorsize(fd);
- sinit.gs_error = 0;
+ error = pthread_mutex_init(&inqueue_mtx, NULL);
+ if (error != 0) {
+ g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
+ strerror(error));
+ }
+ error = pthread_cond_init(&inqueue_cond, NULL);
+ if (error != 0) {
+ g_gate_xlog("pthread_cond_init(inqueue_cond): %s.",
+ strerror(error));
+ }
+ error = pthread_mutex_init(&outqueue_mtx, NULL);
+ if (error != 0) {
+ g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.",
+ strerror(error));
+ }
+ error = pthread_cond_init(&outqueue_cond, NULL);
+ if (error != 0) {
+ g_gate_xlog("pthread_cond_init(outqueue_cond): %s.",
+ strerror(error));
+ }
+
+ /*
+ * Create threads:
+ * recvtd - thread for receiving I/O request
+ * diskio - thread for doing I/O request
+ * sendtd - thread for sending I/O requests back
+ */
+ error = pthread_create(&td, NULL, send_thread, conn);
+ if (error != 0) {
+ g_gate_xlog("pthread_create(send_thread): %s.",
+ strerror(error));
+ }
+ error = pthread_create(&td, NULL, recv_thread, conn);
+ if (error != 0) {
+ g_gate_xlog("pthread_create(recv_thread): %s.",
+ strerror(error));
+ }
+ disk_thread(conn);
+}
+
+static void
+sendfail(int sfd, int error, const char *fmt, ...)
+{
+ struct g_gate_sinit sinit;
+ va_list ap;
+ ssize_t data;
+
+ sinit.gs_error = error;
g_gate_swap2n_sinit(&sinit);
- data = send(sfd, &sinit, sizeof(sinit), 0);
+ data = g_gate_send(sfd, &sinit, sizeof(sinit), 0);
g_gate_swap2h_sinit(&sinit);
- if (data == -1) {
- sendfail(sfd, errno, "Error while sending initial packet: %s.",
+ if (data != sizeof(sinit)) {
+ g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.",
strerror(errno));
+ return;
+ }
+ if (fmt != NULL) {
+ va_start(ap, fmt);
+ g_gate_vlog(LOG_WARNING, fmt, ap);
+ va_end(ap);
}
+}
- bufsize = G_GATE_BUFSIZE_START;
- buf = malloc(bufsize);
- if (buf == NULL)
- g_gate_xlog("No enough memory.");
+static void *
+malloc_waitok(size_t size)
+{
+ void *p;
- g_gate_log(LOG_DEBUG, "New process: %u.", getpid());
+ while ((p = malloc(size)) == NULL) {
+ g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size);
+ sleep(1);
+ }
+ return (p);
+}
+
+static void *
+recv_thread(void *arg)
+{
+ struct ggd_connection *conn;
+ struct ggd_request *req;
+ ssize_t data;
+ int error, fd;
+ conn = arg;
+ g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path);
+ fd = conn->c_recvfd;
for (;;) {
/*
- * Receive request.
+ * Get header packet.
*/
- data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL);
+ req = malloc_waitok(sizeof(*req));
+ data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr),
+ MSG_WAITALL);
if (data == 0) {
g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid());
exit(EXIT_SUCCESS);
} else if (data == -1) {
g_gate_xlog("Error while receiving hdr packet: %s.",
strerror(errno));
- } else if (data != sizeof(hdr)) {
+ } else if (data != sizeof(req->r_hdr)) {
g_gate_xlog("Malformed hdr packet received.");
}
g_gate_log(LOG_DEBUG, "Received hdr packet.");
- g_gate_swap2h_hdr(&hdr);
+ g_gate_swap2h_hdr(&req->r_hdr);
+
+ g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__,
+ (intmax_t)req->r_offset, (unsigned)req->r_length);
/*
- * Increase buffer if there is need to.
+ * Allocate memory for data.
*/
- if (hdr.gh_length > bufsize) {
- bufsize = hdr.gh_length;
- g_gate_log(LOG_DEBUG, "Increasing buffer to %u.",
- bufsize);
- buf = realloc(buf, bufsize);
- if (buf == NULL)
- g_gate_xlog("No enough memory.");
- }
+ req->r_data = malloc_waitok(req->r_length);
- if (hdr.gh_cmd == BIO_READ) {
- if (pread(fd, buf, hdr.gh_length,
- hdr.gh_offset) == -1) {
- error = errno;
- g_gate_log(LOG_ERR, "Error while reading data "
- "(offset=%ju, size=%zu): %s.",
- (uintmax_t)hdr.gh_offset,
- (size_t)hdr.gh_length, strerror(error));
- } else {
- error = 0;
- }
- hdr.gh_error = error;
- g_gate_swap2n_hdr(&hdr);
- if (send(sfd, &hdr, sizeof(hdr), 0) == -1) {
- g_gate_xlog("Error while sending status: %s.",
- strerror(errno));
- }
- g_gate_swap2h_hdr(&hdr);
- /* Send data only if there was no error while pread(). */
- if (error == 0) {
- data = send(sfd, buf, hdr.gh_length, 0);
- if (data == -1) {
- g_gate_xlog("Error while sending data: "
- "%s.", strerror(errno));
- }
- g_gate_log(LOG_DEBUG, "Sent %d bytes "
- "(offset=%ju, size=%zu).", data,
- (uintmax_t)hdr.gh_offset,
- (size_t)hdr.gh_length);
- }
- } else /* if (hdr.gh_cmd == BIO_WRITE) */ {
+ /*
+ * Receive data to write for WRITE request.
+ */
+ if (req->r_cmd == GGATE_CMD_WRITE) {
g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...",
- hdr.gh_length);
- data = recv(sfd, buf, hdr.gh_length, MSG_WAITALL);
+ req->r_length);
+ data = g_gate_recv(fd, req->r_data, req->r_length,
+ MSG_WAITALL);
if (data == -1) {
g_gate_xlog("Error while receiving data: %s.",
strerror(errno));
}
- if (pwrite(fd, buf, hdr.gh_length, hdr.gh_offset) == -1) {
- error = errno;
- g_gate_log(LOG_ERR, "Error while writing data "
- "(offset=%llu, size=%u): %s.",
- hdr.gh_offset, hdr.gh_length,
- strerror(error));
- } else {
- error = 0;
+ }
+
+ /*
+ * Put the request onto the incoming queue.
+ */
+ error = pthread_mutex_lock(&inqueue_mtx);
+ assert(error == 0);
+ TAILQ_INSERT_TAIL(&inqueue, req, r_next);
+ error = pthread_cond_signal(&inqueue_cond);
+ assert(error == 0);
+ error = pthread_mutex_unlock(&inqueue_mtx);
+ assert(error == 0);
+ }
+}
+
+static void *
+disk_thread(void *arg)
+{
+ struct ggd_connection *conn;
+ struct ggd_request *req;
+ ssize_t data;
+ int error, fd;
+
+ conn = arg;
+ g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path);
+ fd = conn->c_diskfd;
+ for (;;) {
+ /*
+ * Get a request from the incoming queue.
+ */
+ error = pthread_mutex_lock(&inqueue_mtx);
+ assert(error == 0);
+ while ((req = TAILQ_FIRST(&inqueue)) == NULL) {
+ error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx);
+ assert(error == 0);
+ }
+ TAILQ_REMOVE(&inqueue, req, r_next);
+ error = pthread_mutex_unlock(&inqueue_mtx);
+ assert(error == 0);
+
+ /*
+ * Check the request.
+ */
+ assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE);
+ assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize);
+ assert((req->r_offset % conn->c_sectorsize) == 0);
+ assert((req->r_length % conn->c_sectorsize) == 0);
+
+ g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__,
+ (intmax_t)req->r_offset, (unsigned)req->r_length);
+
+ /*
+ * Do the request.
+ */
+ data = 0;
+ switch (req->r_cmd) {
+ case GGATE_CMD_READ:
+ data = pread(fd, req->r_data, req->r_length,
+ req->r_offset);
+ break;
+ case GGATE_CMD_WRITE:
+ data = pwrite(fd, req->r_data, req->r_length,
+ req->r_offset);
+ /* Free data memory here - better sooner. */
+ free(req->r_data);
+ req->r_data = NULL;
+ break;
+ }
+ if (data != (ssize_t)req->r_length) {
+ /* Report short reads/writes as I/O errors. */
+ if (errno == 0)
+ errno = EIO;
+ g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno));
+ req->r_error = errno;
+ if (req->r_data != NULL) {
+ free(req->r_data);
+ req->r_data = NULL;
}
- hdr.gh_error = error;
- g_gate_swap2n_hdr(&hdr);
- if (send(sfd, &hdr, sizeof(hdr), 0) == -1) {
- g_gate_xlog("Error while sending status: %s.",
+ }
+
+ /*
+ * Put the request onto the outgoing queue.
+ */
+ error = pthread_mutex_lock(&outqueue_mtx);
+ assert(error == 0);
+ TAILQ_INSERT_TAIL(&outqueue, req, r_next);
+ error = pthread_cond_signal(&outqueue_cond);
+ assert(error == 0);
+ error = pthread_mutex_unlock(&outqueue_mtx);
+ assert(error == 0);
+ }
+}
+
+static void *
+send_thread(void *arg)
+{
+ struct ggd_connection *conn;
+ struct ggd_request *req;
+ ssize_t data;
+ int error, fd;
+
+ conn = arg;
+ g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path);
+ fd = conn->c_sendfd;
+ for (;;) {
+ /*
+ * Get a request from the outgoing queue.
+ */
+ error = pthread_mutex_lock(&outqueue_mtx);
+ assert(error == 0);
+ while ((req = TAILQ_FIRST(&outqueue)) == NULL) {
+ error = pthread_cond_wait(&outqueue_cond,
+ &outqueue_mtx);
+ assert(error == 0);
+ }
+ TAILQ_REMOVE(&outqueue, req, r_next);
+ error = pthread_mutex_unlock(&outqueue_mtx);
+ assert(error == 0);
+
+ g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__,
+ (intmax_t)req->r_offset, (unsigned)req->r_length);
+
+ /*
+ * Send the request.
+ */
+ g_gate_swap2n_hdr(&req->r_hdr);
+ if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) {
+ g_gate_xlog("Error while sending hdr packet: %s.",
+ strerror(errno));
+ }
+ g_gate_log(LOG_DEBUG, "Sent hdr packet.");
+ g_gate_swap2h_hdr(&req->r_hdr);
+ if (req->r_data != NULL) {
+ data = g_gate_send(fd, req->r_data, req->r_length, 0);
+ if (data != (ssize_t)req->r_length) {
+ g_gate_xlog("Error while sending data: %s.",
strerror(errno));
}
- g_gate_swap2h_hdr(&hdr);
- g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%llu, "
- "size=%u).", data, hdr.gh_offset, hdr.gh_length);
+ g_gate_log(LOG_DEBUG,
+ "Sent %zd bytes (offset=%ju, size=%zu).", data,
+ (uintmax_t)req->r_offset, (size_t)req->r_length);
+ free(req->r_data);
+ }
+ free(req);
+ }
+}
+
+static void
+log_connection(struct sockaddr *from)
+{
+ in_addr_t ip;
+
+ ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr);
+ g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip));
+}
+
+static int
+handshake(struct sockaddr *from, int sfd)
+{
+ struct g_gate_version ver;
+ struct g_gate_cinit cinit;
+ struct g_gate_sinit sinit;
+ struct ggd_connection *conn;
+ struct ggd_export *ex;
+ ssize_t data;
+
+ log_connection(from);
+ /*
+ * Phase 1: Version verification.
+ */
+ g_gate_log(LOG_DEBUG, "Receiving version packet.");
+ data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL);
+ g_gate_swap2h_version(&ver);
+ if (data != sizeof(ver)) {
+ g_gate_log(LOG_WARNING, "Malformed version packet.");
+ return (0);
+ }
+ g_gate_log(LOG_DEBUG, "Version packet received.");
+ if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) {
+ g_gate_log(LOG_WARNING, "Invalid magic field.");
+ return (0);
+ }
+ if (ver.gv_version != GGATE_VERSION) {
+ g_gate_log(LOG_WARNING, "Version %u is not supported.",
+ ver.gv_version);
+ return (0);
+ }
+ ver.gv_error = 0;
+ g_gate_swap2n_version(&ver);
+ data = g_gate_send(sfd, &ver, sizeof(ver), 0);
+ g_gate_swap2h_version(&ver);
+ if (data == -1) {
+ sendfail(sfd, errno, "Error while sending version packet: %s.",
+ strerror(errno));
+ return (0);
+ }
+
+ /*
+ * Phase 2: Request verification.
+ */
+ g_gate_log(LOG_DEBUG, "Receiving initial packet.");
+ data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL);
+ g_gate_swap2h_cinit(&cinit);
+ if (data != sizeof(cinit)) {
+ g_gate_log(LOG_WARNING, "Malformed initial packet.");
+ return (0);
+ }
+ g_gate_log(LOG_DEBUG, "Initial packet received.");
+ conn = connection_find(&cinit);
+ if (conn != NULL) {
+ /*
+ * Connection should already exists.
+ */
+ g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).",
+ (unsigned long)conn->c_token);
+ if (connection_add(conn, &cinit, from, sfd) == -1) {
+ connection_remove(conn);
+ return (0);
+ }
+ } else {
+ /*
+ * New connection, allocate space.
+ */
+ conn = connection_new(&cinit, from, sfd);
+ if (conn == NULL) {
+ sendfail(sfd, ENOMEM,
+ "Cannot allocate new connection.");
+ return (0);
}
- g_gate_log(LOG_DEBUG, "Tick.");
+ g_gate_log(LOG_DEBUG, "New connection created (token=%lu).",
+ (unsigned long)conn->c_token);
+ }
+
+ ex = exports_find(from, &cinit, conn);
+ if (ex == NULL) {
+ connection_remove(conn);
+ sendfail(sfd, errno, NULL);
+ return (0);
+ }
+ if (conn->c_mediasize == 0) {
+ conn->c_mediasize = g_gate_mediasize(conn->c_diskfd);
+ conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd);
+ }
+ sinit.gs_mediasize = conn->c_mediasize;
+ sinit.gs_sectorsize = conn->c_sectorsize;
+ sinit.gs_error = 0;
+
+ g_gate_log(LOG_DEBUG, "Sending initial packet.");
+
+ g_gate_swap2n_sinit(&sinit);
+ data = g_gate_send(sfd, &sinit, sizeof(sinit), 0);
+ g_gate_swap2h_sinit(&sinit);
+ if (data == -1) {
+ sendfail(sfd, errno, "Error while sending initial packet: %s.",
+ strerror(errno));
+ return (0);
}
+
+ if (connection_ready(conn)) {
+ connection_launch(conn);
+ connection_remove(conn);
+ }
+ return (1);
}
static void
@@ -514,12 +942,9 @@ main(int argc, char *argv[])
{
struct sockaddr_in serv;
struct sockaddr from;
- in_addr_t bindaddr;
socklen_t fromlen;
- struct timeval tv;
- int on, sfd, tmpsfd;
- pid_t childpid;
- unsigned bsize, port;
+ int sfd, tmpsfd;
+ unsigned port;
bindaddr = htonl(INADDR_ANY);
port = G_GATE_PORT;
@@ -570,45 +995,27 @@ main(int argc, char *argv[])
argv += optind;
if (argv[0] != NULL)
- exports = argv[0];
+ exports_file = argv[0];
exports_get();
if (!g_gate_verbose) {
/* Run in daemon mode. */
if (daemon(0, 0) == -1)
- g_gate_xlog("Can't daemonize: %s", strerror(errno));
+ g_gate_xlog("Cannot daemonize: %s", strerror(errno));
}
signal(SIGCHLD, SIG_IGN);
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
- g_gate_xlog("Can't open stream socket: %s.", strerror(errno));
+ g_gate_xlog("Cannot open stream socket: %s.", strerror(errno));
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = bindaddr;
serv.sin_port = htons(port);
- on = 1;
- if (nagle) {
- if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on,
- sizeof(on)) == -1) {
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- }
- }
- if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
- g_gate_xlog("setsockopt(): %s.", strerror(errno));
- bsize = rcvbuf;
- if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1)
- g_gate_xlog("setsockopt(): %s.", strerror(errno));
- bsize = sndbuf;
- if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1)
- g_gate_xlog("setsockopt(): %s.", strerror(errno));
- tv.tv_sec = 10;
- tv.tv_usec = 0;
- if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 ||
- setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
- g_gate_xlog("setsockopt() error: %s.", strerror(errno));
- }
+
+ g_gate_socket_settings(sfd);
+
if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1)
g_gate_xlog("bind(): %s.", strerror(errno));
if (listen(sfd, 5) == -1)
@@ -629,21 +1036,8 @@ main(int argc, char *argv[])
exports_get();
}
- if (exports_find(&from, NULL) == NULL) {
+ if (!handshake(&from, tmpsfd))
close(tmpsfd);
- continue;
- }
-
- childpid = fork();
- if (childpid < 0) {
- g_gate_xlog("Cannot create child process: %s.",
- strerror(errno));
- } else if (childpid == 0) {
- close(sfd);
- serve(tmpsfd, &from);
- /* NOTREACHED */
- }
- close(tmpsfd);
}
close(sfd);
exit(EXIT_SUCCESS);
diff --git a/sbin/ggate/shared/ggate.c b/sbin/ggate/shared/ggate.c
index 08f13d7..c8428a6 100644
--- a/sbin/ggate/shared/ggate.c
+++ b/sbin/ggate/shared/ggate.c
@@ -38,6 +38,7 @@
#include <sys/linker.h>
#include <sys/module.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <signal.h>
#include <err.h>
@@ -206,17 +207,6 @@ g_gate_destroy(int unit, int force)
g_gate_ioctl(G_GATE_CMD_DESTROY, &ggio);
}
-int
-g_gate_openflags(unsigned ggflags)
-{
-
- if ((ggflags & G_GATE_FLAG_READONLY) != 0)
- return (O_RDONLY);
- else if ((ggflags & G_GATE_FLAG_WRITEONLY) != 0)
- return (O_WRONLY);
- return (O_RDWR);
-}
-
void
g_gate_load_module(void)
{
@@ -232,6 +222,76 @@ g_gate_load_module(void)
}
}
+ssize_t
+g_gate_send(int s, const void *buf, size_t len, int flags)
+{
+ ssize_t done = 0, done2;
+ const unsigned char *p = buf;
+
+ while (len > 0) {
+ done2 = send(s, p, len, flags);
+ if (done2 == 0)
+ break;
+ else if (done2 == -1) {
+ if (errno == EAGAIN) {
+ printf("%s: EAGAIN\n", __func__);
+ continue;
+ }
+ done = -1;
+ break;
+ }
+ done += done2;
+ p += done2;
+ len -= done2;
+ }
+ return (done);
+}
+
+ssize_t
+g_gate_recv(int s, void *buf, size_t len, int flags)
+{
+
+ return (recv(s, buf, len, flags));
+}
+
+int nagle = 1;
+unsigned rcvbuf = G_GATE_RCVBUF;
+unsigned sndbuf = G_GATE_SNDBUF;
+
+void
+g_gate_socket_settings(int sfd)
+{
+ struct timeval tv;
+ int bsize, on;
+
+ /* Socket settings. */
+ on = 1;
+ if (nagle) {
+ if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on,
+ sizeof(on)) == -1) {
+ g_gate_xlog("setsockopt() error: %s.", strerror(errno));
+ }
+ }
+ if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
+ g_gate_xlog("setsockopt(SO_REUSEADDR): %s.", strerror(errno));
+ bsize = rcvbuf;
+ if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1)
+ g_gate_xlog("setsockopt(SO_RCVBUF): %s.", strerror(errno));
+ bsize = sndbuf;
+ if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1)
+ g_gate_xlog("setsockopt(SO_SNDBUF): %s.", strerror(errno));
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
+ g_gate_log(LOG_ERR, "setsockopt(SO_SNDTIMEO) error: %s.",
+ strerror(errno));
+ }
+ if (setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
+ g_gate_log(LOG_ERR, "setsockopt(SO_RCVTIMEO) error: %s.",
+ strerror(errno));
+ }
+}
+
#ifdef LIBGEOM
static struct gclass *
find_class(struct gmesh *mesh, const char *name)
diff --git a/sbin/ggate/shared/ggate.h b/sbin/ggate/shared/ggate.h
index 12dfe6d..acbdaaa 100644
--- a/sbin/ggate/shared/ggate.h
+++ b/sbin/ggate/shared/ggate.h
@@ -32,22 +32,49 @@
#include <sys/endian.h>
#include <stdarg.h>
-#define G_GATE_BUFSIZE_START 65536
#define G_GATE_PORT 3080
#define G_GATE_RCVBUF 131072
#define G_GATE_SNDBUF 131072
#define G_GATE_QUEUE_SIZE 1024
-#define G_GATE_TIMEOUT 30
+#define G_GATE_TIMEOUT 0
+
+#define GGATE_MAGIC "GEOM_GATE "
+#define GGATE_VERSION 0
+
+#define GGATE_FLAG_RDONLY 0x0001
+#define GGATE_FLAG_WRONLY 0x0002
+/*
+ * If GGATE_FLAG_SEND not GGATE_FLAG_RECV flag is set, this is initial
+ * connection.
+ * If GGATE_FLAG_SEND flag is set - this is socket to send data.
+ * If GGATE_FLAG_RECV flag is set - this is socket to receive data.
+ */
+#define GGATE_FLAG_SEND 0x0004
+#define GGATE_FLAG_RECV 0x0008
+
+#define GGATE_CMD_READ 0
+#define GGATE_CMD_WRITE 1
extern int g_gate_devfd;
extern int g_gate_verbose;
+extern int nagle;
+extern unsigned rcvbuf, sndbuf;
+
+struct g_gate_version {
+ char gv_magic[16];
+ uint16_t gv_version;
+ uint16_t gv_error;
+} __packed;
+
/* Client's initial packet. */
struct g_gate_cinit {
- char gc_path[PATH_MAX + 1];
- uint8_t gc_flags;
-};
+ char gc_path[PATH_MAX + 1];
+ uint64_t gc_flags;
+ uint16_t gc_nconn;
+ uint32_t gc_token;
+} __packed;
/* Server's initial packet. */
struct g_gate_sinit {
@@ -55,15 +82,16 @@ struct g_gate_sinit {
uint64_t gs_mediasize;
uint32_t gs_sectorsize;
uint16_t gs_error;
-};
+} __packed;
/* Control struct. */
struct g_gate_hdr {
uint8_t gh_cmd; /* command */
uint64_t gh_offset; /* device offset */
uint32_t gh_length; /* size of block */
- int16_t gh_error; /* error value (0 if ok) */
-};
+ uint64_t gh_seq; /* request number */
+ uint16_t gh_error; /* error value (0 if ok) */
+} __packed;
void g_gate_vlog(int priority, const char *message, va_list ap);
void g_gate_log(int priority, const char *message, ...);
@@ -75,8 +103,10 @@ void g_gate_open_device(void);
void g_gate_close_device(void);
void g_gate_ioctl(unsigned long req, void *data);
void g_gate_destroy(int unit, int force);
-int g_gate_openflags(unsigned ggflags);
void g_gate_load_module(void);
+ssize_t g_gate_recv(int s, void *buf, size_t len, int flags);
+ssize_t g_gate_send(int s, const void *buf, size_t len, int flags);
+void g_gate_socket_settings(int sfd);
#ifdef LIBGEOM
void g_gate_list(int unit, int verbose);
#endif
@@ -89,17 +119,37 @@ in_addr_t g_gate_str2ip(const char *str);
*/
static __inline void
-g_gate_swap2h_cinit(struct g_gate_cinit *cinit __unused)
+g_gate_swap2h_version(struct g_gate_version *ver)
+{
+
+ ver->gv_version = be16toh(ver->gv_version);
+ ver->gv_error = be16toh(ver->gv_error);
+}
+
+static __inline void
+g_gate_swap2n_version(struct g_gate_version *ver)
+{
+
+ ver->gv_version = htobe16(ver->gv_version);
+ ver->gv_error = htobe16(ver->gv_error);
+}
+
+static __inline void
+g_gate_swap2h_cinit(struct g_gate_cinit *cinit)
{
- /* Nothing here for now. */
+ cinit->gc_flags = be64toh(cinit->gc_flags);
+ cinit->gc_nconn = be16toh(cinit->gc_nconn);
+ cinit->gc_token = be32toh(cinit->gc_token);
}
static __inline void
-g_gate_swap2n_cinit(struct g_gate_cinit *cinit __unused)
+g_gate_swap2n_cinit(struct g_gate_cinit *cinit)
{
- /* Nothing here for now. */
+ cinit->gc_flags = htobe64(cinit->gc_flags);
+ cinit->gc_nconn = htobe16(cinit->gc_nconn);
+ cinit->gc_token = htobe32(cinit->gc_token);
}
static __inline void
@@ -129,6 +179,7 @@ g_gate_swap2h_hdr(struct g_gate_hdr *hdr)
/* Swap only used fields. */
hdr->gh_offset = be64toh(hdr->gh_offset);
hdr->gh_length = be32toh(hdr->gh_length);
+ hdr->gh_seq = be64toh(hdr->gh_seq);
hdr->gh_error = be16toh(hdr->gh_error);
}
@@ -139,6 +190,7 @@ g_gate_swap2n_hdr(struct g_gate_hdr *hdr)
/* Swap only used fields. */
hdr->gh_offset = htobe64(hdr->gh_offset);
hdr->gh_length = htobe32(hdr->gh_length);
+ hdr->gh_seq = htobe64(hdr->gh_seq);
hdr->gh_error = htobe16(hdr->gh_error);
}
#endif /* _GGATE_H_ */
OpenPOWER on IntegriCloud