summaryrefslogtreecommitdiffstats
path: root/sys/rpc/svc_vc.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc/svc_vc.c')
-rw-r--r--sys/rpc/svc_vc.c247
1 files changed, 120 insertions, 127 deletions
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 47530da..e3f0350 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -54,6 +54,7 @@ __FBSDID("$FreeBSD$");
#include <sys/queue.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
+#include <sys/sx.h>
#include <sys/systm.h>
#include <sys/uio.h>
#include <netinet/tcp.h>
@@ -62,16 +63,17 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc_com.h>
-static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *);
+static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
+ struct sockaddr **, struct mbuf **);
static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
static void svc_vc_rendezvous_destroy(SVCXPRT *);
static bool_t svc_vc_null(void);
static void svc_vc_destroy(SVCXPRT *);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
-static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *);
-static bool_t svc_vc_getargs(SVCXPRT *, xdrproc_t, void *);
-static bool_t svc_vc_freeargs(SVCXPRT *, xdrproc_t, void *);
-static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *);
+static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
+ struct sockaddr **, struct mbuf **);
+static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
+ struct sockaddr *, struct mbuf *);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
@@ -83,9 +85,8 @@ static void svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
static struct xp_ops svc_vc_rendezvous_ops = {
.xp_recv = svc_vc_rendezvous_recv,
.xp_stat = svc_vc_rendezvous_stat,
- .xp_getargs = (bool_t (*)(SVCXPRT *, xdrproc_t, void *))svc_vc_null,
- .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *))svc_vc_null,
- .xp_freeargs = (bool_t (*)(SVCXPRT *, xdrproc_t, void *))svc_vc_null,
+ .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *,
+ struct sockaddr *, struct mbuf *))svc_vc_null,
.xp_destroy = svc_vc_rendezvous_destroy,
.xp_control = svc_vc_rendezvous_control
};
@@ -93,9 +94,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
static struct xp_ops svc_vc_ops = {
.xp_recv = svc_vc_recv,
.xp_stat = svc_vc_stat,
- .xp_getargs = svc_vc_getargs,
.xp_reply = svc_vc_reply,
- .xp_freeargs = svc_vc_freeargs,
.xp_destroy = svc_vc_destroy,
.xp_control = svc_vc_control
};
@@ -141,28 +140,21 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
return (xprt);
}
- xprt = mem_alloc(sizeof(SVCXPRT));
- mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
+ xprt = svc_xprt_alloc();
+ sx_init(&xprt->xp_lock, "xprt->xp_lock");
xprt->xp_pool = pool;
xprt->xp_socket = so;
xprt->xp_p1 = NULL;
xprt->xp_p2 = NULL;
- xprt->xp_p3 = NULL;
- xprt->xp_verf = _null_auth;
xprt->xp_ops = &svc_vc_rendezvous_ops;
error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
if (error)
goto cleanup_svc_vc_create;
- xprt->xp_ltaddr.buf = mem_alloc(sizeof (struct sockaddr_storage));
- xprt->xp_ltaddr.maxlen = sizeof (struct sockaddr_storage);
- xprt->xp_ltaddr.len = sa->sa_len;
- memcpy(xprt->xp_ltaddr.buf, sa, sa->sa_len);
+ memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
free(sa, M_SONAME);
- xprt->xp_rtaddr.maxlen = 0;
-
xprt_register(xprt);
solisten(so, SOMAXCONN, curthread);
@@ -176,7 +168,7 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
return (xprt);
cleanup_svc_vc_create:
if (xprt)
- mem_free(xprt, sizeof(*xprt));
+ svc_xprt_free(xprt);
return (NULL);
}
@@ -218,29 +210,27 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
cd = mem_alloc(sizeof(*cd));
cd->strm_stat = XPRT_IDLE;
- xprt = mem_alloc(sizeof(SVCXPRT));
- mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
+ xprt = svc_xprt_alloc();
+ sx_init(&xprt->xp_lock, "xprt->xp_lock");
xprt->xp_pool = pool;
xprt->xp_socket = so;
xprt->xp_p1 = cd;
xprt->xp_p2 = NULL;
- xprt->xp_p3 = NULL;
- xprt->xp_verf = _null_auth;
xprt->xp_ops = &svc_vc_ops;
- xprt->xp_rtaddr.buf = mem_alloc(sizeof (struct sockaddr_storage));
- xprt->xp_rtaddr.maxlen = sizeof (struct sockaddr_storage);
- xprt->xp_rtaddr.len = raddr->sa_len;
- memcpy(xprt->xp_rtaddr.buf, raddr, raddr->sa_len);
+ /*
+ * See http://www.connectathon.org/talks96/nfstcp.pdf - client
+ * has a 5 minute timer, server has a 6 minute timer.
+ */
+ xprt->xp_idletimeout = 6 * 60;
+
+ memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
if (error)
goto cleanup_svc_vc_create;
- xprt->xp_ltaddr.buf = mem_alloc(sizeof (struct sockaddr_storage));
- xprt->xp_ltaddr.maxlen = sizeof (struct sockaddr_storage);
- xprt->xp_ltaddr.len = sa->sa_len;
- memcpy(xprt->xp_ltaddr.buf, sa, sa->sa_len);
+ memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
free(sa, M_SONAME);
xprt_register(xprt);
@@ -255,19 +245,13 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
* Throw the transport into the active list in case it already
* has some data buffered.
*/
- mtx_lock(&xprt->xp_lock);
+ sx_xlock(&xprt->xp_lock);
xprt_active(xprt);
- mtx_unlock(&xprt->xp_lock);
+ sx_xunlock(&xprt->xp_lock);
return (xprt);
cleanup_svc_vc_create:
if (xprt) {
- if (xprt->xp_ltaddr.buf)
- mem_free(xprt->xp_ltaddr.buf,
- sizeof(struct sockaddr_storage));
- if (xprt->xp_rtaddr.buf)
- mem_free(xprt->xp_rtaddr.buf,
- sizeof(struct sockaddr_storage));
mem_free(xprt, sizeof(*xprt));
}
if (cd)
@@ -335,7 +319,8 @@ done:
/*ARGSUSED*/
static bool_t
-svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
+svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
+ struct sockaddr **addrp, struct mbuf **mp)
{
struct socket *so = NULL;
struct sockaddr *sa = NULL;
@@ -347,22 +332,27 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
* connection from the socket and turn it into a new
* transport. If the accept fails, we have drained all pending
* connections so we call xprt_inactive().
- *
- * The lock protects us in the case where a new connection arrives
- * on the socket after our call to accept fails with
- * EWOULDBLOCK - the call to xprt_active() in the upcall will
- * happen only after our call to xprt_inactive() which ensures
- * that we will remain active. It might be possible to use
- * SOCKBUF_LOCK for this - its not clear to me what locks are
- * held during the upcall.
*/
- mtx_lock(&xprt->xp_lock);
+ sx_xlock(&xprt->xp_lock);
error = svc_vc_accept(xprt->xp_socket, &so);
if (error == EWOULDBLOCK) {
- xprt_inactive(xprt);
- mtx_unlock(&xprt->xp_lock);
+ /*
+ * We must re-test for new connections after taking
+ * the lock to protect us in the case where a new
+ * connection arrives after our call to accept fails
+ * with EWOULDBLOCK. The pool lock protects us from
+ * racing the upcall after our TAILQ_EMPTY() call
+ * returns false.
+ */
+ ACCEPT_LOCK();
+ mtx_lock(&xprt->xp_pool->sp_lock);
+ if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
+ xprt_inactive_locked(xprt);
+ mtx_unlock(&xprt->xp_pool->sp_lock);
+ ACCEPT_UNLOCK();
+ sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
@@ -373,11 +363,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
xprt_inactive(xprt);
- mtx_unlock(&xprt->xp_lock);
+ sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
- mtx_unlock(&xprt->xp_lock);
+ sx_xunlock(&xprt->xp_lock);
sa = 0;
error = soaccept(so, &sa);
@@ -420,18 +410,13 @@ svc_vc_destroy_common(SVCXPRT *xprt)
xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
- xprt_unregister(xprt);
-
- mtx_destroy(&xprt->xp_lock);
+ sx_destroy(&xprt->xp_lock);
if (xprt->xp_socket)
(void)soclose(xprt->xp_socket);
- if (xprt->xp_rtaddr.buf)
- (void) mem_free(xprt->xp_rtaddr.buf, xprt->xp_rtaddr.maxlen);
- if (xprt->xp_ltaddr.buf)
- (void) mem_free(xprt->xp_ltaddr.buf, xprt->xp_ltaddr.maxlen);
- (void) mem_free(xprt, sizeof (SVCXPRT));
-
+ if (xprt->xp_netid)
+ (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
+ svc_xprt_free(xprt);
}
static void
@@ -483,32 +468,48 @@ svc_vc_stat(SVCXPRT *xprt)
/*
* Return XPRT_MOREREQS if we have buffered data and we are
- * mid-record or if we have enough data for a record marker.
+ * mid-record or if we have enough data for a record
+ * marker. Since this is only a hint, we read mpending and
+ * resid outside the lock. We do need to take the lock if we
+ * have to traverse the mbuf chain.
*/
if (cd->mpending) {
if (cd->resid)
return (XPRT_MOREREQS);
n = 0;
+ sx_xlock(&xprt->xp_lock);
m = cd->mpending;
while (m && n < sizeof(uint32_t)) {
n += m->m_len;
m = m->m_next;
}
+ sx_xunlock(&xprt->xp_lock);
if (n >= sizeof(uint32_t))
return (XPRT_MOREREQS);
}
+ if (soreadable(xprt->xp_socket))
+ return (XPRT_MOREREQS);
+
return (XPRT_IDLE);
}
static bool_t
-svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
+svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
+ struct sockaddr **addrp, struct mbuf **mp)
{
struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
struct uio uio;
struct mbuf *m;
+ XDR xdrs;
int error, rcvflag;
+ /*
+ * Serialise access to the socket and our own record parsing
+ * state.
+ */
+ sx_xlock(&xprt->xp_lock);
+
for (;;) {
/*
* If we have an mbuf chain in cd->mpending, try to parse a
@@ -539,7 +540,9 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
}
if (n < sizeof(uint32_t))
goto readmore;
- cd->mpending = m_pullup(cd->mpending, sizeof(uint32_t));
+ if (cd->mpending->m_len < sizeof(uint32_t))
+ cd->mpending = m_pullup(cd->mpending,
+ sizeof(uint32_t));
memcpy(&header, mtod(cd->mpending, uint32_t *),
sizeof(header));
header = ntohl(header);
@@ -557,8 +560,12 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
*/
while (cd->mpending && cd->resid) {
m = cd->mpending;
- cd->mpending = m_split(cd->mpending, cd->resid,
- M_WAIT);
+ if (cd->mpending->m_next
+ || cd->mpending->m_len > cd->resid)
+ cd->mpending = m_split(cd->mpending,
+ cd->resid, M_WAIT);
+ else
+ cd->mpending = NULL;
if (cd->mreq)
m_last(cd->mreq)->m_next = m;
else
@@ -582,13 +589,18 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
* Success - we have a complete record in
* cd->mreq.
*/
- xdrmbuf_create(&xprt->xp_xdrreq, cd->mreq, XDR_DECODE);
+ xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
cd->mreq = NULL;
- if (! xdr_callmsg(&xprt->xp_xdrreq, msg)) {
- XDR_DESTROY(&xprt->xp_xdrreq);
+ sx_xunlock(&xprt->xp_lock);
+
+ if (! xdr_callmsg(&xdrs, msg)) {
+ XDR_DESTROY(&xdrs);
return (FALSE);
}
- xprt->xp_xid = msg->rm_xid;
+
+ *addrp = NULL;
+ *mp = xdrmbuf_getall(&xdrs);
+ XDR_DESTROY(&xdrs);
return (TRUE);
}
@@ -602,17 +614,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
* the result in cd->mpending. If the read fails,
* we have drained both cd->mpending and the socket so
* we can call xprt_inactive().
- *
- * The lock protects us in the case where a new packet arrives
- * on the socket after our call to soreceive fails with
- * EWOULDBLOCK - the call to xprt_active() in the upcall will
- * happen only after our call to xprt_inactive() which ensures
- * that we will remain active. It might be possible to use
- * SOCKBUF_LOCK for this - its not clear to me what locks are
- * held during the upcall.
*/
- mtx_lock(&xprt->xp_lock);
-
uio.uio_resid = 1000000000;
uio.uio_td = curthread;
m = NULL;
@@ -621,8 +623,20 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
&rcvflag);
if (error == EWOULDBLOCK) {
- xprt_inactive(xprt);
- mtx_unlock(&xprt->xp_lock);
+ /*
+ * We must re-test for readability after
+ * taking the lock to protect us in the case
+ * where a new packet arrives on the socket
+ * after our call to soreceive fails with
+ * EWOULDBLOCK. The pool lock protects us from
+ * racing the upcall after our soreadable()
+ * call returns false.
+ */
+ mtx_lock(&xprt->xp_pool->sp_lock);
+ if (!soreadable(xprt->xp_socket))
+ xprt_inactive_locked(xprt);
+ mtx_unlock(&xprt->xp_pool->sp_lock);
+ sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
@@ -634,7 +648,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
xprt_inactive(xprt);
cd->strm_stat = XPRT_DIED;
- mtx_unlock(&xprt->xp_lock);
+ sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
@@ -642,8 +656,9 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
/*
* EOF - the other end has closed the socket.
*/
+ xprt_inactive(xprt);
cd->strm_stat = XPRT_DIED;
- mtx_unlock(&xprt->xp_lock);
+ sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
@@ -651,53 +666,38 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg)
m_last(cd->mpending)->m_next = m;
else
cd->mpending = m;
-
- mtx_unlock(&xprt->xp_lock);
}
}
static bool_t
-svc_vc_getargs(SVCXPRT *xprt, xdrproc_t xdr_args, void *args_ptr)
-{
-
- return (xdr_args(&xprt->xp_xdrreq, args_ptr));
-}
-
-static bool_t
-svc_vc_freeargs(SVCXPRT *xprt, xdrproc_t xdr_args, void *args_ptr)
+svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
+ struct sockaddr *addr, struct mbuf *m)
{
XDR xdrs;
-
- /*
- * Free the request mbuf here - this allows us to handle
- * protocols where not all requests have replies
- * (i.e. NLM). Note that xdrmbuf_destroy handles being called
- * twice correctly - the mbuf will only be freed once.
- */
- XDR_DESTROY(&xprt->xp_xdrreq);
-
- xdrs.x_op = XDR_FREE;
- return (xdr_args(&xdrs, args_ptr));
-}
-
-static bool_t
-svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg)
-{
struct mbuf *mrep;
- bool_t stat = FALSE;
+ bool_t stat = TRUE;
int error;
/*
* Leave space for record mark.
*/
MGETHDR(mrep, M_WAIT, MT_DATA);
- MCLGET(mrep, M_WAIT);
mrep->m_len = 0;
mrep->m_data += sizeof(uint32_t);
- xdrmbuf_create(&xprt->xp_xdrrep, mrep, XDR_ENCODE);
- msg->rm_xid = xprt->xp_xid;
- if (xdr_replymsg(&xprt->xp_xdrrep, msg)) {
+ xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
+
+ if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
+ msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
+ if (!xdr_replymsg(&xdrs, msg))
+ stat = FALSE;
+ else
+ xdrmbuf_append(&xdrs, m);
+ } else {
+ stat = xdr_replymsg(&xdrs, msg);
+ }
+
+ if (stat) {
m_fixhdr(mrep);
/*
@@ -716,12 +716,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg)
m_freem(mrep);
}
- /*
- * This frees the request mbuf chain as well. The reply mbuf
- * chain was consumed by sosend.
- */
- XDR_DESTROY(&xprt->xp_xdrreq);
- XDR_DESTROY(&xprt->xp_xdrrep);
+ XDR_DESTROY(&xdrs);
xprt->xp_p2 = NULL;
return (stat);
@@ -739,9 +734,7 @@ svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
{
SVCXPRT *xprt = (SVCXPRT *) arg;
- mtx_lock(&xprt->xp_lock);
xprt_active(xprt);
- mtx_unlock(&xprt->xp_lock);
}
#if 0
@@ -757,7 +750,7 @@ __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
struct sockaddr *sa;
sock = transp->xp_fd;
- sa = (struct sockaddr *)transp->xp_rtaddr.buf;
+ sa = (struct sockaddr *)transp->xp_rtaddr;
if (sa->sa_family == AF_LOCAL) {
ret = getpeereid(sock, &euid, &egid);
if (ret == 0)
OpenPOWER on IntegriCloud