diff options
author | jhb <jhb@FreeBSD.org> | 2009-06-01 21:17:03 +0000 |
---|---|---|
committer | jhb <jhb@FreeBSD.org> | 2009-06-01 21:17:03 +0000 |
commit | a1af9ecca44f362b24fe3a8342ca6ed8676a399c (patch) | |
tree | 13628b6be10af95db7dc7d8ef88b3291d48583ab /sys/rpc | |
parent | 9956d85f164d16d3c1db67cc01f521c1c09d5fdb (diff) | |
download | FreeBSD-src-a1af9ecca44f362b24fe3a8342ca6ed8676a399c.zip FreeBSD-src-a1af9ecca44f362b24fe3a8342ca6ed8676a399c.tar.gz |
Rework socket upcalls to close some races with setup/teardown of upcalls.
- Each socket upcall is now invoked with the appropriate socket buffer
locked. It is not permissible to call soisconnected() with this lock
held; however, so socket upcalls now return an integer value. The two
possible values are SU_OK and SU_ISCONNECTED. If an upcall returns
SU_ISCONNECTED, then the soisconnected() will be invoked on the
socket after the socket buffer lock is dropped.
- A new API is provided for setting and clearing socket upcalls. The
API consists of soupcall_set() and soupcall_clear().
- To simplify locking, each socket buffer now has a separate upcall.
- When a socket upcall returns SU_ISCONNECTED, the upcall is cleared from
the receive socket buffer automatically. Note that a SO_SND upcall
should never return SU_ISCONNECTED.
- All this means that accept filters should now return SU_ISCONNECTED
instead of calling soisconnected() directly. They also no longer need
to explicitly clear the upcall on the new socket.
- The HTTP accept filter still uses soupcall_set() to manage its internal
state machine, but other accept filters no longer have any explicit
knowlege of socket upcall internals aside from their return value.
- The various RPC client upcalls currently drop the socket buffer lock
while invoking soreceive() as a temporary band-aid. The plan for
the future is to add a new flag to allow soreceive() to be called with
the socket buffer locked.
- The AIO callback for socket I/O is now also invoked with the socket
buffer locked. Previously sowakeup() would drop the socket buffer
lock only to call aio_swake() which immediately re-acquired the socket
buffer lock for the duration of the function call.
Discussed with: rwatson, rmacklem
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/clnt_dg.c | 43 | ||||
-rw-r--r-- | sys/rpc/clnt_vc.c | 30 | ||||
-rw-r--r-- | sys/rpc/svc_dg.c | 17 | ||||
-rw-r--r-- | sys/rpc/svc_vc.c | 25 |
4 files changed, 53 insertions, 62 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c index e6d101d..880a16f 100644 --- a/sys/rpc/clnt_dg.c +++ b/sys/rpc/clnt_dg.c @@ -79,7 +79,7 @@ static void clnt_dg_abort(CLIENT *); static bool_t clnt_dg_control(CLIENT *, u_int, void *); static void clnt_dg_close(CLIENT *); static void clnt_dg_destroy(CLIENT *); -static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); +static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); static struct clnt_ops clnt_dg_ops = { .cl_call = clnt_dg_call, @@ -112,7 +112,7 @@ TAILQ_HEAD(cu_request_list, cu_request); #define MCALL_MSG_SIZE 24 /* - * This structure is pointed to by the socket's so_upcallarg + * This structure is pointed to by the socket buffer's sb_upcallarg * member. It is separate from the client private data to facilitate * multiple clients sharing the same socket. The cs_lock mutex is used * to protect all fields of this structure, the socket's receive @@ -183,6 +183,7 @@ clnt_dg_create( CLIENT *cl = NULL; /* client handle */ struct cu_data *cu = NULL; /* private data */ struct cu_socket *cs = NULL; + struct sockbuf *sb; struct timeval now; struct rpc_msg call_msg; struct __rpc_sockinfo si; @@ -260,15 +261,16 @@ clnt_dg_create( cu->cu_socket = so; soreserve(so, 256*1024, 256*1024); + sb = &so->so_rcv; SOCKBUF_LOCK(&so->so_rcv); recheck_socket: - if (so->so_upcall) { - if (so->so_upcall != clnt_dg_soupcall) { + if (sb->sb_upcall) { + if (sb->sb_upcall != clnt_dg_soupcall) { SOCKBUF_UNLOCK(&so->so_rcv); printf("clnt_dg_create(): socket already has an incompatible upcall\n"); goto err2; } - cs = (struct cu_socket *) so->so_upcallarg; + cs = (struct cu_socket *) sb->sb_upcallarg; mtx_lock(&cs->cs_lock); cs->cs_refs++; mtx_unlock(&cs->cs_lock); @@ -277,10 +279,10 @@ recheck_socket: * We are the first on this socket - allocate the * structure and install it in the socket. */ - SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); + SOCKBUF_UNLOCK(&so->so_rcv); cs = mem_alloc(sizeof(*cs)); - SOCKBUF_LOCK(&cu->cu_socket->so_rcv); - if (so->so_upcall) { + SOCKBUF_LOCK(&so->so_rcv); + if (sb->sb_upcall) { /* * We have lost a race with some other client. */ @@ -290,9 +292,7 @@ recheck_socket: mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); cs->cs_refs = 1; TAILQ_INIT(&cs->cs_pending); - so->so_upcallarg = cs; - so->so_upcall = clnt_dg_soupcall; - so->so_rcv.sb_flags |= SB_UPCALL; + soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); } SOCKBUF_UNLOCK(&so->so_rcv); @@ -322,7 +322,7 @@ clnt_dg_call( struct timeval utimeout) /* seconds to wait before giving up */ { struct cu_data *cu = (struct cu_data *)cl->cl_private; - struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_socket *cs; struct rpc_timers *rt; AUTH *auth; struct rpc_err *errp; @@ -343,6 +343,7 @@ clnt_dg_call( struct cu_request *cr; int error; + cs = cu->cu_socket->so_rcv.sb_upcallarg; cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); mtx_lock(&cs->cs_lock); @@ -797,9 +798,10 @@ static bool_t clnt_dg_control(CLIENT *cl, u_int request, void *info) { struct cu_data *cu = (struct cu_data *)cl->cl_private; - struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_socket *cs; struct sockaddr *addr; + cs = cu->cu_socket->so_rcv.sb_upcallarg; mtx_lock(&cs->cs_lock); switch (request) { @@ -929,9 +931,10 @@ static void clnt_dg_close(CLIENT *cl) { struct cu_data *cu = (struct cu_data *)cl->cl_private; - struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_socket *cs; struct cu_request *cr; + cs = cu->cu_socket->so_rcv.sb_upcallarg; mtx_lock(&cs->cs_lock); if (cu->cu_closed) { @@ -974,10 +977,11 @@ static void clnt_dg_destroy(CLIENT *cl) { struct cu_data *cu = (struct cu_data *)cl->cl_private; - struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_socket *cs; struct socket *so = NULL; bool_t lastsocketref; + cs = cu->cu_socket->so_rcv.sb_upcallarg; clnt_dg_close(cl); mtx_lock(&cs->cs_lock); @@ -986,9 +990,7 @@ clnt_dg_destroy(CLIENT *cl) if (cs->cs_refs == 0) { mtx_destroy(&cs->cs_lock); SOCKBUF_LOCK(&cu->cu_socket->so_rcv); - cu->cu_socket->so_upcallarg = NULL; - cu->cu_socket->so_upcall = NULL; - cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(cu->cu_socket, SO_RCV); SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); mem_free(cs, sizeof(*cs)); lastsocketref = TRUE; @@ -1023,7 +1025,7 @@ time_not_ok(struct timeval *t) t->tv_usec < -1 || t->tv_usec > 1000000); } -void +int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) { struct cu_socket *cs = (struct cu_socket *) arg; @@ -1037,12 +1039,14 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) uio.uio_resid = 1000000000; uio.uio_td = curthread; do { + SOCKBUF_UNLOCK(&so->so_rcv); m = NULL; control = NULL; rcvflag = MSG_DONTWAIT; error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); if (control) m_freem(control); + SOCKBUF_LOCK(&so->so_rcv); if (error == EWOULDBLOCK) break; @@ -1107,5 +1111,6 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) if (!foundreq) m_freem(m); } while (m); + return (SU_OK); } diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c index 8054a2a..f094945 100644 --- a/sys/rpc/clnt_vc.c +++ b/sys/rpc/clnt_vc.c @@ -91,7 +91,7 @@ static bool_t clnt_vc_control(CLIENT *, u_int, void *); static void clnt_vc_close(CLIENT *); static void clnt_vc_destroy(CLIENT *); static bool_t time_not_ok(struct timeval *); -static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); +static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); static struct clnt_ops clnt_vc_ops = { .cl_call = clnt_vc_call, @@ -286,9 +286,7 @@ clnt_vc_create( soreserve(ct->ct_socket, sendsz, recvsz); SOCKBUF_LOCK(&ct->ct_socket->so_rcv); - ct->ct_socket->so_upcallarg = ct; - ct->ct_socket->so_upcall = clnt_vc_soupcall; - ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL; + soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); ct->ct_record = NULL; @@ -750,17 +748,18 @@ clnt_vc_close(CLIENT *cl) } if (ct->ct_socket) { + ct->ct_closing = TRUE; + mtx_unlock(&ct->ct_lock); + SOCKBUF_LOCK(&ct->ct_socket->so_rcv); - ct->ct_socket->so_upcallarg = NULL; - ct->ct_socket->so_upcall = NULL; - ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(ct->ct_socket, SO_RCV); SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); /* * Abort any pending requests and wait until everyone * has finished with clnt_vc_call. */ - ct->ct_closing = TRUE; + mtx_lock(&ct->ct_lock); TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { cr->cr_xid = 0; cr->cr_error = ESHUTDOWN; @@ -815,7 +814,7 @@ time_not_ok(struct timeval *t) t->tv_usec <= -1 || t->tv_usec > 1000000); } -void +int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) { struct ct_data *ct = (struct ct_data *) arg; @@ -840,20 +839,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) * error condition */ do_read = FALSE; - SOCKBUF_LOCK(&so->so_rcv); if (so->so_rcv.sb_cc >= sizeof(uint32_t) || (so->so_rcv.sb_state & SBS_CANTRCVMORE) || so->so_error) do_read = TRUE; - SOCKBUF_UNLOCK(&so->so_rcv); if (!do_read) - return; + return (SU_OK); + SOCKBUF_UNLOCK(&so->so_rcv); uio.uio_resid = sizeof(uint32_t); m = NULL; rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); + SOCKBUF_LOCK(&so->so_rcv); if (error == EWOULDBLOCK) break; @@ -893,25 +892,25 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) * buffered. */ do_read = FALSE; - SOCKBUF_LOCK(&so->so_rcv); if (so->so_rcv.sb_cc >= ct->ct_record_resid || (so->so_rcv.sb_state & SBS_CANTRCVMORE) || so->so_error) do_read = TRUE; - SOCKBUF_UNLOCK(&so->so_rcv); if (!do_read) - return; + return (SU_OK); /* * We have the record mark. Read as much as * the socket has buffered up to the end of * this record. */ + SOCKBUF_UNLOCK(&so->so_rcv); uio.uio_resid = ct->ct_record_resid; m = NULL; rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); + SOCKBUF_LOCK(&so->so_rcv); if (error == EWOULDBLOCK) break; @@ -980,4 +979,5 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) } } } while (m); + return (SU_OK); } diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c index 72721b0..0747d1d 100644 --- a/sys/rpc/svc_dg.c +++ b/sys/rpc/svc_dg.c @@ -68,7 +68,7 @@ static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *, struct sockaddr *, struct mbuf *); static void svc_dg_destroy(SVCXPRT *); static bool_t svc_dg_control(SVCXPRT *, const u_int, void *); -static void svc_dg_soupcall(struct socket *so, void *arg, int waitflag); +static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag); static struct xp_ops svc_dg_ops = { .xp_recv = svc_dg_recv, @@ -133,9 +133,7 @@ svc_dg_create(SVCPOOL *pool, struct socket *so, size_t sendsize, xprt_register(xprt); SOCKBUF_LOCK(&so->so_rcv); - so->so_upcallarg = xprt; - so->so_upcall = svc_dg_soupcall; - so->so_rcv.sb_flags |= SB_UPCALL; + soupcall_set(so, SO_RCV, svc_dg_soupcall, xprt); SOCKBUF_UNLOCK(&so->so_rcv); return (xprt); @@ -205,9 +203,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg, if (error) { SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); - xprt->xp_socket->so_upcallarg = NULL; - xprt->xp_socket->so_upcall = NULL; - xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(xprt->xp_socket, SO_RCV); SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); xprt_inactive(xprt); sx_xunlock(&xprt->xp_lock); @@ -275,9 +271,7 @@ svc_dg_destroy(SVCXPRT *xprt) { SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); - xprt->xp_socket->so_upcallarg = NULL; - xprt->xp_socket->so_upcall = NULL; - xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(xprt->xp_socket, SO_RCV); SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); sx_destroy(&xprt->xp_lock); @@ -300,10 +294,11 @@ svc_dg_control(xprt, rq, in) return (FALSE); } -static void +static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag) { SVCXPRT *xprt = (SVCXPRT *) arg; xprt_active(xprt); + return (SU_OK); } diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index e3f0350..9f6a4a4 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -80,7 +80,7 @@ static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr); static int svc_vc_accept(struct socket *head, struct socket **sop); -static void svc_vc_soupcall(struct socket *so, void *arg, int waitflag); +static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); static struct xp_ops svc_vc_rendezvous_ops = { .xp_recv = svc_vc_rendezvous_recv, @@ -160,9 +160,7 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, solisten(so, SOMAXCONN, curthread); SOCKBUF_LOCK(&so->so_rcv); - so->so_upcallarg = xprt; - so->so_upcall = svc_vc_soupcall; - so->so_rcv.sb_flags |= SB_UPCALL; + soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); SOCKBUF_UNLOCK(&so->so_rcv); return (xprt); @@ -236,9 +234,7 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) xprt_register(xprt); SOCKBUF_LOCK(&so->so_rcv); - so->so_upcallarg = xprt; - so->so_upcall = svc_vc_soupcall; - so->so_rcv.sb_flags |= SB_UPCALL; + soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); SOCKBUF_UNLOCK(&so->so_rcv); /* @@ -358,9 +354,7 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, if (error) { SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); - xprt->xp_socket->so_upcallarg = NULL; - xprt->xp_socket->so_upcall = NULL; - xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(xprt->xp_socket, SO_RCV); SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); xprt_inactive(xprt); sx_xunlock(&xprt->xp_lock); @@ -405,9 +399,7 @@ static void svc_vc_destroy_common(SVCXPRT *xprt) { SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); - xprt->xp_socket->so_upcallarg = NULL; - xprt->xp_socket->so_upcall = NULL; - xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(xprt->xp_socket, SO_RCV); SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); sx_destroy(&xprt->xp_lock); @@ -642,9 +634,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, if (error) { SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); - xprt->xp_socket->so_upcallarg = NULL; - xprt->xp_socket->so_upcall = NULL; - xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL; + soupcall_clear(xprt->xp_socket, SO_RCV); SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); xprt_inactive(xprt); cd->strm_stat = XPRT_DIED; @@ -729,12 +719,13 @@ svc_vc_null() return (FALSE); } -static void +static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag) { SVCXPRT *xprt = (SVCXPRT *) arg; xprt_active(xprt); + return (SU_OK); } #if 0 |