summaryrefslogtreecommitdiffstats
path: root/contrib/openbsm/bin/auditdistd/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/openbsm/bin/auditdistd/sender.c')
-rw-r--r--contrib/openbsm/bin/auditdistd/sender.c838
1 files changed, 838 insertions, 0 deletions
diff --git a/contrib/openbsm/bin/auditdistd/sender.c b/contrib/openbsm/bin/auditdistd/sender.c
new file mode 100644
index 0000000..256fbb1
--- /dev/null
+++ b/contrib/openbsm/bin/auditdistd/sender.c
@@ -0,0 +1,838 @@
+/*-
+ * Copyright (c) 2012 The FreeBSD Foundation
+ * All rights reserved.
+ *
+ * This software was developed by Pawel Jakub Dawidek under sponsorship from
+ * the FreeBSD Foundation.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
+ */
+
+#include <config/config.h>
+
+#include <sys/param.h>
+#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
+#include <sys/endian.h>
+#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
+#ifdef HAVE_MACHINE_ENDIAN_H
+#include <machine/endian.h>
+#else /* !HAVE_MACHINE_ENDIAN_H */
+#ifdef HAVE_ENDIAN_H
+#include <endian.h>
+#else /* !HAVE_ENDIAN_H */
+#error "No supported endian.h"
+#endif /* !HAVE_ENDIAN_H */
+#endif /* !HAVE_MACHINE_ENDIAN_H */
+#include <compat/endian.h>
+#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
+#include <sys/queue.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <ctype.h>
+#include <dirent.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#ifdef HAVE_LIBUTIL_H
+#include <libutil.h>
+#endif
+#include <signal.h>
+#include <string.h>
+#include <strings.h>
+
+#include <openssl/hmac.h>
+
+#ifndef HAVE_SIGTIMEDWAIT
+#include "sigtimedwait.h"
+#endif
+
+#include "auditdistd.h"
+#include "pjdlog.h"
+#include "proto.h"
+#include "sandbox.h"
+#include "subr.h"
+#include "synch.h"
+#include "trail.h"
+
+static struct adist_config *adcfg;
+static struct adist_host *adhost;
+
+static pthread_rwlock_t adist_remote_lock;
+static pthread_mutex_t adist_remote_mtx;
+static pthread_cond_t adist_remote_cond;
+static struct trail *adist_trail;
+
+static TAILQ_HEAD(, adreq) adist_free_list;
+static pthread_mutex_t adist_free_list_lock;
+static pthread_cond_t adist_free_list_cond;
+static TAILQ_HEAD(, adreq) adist_send_list;
+static pthread_mutex_t adist_send_list_lock;
+static pthread_cond_t adist_send_list_cond;
+static TAILQ_HEAD(, adreq) adist_recv_list;
+static pthread_mutex_t adist_recv_list_lock;
+static pthread_cond_t adist_recv_list_cond;
+
+static void
+init_environment(void)
+{
+ struct adreq *adreq;
+ unsigned int ii;
+
+ rw_init(&adist_remote_lock);
+ mtx_init(&adist_remote_mtx);
+ cv_init(&adist_remote_cond);
+ TAILQ_INIT(&adist_free_list);
+ mtx_init(&adist_free_list_lock);
+ cv_init(&adist_free_list_cond);
+ TAILQ_INIT(&adist_send_list);
+ mtx_init(&adist_send_list_lock);
+ cv_init(&adist_send_list_cond);
+ TAILQ_INIT(&adist_recv_list);
+ mtx_init(&adist_recv_list_lock);
+ cv_init(&adist_recv_list_cond);
+
+ for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
+ adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
+ if (adreq == NULL) {
+ pjdlog_exitx(EX_TEMPFAIL,
+ "Unable to allocate %zu bytes of memory for adreq object.",
+ sizeof(*adreq) + ADIST_BUF_SIZE);
+ }
+ adreq->adr_byteorder = ADIST_BYTEORDER;
+ adreq->adr_cmd = ADIST_CMD_UNDEFINED;
+ adreq->adr_seq = 0;
+ adreq->adr_datasize = 0;
+ TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
+ }
+}
+
+static int
+sender_connect(void)
+{
+ unsigned char rnd[32], hash[32], resp[32];
+ struct proto_conn *conn;
+ char welcome[8];
+ int16_t val;
+
+ val = 1;
+ if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
+ pjdlog_exit(EX_TEMPFAIL,
+ "Unable to send connection request to parent");
+ }
+ if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
+ pjdlog_exit(EX_TEMPFAIL,
+ "Unable to receive reply to connection request from parent");
+ }
+ if (val != 0) {
+ errno = val;
+ pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
+ adhost->adh_remoteaddr);
+ return (-1);
+ }
+ if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
+ pjdlog_exit(EX_TEMPFAIL,
+ "Unable to receive connection from parent");
+ }
+ if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
+ pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
+ /* Error in setting timeout is not critical, but why should it fail? */
+ if (proto_timeout(conn, adcfg->adc_timeout) < 0)
+ pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
+ else
+ pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
+
+ /* Exchange welcome message, which includes version number. */
+ (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
+ if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
+ pjdlog_errno(LOG_WARNING,
+ "Unable to send welcome message to %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Welcome message sent (%s).", welcome);
+ bzero(welcome, sizeof(welcome));
+ if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
+ pjdlog_errno(LOG_WARNING,
+ "Unable to receive welcome message from %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
+ !isdigit(welcome[6]) || welcome[7] != '\0') {
+ pjdlog_warning("Invalid welcome message from %s.",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Welcome message received (%s).", welcome);
+ /*
+ * Receiver can only reply with version number lower or equal to
+ * the one we sent.
+ */
+ adhost->adh_version = atoi(welcome + 5);
+ if (adhost->adh_version > ADIST_VERSION) {
+ pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
+ adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
+ proto_close(conn);
+ return (-1);
+ }
+
+ pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
+ adhost->adh_remoteaddr);
+
+ if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
+ pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
+
+ if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
+ pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Challenge received.");
+
+ if (HMAC(EVP_sha256(), adhost->adh_password,
+ (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
+ NULL) == NULL) {
+ pjdlog_warning("Unable to generate response.");
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Response generated.");
+
+ if (proto_send(conn, hash, sizeof(hash)) == -1) {
+ pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Response sent.");
+
+ if (adist_random(rnd, sizeof(rnd)) == -1) {
+ pjdlog_warning("Unable to generate challenge.");
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Challenge generated.");
+
+ if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
+ pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Challenge sent.");
+
+ if (proto_recv(conn, resp, sizeof(resp)) == -1) {
+ pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Response received.");
+
+ if (HMAC(EVP_sha256(), adhost->adh_password,
+ (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
+ NULL) == NULL) {
+ pjdlog_warning("Unable to generate hash.");
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Hash generated.");
+
+ if (memcmp(resp, hash, sizeof(hash)) != 0) {
+ pjdlog_warning("Invalid response from %s (wrong password?).",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_info("Receiver authenticated.");
+
+ if (proto_recv(conn, &adhost->adh_trail_offset,
+ sizeof(adhost->adh_trail_offset)) == -1) {
+ pjdlog_errno(LOG_WARNING,
+ "Unable to receive size of the most recent trail file from %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
+ if (proto_recv(conn, &adhost->adh_trail_name,
+ sizeof(adhost->adh_trail_name)) == -1) {
+ pjdlog_errno(LOG_WARNING,
+ "Unable to receive name of the most recent trail file from %s",
+ adhost->adh_remoteaddr);
+ proto_close(conn);
+ return (-1);
+ }
+ pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
+ adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
+
+ rw_wlock(&adist_remote_lock);
+ mtx_lock(&adist_remote_mtx);
+ PJDLOG_ASSERT(adhost->adh_remote == NULL);
+ PJDLOG_ASSERT(conn != NULL);
+ adhost->adh_remote = conn;
+ mtx_unlock(&adist_remote_mtx);
+ rw_unlock(&adist_remote_lock);
+ cv_signal(&adist_remote_cond);
+
+ return (0);
+}
+
+static void
+sender_disconnect(void)
+{
+
+ rw_wlock(&adist_remote_lock);
+ /*
+ * Check for a race between dropping rlock and acquiring wlock -
+ * another thread can close connection in-between.
+ */
+ if (adhost->adh_remote == NULL) {
+ rw_unlock(&adist_remote_lock);
+ return;
+ }
+ pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
+ proto_close(adhost->adh_remote);
+ mtx_lock(&adist_remote_mtx);
+ adhost->adh_remote = NULL;
+ adhost->adh_reset = true;
+ adhost->adh_trail_name[0] = '\0';
+ adhost->adh_trail_offset = 0;
+ mtx_unlock(&adist_remote_mtx);
+ rw_unlock(&adist_remote_lock);
+
+ pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
+
+ /* Move all in-flight requests back onto free list. */
+ mtx_lock(&adist_free_list_lock);
+ mtx_lock(&adist_send_list_lock);
+ TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
+ mtx_unlock(&adist_send_list_lock);
+ mtx_lock(&adist_recv_list_lock);
+ TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
+ mtx_unlock(&adist_recv_list_lock);
+ mtx_unlock(&adist_free_list_lock);
+}
+
+static void
+adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
+ size_t size)
+{
+ static uint64_t seq = 1;
+
+ PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
+
+ switch (cmd) {
+ case ADIST_CMD_OPEN:
+ case ADIST_CMD_CLOSE:
+ PJDLOG_ASSERT(data != NULL && size == 0);
+ size = strlen(data) + 1;
+ break;
+ case ADIST_CMD_APPEND:
+ PJDLOG_ASSERT(data != NULL && size > 0);
+ break;
+ case ADIST_CMD_KEEPALIVE:
+ case ADIST_CMD_ERROR:
+ PJDLOG_ASSERT(data == NULL && size == 0);
+ break;
+ default:
+ PJDLOG_ABORT("Invalid command (%hhu).", cmd);
+ }
+
+ adreq->adr_cmd = cmd;
+ adreq->adr_seq = seq++;
+ adreq->adr_datasize = size;
+ /* Don't copy if data is already in out buffer. */
+ if (data != NULL && data != adreq->adr_data)
+ bcopy(data, adreq->adr_data, size);
+}
+
+static bool
+read_thread_wait(void)
+{
+ bool newfile = false;
+
+ mtx_lock(&adist_remote_mtx);
+ if (adhost->adh_reset) {
+ adhost->adh_reset = false;
+ if (trail_filefd(adist_trail) != -1)
+ trail_close(adist_trail);
+ trail_reset(adist_trail);
+ while (adhost->adh_remote == NULL)
+ cv_wait(&adist_remote_cond, &adist_remote_mtx);
+ trail_start(adist_trail, adhost->adh_trail_name,
+ adhost->adh_trail_offset);
+ newfile = true;
+ }
+ mtx_unlock(&adist_remote_mtx);
+ while (trail_filefd(adist_trail) == -1) {
+ newfile = true;
+ wait_for_dir();
+ if (trail_filefd(adist_trail) == -1)
+ trail_next(adist_trail);
+ }
+ if (newfile) {
+ pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
+ adhost->adh_directory,
+ trail_filename(adist_trail));
+ (void)wait_for_file_init(trail_filefd(adist_trail));
+ }
+ return (newfile);
+}
+
+static void *
+read_thread(void *arg __unused)
+{
+ struct adreq *adreq;
+ ssize_t done;
+ bool newfile;
+
+ pjdlog_debug(1, "%s started.", __func__);
+
+ for (;;) {
+ newfile = read_thread_wait();
+ QUEUE_TAKE(adreq, &adist_free_list, 0);
+ if (newfile) {
+ adreq_fill(adreq, ADIST_CMD_OPEN,
+ trail_filename(adist_trail), 0);
+ newfile = false;
+ goto move;
+ }
+
+ done = read(trail_filefd(adist_trail), adreq->adr_data,
+ ADIST_BUF_SIZE);
+ if (done == -1) {
+ off_t offset;
+ int error;
+
+ error = errno;
+ offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
+ errno = error;
+ pjdlog_errno(LOG_ERR,
+ "Error while reading \"%s/%s\" at offset %jd",
+ adhost->adh_directory, trail_filename(adist_trail),
+ offset);
+ trail_close(adist_trail);
+ adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
+ goto move;
+ } else if (done == 0) {
+ /* End of file. */
+ pjdlog_debug(3, "End of \"%s/%s\".",
+ adhost->adh_directory, trail_filename(adist_trail));
+ if (!trail_switch(adist_trail)) {
+ /* More audit records can arrive. */
+ mtx_lock(&adist_free_list_lock);
+ TAILQ_INSERT_TAIL(&adist_free_list, adreq,
+ adr_next);
+ mtx_unlock(&adist_free_list_lock);
+ wait_for_file();
+ continue;
+ }
+ adreq_fill(adreq, ADIST_CMD_CLOSE,
+ trail_filename(adist_trail), 0);
+ trail_close(adist_trail);
+ goto move;
+ }
+
+ adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
+move:
+ pjdlog_debug(3,
+ "read thread: Moving request %p to the send queue (%hhu).",
+ adreq, adreq->adr_cmd);
+ QUEUE_INSERT(adreq, &adist_send_list);
+ }
+ /* NOTREACHED */
+ return (NULL);
+}
+
+static void
+keepalive_send(void)
+{
+ struct adreq *adreq;
+
+ rw_rlock(&adist_remote_lock);
+ if (adhost->adh_remote == NULL) {
+ rw_unlock(&adist_remote_lock);
+ return;
+ }
+ rw_unlock(&adist_remote_lock);
+
+ mtx_lock(&adist_free_list_lock);
+ adreq = TAILQ_FIRST(&adist_free_list);
+ if (adreq != NULL)
+ TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
+ mtx_unlock(&adist_free_list_lock);
+ if (adreq == NULL)
+ return;
+
+ adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
+
+ QUEUE_INSERT(adreq, &adist_send_list);
+
+ pjdlog_debug(3, "keepalive_send: Request sent.");
+}
+
+/*
+ * Thread sends request to secondary node.
+ */
+static void *
+send_thread(void *arg __unused)
+{
+ time_t lastcheck, now;
+ struct adreq *adreq;
+
+ pjdlog_debug(1, "%s started.", __func__);
+
+ lastcheck = time(NULL);
+
+ for (;;) {
+ pjdlog_debug(3, "send thread: Taking request.");
+ for (;;) {
+ QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
+ if (adreq != NULL)
+ break;
+ now = time(NULL);
+ if (lastcheck + ADIST_KEEPALIVE <= now) {
+ keepalive_send();
+ lastcheck = now;
+ }
+ }
+ PJDLOG_ASSERT(adreq != NULL);
+ pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
+ adreq->adr_cmd);
+ /*
+ * Protect connection from disappearing.
+ */
+ rw_rlock(&adist_remote_lock);
+ /*
+ * Move the request to the recv queue first to avoid race
+ * where the recv thread receives the reply before we move
+ * the request to the recv queue.
+ */
+ QUEUE_INSERT(adreq, &adist_recv_list);
+ if (adhost->adh_remote == NULL ||
+ proto_send(adhost->adh_remote, &adreq->adr_packet,
+ ADPKT_SIZE(adreq)) == -1) {
+ rw_unlock(&adist_remote_lock);
+ pjdlog_debug(1,
+ "send thread: (%p) Unable to send request.", adreq);
+ if (adhost->adh_remote != NULL)
+ sender_disconnect();
+ continue;
+ } else {
+ pjdlog_debug(3, "Request %p sent successfully.", adreq);
+ adreq_log(LOG_DEBUG, 2, -1, adreq,
+ "send: (%p) Request sent: ", adreq);
+ rw_unlock(&adist_remote_lock);
+ }
+ }
+ /* NOTREACHED */
+ return (NULL);
+}
+
+static void
+adrep_decode_header(struct adrep *adrep)
+{
+
+ /* Byte-swap only is the receiver is using different byte order. */
+ if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
+ adrep->adrp_byteorder = ADIST_BYTEORDER;
+ adrep->adrp_seq = bswap64(adrep->adrp_seq);
+ adrep->adrp_error = bswap16(adrep->adrp_error);
+ }
+}
+
+/*
+ * Thread receives answer from secondary node and passes it to ggate_send
+ * thread.
+ */
+static void *
+recv_thread(void *arg __unused)
+{
+ struct adrep adrep;
+ struct adreq *adreq;
+
+ pjdlog_debug(1, "%s started.", __func__);
+
+ for (;;) {
+ /* Wait until there is anything to receive. */
+ QUEUE_WAIT(&adist_recv_list);
+ pjdlog_debug(3, "recv thread: Got something.");
+ rw_rlock(&adist_remote_lock);
+ if (adhost->adh_remote == NULL) {
+ /*
+ * Connection is dead.
+ * XXX: We shouldn't be here.
+ */
+ rw_unlock(&adist_remote_lock);
+ continue;
+ }
+ if (proto_recv(adhost->adh_remote, &adrep,
+ sizeof(adrep)) == -1) {
+ rw_unlock(&adist_remote_lock);
+ pjdlog_errno(LOG_ERR, "Unable to receive reply");
+ sender_disconnect();
+ continue;
+ }
+ rw_unlock(&adist_remote_lock);
+ adrep_decode_header(&adrep);
+ /*
+ * Find the request that was just confirmed.
+ */
+ mtx_lock(&adist_recv_list_lock);
+ TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
+ if (adreq->adr_seq == adrep.adrp_seq) {
+ TAILQ_REMOVE(&adist_recv_list, adreq,
+ adr_next);
+ break;
+ }
+ }
+ if (adreq == NULL) {
+ /*
+ * If we disconnected in the meantime, just continue.
+ * On disconnect sender_disconnect() clears the queue,
+ * we can use that.
+ */
+ if (TAILQ_EMPTY(&adist_recv_list)) {
+ rw_unlock(&adist_remote_lock);
+ continue;
+ }
+ mtx_unlock(&adist_recv_list_lock);
+ pjdlog_error("Found no request matching received 'seq' field (%ju).",
+ (uintmax_t)adrep.adrp_seq);
+ sender_disconnect();
+ continue;
+ }
+ mtx_unlock(&adist_recv_list_lock);
+ adreq_log(LOG_DEBUG, 2, -1, adreq,
+ "recv thread: (%p) Request confirmed: ", adreq);
+ pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
+ adreq->adr_cmd);
+ if (adrep.adrp_error != 0) {
+ pjdlog_error("Receiver returned error (%s), disconnecting.",
+ adist_errstr((int)adrep.adrp_error));
+ sender_disconnect();
+ continue;
+ }
+ if (adreq->adr_cmd == ADIST_CMD_CLOSE)
+ trail_unlink(adist_trail, adreq->adr_data);
+ pjdlog_debug(3, "Request received successfully.");
+ QUEUE_INSERT(adreq, &adist_free_list);
+ }
+ /* NOTREACHED */
+ return (NULL);
+}
+
+static void
+guard_check_connection(void)
+{
+
+ PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
+
+ rw_rlock(&adist_remote_lock);
+ if (adhost->adh_remote != NULL) {
+ rw_unlock(&adist_remote_lock);
+ pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
+ adhost->adh_remoteaddr);
+ return;
+ }
+
+ /*
+ * Upgrade the lock. It doesn't have to be atomic as no other thread
+ * can change connection status from disconnected to connected.
+ */
+ rw_unlock(&adist_remote_lock);
+ pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
+ adhost->adh_remoteaddr);
+ if (sender_connect() == 0) {
+ pjdlog_info("Successfully reconnected to %s.",
+ adhost->adh_remoteaddr);
+ } else {
+ pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
+ adhost->adh_remoteaddr);
+ }
+}
+
+/*
+ * Thread guards remote connections and reconnects when needed, handles
+ * signals, etc.
+ */
+static void *
+guard_thread(void *arg __unused)
+{
+ struct timespec timeout;
+ time_t lastcheck, now;
+ sigset_t mask;
+ int signo;
+
+ lastcheck = time(NULL);
+
+ PJDLOG_VERIFY(sigemptyset(&mask) == 0);
+ PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
+ PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
+
+ timeout.tv_sec = ADIST_KEEPALIVE;
+ timeout.tv_nsec = 0;
+ signo = -1;
+
+ for (;;) {
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ sigexit_received = true;
+ pjdlog_exitx(EX_OK,
+ "Termination signal received, exiting.");
+ break;
+ default:
+ break;
+ }
+
+ pjdlog_debug(3, "remote_guard: Checking connections.");
+ now = time(NULL);
+ if (lastcheck + ADIST_KEEPALIVE <= now) {
+ guard_check_connection();
+ lastcheck = now;
+ }
+ signo = sigtimedwait(&mask, NULL, &timeout);
+ }
+ /* NOTREACHED */
+ return (NULL);
+}
+
+void
+adist_sender(struct adist_config *config, struct adist_host *adh)
+{
+ pthread_t td;
+ pid_t pid;
+ int error, mode, debuglevel;
+
+ /*
+ * Create communication channel for sending connection requests from
+ * child to parent.
+ */
+ if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
+ pjdlog_errno(LOG_ERR,
+ "Unable to create connection sockets between child and parent");
+ return;
+ }
+
+ pid = fork();
+ if (pid == -1) {
+ pjdlog_errno(LOG_ERR, "Unable to fork");
+ proto_close(adh->adh_conn);
+ adh->adh_conn = NULL;
+ return;
+ }
+
+ if (pid > 0) {
+ /* This is parent. */
+ adh->adh_worker_pid = pid;
+ /* Declare that we are receiver. */
+ proto_recv(adh->adh_conn, NULL, 0);
+ return;
+ }
+
+ adcfg = config;
+ adhost = adh;
+
+ mode = pjdlog_mode_get();
+ debuglevel = pjdlog_debug_get();
+
+ /* Declare that we are sender. */
+ proto_send(adhost->adh_conn, NULL, 0);
+
+ descriptors_cleanup(adhost);
+
+#ifdef TODO
+ descriptors_assert(adhost, mode);
+#endif
+
+ pjdlog_init(mode);
+ pjdlog_debug_set(debuglevel);
+ pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
+ role2str(adhost->adh_role));
+#ifdef HAVE_SETPROCTITLE
+ setproctitle("[%s] (%s) ", adhost->adh_name,
+ role2str(adhost->adh_role));
+#endif
+
+ /*
+ * The sender process should be able to remove entries from its
+ * trail directory, but it should not be able to write to the
+ * trail files, only read from them.
+ */
+ adist_trail = trail_new(adhost->adh_directory, false);
+ if (adist_trail == NULL)
+ exit(EX_OSFILE);
+
+ if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
+ role2str(adhost->adh_role), adhost->adh_name) != 0) {
+ exit(EX_CONFIG);
+ }
+ pjdlog_info("Privileges successfully dropped.");
+
+ /*
+ * We can ignore wait_for_dir_init() failures. It will fall back to
+ * using sleep(3).
+ */
+ (void)wait_for_dir_init(trail_dirfd(adist_trail));
+
+ init_environment();
+ if (sender_connect() == 0) {
+ pjdlog_info("Successfully connected to %s.",
+ adhost->adh_remoteaddr);
+ }
+ adhost->adh_reset = true;
+
+ /*
+ * Create the guard thread first, so we can handle signals from the
+ * very begining.
+ */
+ error = pthread_create(&td, NULL, guard_thread, NULL);
+ PJDLOG_ASSERT(error == 0);
+ error = pthread_create(&td, NULL, send_thread, NULL);
+ PJDLOG_ASSERT(error == 0);
+ error = pthread_create(&td, NULL, recv_thread, NULL);
+ PJDLOG_ASSERT(error == 0);
+ (void)read_thread(NULL);
+}
OpenPOWER on IntegriCloud