diff options
Diffstat (limited to 'sys/rpc/replay.c')
-rw-r--r-- | sys/rpc/replay.c | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/sys/rpc/replay.c b/sys/rpc/replay.c new file mode 100644 index 0000000..d82fc20 --- /dev/null +++ b/sys/rpc/replay.c @@ -0,0 +1,248 @@ +/*- + * Copyright (c) 2008 Isilon Inc http://www.isilon.com/ + * Authors: Doug Rabson <dfr@rabson.org> + * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <sys/cdefs.h> +__FBSDID("$FreeBSD$"); + +#include <sys/param.h> +#include <sys/hash.h> +#include <sys/kernel.h> +#include <sys/lock.h> +#include <sys/mbuf.h> +#include <sys/mutex.h> +#include <sys/queue.h> + +#include <rpc/rpc.h> +#include <rpc/replay.h> + +struct replay_cache_entry { + int rce_hash; + struct rpc_msg rce_msg; + struct sockaddr_storage rce_addr; + struct rpc_msg rce_repmsg; + struct mbuf *rce_repbody; + + TAILQ_ENTRY(replay_cache_entry) rce_link; + TAILQ_ENTRY(replay_cache_entry) rce_alllink; +}; +TAILQ_HEAD(replay_cache_list, replay_cache_entry); + +static struct replay_cache_entry * + replay_alloc(struct replay_cache *rc, struct rpc_msg *msg, + struct sockaddr *addr, int h); +static void replay_free(struct replay_cache *rc, + struct replay_cache_entry *rce); +static void replay_prune(struct replay_cache *rc); + +#define REPLAY_HASH_SIZE 256 +#define REPLAY_MAX 1024 + +struct replay_cache { + struct replay_cache_list rc_cache[REPLAY_HASH_SIZE]; + struct replay_cache_list rc_all; + struct mtx rc_lock; + int rc_count; + size_t rc_size; + size_t rc_maxsize; +}; + +struct replay_cache * +replay_newcache(size_t maxsize) +{ + struct replay_cache *rc; + int i; + + rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO); + for (i = 0; i < REPLAY_HASH_SIZE; i++) + TAILQ_INIT(&rc->rc_cache[i]); + TAILQ_INIT(&rc->rc_all); + mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF); + rc->rc_maxsize = maxsize; + + return (rc); +} + +void +replay_setsize(struct replay_cache *rc, size_t newmaxsize) +{ + + rc->rc_maxsize = newmaxsize; + replay_prune(rc); +} + +void +replay_freecache(struct replay_cache *rc) +{ + + mtx_lock(&rc->rc_lock); + while (TAILQ_FIRST(&rc->rc_all)) + replay_free(rc, TAILQ_FIRST(&rc->rc_all)); + mtx_destroy(&rc->rc_lock); + free(rc, M_RPC); +} + +static struct replay_cache_entry * +replay_alloc(struct replay_cache *rc, + struct rpc_msg *msg, struct sockaddr *addr, int h) +{ + struct replay_cache_entry *rce; + + rc->rc_count++; + rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO); + rce->rce_hash = h; + rce->rce_msg = *msg; + bcopy(addr, &rce->rce_addr, addr->sa_len); + + TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link); + TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink); + + return (rce); +} + +static void +replay_free(struct replay_cache *rc, struct replay_cache_entry *rce) +{ + + rc->rc_count--; + TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link); + TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); + if (rce->rce_repbody) { + rc->rc_size -= m_length(rce->rce_repbody, NULL); + m_freem(rce->rce_repbody); + } + free(rce, M_RPC); +} + +static void +replay_prune(struct replay_cache *rc) +{ + struct replay_cache_entry *rce; + bool_t freed_one; + + if (rc->rc_count >= REPLAY_MAX || rc->rc_size > rc->rc_maxsize) { + freed_one = FALSE; + do { + /* + * Try to free an entry. Don't free in-progress entries + */ + TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, + replay_cache_list, rce_alllink) { + if (rce->rce_repmsg.rm_xid) { + replay_free(rc, rce); + freed_one = TRUE; + break; + } + } + } while (freed_one + && (rc->rc_count >= REPLAY_MAX + || rc->rc_size > rc->rc_maxsize)); + } +} + +enum replay_state +replay_find(struct replay_cache *rc, struct rpc_msg *msg, + struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp) +{ + int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE; + struct replay_cache_entry *rce; + + mtx_lock(&rc->rc_lock); + TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { + if (rce->rce_msg.rm_xid == msg->rm_xid + && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog + && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers + && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc + && rce->rce_addr.ss_len == addr->sa_len + && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { + if (rce->rce_repmsg.rm_xid) { + /* + * We have a reply for this + * message. Copy it and return. Keep + * replay_all LRU sorted + */ + TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); + TAILQ_INSERT_HEAD(&rc->rc_all, rce, + rce_alllink); + *repmsg = rce->rce_repmsg; + if (rce->rce_repbody) { + *mp = m_copym(rce->rce_repbody, + 0, M_COPYALL, M_NOWAIT); + mtx_unlock(&rc->rc_lock); + if (!*mp) + return (RS_ERROR); + } else { + mtx_unlock(&rc->rc_lock); + } + return (RS_DONE); + } else { + mtx_unlock(&rc->rc_lock); + return (RS_INPROGRESS); + } + } + } + + replay_prune(rc); + + rce = replay_alloc(rc, msg, addr, h); + + mtx_unlock(&rc->rc_lock); + + if (!rce) + return (RS_ERROR); + else + return (RS_NEW); +} + +void +replay_setreply(struct replay_cache *rc, + struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m) +{ + int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE; + struct replay_cache_entry *rce; + + /* + * Copy the reply before the lock so we can sleep. + */ + if (m) + m = m_copym(m, 0, M_COPYALL, M_WAITOK); + + mtx_lock(&rc->rc_lock); + TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { + if (rce->rce_msg.rm_xid == repmsg->rm_xid + && rce->rce_addr.ss_len == addr->sa_len + && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { + break; + } + } + if (rce) { + rce->rce_repmsg = *repmsg; + rce->rce_repbody = m; + if (m) + rc->rc_size += m_length(m, NULL); + } + mtx_unlock(&rc->rc_lock); +} |