summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
authorjhb <jhb@FreeBSD.org>2009-06-01 21:17:03 +0000
committerjhb <jhb@FreeBSD.org>2009-06-01 21:17:03 +0000
commita1af9ecca44f362b24fe3a8342ca6ed8676a399c (patch)
tree13628b6be10af95db7dc7d8ef88b3291d48583ab /sys/rpc
parent9956d85f164d16d3c1db67cc01f521c1c09d5fdb (diff)
downloadFreeBSD-src-a1af9ecca44f362b24fe3a8342ca6ed8676a399c.zip
FreeBSD-src-a1af9ecca44f362b24fe3a8342ca6ed8676a399c.tar.gz
Rework socket upcalls to close some races with setup/teardown of upcalls.
- Each socket upcall is now invoked with the appropriate socket buffer locked. It is not permissible to call soisconnected() with this lock held; however, so socket upcalls now return an integer value. The two possible values are SU_OK and SU_ISCONNECTED. If an upcall returns SU_ISCONNECTED, then the soisconnected() will be invoked on the socket after the socket buffer lock is dropped. - A new API is provided for setting and clearing socket upcalls. The API consists of soupcall_set() and soupcall_clear(). - To simplify locking, each socket buffer now has a separate upcall. - When a socket upcall returns SU_ISCONNECTED, the upcall is cleared from the receive socket buffer automatically. Note that a SO_SND upcall should never return SU_ISCONNECTED. - All this means that accept filters should now return SU_ISCONNECTED instead of calling soisconnected() directly. They also no longer need to explicitly clear the upcall on the new socket. - The HTTP accept filter still uses soupcall_set() to manage its internal state machine, but other accept filters no longer have any explicit knowlege of socket upcall internals aside from their return value. - The various RPC client upcalls currently drop the socket buffer lock while invoking soreceive() as a temporary band-aid. The plan for the future is to add a new flag to allow soreceive() to be called with the socket buffer locked. - The AIO callback for socket I/O is now also invoked with the socket buffer locked. Previously sowakeup() would drop the socket buffer lock only to call aio_swake() which immediately re-acquired the socket buffer lock for the duration of the function call. Discussed with: rwatson, rmacklem
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/clnt_dg.c43
-rw-r--r--sys/rpc/clnt_vc.c30
-rw-r--r--sys/rpc/svc_dg.c17
-rw-r--r--sys/rpc/svc_vc.c25
4 files changed, 53 insertions, 62 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c
index e6d101d..880a16f 100644
--- a/sys/rpc/clnt_dg.c
+++ b/sys/rpc/clnt_dg.c
@@ -79,7 +79,7 @@ static void clnt_dg_abort(CLIENT *);
static bool_t clnt_dg_control(CLIENT *, u_int, void *);
static void clnt_dg_close(CLIENT *);
static void clnt_dg_destroy(CLIENT *);
-static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
+static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
static struct clnt_ops clnt_dg_ops = {
.cl_call = clnt_dg_call,
@@ -112,7 +112,7 @@ TAILQ_HEAD(cu_request_list, cu_request);
#define MCALL_MSG_SIZE 24
/*
- * This structure is pointed to by the socket's so_upcallarg
+ * This structure is pointed to by the socket buffer's sb_upcallarg
* member. It is separate from the client private data to facilitate
* multiple clients sharing the same socket. The cs_lock mutex is used
* to protect all fields of this structure, the socket's receive
@@ -183,6 +183,7 @@ clnt_dg_create(
CLIENT *cl = NULL; /* client handle */
struct cu_data *cu = NULL; /* private data */
struct cu_socket *cs = NULL;
+ struct sockbuf *sb;
struct timeval now;
struct rpc_msg call_msg;
struct __rpc_sockinfo si;
@@ -260,15 +261,16 @@ clnt_dg_create(
cu->cu_socket = so;
soreserve(so, 256*1024, 256*1024);
+ sb = &so->so_rcv;
SOCKBUF_LOCK(&so->so_rcv);
recheck_socket:
- if (so->so_upcall) {
- if (so->so_upcall != clnt_dg_soupcall) {
+ if (sb->sb_upcall) {
+ if (sb->sb_upcall != clnt_dg_soupcall) {
SOCKBUF_UNLOCK(&so->so_rcv);
printf("clnt_dg_create(): socket already has an incompatible upcall\n");
goto err2;
}
- cs = (struct cu_socket *) so->so_upcallarg;
+ cs = (struct cu_socket *) sb->sb_upcallarg;
mtx_lock(&cs->cs_lock);
cs->cs_refs++;
mtx_unlock(&cs->cs_lock);
@@ -277,10 +279,10 @@ recheck_socket:
* We are the first on this socket - allocate the
* structure and install it in the socket.
*/
- SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
+ SOCKBUF_UNLOCK(&so->so_rcv);
cs = mem_alloc(sizeof(*cs));
- SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
- if (so->so_upcall) {
+ SOCKBUF_LOCK(&so->so_rcv);
+ if (sb->sb_upcall) {
/*
* We have lost a race with some other client.
*/
@@ -290,9 +292,7 @@ recheck_socket:
mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
cs->cs_refs = 1;
TAILQ_INIT(&cs->cs_pending);
- so->so_upcallarg = cs;
- so->so_upcall = clnt_dg_soupcall;
- so->so_rcv.sb_flags |= SB_UPCALL;
+ soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
}
SOCKBUF_UNLOCK(&so->so_rcv);
@@ -322,7 +322,7 @@ clnt_dg_call(
struct timeval utimeout) /* seconds to wait before giving up */
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
- struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_socket *cs;
struct rpc_timers *rt;
AUTH *auth;
struct rpc_err *errp;
@@ -343,6 +343,7 @@ clnt_dg_call(
struct cu_request *cr;
int error;
+ cs = cu->cu_socket->so_rcv.sb_upcallarg;
cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
mtx_lock(&cs->cs_lock);
@@ -797,9 +798,10 @@ static bool_t
clnt_dg_control(CLIENT *cl, u_int request, void *info)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
- struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_socket *cs;
struct sockaddr *addr;
+ cs = cu->cu_socket->so_rcv.sb_upcallarg;
mtx_lock(&cs->cs_lock);
switch (request) {
@@ -929,9 +931,10 @@ static void
clnt_dg_close(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
- struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_socket *cs;
struct cu_request *cr;
+ cs = cu->cu_socket->so_rcv.sb_upcallarg;
mtx_lock(&cs->cs_lock);
if (cu->cu_closed) {
@@ -974,10 +977,11 @@ static void
clnt_dg_destroy(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
- struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_socket *cs;
struct socket *so = NULL;
bool_t lastsocketref;
+ cs = cu->cu_socket->so_rcv.sb_upcallarg;
clnt_dg_close(cl);
mtx_lock(&cs->cs_lock);
@@ -986,9 +990,7 @@ clnt_dg_destroy(CLIENT *cl)
if (cs->cs_refs == 0) {
mtx_destroy(&cs->cs_lock);
SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
- cu->cu_socket->so_upcallarg = NULL;
- cu->cu_socket->so_upcall = NULL;
- cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(cu->cu_socket, SO_RCV);
SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
mem_free(cs, sizeof(*cs));
lastsocketref = TRUE;
@@ -1023,7 +1025,7 @@ time_not_ok(struct timeval *t)
t->tv_usec < -1 || t->tv_usec > 1000000);
}
-void
+int
clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
{
struct cu_socket *cs = (struct cu_socket *) arg;
@@ -1037,12 +1039,14 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
uio.uio_resid = 1000000000;
uio.uio_td = curthread;
do {
+ SOCKBUF_UNLOCK(&so->so_rcv);
m = NULL;
control = NULL;
rcvflag = MSG_DONTWAIT;
error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
if (control)
m_freem(control);
+ SOCKBUF_LOCK(&so->so_rcv);
if (error == EWOULDBLOCK)
break;
@@ -1107,5 +1111,6 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
if (!foundreq)
m_freem(m);
} while (m);
+ return (SU_OK);
}
diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c
index 8054a2a..f094945 100644
--- a/sys/rpc/clnt_vc.c
+++ b/sys/rpc/clnt_vc.c
@@ -91,7 +91,7 @@ static bool_t clnt_vc_control(CLIENT *, u_int, void *);
static void clnt_vc_close(CLIENT *);
static void clnt_vc_destroy(CLIENT *);
static bool_t time_not_ok(struct timeval *);
-static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
+static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
static struct clnt_ops clnt_vc_ops = {
.cl_call = clnt_vc_call,
@@ -286,9 +286,7 @@ clnt_vc_create(
soreserve(ct->ct_socket, sendsz, recvsz);
SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
- ct->ct_socket->so_upcallarg = ct;
- ct->ct_socket->so_upcall = clnt_vc_soupcall;
- ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
+ soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
ct->ct_record = NULL;
@@ -750,17 +748,18 @@ clnt_vc_close(CLIENT *cl)
}
if (ct->ct_socket) {
+ ct->ct_closing = TRUE;
+ mtx_unlock(&ct->ct_lock);
+
SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
- ct->ct_socket->so_upcallarg = NULL;
- ct->ct_socket->so_upcall = NULL;
- ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(ct->ct_socket, SO_RCV);
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
/*
* Abort any pending requests and wait until everyone
* has finished with clnt_vc_call.
*/
- ct->ct_closing = TRUE;
+ mtx_lock(&ct->ct_lock);
TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
cr->cr_xid = 0;
cr->cr_error = ESHUTDOWN;
@@ -815,7 +814,7 @@ time_not_ok(struct timeval *t)
t->tv_usec <= -1 || t->tv_usec > 1000000);
}
-void
+int
clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
{
struct ct_data *ct = (struct ct_data *) arg;
@@ -840,20 +839,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
* error condition
*/
do_read = FALSE;
- SOCKBUF_LOCK(&so->so_rcv);
if (so->so_rcv.sb_cc >= sizeof(uint32_t)
|| (so->so_rcv.sb_state & SBS_CANTRCVMORE)
|| so->so_error)
do_read = TRUE;
- SOCKBUF_UNLOCK(&so->so_rcv);
if (!do_read)
- return;
+ return (SU_OK);
+ SOCKBUF_UNLOCK(&so->so_rcv);
uio.uio_resid = sizeof(uint32_t);
m = NULL;
rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
+ SOCKBUF_LOCK(&so->so_rcv);
if (error == EWOULDBLOCK)
break;
@@ -893,25 +892,25 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
* buffered.
*/
do_read = FALSE;
- SOCKBUF_LOCK(&so->so_rcv);
if (so->so_rcv.sb_cc >= ct->ct_record_resid
|| (so->so_rcv.sb_state & SBS_CANTRCVMORE)
|| so->so_error)
do_read = TRUE;
- SOCKBUF_UNLOCK(&so->so_rcv);
if (!do_read)
- return;
+ return (SU_OK);
/*
* We have the record mark. Read as much as
* the socket has buffered up to the end of
* this record.
*/
+ SOCKBUF_UNLOCK(&so->so_rcv);
uio.uio_resid = ct->ct_record_resid;
m = NULL;
rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
+ SOCKBUF_LOCK(&so->so_rcv);
if (error == EWOULDBLOCK)
break;
@@ -980,4 +979,5 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
}
}
} while (m);
+ return (SU_OK);
}
diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c
index 72721b0..0747d1d 100644
--- a/sys/rpc/svc_dg.c
+++ b/sys/rpc/svc_dg.c
@@ -68,7 +68,7 @@ static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *,
struct sockaddr *, struct mbuf *);
static void svc_dg_destroy(SVCXPRT *);
static bool_t svc_dg_control(SVCXPRT *, const u_int, void *);
-static void svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
+static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
static struct xp_ops svc_dg_ops = {
.xp_recv = svc_dg_recv,
@@ -133,9 +133,7 @@ svc_dg_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
xprt_register(xprt);
SOCKBUF_LOCK(&so->so_rcv);
- so->so_upcallarg = xprt;
- so->so_upcall = svc_dg_soupcall;
- so->so_rcv.sb_flags |= SB_UPCALL;
+ soupcall_set(so, SO_RCV, svc_dg_soupcall, xprt);
SOCKBUF_UNLOCK(&so->so_rcv);
return (xprt);
@@ -205,9 +203,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg,
if (error) {
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
- xprt->xp_socket->so_upcallarg = NULL;
- xprt->xp_socket->so_upcall = NULL;
- xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(xprt->xp_socket, SO_RCV);
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
xprt_inactive(xprt);
sx_xunlock(&xprt->xp_lock);
@@ -275,9 +271,7 @@ svc_dg_destroy(SVCXPRT *xprt)
{
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
- xprt->xp_socket->so_upcallarg = NULL;
- xprt->xp_socket->so_upcall = NULL;
- xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(xprt->xp_socket, SO_RCV);
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
sx_destroy(&xprt->xp_lock);
@@ -300,10 +294,11 @@ svc_dg_control(xprt, rq, in)
return (FALSE);
}
-static void
+static int
svc_dg_soupcall(struct socket *so, void *arg, int waitflag)
{
SVCXPRT *xprt = (SVCXPRT *) arg;
xprt_active(xprt);
+ return (SU_OK);
}
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index e3f0350..9f6a4a4 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -80,7 +80,7 @@ static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
struct sockaddr *raddr);
static int svc_vc_accept(struct socket *head, struct socket **sop);
-static void svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
+static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
static struct xp_ops svc_vc_rendezvous_ops = {
.xp_recv = svc_vc_rendezvous_recv,
@@ -160,9 +160,7 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
solisten(so, SOMAXCONN, curthread);
SOCKBUF_LOCK(&so->so_rcv);
- so->so_upcallarg = xprt;
- so->so_upcall = svc_vc_soupcall;
- so->so_rcv.sb_flags |= SB_UPCALL;
+ soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
SOCKBUF_UNLOCK(&so->so_rcv);
return (xprt);
@@ -236,9 +234,7 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
xprt_register(xprt);
SOCKBUF_LOCK(&so->so_rcv);
- so->so_upcallarg = xprt;
- so->so_upcall = svc_vc_soupcall;
- so->so_rcv.sb_flags |= SB_UPCALL;
+ soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
SOCKBUF_UNLOCK(&so->so_rcv);
/*
@@ -358,9 +354,7 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
if (error) {
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
- xprt->xp_socket->so_upcallarg = NULL;
- xprt->xp_socket->so_upcall = NULL;
- xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(xprt->xp_socket, SO_RCV);
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
xprt_inactive(xprt);
sx_xunlock(&xprt->xp_lock);
@@ -405,9 +399,7 @@ static void
svc_vc_destroy_common(SVCXPRT *xprt)
{
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
- xprt->xp_socket->so_upcallarg = NULL;
- xprt->xp_socket->so_upcall = NULL;
- xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(xprt->xp_socket, SO_RCV);
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
sx_destroy(&xprt->xp_lock);
@@ -642,9 +634,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
if (error) {
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
- xprt->xp_socket->so_upcallarg = NULL;
- xprt->xp_socket->so_upcall = NULL;
- xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
+ soupcall_clear(xprt->xp_socket, SO_RCV);
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
xprt_inactive(xprt);
cd->strm_stat = XPRT_DIED;
@@ -729,12 +719,13 @@ svc_vc_null()
return (FALSE);
}
-static void
+static int
svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
{
SVCXPRT *xprt = (SVCXPRT *) arg;
xprt_active(xprt);
+ return (SU_OK);
}
#if 0
OpenPOWER on IntegriCloud