summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrwatson <rwatson@FreeBSD.org>2008-07-02 23:23:27 +0000
committerrwatson <rwatson@FreeBSD.org>2008-07-02 23:23:27 +0000
commit0c50a62527ee21e24fb9e93a0ebf67df0043d601 (patch)
treef93fb0d7ca2098d63cd9cb5c25ccd3e6b4f9b4d6
parenta2caa98b9568eea606d112d9042d80125e5adbb4 (diff)
downloadFreeBSD-src-0c50a62527ee21e24fb9e93a0ebf67df0043d601.zip
FreeBSD-src-0c50a62527ee21e24fb9e93a0ebf67df0043d601.tar.gz
Add soreceive_dgram(9), an optimized socket receive function for use by
datagram-only protocols, such as UDP. This version removes use of sblock(), which is not required due to an inability to interlace data improperly with datagrams, as well as avoiding some of the larger loops and state management that don't apply on datagram sockets. This is experimental code, so hook it up only for UDPv4 for testing; if there are problems we may need to revise it or turn it off by default, but it offers *significant* performance improvements for threaded UDP applications such as BIND9, nsd, and memcached using UDP. Tested by: kris, ps
-rw-r--r--sys/kern/uipc_socket.c234
-rw-r--r--sys/netinet/udp_usrreq.c1
-rw-r--r--sys/sys/socketvar.h3
3 files changed, 238 insertions, 0 deletions
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
index c9b6076..bfe01ee 100644
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -1845,6 +1845,240 @@ release:
return (error);
}
+/*
+ * Optimized version of soreceive() for simple datagram cases from userspace;
+ * this is experimental, and while heavily tested, may contain errors.
+ */
+int
+soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
+ struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
+{
+ struct mbuf *m, *m2;
+ int flags, len, error, offset;
+ struct protosw *pr = so->so_proto;
+ struct mbuf *nextrecord;
+ int orig_resid = uio->uio_resid;
+
+ if (psa != NULL)
+ *psa = NULL;
+ if (controlp != NULL)
+ *controlp = NULL;
+ if (flagsp != NULL)
+ flags = *flagsp &~ MSG_EOR;
+ else
+ flags = 0;
+
+ /*
+ * For any complicated cases, fall back to the full
+ * soreceive_generic().
+ */
+ if (mp0 != NULL || (flags & MSG_PEEK) || (flags & MSG_OOB))
+ return (soreceive_generic(so, psa, uio, mp0, controlp,
+ flagsp));
+
+ /*
+ * Enforce restrictions on use.
+ */
+ KASSERT((pr->pr_flags & PR_WANTRCVD) == 0,
+ ("soreceive_dgram: wantrcvd"));
+ KASSERT(pr->pr_flags & PR_ATOMIC, ("soreceive_dgram: !atomic"));
+ KASSERT((so->so_rcv.sb_state & SBS_RCVATMARK) == 0,
+ ("soreceive_dgram: SBS_RCVATMARK"));
+ KASSERT((so->so_proto->pr_flags & PR_CONNREQUIRED) == 0,
+ ("soreceive_dgram: P_CONNREQUIRED"));
+
+restart:
+ SOCKBUF_LOCK(&so->so_rcv);
+ m = so->so_rcv.sb_mb;
+
+ /*
+ * If we have less data than requested, block awaiting more (subject
+ * to any timeout) if:
+ * 1. the current count is less than the low water mark, or
+ * 2. MSG_WAITALL is set, and it is possible to do the entire
+ * receive operation at once if we block (resid <= hiwat).
+ * 3. MSG_DONTWAIT is not set
+ * If MSG_WAITALL is set but resid is larger than the receive buffer,
+ * we have to do the receive in sections, and thus risk returning a
+ * short count if a timeout or signal occurs after we start.
+ */
+ if (m == NULL) {
+ KASSERT(m != NULL || !so->so_rcv.sb_cc,
+ ("receive: m == %p so->so_rcv.sb_cc == %u",
+ m, so->so_rcv.sb_cc));
+ if (so->so_error) {
+ if (m != NULL)
+ goto dontblock;
+ error = so->so_error;
+ so->so_error = 0;
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ return (error);
+ }
+ SOCKBUF_LOCK_ASSERT(&so->so_rcv);
+ if (so->so_rcv.sb_state & SBS_CANTRCVMORE) {
+ if (m == NULL) {
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ return (0);
+ } else
+ goto dontblock;
+ }
+ if (uio->uio_resid == 0) {
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ return (0);
+ }
+ if ((so->so_state & SS_NBIO) ||
+ (flags & (MSG_DONTWAIT|MSG_NBIO))) {
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ error = EWOULDBLOCK;
+ return (error);
+ }
+ SBLASTRECORDCHK(&so->so_rcv);
+ SBLASTMBUFCHK(&so->so_rcv);
+
+ /* XXXRW: sbwait() may not be as happy without sblock(). */
+ error = sbwait(&so->so_rcv);
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ if (error)
+ return (error);
+ goto restart;
+ }
+dontblock:
+ /*
+ * From this point onward, we maintain 'nextrecord' as a cache of the
+ * pointer to the next record in the socket buffer. We must keep the
+ * various socket buffer pointers and local stack versions of the
+ * pointers in sync, pushing out modifications before dropping the
+ * socket buffer mutex, and re-reading them when picking it up.
+ *
+ * Otherwise, we will race with the network stack appending new data
+ * or records onto the socket buffer by using inconsistent/stale
+ * versions of the field, possibly resulting in socket buffer
+ * corruption.
+ *
+ * By holding the high-level sblock(), we prevent simultaneous
+ * readers from pulling off the front of the socket buffer.
+ */
+ SOCKBUF_LOCK_ASSERT(&so->so_rcv);
+ if (uio->uio_td)
+ uio->uio_td->td_ru.ru_msgrcv++;
+ KASSERT(m == so->so_rcv.sb_mb, ("soreceive: m != so->so_rcv.sb_mb"));
+ SBLASTRECORDCHK(&so->so_rcv);
+ SBLASTMBUFCHK(&so->so_rcv);
+ nextrecord = m->m_nextpkt;
+ if (pr->pr_flags & PR_ADDR) {
+ KASSERT(m->m_type == MT_SONAME,
+ ("m->m_type == %d", m->m_type));
+ orig_resid = 0;
+ if (psa != NULL)
+ *psa = sodupsockaddr(mtod(m, struct sockaddr *),
+ M_NOWAIT);
+ sbfree(&so->so_rcv, m);
+ so->so_rcv.sb_mb = m_free(m);
+ m = so->so_rcv.sb_mb;
+ sockbuf_pushsync(&so->so_rcv, nextrecord);
+ }
+ if (m == NULL) {
+ /* XXXRW: Can this happen? */
+ SOCKBUF_UNLOCK(&so->so_rcv);
+ return (0);
+ }
+ KASSERT(m->m_nextpkt == nextrecord,
+ ("soreceive: post-control, nextrecord !sync"));
+ if (nextrecord == NULL) {
+ KASSERT(so->so_rcv.sb_mb == m,
+ ("soreceive: post-control, sb_mb!=m"));
+ KASSERT(so->so_rcv.sb_lastrecord == m,
+ ("soreceive: post-control, lastrecord!=m"));
+ }
+
+ SOCKBUF_LOCK_ASSERT(&so->so_rcv);
+ SBLASTRECORDCHK(&so->so_rcv);
+ SBLASTMBUFCHK(&so->so_rcv);
+ KASSERT(m == so->so_rcv.sb_mb, ("soreceive_dgram: m not sb_mb"));
+ KASSERT(so->so_rcv.sb_mb->m_nextpkt == nextrecord,
+ ("soreceive_dgram: m_nextpkt != nextrecord"));
+
+ /*
+ * Pull 'm' and its chain off the front of the packet queue.
+ */
+ so->so_rcv.sb_mb = NULL;
+ sockbuf_pushsync(&so->so_rcv, nextrecord);
+
+ /*
+ * Walk 'm's chain and free that many bytes from the socket buffer.
+ */
+ for (m2 = m; m2 != NULL; m2 = m2->m_next)
+ sbfree(&so->so_rcv, m2);
+
+ /*
+ * Do a few last checks before we let go of the lock.
+ */
+ SBLASTRECORDCHK(&so->so_rcv);
+ SBLASTMBUFCHK(&so->so_rcv);
+ SOCKBUF_UNLOCK(&so->so_rcv);
+
+ /*
+ * Packet to copyout() is now in 'm' and it is disconnected from the
+ * queue.
+ *
+ * Process one or more MT_CONTROL mbufs present before any data mbufs
+ * in the first mbuf chain on the socket buffer. If MSG_PEEK, we
+ * just copy the data; if !MSG_PEEK, we call into the protocol to
+ * perform externalization (or freeing if controlp == NULL).
+ */
+ if (m->m_type == MT_CONTROL) {
+ struct mbuf *cm = NULL, *cmn;
+ struct mbuf **cme = &cm;
+
+ do {
+ m2 = m->m_next;
+ m->m_next = NULL;
+ *cme = m;
+ cme = &(*cme)->m_next;
+ m = m2;
+ } while (m != NULL && m->m_type == MT_CONTROL);
+ while (cm != NULL) {
+ cmn = cm->m_next;
+ cm->m_next = NULL;
+ if (pr->pr_domain->dom_externalize != NULL) {
+ error = (*pr->pr_domain->dom_externalize)
+ (cm, controlp);
+ } else if (controlp != NULL)
+ *controlp = cm;
+ else
+ m_freem(cm);
+ if (controlp != NULL) {
+ orig_resid = 0;
+ while (*controlp != NULL)
+ controlp = &(*controlp)->m_next;
+ }
+ cm = cmn;
+ }
+ orig_resid = 0; /* XXXRW: why this? */
+ }
+
+ KASSERT(m->m_type == MT_DATA, ("soreceive_dgram: !data"));
+
+ offset = 0;
+ while (m != NULL && uio->uio_resid > 0) {
+ len = uio->uio_resid;
+ if (len > m->m_len)
+ len = m->m_len;
+ error = uiomove(mtod(m, char *), (int)len, uio);
+ if (error) {
+ m_freem(m);
+ return (error);
+ }
+ m = m_free(m);
+ }
+ if (m != NULL && pr->pr_flags & PR_ATOMIC)
+ flags |= MSG_TRUNC;
+ m_freem(m);
+ if (flagsp != NULL)
+ *flagsp |= flags;
+ return (0);
+}
+
int
soreceive(struct socket *so, struct sockaddr **psa, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
diff --git a/sys/netinet/udp_usrreq.c b/sys/netinet/udp_usrreq.c
index 99f40aa..4479a50 100644
--- a/sys/netinet/udp_usrreq.c
+++ b/sys/netinet/udp_usrreq.c
@@ -1164,6 +1164,7 @@ struct pr_usrreqs udp_usrreqs = {
.pru_disconnect = udp_disconnect,
.pru_peeraddr = in_getpeeraddr,
.pru_send = udp_send,
+ .pru_soreceive = soreceive_dgram,
.pru_sosend = sosend_dgram,
.pru_shutdown = udp_shutdown,
.pru_sockaddr = in_getsockaddr,
diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h
index 8fb1a70..408f07b 100644
--- a/sys/sys/socketvar.h
+++ b/sys/sys/socketvar.h
@@ -548,6 +548,9 @@ int sopoll_generic(struct socket *so, int events,
struct ucred *active_cred, struct thread *td);
int soreceive(struct socket *so, struct sockaddr **paddr, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp);
+int soreceive_dgram(struct socket *so, struct sockaddr **paddr,
+ struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
+ int *flagsp);
int soreceive_generic(struct socket *so, struct sockaddr **paddr,
struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
int *flagsp);
OpenPOWER on IntegriCloud