diff options
author | pjd <pjd@FreeBSD.org> | 2005-07-08 21:28:26 +0000 |
---|---|---|
committer | pjd <pjd@FreeBSD.org> | 2005-07-08 21:28:26 +0000 |
commit | 48406acfafebf9020783e0c2d8a8b79be9e465d6 (patch) | |
tree | 36a5f3303e7dcfeb9f092f43b6c42199acf5ea18 /sbin/ggate/ggated/ggated.c | |
parent | 9ef3d97ebe1d220afd368af05a870dd6fb7896b4 (diff) | |
download | FreeBSD-src-48406acfafebf9020783e0c2d8a8b79be9e465d6.zip FreeBSD-src-48406acfafebf9020783e0c2d8a8b79be9e465d6.tar.gz |
Reimplement ggatec/ggated applications.
Change communication protocol to be much more resistant on network
problems and to allow for much better performance.
Better performance is achieved by creating two connections between
ggatec and ggated one for sending the data and one for receiving it.
Every connection is handled by separeted thread, so there is no more
synchronous data flow (send and wait for response), now one threads
sends all requests and another receives the data.
Use two threads in ggatec(8):
- sendtd, which takes I/O requests from the kernel and sends them to the
ggated daemon on the other end;
- recvtd, which waits for ggated responses and forwards them to the kernel.
Use three threads in ggated(8):
- recvtd, which waits for I/O requests and puts them onto incoming queue;
- disktd, which takes requests from the incoming queue, does disk operations
and puts finished requests onto outgoing queue;
- sendtd, which takes finished requests from the outgoing queue and sends
responses back to ggatec.
Because there were major changes in communication protocol, there is no
backward compatibility, from now on, both client and server has to run
on 5.x or 6.x (or at least ggated should be from the same FreeBSD version
on which ggatec is running).
For Gbit networks some buffers need to be increased. I use those settings:
kern.ipc.maxsockbuf=16777216
net.inet.tcp.sendspace=8388608
net.inet.tcp.recvspace=8388608
and I use '-S 4194304 -R 4194304' options for both, ggatec and ggated.
Approved by: re (scottl)
Diffstat (limited to 'sbin/ggate/ggated/ggated.c')
-rw-r--r-- | sbin/ggate/ggated/ggated.c | 866 |
1 files changed, 630 insertions, 236 deletions
diff --git a/sbin/ggate/ggated/ggated.c b/sbin/ggate/ggated/ggated.c index a0628ea..82d66e1 100644 --- a/sbin/ggate/ggated/ggated.c +++ b/sbin/ggate/ggated/ggated.c @@ -31,6 +31,7 @@ #include <stdint.h> #include <unistd.h> #include <fcntl.h> +#include <pthread.h> #include <sys/param.h> #include <sys/queue.h> #include <sys/endian.h> @@ -44,6 +45,7 @@ #include <netinet/tcp.h> #include <arpa/inet.h> #include <signal.h> +#include <assert.h> #include <err.h> #include <errno.h> #include <string.h> @@ -51,32 +53,58 @@ #include <syslog.h> #include <stdarg.h> -#include <geom/gate/g_gate.h> #include "ggate.h" -#define G_GATED_EXPORT_FILE "/etc/gg.exports" -#define G_GATED_DEBUG(...) \ - if (g_gate_verbose) { \ - printf(__VA_ARGS__); \ - printf("\n"); \ - } +#define GGATED_EXPORT_FILE "/etc/gg.exports" + +struct ggd_connection { + off_t c_mediasize; + off_t c_sectorsize; + unsigned c_flags; /* flags (RO/RW) */ + int c_diskfd; + int c_sendfd; + int c_recvfd; + time_t c_birthtime; + char *c_path; + uint64_t c_token; + in_addr_t c_srcip; + LIST_ENTRY(ggd_connection) c_next; +}; -static const char *exports = G_GATED_EXPORT_FILE; -static int got_sighup = 0; -static int nagle = 1; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +struct ggd_request { + struct g_gate_hdr r_hdr; + char *r_data; + TAILQ_ENTRY(ggd_request) r_next; +}; +#define r_cmd r_hdr.gh_cmd +#define r_offset r_hdr.gh_offset +#define r_length r_hdr.gh_length +#define r_error r_hdr.gh_error -struct export { +struct ggd_export { char *e_path; /* path to device/file */ in_addr_t e_ip; /* remote IP address */ in_addr_t e_mask; /* IP mask */ unsigned e_flags; /* flags (RO/RW) */ - SLIST_ENTRY(export) e_next; + SLIST_ENTRY(ggd_export) e_next; }; -static SLIST_HEAD(, export) exports_list = - SLIST_HEAD_INITIALIZER(&exports_list); + +static const char *exports_file = GGATED_EXPORT_FILE; +static int got_sighup = 0; +in_addr_t bindaddr; + +static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue); +static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue); +pthread_mutex_t inqueue_mtx, outqueue_mtx; +pthread_cond_t inqueue_cond, outqueue_cond; + +static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(&exports); +static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(&connection); + +static void *recv_thread(void *arg); +static void *disk_thread(void *arg); +static void *send_thread(void *arg); static void usage(void) @@ -118,7 +146,7 @@ countmask(unsigned m) static void line_parse(char *line, unsigned lineno) { - struct export *ex; + struct ggd_export *ex; char *word, *path, *sflags; unsigned flags, i, vmask; in_addr_t ip, mask; @@ -193,7 +221,7 @@ line_parse(char *line, unsigned lineno) ex->e_mask = mask; ex->e_flags = flags; - SLIST_INSERT_HEAD(&exports_list, ex, e_next); + SLIST_INSERT_HEAD(&exports, ex, e_next); g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.", ip2str(ex->e_ip), vmask, path, sflags); @@ -202,11 +230,11 @@ line_parse(char *line, unsigned lineno) static void exports_clear(void) { - struct export *ex; + struct ggd_export *ex; - while (!SLIST_EMPTY(&exports_list)) { - ex = SLIST_FIRST(&exports_list); - SLIST_REMOVE_HEAD(&exports_list, e_next); + while (!SLIST_EMPTY(&exports)) { + ex = SLIST_FIRST(&exports); + SLIST_REMOVE_HEAD(&exports, e_next); free(ex); } } @@ -221,13 +249,13 @@ exports_get(void) exports_clear(); - fd = fopen(exports, "r"); + fd = fopen(exports_file, "r"); if (fd == NULL) { - g_gate_xlog("Cannot open exports file (%s): %s.", exports, + g_gate_xlog("Cannot open exports file (%s): %s.", exports_file, strerror(errno)); } - g_gate_log(LOG_INFO, "Reading exports file (%s).", exports); + g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file); for (;;) { if (fgets(buf, sizeof(buf), fd) == NULL) { @@ -270,236 +298,636 @@ exports_get(void) g_gate_log(LOG_INFO, "Exporting %u object(s).", objs); } -static struct export * -exports_find(struct sockaddr *s, const char *path) +static int +exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit, + struct ggd_connection *conn) +{ + char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ + int error = 0, flags; + + strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); + strlcat(ipmask, "/", sizeof(ipmask)); + strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); + if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) { + if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-only access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_RDONLY; + } + } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Write-only access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_WRONLY; + } + } else { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } + } + if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0) + flags = O_RDONLY; + else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0) + flags = O_WRONLY; + else + flags = O_RDWR; + conn->c_diskfd = open(ex->e_path, flags); + if (conn->c_diskfd == -1) { + error = errno; + g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path, + strerror(error)); + return (error); + } + return (0); +} + +static struct ggd_export * +exports_find(struct sockaddr *s, struct g_gate_cinit *cinit, + struct ggd_connection *conn) { - struct export *ex; + struct ggd_export *ex; in_addr_t ip; + int error; ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); - SLIST_FOREACH(ex, &exports_list, e_next) { - if ((ip & ex->e_mask) != ex->e_ip) + SLIST_FOREACH(ex, &exports, e_next) { + if ((ip & ex->e_mask) != ex->e_ip) { + g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.", + ex->e_path); continue; - if (path != NULL && strcmp(path, ex->e_path) != 0) + } + if (strcmp(cinit->gc_path, ex->e_path) != 0) { + g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.", + ex->e_path); continue; - - g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); - return (ex); + } + error = exports_check(ex, cinit, conn); + if (error == 0) + return (ex); + else { + errno = error; + return (NULL); + } } - g_gate_log(LOG_INFO, "Unauthorized connection from: %s.", ip2str(ip)); - + g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.", + ip2str(ip)); + errno = EPERM; return (NULL); } +/* + * Remove timed out connections. + */ static void -sendfail(int sfd, int error, const char *fmt, ...) +connection_cleanups(void) { - struct g_gate_sinit sinit; - va_list ap; - int data; - - sinit.gs_error = error; - g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); - g_gate_swap2h_sinit(&sinit); - if (data == -1) { - g_gate_xlog("Error while sending initial packet: %s.", - strerror(errno)); - } - if (fmt != NULL) { - va_start(ap, fmt); - g_gate_xvlog(fmt, ap); - /* NOTREACHED */ - va_end(ap); + struct ggd_connection *conn, *tconn; + time_t now; + + time(&now); + LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) { + if (now - conn->c_birthtime > 10) { + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_NOTICE, + "Connection from %s [%s] removed.", + ip2str(conn->c_srcip), conn->c_path); + close(conn->c_diskfd); + close(conn->c_sendfd); + close(conn->c_recvfd); + free(conn->c_path); + free(conn); + } } - exit(EXIT_FAILURE); } -static void -serve(int sfd, struct sockaddr *s) +static struct ggd_connection * +connection_find(struct g_gate_cinit *cinit) { - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct g_gate_hdr hdr; - struct export *ex; - char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ - size_t bufsize; - int32_t error; - int fd, flags; - ssize_t data; - char *buf; + struct ggd_connection *conn; - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - data = recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); - g_gate_swap2h_cinit(&cinit); - if (data == -1) { - g_gate_xlog("Error while receiving initial packet: %s.", - strerror(errno)); + LIST_FOREACH(conn, &connections, c_next) { + if (conn->c_token == cinit->gc_token) + break; } + return (conn); +} - ex = exports_find(s, cinit.gc_path); - if (ex == NULL) { - sendfail(sfd, EINVAL, "Requested path isn't exported: %s.", - strerror(errno)); +static struct ggd_connection * +connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd) +{ + struct ggd_connection *conn; + in_addr_t ip; + + /* + * First, look for old connections. + * We probably should do it every X seconds, but what for? + * It is only dangerous if an attacker wants to overload connections + * queue, so here is a good place to do the cleanups. + */ + connection_cleanups(); + + conn = malloc(sizeof(*conn)); + if (conn == NULL) + return (NULL); + conn->c_path = strdup(cinit->gc_path); + if (conn->c_path == NULL) { + free(conn); + return (NULL); } + conn->c_token = cinit->gc_token; + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + conn->c_srcip = ip; + conn->c_sendfd = conn->c_recvfd = -1; + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) + conn->c_sendfd = sfd; + else + conn->c_recvfd = sfd; + conn->c_mediasize = 0; + conn->c_sectorsize = 0; + time(&conn->c_birthtime); + conn->c_flags = cinit->gc_flags; + LIST_INSERT_HEAD(&connections, conn, c_next); + g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip), + conn->c_path); + return (conn); +} - error = 0; - strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); - strlcat(ipmask, "/", sizeof(ipmask)); - strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); - if ((cinit.gc_flags & G_GATE_FLAG_READONLY) != 0) { - if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-only access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_READONLY; - } - } else if ((cinit.gc_flags & G_GATE_FLAG_WRITEONLY) != 0) { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Write-only access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_WRITEONLY; +static int +connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit, + struct sockaddr *s, int sfd) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) { + if (conn->c_sendfd != -1) { + g_gate_log(LOG_WARNING, + "Send socket already exists [%s, %s].", ip2str(ip), + conn->c_path); + return (EEXIST); } + conn->c_sendfd = sfd; } else { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = 0; + if (conn->c_recvfd != -1) { + g_gate_log(LOG_WARNING, + "Receive socket already exists [%s, %s].", + ip2str(ip), conn->c_path); + return (EEXIST); } + conn->c_recvfd = sfd; } - if (error != 0) - sendfail(sfd, error, NULL); - flags = g_gate_openflags(sinit.gs_flags); - fd = open(ex->e_path, flags); - if (fd < 0) { - sendfail(sfd, errno, "Error while opening %s: %s.", ex->e_path, - strerror(errno)); + g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip), + conn->c_path); + return (0); +} + +/* + * Remove one socket from the given connection or the whole + * connection if sfd == -1. + */ +static void +connection_remove(struct ggd_connection *conn) +{ + + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_DEBUG, "Connection removed [%s %s].", + ip2str(conn->c_srcip), conn->c_path); + if (conn->c_sendfd != -1) + close(conn->c_sendfd); + if (conn->c_recvfd != -1) + close(conn->c_recvfd); + free(conn->c_path); + free(conn); +} + +static int +connection_ready(struct ggd_connection *conn) +{ + + return (conn->c_sendfd != -1 && conn->c_recvfd != -1); +} + +static void +connection_launch(struct ggd_connection *conn) +{ + pthread_t td; + int error, pid; + + pid = fork(); + if (pid > 0) + return; + else if (pid == -1) { + g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno)); + return; } + g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path); - g_gate_log(LOG_DEBUG, "Sending initial packet."); /* - * This field isn't used by ggc(8) for now. - * It should be used in future when user don't give device size. + * Create condition variables and mutexes for in-queue and out-queue + * synchronization. */ - sinit.gs_mediasize = g_gate_mediasize(fd); - sinit.gs_sectorsize = g_gate_sectorsize(fd); - sinit.gs_error = 0; + error = pthread_mutex_init(&inqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&inqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(inqueue_cond): %s.", + strerror(error)); + } + error = pthread_mutex_init(&outqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&outqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(outqueue_cond): %s.", + strerror(error)); + } + + /* + * Create threads: + * recvtd - thread for receiving I/O request + * diskio - thread for doing I/O request + * sendtd - thread for sending I/O requests back + */ + error = pthread_create(&td, NULL, send_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(send_thread): %s.", + strerror(error)); + } + error = pthread_create(&td, NULL, recv_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + disk_thread(conn); +} + +static void +sendfail(int sfd, int error, const char *fmt, ...) +{ + struct g_gate_sinit sinit; + va_list ap; + ssize_t data; + + sinit.gs_error = error; g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); g_gate_swap2h_sinit(&sinit); - if (data == -1) { - sendfail(sfd, errno, "Error while sending initial packet: %s.", + if (data != sizeof(sinit)) { + g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.", strerror(errno)); + return; + } + if (fmt != NULL) { + va_start(ap, fmt); + g_gate_vlog(LOG_WARNING, fmt, ap); + va_end(ap); } +} - bufsize = G_GATE_BUFSIZE_START; - buf = malloc(bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); +static void * +malloc_waitok(size_t size) +{ + void *p; - g_gate_log(LOG_DEBUG, "New process: %u.", getpid()); + while ((p = malloc(size)) == NULL) { + g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size); + sleep(1); + } + return (p); +} + +static void * +recv_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_recvfd; for (;;) { /* - * Receive request. + * Get header packet. */ - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); + req = malloc_waitok(sizeof(*req)); + data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr), + MSG_WAITALL); if (data == 0) { g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid()); exit(EXIT_SUCCESS); } else if (data == -1) { g_gate_xlog("Error while receiving hdr packet: %s.", strerror(errno)); - } else if (data != sizeof(hdr)) { + } else if (data != sizeof(req->r_hdr)) { g_gate_xlog("Malformed hdr packet received."); } g_gate_log(LOG_DEBUG, "Received hdr packet."); - g_gate_swap2h_hdr(&hdr); + g_gate_swap2h_hdr(&req->r_hdr); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); /* - * Increase buffer if there is need to. + * Allocate memory for data. */ - if (hdr.gh_length > bufsize) { - bufsize = hdr.gh_length; - g_gate_log(LOG_DEBUG, "Increasing buffer to %u.", - bufsize); - buf = realloc(buf, bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); - } + req->r_data = malloc_waitok(req->r_length); - if (hdr.gh_cmd == BIO_READ) { - if (pread(fd, buf, hdr.gh_length, - hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while reading data " - "(offset=%ju, size=%zu): %s.", - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length, strerror(error)); - } else { - error = 0; - } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", - strerror(errno)); - } - g_gate_swap2h_hdr(&hdr); - /* Send data only if there was no error while pread(). */ - if (error == 0) { - data = send(sfd, buf, hdr.gh_length, 0); - if (data == -1) { - g_gate_xlog("Error while sending data: " - "%s.", strerror(errno)); - } - g_gate_log(LOG_DEBUG, "Sent %d bytes " - "(offset=%ju, size=%zu).", data, - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length); - } - } else /* if (hdr.gh_cmd == BIO_WRITE) */ { + /* + * Receive data to write for WRITE request. + */ + if (req->r_cmd == GGATE_CMD_WRITE) { g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...", - hdr.gh_length); - data = recv(sfd, buf, hdr.gh_length, MSG_WAITALL); + req->r_length); + data = g_gate_recv(fd, req->r_data, req->r_length, + MSG_WAITALL); if (data == -1) { g_gate_xlog("Error while receiving data: %s.", strerror(errno)); } - if (pwrite(fd, buf, hdr.gh_length, hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while writing data " - "(offset=%llu, size=%u): %s.", - hdr.gh_offset, hdr.gh_length, - strerror(error)); - } else { - error = 0; + } + + /* + * Put the request onto the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&inqueue, req, r_next); + error = pthread_cond_signal(&inqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + } +} + +static void * +disk_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_diskfd; + for (;;) { + /* + * Get a request from the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&inqueue)) == NULL) { + error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&inqueue, req, r_next); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + + /* + * Check the request. + */ + assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE); + assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize); + assert((req->r_offset % conn->c_sectorsize) == 0); + assert((req->r_length % conn->c_sectorsize) == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Do the request. + */ + data = 0; + switch (req->r_cmd) { + case GGATE_CMD_READ: + data = pread(fd, req->r_data, req->r_length, + req->r_offset); + break; + case GGATE_CMD_WRITE: + data = pwrite(fd, req->r_data, req->r_length, + req->r_offset); + /* Free data memory here - better sooner. */ + free(req->r_data); + req->r_data = NULL; + break; + } + if (data != (ssize_t)req->r_length) { + /* Report short reads/writes as I/O errors. */ + if (errno == 0) + errno = EIO; + g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno)); + req->r_error = errno; + if (req->r_data != NULL) { + free(req->r_data); + req->r_data = NULL; } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", + } + + /* + * Put the request onto the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&outqueue, req, r_next); + error = pthread_cond_signal(&outqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + } +} + +static void * +send_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_sendfd; + for (;;) { + /* + * Get a request from the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&outqueue)) == NULL) { + error = pthread_cond_wait(&outqueue_cond, + &outqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&outqueue, req, r_next); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Send the request. + */ + g_gate_swap2n_hdr(&req->r_hdr); + if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) { + g_gate_xlog("Error while sending hdr packet: %s.", + strerror(errno)); + } + g_gate_log(LOG_DEBUG, "Sent hdr packet."); + g_gate_swap2h_hdr(&req->r_hdr); + if (req->r_data != NULL) { + data = g_gate_send(fd, req->r_data, req->r_length, 0); + if (data != (ssize_t)req->r_length) { + g_gate_xlog("Error while sending data: %s.", strerror(errno)); } - g_gate_swap2h_hdr(&hdr); - g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%llu, " - "size=%u).", data, hdr.gh_offset, hdr.gh_length); + g_gate_log(LOG_DEBUG, + "Sent %zd bytes (offset=%ju, size=%zu).", data, + (uintmax_t)req->r_offset, (size_t)req->r_length); + free(req->r_data); + } + free(req); + } +} + +static void +log_connection(struct sockaddr *from) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr); + g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); +} + +static int +handshake(struct sockaddr *from, int sfd) +{ + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct ggd_connection *conn; + struct ggd_export *ex; + ssize_t data; + + log_connection(from); + /* + * Phase 1: Version verification. + */ + g_gate_log(LOG_DEBUG, "Receiving version packet."); + data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL); + g_gate_swap2h_version(&ver); + if (data != sizeof(ver)) { + g_gate_log(LOG_WARNING, "Malformed version packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Version packet received."); + if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) { + g_gate_log(LOG_WARNING, "Invalid magic field."); + return (0); + } + if (ver.gv_version != GGATE_VERSION) { + g_gate_log(LOG_WARNING, "Version %u is not supported.", + ver.gv_version); + return (0); + } + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + data = g_gate_send(sfd, &ver, sizeof(ver), 0); + g_gate_swap2h_version(&ver); + if (data == -1) { + sendfail(sfd, errno, "Error while sending version packet: %s.", + strerror(errno)); + return (0); + } + + /* + * Phase 2: Request verification. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); + g_gate_swap2h_cinit(&cinit); + if (data != sizeof(cinit)) { + g_gate_log(LOG_WARNING, "Malformed initial packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Initial packet received."); + conn = connection_find(&cinit); + if (conn != NULL) { + /* + * Connection should already exists. + */ + g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).", + (unsigned long)conn->c_token); + if (connection_add(conn, &cinit, from, sfd) == -1) { + connection_remove(conn); + return (0); + } + } else { + /* + * New connection, allocate space. + */ + conn = connection_new(&cinit, from, sfd); + if (conn == NULL) { + sendfail(sfd, ENOMEM, + "Cannot allocate new connection."); + return (0); } - g_gate_log(LOG_DEBUG, "Tick."); + g_gate_log(LOG_DEBUG, "New connection created (token=%lu).", + (unsigned long)conn->c_token); + } + + ex = exports_find(from, &cinit, conn); + if (ex == NULL) { + connection_remove(conn); + sendfail(sfd, errno, NULL); + return (0); + } + if (conn->c_mediasize == 0) { + conn->c_mediasize = g_gate_mediasize(conn->c_diskfd); + conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd); + } + sinit.gs_mediasize = conn->c_mediasize; + sinit.gs_sectorsize = conn->c_sectorsize; + sinit.gs_error = 0; + + g_gate_log(LOG_DEBUG, "Sending initial packet."); + + g_gate_swap2n_sinit(&sinit); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); + g_gate_swap2h_sinit(&sinit); + if (data == -1) { + sendfail(sfd, errno, "Error while sending initial packet: %s.", + strerror(errno)); + return (0); } + + if (connection_ready(conn)) { + connection_launch(conn); + connection_remove(conn); + } + return (1); } static void @@ -514,12 +942,9 @@ main(int argc, char *argv[]) { struct sockaddr_in serv; struct sockaddr from; - in_addr_t bindaddr; socklen_t fromlen; - struct timeval tv; - int on, sfd, tmpsfd; - pid_t childpid; - unsigned bsize, port; + int sfd, tmpsfd; + unsigned port; bindaddr = htonl(INADDR_ANY); port = G_GATE_PORT; @@ -570,45 +995,27 @@ main(int argc, char *argv[]) argv += optind; if (argv[0] != NULL) - exports = argv[0]; + exports_file = argv[0]; exports_get(); if (!g_gate_verbose) { /* Run in daemon mode. */ if (daemon(0, 0) == -1) - g_gate_xlog("Can't daemonize: %s", strerror(errno)); + g_gate_xlog("Cannot daemonize: %s", strerror(errno)); } signal(SIGCHLD, SIG_IGN); sfd = socket(AF_INET, SOCK_STREAM, 0); if (sfd == -1) - g_gate_xlog("Can't open stream socket: %s.", strerror(errno)); + g_gate_xlog("Cannot open stream socket: %s.", strerror(errno)); bzero(&serv, sizeof(serv)); serv.sin_family = AF_INET; serv.sin_addr.s_addr = bindaddr; serv.sin_port = htons(port); - on = 1; - if (nagle) { - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - tv.tv_sec = 10; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } + + g_gate_socket_settings(sfd); + if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) g_gate_xlog("bind(): %s.", strerror(errno)); if (listen(sfd, 5) == -1) @@ -629,21 +1036,8 @@ main(int argc, char *argv[]) exports_get(); } - if (exports_find(&from, NULL) == NULL) { + if (!handshake(&from, tmpsfd)) close(tmpsfd); - continue; - } - - childpid = fork(); - if (childpid < 0) { - g_gate_xlog("Cannot create child process: %s.", - strerror(errno)); - } else if (childpid == 0) { - close(sfd); - serve(tmpsfd, &from); - /* NOTREACHED */ - } - close(tmpsfd); } close(sfd); exit(EXIT_SUCCESS); |