summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorluigi <luigi@FreeBSD.org>2012-12-23 23:03:45 +0000
committerluigi <luigi@FreeBSD.org>2012-12-23 23:03:45 +0000
commitda7a62683030c2747662ae8b6bda3acac95e794d (patch)
tree4021a7f1af602c4e1840965e65487fd83984f3ab /tools
parentc6bad3bef79216ab7313dba42827d02cecb65b47 (diff)
downloadFreeBSD-src-da7a62683030c2747662ae8b6bda3acac95e794d.zip
FreeBSD-src-da7a62683030c2747662ae8b6bda3acac95e794d.tar.gz
small cleanup of the code, and add support for running multiple
threads on each socket.
Diffstat (limited to 'tools')
-rw-r--r--tools/tools/netrate/netreceive/netreceive.c165
1 files changed, 144 insertions, 21 deletions
diff --git a/tools/tools/netrate/netreceive/netreceive.c b/tools/tools/netrate/netreceive/netreceive.c
index 9300109..80be693 100644
--- a/tools/tools/netrate/netreceive/netreceive.c
+++ b/tools/tools/netrate/netreceive/netreceive.c
@@ -43,27 +43,158 @@
#define MAXSOCK 20
+#include <pthread.h>
+#include <fcntl.h>
+#include <time.h> /* clock_getres() */
+
+static int round_to(int n, int l)
+{
+ return ((n + l - 1)/l)*l;
+}
+
+/*
+ * Each socket uses multiple threads so the receiver is
+ * more efficient. A collector thread runs the stats.
+ */
+struct td_desc {
+ pthread_t td_id;
+ uint64_t count; /* rx counter */
+ int fd;
+ char *buf;
+ int buflen;
+};
+
static void
usage(void)
{
- fprintf(stderr, "netreceive [port]\n");
+ fprintf(stderr, "netreceive port [nthreads]\n");
exit(-1);
}
+static __inline void
+timespec_add(struct timespec *tsa, struct timespec *tsb)
+{
+
+ tsa->tv_sec += tsb->tv_sec;
+ tsa->tv_nsec += tsb->tv_nsec;
+ if (tsa->tv_nsec >= 1000000000) {
+ tsa->tv_sec++;
+ tsa->tv_nsec -= 1000000000;
+ }
+}
+
+static __inline void
+timespec_sub(struct timespec *tsa, struct timespec *tsb)
+{
+
+ tsa->tv_sec -= tsb->tv_sec;
+ tsa->tv_nsec -= tsb->tv_nsec;
+ if (tsa->tv_nsec < 0) {
+ tsa->tv_sec--;
+ tsa->tv_nsec += 1000000000;
+ }
+}
+
+static void *
+rx_body(void *data)
+{
+ struct td_desc *t = data;
+ struct pollfd fds;
+ int y;
+
+ fds.fd = t->fd;
+ fds.events = POLLIN;
+
+ for (;;) {
+ if (poll(&fds, 1, -1) < 0)
+ perror("poll on thread");
+ if (!(fds.revents & POLLIN))
+ continue;
+ for (;;) {
+ y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT);
+ if (y < 0)
+ break;
+ t->count++;
+ }
+ }
+ return NULL;
+}
+
+int
+make_threads(struct td_desc **tp, int *s, int nsock, int nthreads)
+{
+ int i, si, nt = nsock * nthreads;
+ int lb = round_to(nt * sizeof (struct td_desc *), 64);
+ int td_len = round_to(sizeof(struct td_desc), 64); // cache align
+ char *m = calloc(1, lb + td_len * nt);
+
+ printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len);
+ /* pointers plus the structs */
+ if (m == NULL) {
+ perror("no room for pointers!");
+ exit(1);
+ }
+ tp = (struct td_desc **)m;
+ m += lb; /* skip the pointers */
+ for (si = i = 0; i < nt; i++, m += td_len) {
+ tp[i] = (struct td_desc *)m;
+ tp[i]->fd = s[si];
+ if (++si == nsock)
+ si = 0;
+ if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) {
+ perror("unable to create thread");
+ exit(1);
+ }
+ }
+}
+
+int
+main_thread(struct td_desc **tp, int nsock, int nthreads)
+{
+ uint64_t c0, c1;
+ struct timespec now, then, delta;
+ /* now the parent collects and prints results */
+ c0 = c1 = 0;
+ clock_gettime(CLOCK_REALTIME, &then);
+ fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec);
+ while (1) {
+ int i, nt = nsock * nthreads;
+ int64_t dn;
+ uint64_t pps;
+
+ if (poll(NULL, 0, 500) < 0)
+ perror("poll");
+ c0 = 0;
+ for (i = 0; i < nt; i++) {
+ c0 += tp[i]->count;
+ }
+ dn = c0 - c1;
+ clock_gettime(CLOCK_REALTIME, &now);
+ delta = now;
+ timespec_sub(&delta, &then);
+ then = now;
+ pps = dn;
+ pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1);
+ fprintf(stderr, "%d pkts in %ld.%09ld ns %ld pps\n",
+ (int)dn, delta.tv_sec, delta.tv_nsec, (long)pps);
+ c1 = c0;
+ }
+}
+
int
main(int argc, char *argv[])
{
struct addrinfo hints, *res, *res0;
char *dummy, *packet;
int port;
- int error, v, i;
+ int error, v, nthreads = 1;
+ struct td_desc **tp;
const char *cause = NULL;
int s[MAXSOCK];
- struct pollfd fds[MAXSOCK];
int nsock;
- if (argc != 2)
+ if (argc < 2)
usage();
memset(&hints, 0, sizeof(hints));
@@ -74,6 +205,10 @@ main(int argc, char *argv[])
port = strtoul(argv[1], &dummy, 10);
if (port < 1 || port > 65535 || *dummy != '\0')
usage();
+ if (argc > 2)
+ nthreads = strtoul(argv[2], &dummy, 10);
+ if (nthreads < 1 || nthreads > 64)
+ usage();
packet = malloc(65536);
if (packet == NULL) {
@@ -110,9 +245,6 @@ main(int argc, char *argv[])
continue;
}
(void) listen(s[nsock], 5);
- fds[nsock].fd = s[nsock];
- fds[nsock].events = POLLIN;
-
nsock++;
}
if (nsock == 0) {
@@ -121,21 +253,12 @@ main(int argc, char *argv[])
/*NOTREACHED*/
}
- printf("netreceive listening on UDP port %d\n", (u_short)port);
+ printf("netreceive %d sockets x %d threads listening on UDP port %d\n",
+ nsock, nthreads, (u_short)port);
+
+ make_threads(tp, s, nsock, nthreads);
+ main_thread(tp, nsock, nthreads);
- while (1) {
- if (poll(fds, nsock, -1) < 0)
- perror("poll");
- for (i = 0; i < nsock; i++) {
- if (fds[i].revents & POLLIN) {
- if (recv(s[i], packet, 65536, 0) < 0)
- perror("recv");
- }
- if ((fds[i].revents &~ POLLIN) != 0)
- perror("poll");
- }
- }
-
/*NOTREACHED*/
freeaddrinfo(res0);
}
OpenPOWER on IntegriCloud