summaryrefslogtreecommitdiffstats
path: root/sys/rpc/clnt_dg.c
diff options
context:
space:
mode:
authordfr <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
committerdfr <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
commit41cea6d5ca71b8cf057f9face8055b218b30e18e (patch)
tree994a214037913bc4e44eaee5070c65aeadf53485 /sys/rpc/clnt_dg.c
parentca3c788812715a263f83dcec4bdabaf6c10eb922 (diff)
downloadFreeBSD-src-41cea6d5ca71b8cf057f9face8055b218b30e18e.zip
FreeBSD-src-41cea6d5ca71b8cf057f9face8055b218b30e18e.tar.gz
Re-implement the client side of rpc.lockd in the kernel. This implementation
provides the correct semantics for flock(2) style locks which are used by the lockf(1) command line tool and the pidfile(3) library. It also implements recovery from server restarts and ensures that dirty cache blocks are written to the server before obtaining locks (allowing multiple clients to use file locking to safely share data). Sponsored by: Isilon Systems PR: 94256 MFC after: 2 weeks
Diffstat (limited to 'sys/rpc/clnt_dg.c')
-rw-r--r--sys/rpc/clnt_dg.c218
1 files changed, 146 insertions, 72 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c
index c66ac50..f14e1d6 100644
--- a/sys/rpc/clnt_dg.c
+++ b/sys/rpc/clnt_dg.c
@@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/kernel.h>
#include <sys/lock.h>
#include <sys/malloc.h>
#include <sys/mbuf.h>
@@ -70,8 +71,8 @@ __FBSDID("$FreeBSD$");
#endif
static bool_t time_not_ok(struct timeval *);
-static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_dg_abort(CLIENT *);
@@ -91,10 +92,13 @@ static struct clnt_ops clnt_dg_ops = {
static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct cu_request {
TAILQ_ENTRY(cu_request) cr_link;
+ CLIENT *cr_client; /* owner */
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
@@ -123,6 +127,8 @@ struct cu_socket {
* Private data kept per client handle
*/
struct cu_data {
+ int cu_threads; /* # threads in clnt_vc_call */
+ bool_t cu_closing; /* TRUE if we are destroying */
struct socket *cu_socket; /* connection socket */
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
@@ -203,10 +209,12 @@ clnt_dg_create(
sendsz = ((sendsz + 3) / 4) * 4;
recvsz = ((recvsz + 3) / 4) * 4;
cu = mem_alloc(sizeof (*cu));
+ cu->cu_threads = 0;
+ cu->cu_closing = FALSE;
(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
cu->cu_rlen = svcaddr->sa_len;
/* Other values can also be set through clnt_control() */
- cu->cu_wait.tv_sec = 15; /* heuristically chosen */
+ cu->cu_wait.tv_sec = 3; /* heuristically chosen */
cu->cu_wait.tv_usec = 0;
cu->cu_total.tv_sec = -1;
cu->cu_total.tv_usec = -1;
@@ -237,6 +245,7 @@ clnt_dg_create(
*/
cu->cu_closeit = FALSE;
cu->cu_socket = so;
+ soreserve(so, 256*1024, 256*1024);
SOCKBUF_LOCK(&so->so_rcv);
recheck_socket:
@@ -274,6 +283,7 @@ recheck_socket:
}
SOCKBUF_UNLOCK(&so->so_rcv);
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_dg_ops;
cl->cl_private = (caddr_t)(void *)cu;
cl->cl_auth = authnone_create();
@@ -291,7 +301,8 @@ err2:
static enum clnt_stat
clnt_dg_call(
- CLIENT *cl, /* client handle */
+ CLIENT *cl, /* client handle */
+ struct rpc_callextra *ext, /* call metadata */
rpcproc_t proc, /* procedure number */
xdrproc_t xargs, /* xdr routine for args */
void *argsp, /* pointer to args */
@@ -301,30 +312,52 @@ clnt_dg_call(
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
+ int retrans; /* number of re-transmits so far */
int nrefreshes = 2; /* number of times to refresh cred */
- struct timeval timeout;
- struct timeval retransmit_time;
- struct timeval next_sendtime, starttime, time_waited, tv;
+ struct timeval *tvp;
+ int timeout;
+ int retransmit_time;
+ int next_sendtime, starttime, time_waited, tv;
struct sockaddr *sa;
socklen_t salen;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct cu_request cr;
+ struct cu_request *cr;
int error;
+ cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
+
mtx_lock(&cs->cs_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (cu->cu_closing) {
+ mtx_unlock(&cs->cs_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ cu->cu_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_client = cl;
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (cu->cu_total.tv_usec == -1) {
- timeout = utimeout; /* use supplied timeout */
+ tvp = &utimeout; /* use supplied timeout */
} else {
- timeout = cu->cu_total; /* use default timeout */
+ tvp = &cu->cu_total; /* use default timeout */
}
+ if (tvp->tv_sec || tvp->tv_usec)
+ timeout = tvtohz(tvp);
+ else
+ timeout = 0;
if (cu->cu_connect && !cu->cu_connected) {
mtx_unlock(&cs->cs_lock);
@@ -345,11 +378,11 @@ clnt_dg_call(
sa = (struct sockaddr *)&cu->cu_raddr;
salen = cu->cu_rlen;
}
- time_waited.tv_sec = 0;
- time_waited.tv_usec = 0;
- retransmit_time = next_sendtime = cu->cu_wait;
+ time_waited = 0;
+ retrans = 0;
+ retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
- getmicrotime(&starttime);
+ starttime = ticks;
call_again:
mtx_assert(&cs->cs_lock, MA_OWNED);
@@ -376,7 +409,7 @@ send_again:
goto get_reply;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xargs)(&xdrs, argsp))) {
cu->cu_error.re_status = RPC_CANTENCODEARGS;
mtx_lock(&cs->cs_lock);
@@ -384,9 +417,9 @@ send_again:
}
m_fixhdr(mreq);
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&cs->cs_lock);
- TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
mtx_unlock(&cs->cs_lock);
/*
@@ -406,8 +439,7 @@ send_again:
mtx_lock(&cs->cs_lock);
if (error) {
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_errno = error;
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
@@ -415,24 +447,24 @@ send_again:
/*
* Check to see if we got an upcall while waiting for the
- * lock. In both these cases, the request has been removed
- * from cs->cs_pending.
+ * lock.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
goto got_reply;
}
/*
* Hack to provide rpc-based message passing
*/
- if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (timeout == 0) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
@@ -440,17 +472,23 @@ send_again:
get_reply:
for (;;) {
/* Decide how long to wait. */
- if (timevalcmp(&next_sendtime, &timeout, <)) {
+ if (next_sendtime < timeout)
tv = next_sendtime;
- } else {
+ else
tv = timeout;
+ tv -= time_waited;
+
+ if (tv > 0) {
+ if (cu->cu_closing)
+ error = 0;
+ else
+ error = msleep(cr, &cs->cs_lock,
+ cu->cu_waitflag, cu->cu_waitchan, tv);
+ } else {
+ error = EWOULDBLOCK;
}
- timevalsub(&tv, &time_waited);
- if (tv.tv_sec < 0 || tv.tv_usec < 0)
- tv.tv_sec = tv.tv_usec = 0;
- error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag,
- cu->cu_waitchan, tvtohz(&tv));
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
if (!error) {
/*
@@ -458,8 +496,8 @@ get_reply:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -472,8 +510,6 @@ get_reply:
* re-send the request.
*/
if (error != EWOULDBLOCK) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
cu->cu_error.re_errno = error;
if (error == EINTR)
cu->cu_error.re_status = RPC_INTR;
@@ -482,29 +518,40 @@ get_reply:
goto out;
}
- getmicrotime(&tv);
- time_waited = tv;
- timevalsub(&time_waited, &starttime);
+ time_waited = ticks - starttime;
/* Check for timeout. */
- if (timevalcmp(&time_waited, &timeout, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited > timeout) {
cu->cu_error.re_errno = EWOULDBLOCK;
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
- if (timevalcmp(&time_waited, &next_sendtime, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited >= next_sendtime) {
+ if (ext && ext->rc_feedback) {
+ mtx_unlock(&cs->cs_lock);
+ if (retrans == 0)
+ ext->rc_feedback(FEEDBACK_REXMIT1,
+ proc, ext->rc_feedback_arg);
+ else
+ ext->rc_feedback(FEEDBACK_REXMIT2,
+ proc, ext->rc_feedback_arg);
+ mtx_lock(&cs->cs_lock);
+ }
+ if (cu->cu_closing) {
+ cu->cu_error.re_errno = ESHUTDOWN;
+ cu->cu_error.re_status = RPC_CANTRECV;
+ goto out;
+ }
+ retrans++;
/* update retransmit_time */
- if (retransmit_time.tv_sec < RPC_MAX_BACKOFF)
- timevaladd(&retransmit_time, &retransmit_time);
- timevaladd(&next_sendtime, &retransmit_time);
+ if (retransmit_time < RPC_MAX_BACKOFF * hz)
+ retransmit_time = 2 * retransmit_time;
+ next_sendtime += retransmit_time;
goto send_again;
}
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
}
got_reply:
@@ -514,10 +561,13 @@ got_reply:
*/
mtx_unlock(&cs->cs_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ if (ext && ext->rc_feedback)
+ ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
+
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&cs->cs_lock);
@@ -562,10 +612,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ cu->cu_threads--;
+ if (cu->cu_closing)
+ wakeup(cu);
+
mtx_unlock(&cs->cs_lock);
+
+ free(cr, M_RPC);
+
return (cu->cu_error.re_status);
}
@@ -732,30 +789,44 @@ clnt_dg_destroy(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_request *cr;
struct socket *so = NULL;
bool_t lastsocketref;
- SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
-
mtx_lock(&cs->cs_lock);
+
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ cu->cu_closing = TRUE;
+ TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ if (cr->cr_client == cl) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+ }
+
+ while (cu->cu_threads)
+ msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
+
cs->cs_refs--;
if (cs->cs_refs == 0) {
+ mtx_destroy(&cs->cs_lock);
+ SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
cu->cu_socket->so_upcallarg = NULL;
cu->cu_socket->so_upcall = NULL;
cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
- mtx_destroy(&cs->cs_lock);
SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
mem_free(cs, sizeof(*cs));
lastsocketref = TRUE;
} else {
mtx_unlock(&cs->cs_lock);
- SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
lastsocketref = FALSE;
}
- if (cu->cu_closeit) {
- KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket "
- "shared with other clients"));
+ if (cu->cu_closeit && lastsocketref) {
so = cu->cu_socket;
cu->cu_socket = NULL;
}
@@ -812,10 +883,10 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
if (error) {
mtx_lock(&cs->cs_lock);
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ cr->cr_xid = 0;
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&cs->cs_pending);
mtx_unlock(&cs->cs_lock);
break;
}
@@ -825,7 +896,11 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
*/
m = m_pullup(m, sizeof(xid));
if (!m)
- break;
+ /*
+ * Should never happen.
+ */
+ continue;
+
xid = ntohl(*mtod(m, uint32_t *));
/*
@@ -836,14 +911,13 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
if (cr->cr_xid == xid) {
/*
- * This one matches. We snip it out of
- * the pending list and leave the
+ * This one matches. We leave the
* reply mbuf in cr->cr_mrep. Set the
- * XID to zero so that clnt_dg_call
- * can know not to repeat the
- * TAILQ_REMOVE.
+ * XID to zero so that we will ignore
+ * any duplicated replies that arrive
+ * before clnt_dg_call removes it from
+ * the queue.
*/
- TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = m;
cr->cr_error = 0;
OpenPOWER on IntegriCloud