summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/Makefile3
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c93
-rw-r--r--net/sunrpc/cache.c152
-rw-r--r--net/sunrpc/stats.c7
-rw-r--r--net/sunrpc/sunrpc_syms.c52
-rw-r--r--net/sunrpc/svc.c90
-rw-r--r--net/sunrpc/svc_xprt.c1055
-rw-r--r--net/sunrpc/svcauth.c6
-rw-r--r--net/sunrpc/svcauth_unix.c59
-rw-r--r--net/sunrpc/svcsock.c1311
-rw-r--r--net/sunrpc/sysctl.c31
-rw-r--r--net/sunrpc/xdr.c8
-rw-r--r--net/sunrpc/xprtrdma/Makefile5
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c266
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c412
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c586
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c520
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c1080
18 files changed, 4530 insertions, 1206 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 5c69a72..92e1dbe 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -11,6 +11,7 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
auth.o auth_null.o auth_unix.o \
svc.o svcsock.o svcauth.o svcauth_unix.o \
rpcb_clnt.o timer.o xdr.o \
- sunrpc_syms.o cache.o rpc_pipe.o
+ sunrpc_syms.o cache.o rpc_pipe.o \
+ svc_xprt.o
sunrpc-$(CONFIG_PROC_FS) += stats.o
sunrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 73940df..481f984 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -224,38 +224,34 @@ static int rsi_parse(struct cache_detail *cd,
/* major/minor */
len = qword_get(&mesg, buf, mlen);
- if (len < 0)
+ if (len <= 0)
goto out;
- if (len == 0) {
+ rsii.major_status = simple_strtoul(buf, &ep, 10);
+ if (*ep)
+ goto out;
+ len = qword_get(&mesg, buf, mlen);
+ if (len <= 0)
+ goto out;
+ rsii.minor_status = simple_strtoul(buf, &ep, 10);
+ if (*ep)
goto out;
- } else {
- rsii.major_status = simple_strtoul(buf, &ep, 10);
- if (*ep)
- goto out;
- len = qword_get(&mesg, buf, mlen);
- if (len <= 0)
- goto out;
- rsii.minor_status = simple_strtoul(buf, &ep, 10);
- if (*ep)
- goto out;
- /* out_handle */
- len = qword_get(&mesg, buf, mlen);
- if (len < 0)
- goto out;
- status = -ENOMEM;
- if (dup_to_netobj(&rsii.out_handle, buf, len))
- goto out;
+ /* out_handle */
+ len = qword_get(&mesg, buf, mlen);
+ if (len < 0)
+ goto out;
+ status = -ENOMEM;
+ if (dup_to_netobj(&rsii.out_handle, buf, len))
+ goto out;
- /* out_token */
- len = qword_get(&mesg, buf, mlen);
- status = -EINVAL;
- if (len < 0)
- goto out;
- status = -ENOMEM;
- if (dup_to_netobj(&rsii.out_token, buf, len))
- goto out;
- }
+ /* out_token */
+ len = qword_get(&mesg, buf, mlen);
+ status = -EINVAL;
+ if (len < 0)
+ goto out;
+ status = -ENOMEM;
+ if (dup_to_netobj(&rsii.out_token, buf, len))
+ goto out;
rsii.h.expiry_time = expiry;
rsip = rsi_update(&rsii, rsip);
status = 0;
@@ -975,6 +971,7 @@ static int svcauth_gss_handle_init(struct svc_rqst *rqstp,
struct kvec *resv = &rqstp->rq_res.head[0];
struct xdr_netobj tmpobj;
struct rsi *rsip, rsikey;
+ int ret;
/* Read the verifier; should be NULL: */
*authp = rpc_autherr_badverf;
@@ -1014,23 +1011,27 @@ static int svcauth_gss_handle_init(struct svc_rqst *rqstp,
/* No upcall result: */
return SVC_DROP;
case 0:
+ ret = SVC_DROP;
/* Got an answer to the upcall; use it: */
if (gss_write_init_verf(rqstp, rsip))
- return SVC_DROP;
+ goto out;
if (resv->iov_len + 4 > PAGE_SIZE)
- return SVC_DROP;
+ goto out;
svc_putnl(resv, RPC_SUCCESS);
if (svc_safe_putnetobj(resv, &rsip->out_handle))
- return SVC_DROP;
+ goto out;
if (resv->iov_len + 3 * 4 > PAGE_SIZE)
- return SVC_DROP;
+ goto out;
svc_putnl(resv, rsip->major_status);
svc_putnl(resv, rsip->minor_status);
svc_putnl(resv, GSS_SEQ_WIN);
if (svc_safe_putnetobj(resv, &rsip->out_token))
- return SVC_DROP;
+ goto out;
}
- return SVC_COMPLETE;
+ ret = SVC_COMPLETE;
+out:
+ cache_put(&rsip->h, &rsi_cache);
+ return ret;
}
/*
@@ -1125,6 +1126,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
case RPC_GSS_PROC_DESTROY:
if (gss_write_verf(rqstp, rsci->mechctx, gc->gc_seq))
goto auth_err;
+ rsci->h.expiry_time = get_seconds();
set_bit(CACHE_NEGATIVE, &rsci->h.flags);
if (resv->iov_len + 4 > PAGE_SIZE)
goto drop;
@@ -1386,19 +1388,26 @@ int
gss_svc_init(void)
{
int rv = svc_auth_register(RPC_AUTH_GSS, &svcauthops_gss);
- if (rv == 0) {
- cache_register(&rsc_cache);
- cache_register(&rsi_cache);
- }
+ if (rv)
+ return rv;
+ rv = cache_register(&rsc_cache);
+ if (rv)
+ goto out1;
+ rv = cache_register(&rsi_cache);
+ if (rv)
+ goto out2;
+ return 0;
+out2:
+ cache_unregister(&rsc_cache);
+out1:
+ svc_auth_unregister(RPC_AUTH_GSS);
return rv;
}
void
gss_svc_shutdown(void)
{
- if (cache_unregister(&rsc_cache))
- printk(KERN_ERR "auth_rpcgss: failed to unregister rsc cache\n");
- if (cache_unregister(&rsi_cache))
- printk(KERN_ERR "auth_rpcgss: failed to unregister rsi cache\n");
+ cache_unregister(&rsc_cache);
+ cache_unregister(&rsi_cache);
svc_auth_unregister(RPC_AUTH_GSS);
}
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 73f053d..636c8e0 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -245,6 +245,7 @@ int cache_check(struct cache_detail *detail,
cache_put(h, detail);
return rv;
}
+EXPORT_SYMBOL(cache_check);
/*
* caches need to be periodically cleaned.
@@ -290,44 +291,78 @@ static const struct file_operations cache_flush_operations;
static void do_cache_clean(struct work_struct *work);
static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
-void cache_register(struct cache_detail *cd)
+static void remove_cache_proc_entries(struct cache_detail *cd)
{
- cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
- if (cd->proc_ent) {
- struct proc_dir_entry *p;
- cd->proc_ent->owner = cd->owner;
- cd->channel_ent = cd->content_ent = NULL;
+ if (cd->proc_ent == NULL)
+ return;
+ if (cd->flush_ent)
+ remove_proc_entry("flush", cd->proc_ent);
+ if (cd->channel_ent)
+ remove_proc_entry("channel", cd->proc_ent);
+ if (cd->content_ent)
+ remove_proc_entry("content", cd->proc_ent);
+ cd->proc_ent = NULL;
+ remove_proc_entry(cd->name, proc_net_rpc);
+}
- p = create_proc_entry("flush", S_IFREG|S_IRUSR|S_IWUSR,
- cd->proc_ent);
- cd->flush_ent = p;
- if (p) {
- p->proc_fops = &cache_flush_operations;
- p->owner = cd->owner;
- p->data = cd;
- }
+#ifdef CONFIG_PROC_FS
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+ struct proc_dir_entry *p;
- if (cd->cache_request || cd->cache_parse) {
- p = create_proc_entry("channel", S_IFREG|S_IRUSR|S_IWUSR,
- cd->proc_ent);
- cd->channel_ent = p;
- if (p) {
- p->proc_fops = &cache_file_operations;
- p->owner = cd->owner;
- p->data = cd;
- }
- }
- if (cd->cache_show) {
- p = create_proc_entry("content", S_IFREG|S_IRUSR|S_IWUSR,
- cd->proc_ent);
- cd->content_ent = p;
- if (p) {
- p->proc_fops = &content_file_operations;
- p->owner = cd->owner;
- p->data = cd;
- }
- }
+ cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
+ if (cd->proc_ent == NULL)
+ goto out_nomem;
+ cd->proc_ent->owner = cd->owner;
+ cd->channel_ent = cd->content_ent = NULL;
+
+ p = create_proc_entry("flush", S_IFREG|S_IRUSR|S_IWUSR, cd->proc_ent);
+ cd->flush_ent = p;
+ if (p == NULL)
+ goto out_nomem;
+ p->proc_fops = &cache_flush_operations;
+ p->owner = cd->owner;
+ p->data = cd;
+
+ if (cd->cache_request || cd->cache_parse) {
+ p = create_proc_entry("channel", S_IFREG|S_IRUSR|S_IWUSR,
+ cd->proc_ent);
+ cd->channel_ent = p;
+ if (p == NULL)
+ goto out_nomem;
+ p->proc_fops = &cache_file_operations;
+ p->owner = cd->owner;
+ p->data = cd;
}
+ if (cd->cache_show) {
+ p = create_proc_entry("content", S_IFREG|S_IRUSR|S_IWUSR,
+ cd->proc_ent);
+ cd->content_ent = p;
+ if (p == NULL)
+ goto out_nomem;
+ p->proc_fops = &content_file_operations;
+ p->owner = cd->owner;
+ p->data = cd;
+ }
+ return 0;
+out_nomem:
+ remove_cache_proc_entries(cd);
+ return -ENOMEM;
+}
+#else /* CONFIG_PROC_FS */
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+ return 0;
+}
+#endif
+
+int cache_register(struct cache_detail *cd)
+{
+ int ret;
+
+ ret = create_cache_proc_entries(cd);
+ if (ret)
+ return ret;
rwlock_init(&cd->hash_lock);
INIT_LIST_HEAD(&cd->queue);
spin_lock(&cache_list_lock);
@@ -341,9 +376,11 @@ void cache_register(struct cache_detail *cd)
/* start the cleaning process */
schedule_delayed_work(&cache_cleaner, 0);
+ return 0;
}
+EXPORT_SYMBOL(cache_register);
-int cache_unregister(struct cache_detail *cd)
+void cache_unregister(struct cache_detail *cd)
{
cache_purge(cd);
spin_lock(&cache_list_lock);
@@ -351,30 +388,23 @@ int cache_unregister(struct cache_detail *cd)
if (cd->entries || atomic_read(&cd->inuse)) {
write_unlock(&cd->hash_lock);
spin_unlock(&cache_list_lock);
- return -EBUSY;
+ goto out;
}
if (current_detail == cd)
current_detail = NULL;
list_del_init(&cd->others);
write_unlock(&cd->hash_lock);
spin_unlock(&cache_list_lock);
- if (cd->proc_ent) {
- if (cd->flush_ent)
- remove_proc_entry("flush", cd->proc_ent);
- if (cd->channel_ent)
- remove_proc_entry("channel", cd->proc_ent);
- if (cd->content_ent)
- remove_proc_entry("content", cd->proc_ent);
-
- cd->proc_ent = NULL;
- remove_proc_entry(cd->name, proc_net_rpc);
- }
+ remove_cache_proc_entries(cd);
if (list_empty(&cache_list)) {
/* module must be being unloaded so its safe to kill the worker */
cancel_delayed_work_sync(&cache_cleaner);
}
- return 0;
+ return;
+out:
+ printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
}
+EXPORT_SYMBOL(cache_unregister);
/* clean cache tries to find something to clean
* and cleans it.
@@ -489,6 +519,7 @@ void cache_flush(void)
while (cache_clean() != -1)
cond_resched();
}
+EXPORT_SYMBOL(cache_flush);
void cache_purge(struct cache_detail *detail)
{
@@ -497,7 +528,7 @@ void cache_purge(struct cache_detail *detail)
cache_flush();
detail->flush_time = 1;
}
-
+EXPORT_SYMBOL(cache_purge);
/*
@@ -634,13 +665,13 @@ void cache_clean_deferred(void *owner)
/*
* communicate with user-space
*
- * We have a magic /proc file - /proc/sunrpc/cache
- * On read, you get a full request, or block
- * On write, an update request is processed
- * Poll works if anything to read, and always allows write
+ * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
+ * On read, you get a full request, or block.
+ * On write, an update request is processed.
+ * Poll works if anything to read, and always allows write.
*
* Implemented by linked list of requests. Each open file has
- * a ->private that also exists in this list. New request are added
+ * a ->private that also exists in this list. New requests are added
* to the end and may wakeup and preceding readers.
* New readers are added to the head. If, on read, an item is found with
* CACHE_UPCALLING clear, we free it from the list.
@@ -963,6 +994,7 @@ void qword_add(char **bpp, int *lp, char *str)
*bpp = bp;
*lp = len;
}
+EXPORT_SYMBOL(qword_add);
void qword_addhex(char **bpp, int *lp, char *buf, int blen)
{
@@ -991,6 +1023,7 @@ void qword_addhex(char **bpp, int *lp, char *buf, int blen)
*bpp = bp;
*lp = len;
}
+EXPORT_SYMBOL(qword_addhex);
static void warn_no_listener(struct cache_detail *detail)
{
@@ -1113,6 +1146,7 @@ int qword_get(char **bpp, char *dest, int bufsize)
*dest = '\0';
return len;
}
+EXPORT_SYMBOL(qword_get);
/*
@@ -1244,18 +1278,18 @@ static ssize_t read_flush(struct file *file, char __user *buf,
struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
char tbuf[20];
unsigned long p = *ppos;
- int len;
+ size_t len;
sprintf(tbuf, "%lu\n", cd->flush_time);
len = strlen(tbuf);
if (p >= len)
return 0;
len -= p;
- if (len > count) len = count;
+ if (len > count)
+ len = count;
if (copy_to_user(buf, (void*)(tbuf+p), len))
- len = -EFAULT;
- else
- *ppos += len;
+ return -EFAULT;
+ *ppos += len;
return len;
}
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 74df2d3..5a16875 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -33,7 +33,7 @@ struct proc_dir_entry *proc_net_rpc = NULL;
static int rpc_proc_show(struct seq_file *seq, void *v) {
const struct rpc_stat *statp = seq->private;
const struct rpc_program *prog = statp->program;
- int i, j;
+ unsigned int i, j;
seq_printf(seq,
"net %u %u %u %u\n",
@@ -81,7 +81,7 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
const struct svc_program *prog = statp->program;
const struct svc_procedure *proc;
const struct svc_version *vers;
- int i, j;
+ unsigned int i, j;
seq_printf(seq,
"net %u %u %u %u\n",
@@ -106,6 +106,7 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
seq_putc(seq, '\n');
}
}
+EXPORT_SYMBOL(svc_seq_show);
/**
* rpc_alloc_iostats - allocate an rpc_iostats structure
@@ -255,12 +256,14 @@ svc_proc_register(struct svc_stat *statp, const struct file_operations *fops)
{
return do_register(statp->program->pg_name, statp, fops);
}
+EXPORT_SYMBOL(svc_proc_register);
void
svc_proc_unregister(const char *name)
{
remove_proc_entry(name, proc_net_rpc);
}
+EXPORT_SYMBOL(svc_proc_unregister);
void
rpc_proc_init(void)
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index 1a7e309..843629f 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -22,48 +22,6 @@
#include <linux/sunrpc/rpc_pipe_fs.h>
#include <linux/sunrpc/xprtsock.h>
-/* RPC server stuff */
-EXPORT_SYMBOL(svc_create);
-EXPORT_SYMBOL(svc_create_thread);
-EXPORT_SYMBOL(svc_create_pooled);
-EXPORT_SYMBOL(svc_set_num_threads);
-EXPORT_SYMBOL(svc_exit_thread);
-EXPORT_SYMBOL(svc_destroy);
-EXPORT_SYMBOL(svc_drop);
-EXPORT_SYMBOL(svc_process);
-EXPORT_SYMBOL(svc_recv);
-EXPORT_SYMBOL(svc_wake_up);
-EXPORT_SYMBOL(svc_makesock);
-EXPORT_SYMBOL(svc_reserve);
-EXPORT_SYMBOL(svc_auth_register);
-EXPORT_SYMBOL(auth_domain_lookup);
-EXPORT_SYMBOL(svc_authenticate);
-EXPORT_SYMBOL(svc_set_client);
-
-/* RPC statistics */
-#ifdef CONFIG_PROC_FS
-EXPORT_SYMBOL(svc_proc_register);
-EXPORT_SYMBOL(svc_proc_unregister);
-EXPORT_SYMBOL(svc_seq_show);
-#endif
-
-/* caching... */
-EXPORT_SYMBOL(auth_domain_find);
-EXPORT_SYMBOL(auth_domain_put);
-EXPORT_SYMBOL(auth_unix_add_addr);
-EXPORT_SYMBOL(auth_unix_forget_old);
-EXPORT_SYMBOL(auth_unix_lookup);
-EXPORT_SYMBOL(cache_check);
-EXPORT_SYMBOL(cache_flush);
-EXPORT_SYMBOL(cache_purge);
-EXPORT_SYMBOL(cache_register);
-EXPORT_SYMBOL(cache_unregister);
-EXPORT_SYMBOL(qword_add);
-EXPORT_SYMBOL(qword_addhex);
-EXPORT_SYMBOL(qword_get);
-EXPORT_SYMBOL(svcauth_unix_purge);
-EXPORT_SYMBOL(unix_domain_find);
-
extern struct cache_detail ip_map_cache, unix_gid_cache;
static int __init
@@ -85,7 +43,8 @@ init_sunrpc(void)
#endif
cache_register(&ip_map_cache);
cache_register(&unix_gid_cache);
- init_socket_xprt();
+ svc_init_xprt_sock(); /* svc sock transport */
+ init_socket_xprt(); /* clnt sock transport */
rpcauth_init_module();
out:
return err;
@@ -96,12 +55,11 @@ cleanup_sunrpc(void)
{
rpcauth_remove_module();
cleanup_socket_xprt();
+ svc_cleanup_xprt_sock();
unregister_rpc_pipefs();
rpc_destroy_mempool();
- if (cache_unregister(&ip_map_cache))
- printk(KERN_ERR "sunrpc: failed to unregister ip_map cache\n");
- if (cache_unregister(&unix_gid_cache))
- printk(KERN_ERR "sunrpc: failed to unregister unix_gid cache\n");
+ cache_unregister(&ip_map_cache);
+ cache_unregister(&unix_gid_cache);
#ifdef RPC_DEBUG
rpc_unregister_sysctl();
#endif
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 4ad5fbb..a290e15 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -364,7 +364,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
void (*shutdown)(struct svc_serv *serv))
{
struct svc_serv *serv;
- int vers;
+ unsigned int vers;
unsigned int xdrsize;
unsigned int i;
@@ -433,6 +433,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize,
{
return __svc_create(prog, bufsize, /*npools*/1, shutdown);
}
+EXPORT_SYMBOL(svc_create);
struct svc_serv *
svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
@@ -452,6 +453,7 @@ svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
return serv;
}
+EXPORT_SYMBOL(svc_create_pooled);
/*
* Destroy an RPC service. Should be called with the BKL held
@@ -459,9 +461,6 @@ svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
void
svc_destroy(struct svc_serv *serv)
{
- struct svc_sock *svsk;
- struct svc_sock *tmp;
-
dprintk("svc: svc_destroy(%s, %d)\n",
serv->sv_program->pg_name,
serv->sv_nrthreads);
@@ -476,14 +475,12 @@ svc_destroy(struct svc_serv *serv)
del_timer_sync(&serv->sv_temptimer);
- list_for_each_entry_safe(svsk, tmp, &serv->sv_tempsocks, sk_list)
- svc_force_close_socket(svsk);
+ svc_close_all(&serv->sv_tempsocks);
if (serv->sv_shutdown)
serv->sv_shutdown(serv);
- list_for_each_entry_safe(svsk, tmp, &serv->sv_permsocks, sk_list)
- svc_force_close_socket(svsk);
+ svc_close_all(&serv->sv_permsocks);
BUG_ON(!list_empty(&serv->sv_permsocks));
BUG_ON(!list_empty(&serv->sv_tempsocks));
@@ -498,6 +495,7 @@ svc_destroy(struct svc_serv *serv)
kfree(serv->sv_pools);
kfree(serv);
}
+EXPORT_SYMBOL(svc_destroy);
/*
* Allocate an RPC server's buffer space.
@@ -536,31 +534,17 @@ svc_release_buffer(struct svc_rqst *rqstp)
put_page(rqstp->rq_pages[i]);
}
-/*
- * Create a thread in the given pool. Caller must hold BKL.
- * On a NUMA or SMP machine, with a multi-pool serv, the thread
- * will be restricted to run on the cpus belonging to the pool.
- */
-static int
-__svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
- struct svc_pool *pool)
+struct svc_rqst *
+svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool)
{
struct svc_rqst *rqstp;
- int error = -ENOMEM;
- int have_oldmask = 0;
- cpumask_t oldmask;
rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL);
if (!rqstp)
- goto out;
+ goto out_enomem;
init_waitqueue_head(&rqstp->rq_wait);
- if (!(rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
- || !(rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
- || !svc_init_buffer(rqstp, serv->sv_max_mesg))
- goto out_thread;
-
serv->sv_nrthreads++;
spin_lock_bh(&pool->sp_lock);
pool->sp_nrthreads++;
@@ -569,6 +553,45 @@ __svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
+ rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
+ if (!rqstp->rq_argp)
+ goto out_thread;
+
+ rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
+ if (!rqstp->rq_resp)
+ goto out_thread;
+
+ if (!svc_init_buffer(rqstp, serv->sv_max_mesg))
+ goto out_thread;
+
+ return rqstp;
+out_thread:
+ svc_exit_thread(rqstp);
+out_enomem:
+ return ERR_PTR(-ENOMEM);
+}
+EXPORT_SYMBOL(svc_prepare_thread);
+
+/*
+ * Create a thread in the given pool. Caller must hold BKL.
+ * On a NUMA or SMP machine, with a multi-pool serv, the thread
+ * will be restricted to run on the cpus belonging to the pool.
+ */
+static int
+__svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
+ struct svc_pool *pool)
+{
+ struct svc_rqst *rqstp;
+ int error = -ENOMEM;
+ int have_oldmask = 0;
+ cpumask_t oldmask;
+
+ rqstp = svc_prepare_thread(serv, pool);
+ if (IS_ERR(rqstp)) {
+ error = PTR_ERR(rqstp);
+ goto out;
+ }
+
if (serv->sv_nrpools > 1)
have_oldmask = svc_pool_map_set_cpumask(pool->sp_id, &oldmask);
@@ -597,6 +620,7 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
{
return __svc_create_thread(func, serv, &serv->sv_pools[0]);
}
+EXPORT_SYMBOL(svc_create_thread);
/*
* Choose a pool in which to create a new thread, for svc_set_num_threads
@@ -700,6 +724,7 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
return error;
}
+EXPORT_SYMBOL(svc_set_num_threads);
/*
* Called from a server thread as it's exiting. Caller must hold BKL.
@@ -726,6 +751,7 @@ svc_exit_thread(struct svc_rqst *rqstp)
if (serv)
svc_destroy(serv);
}
+EXPORT_SYMBOL(svc_exit_thread);
/*
* Register an RPC service with the local portmapper.
@@ -737,7 +763,8 @@ svc_register(struct svc_serv *serv, int proto, unsigned short port)
{
struct svc_program *progp;
unsigned long flags;
- int i, error = 0, dummy;
+ unsigned int i;
+ int error = 0, dummy;
if (!port)
clear_thread_flag(TIF_SIGPENDING);
@@ -840,9 +867,9 @@ svc_process(struct svc_rqst *rqstp)
rqstp->rq_res.tail[0].iov_len = 0;
/* Will be turned off only in gss privacy case: */
rqstp->rq_splice_ok = 1;
- /* tcp needs a space for the record length... */
- if (rqstp->rq_prot == IPPROTO_TCP)
- svc_putnl(resv, 0);
+
+ /* Setup reply header */
+ rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
rqstp->rq_xid = svc_getu32(argv);
svc_putu32(resv, rqstp->rq_xid);
@@ -1049,16 +1076,15 @@ err_bad:
svc_putnl(resv, ntohl(rpc_stat));
goto sendit;
}
+EXPORT_SYMBOL(svc_process);
/*
* Return (transport-specific) limit on the rpc payload.
*/
u32 svc_max_payload(const struct svc_rqst *rqstp)
{
- int max = RPCSVC_MAXPAYLOAD_TCP;
+ u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
- if (rqstp->rq_sock->sk_sock->type == SOCK_DGRAM)
- max = RPCSVC_MAXPAYLOAD_UDP;
if (rqstp->rq_server->sv_max_payload < max)
max = rqstp->rq_server->sv_max_payload;
return max;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
new file mode 100644
index 0000000..ea377e0
--- /dev/null
+++ b/net/sunrpc/svc_xprt.c
@@ -0,0 +1,1055 @@
+/*
+ * linux/net/sunrpc/svc_xprt.c
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sched.h>
+#include <linux/errno.h>
+#include <linux/fcntl.h>
+#include <linux/net.h>
+#include <linux/in.h>
+#include <linux/inet.h>
+#include <linux/udp.h>
+#include <linux/tcp.h>
+#include <linux/unistd.h>
+#include <linux/slab.h>
+#include <linux/netdevice.h>
+#include <linux/skbuff.h>
+#include <linux/file.h>
+#include <linux/freezer.h>
+#include <net/sock.h>
+#include <net/checksum.h>
+#include <net/ip.h>
+#include <net/ipv6.h>
+#include <net/tcp_states.h>
+#include <linux/uaccess.h>
+#include <asm/ioctls.h>
+
+#include <linux/sunrpc/types.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/svc_xprt.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
+static int svc_deferred_recv(struct svc_rqst *rqstp);
+static struct cache_deferred_req *svc_defer(struct cache_req *req);
+static void svc_age_temp_xprts(unsigned long closure);
+
+/* apparently the "standard" is that clients close
+ * idle connections after 5 minutes, servers after
+ * 6 minutes
+ * http://www.connectathon.org/talks96/nfstcp.pdf
+ */
+static int svc_conn_age_period = 6*60;
+
+/* List of registered transport classes */
+static DEFINE_SPINLOCK(svc_xprt_class_lock);
+static LIST_HEAD(svc_xprt_class_list);
+
+/* SMP locking strategy:
+ *
+ * svc_pool->sp_lock protects most of the fields of that pool.
+ * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
+ * when both need to be taken (rare), svc_serv->sv_lock is first.
+ * BKL protects svc_serv->sv_nrthread.
+ * svc_sock->sk_lock protects the svc_sock->sk_deferred list
+ * and the ->sk_info_authunix cache.
+ *
+ * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being
+ * enqueued multiply. During normal transport processing this bit
+ * is set by svc_xprt_enqueue and cleared by svc_xprt_received.
+ * Providers should not manipulate this bit directly.
+ *
+ * Some flags can be set to certain values at any time
+ * providing that certain rules are followed:
+ *
+ * XPT_CONN, XPT_DATA:
+ * - Can be set or cleared at any time.
+ * - After a set, svc_xprt_enqueue must be called to enqueue
+ * the transport for processing.
+ * - After a clear, the transport must be read/accepted.
+ * If this succeeds, it must be set again.
+ * XPT_CLOSE:
+ * - Can set at any time. It is never cleared.
+ * XPT_DEAD:
+ * - Can only be set while XPT_BUSY is held which ensures
+ * that no other thread will be using the transport or will
+ * try to set XPT_DEAD.
+ */
+
+int svc_reg_xprt_class(struct svc_xprt_class *xcl)
+{
+ struct svc_xprt_class *cl;
+ int res = -EEXIST;
+
+ dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name);
+
+ INIT_LIST_HEAD(&xcl->xcl_list);
+ spin_lock(&svc_xprt_class_lock);
+ /* Make sure there isn't already a class with the same name */
+ list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) {
+ if (strcmp(xcl->xcl_name, cl->xcl_name) == 0)
+ goto out;
+ }
+ list_add_tail(&xcl->xcl_list, &svc_xprt_class_list);
+ res = 0;
+out:
+ spin_unlock(&svc_xprt_class_lock);
+ return res;
+}
+EXPORT_SYMBOL_GPL(svc_reg_xprt_class);
+
+void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
+{
+ dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name);
+ spin_lock(&svc_xprt_class_lock);
+ list_del_init(&xcl->xcl_list);
+ spin_unlock(&svc_xprt_class_lock);
+}
+EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
+
+/*
+ * Format the transport list for printing
+ */
+int svc_print_xprts(char *buf, int maxlen)
+{
+ struct list_head *le;
+ char tmpstr[80];
+ int len = 0;
+ buf[0] = '\0';
+
+ spin_lock(&svc_xprt_class_lock);
+ list_for_each(le, &svc_xprt_class_list) {
+ int slen;
+ struct svc_xprt_class *xcl =
+ list_entry(le, struct svc_xprt_class, xcl_list);
+
+ sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
+ slen = strlen(tmpstr);
+ if (len + slen > maxlen)
+ break;
+ len += slen;
+ strcat(buf, tmpstr);
+ }
+ spin_unlock(&svc_xprt_class_lock);
+
+ return len;
+}
+
+static void svc_xprt_free(struct kref *kref)
+{
+ struct svc_xprt *xprt =
+ container_of(kref, struct svc_xprt, xpt_ref);
+ struct module *owner = xprt->xpt_class->xcl_owner;
+ if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)
+ && xprt->xpt_auth_cache != NULL)
+ svcauth_unix_info_release(xprt->xpt_auth_cache);
+ xprt->xpt_ops->xpo_free(xprt);
+ module_put(owner);
+}
+
+void svc_xprt_put(struct svc_xprt *xprt)
+{
+ kref_put(&xprt->xpt_ref, svc_xprt_free);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_put);
+
+/*
+ * Called by transport drivers to initialize the transport independent
+ * portion of the transport instance.
+ */
+void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt,
+ struct svc_serv *serv)
+{
+ memset(xprt, 0, sizeof(*xprt));
+ xprt->xpt_class = xcl;
+ xprt->xpt_ops = xcl->xcl_ops;
+ kref_init(&xprt->xpt_ref);
+ xprt->xpt_server = serv;
+ INIT_LIST_HEAD(&xprt->xpt_list);
+ INIT_LIST_HEAD(&xprt->xpt_ready);
+ INIT_LIST_HEAD(&xprt->xpt_deferred);
+ mutex_init(&xprt->xpt_mutex);
+ spin_lock_init(&xprt->xpt_lock);
+ set_bit(XPT_BUSY, &xprt->xpt_flags);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_init);
+
+int svc_create_xprt(struct svc_serv *serv, char *xprt_name, unsigned short port,
+ int flags)
+{
+ struct svc_xprt_class *xcl;
+ struct sockaddr_in sin = {
+ .sin_family = AF_INET,
+ .sin_addr.s_addr = INADDR_ANY,
+ .sin_port = htons(port),
+ };
+ dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
+ spin_lock(&svc_xprt_class_lock);
+ list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
+ struct svc_xprt *newxprt;
+
+ if (strcmp(xprt_name, xcl->xcl_name))
+ continue;
+
+ if (!try_module_get(xcl->xcl_owner))
+ goto err;
+
+ spin_unlock(&svc_xprt_class_lock);
+ newxprt = xcl->xcl_ops->
+ xpo_create(serv, (struct sockaddr *)&sin, sizeof(sin),
+ flags);
+ if (IS_ERR(newxprt)) {
+ module_put(xcl->xcl_owner);
+ return PTR_ERR(newxprt);
+ }
+
+ clear_bit(XPT_TEMP, &newxprt->xpt_flags);
+ spin_lock_bh(&serv->sv_lock);
+ list_add(&newxprt->xpt_list, &serv->sv_permsocks);
+ spin_unlock_bh(&serv->sv_lock);
+ clear_bit(XPT_BUSY, &newxprt->xpt_flags);
+ return svc_xprt_local_port(newxprt);
+ }
+ err:
+ spin_unlock(&svc_xprt_class_lock);
+ dprintk("svc: transport %s not found\n", xprt_name);
+ return -ENOENT;
+}
+EXPORT_SYMBOL_GPL(svc_create_xprt);
+
+/*
+ * Copy the local and remote xprt addresses to the rqstp structure
+ */
+void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+ struct sockaddr *sin;
+
+ memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen);
+ rqstp->rq_addrlen = xprt->xpt_remotelen;
+
+ /*
+ * Destination address in request is needed for binding the
+ * source address in RPC replies/callbacks later.
+ */
+ sin = (struct sockaddr *)&xprt->xpt_local;
+ switch (sin->sa_family) {
+ case AF_INET:
+ rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
+ break;
+ case AF_INET6:
+ rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
+ break;
+ }
+}
+EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs);
+
+/**
+ * svc_print_addr - Format rq_addr field for printing
+ * @rqstp: svc_rqst struct containing address to print
+ * @buf: target buffer for formatted address
+ * @len: length of target buffer
+ *
+ */
+char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
+{
+ return __svc_print_addr(svc_addr(rqstp), buf, len);
+}
+EXPORT_SYMBOL_GPL(svc_print_addr);
+
+/*
+ * Queue up an idle server thread. Must have pool->sp_lock held.
+ * Note: this is really a stack rather than a queue, so that we only
+ * use as many different threads as we need, and the rest don't pollute
+ * the cache.
+ */
+static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
+{
+ list_add(&rqstp->rq_list, &pool->sp_threads);
+}
+
+/*
+ * Dequeue an nfsd thread. Must have pool->sp_lock held.
+ */
+static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
+{
+ list_del(&rqstp->rq_list);
+}
+
+/*
+ * Queue up a transport with data pending. If there are idle nfsd
+ * processes, wake 'em up.
+ *
+ */
+void svc_xprt_enqueue(struct svc_xprt *xprt)
+{
+ struct svc_serv *serv = xprt->xpt_server;
+ struct svc_pool *pool;
+ struct svc_rqst *rqstp;
+ int cpu;
+
+ if (!(xprt->xpt_flags &
+ ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
+ return;
+ if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+ return;
+
+ cpu = get_cpu();
+ pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
+ put_cpu();
+
+ spin_lock_bh(&pool->sp_lock);
+
+ if (!list_empty(&pool->sp_threads) &&
+ !list_empty(&pool->sp_sockets))
+ printk(KERN_ERR
+ "svc_xprt_enqueue: "
+ "threads and transports both waiting??\n");
+
+ if (test_bit(XPT_DEAD, &xprt->xpt_flags)) {
+ /* Don't enqueue dead transports */
+ dprintk("svc: transport %p is dead, not enqueued\n", xprt);
+ goto out_unlock;
+ }
+
+ /* Mark transport as busy. It will remain in this state until
+ * the provider calls svc_xprt_received. We update XPT_BUSY
+ * atomically because it also guards against trying to enqueue
+ * the transport twice.
+ */
+ if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
+ /* Don't enqueue transport while already enqueued */
+ dprintk("svc: transport %p busy, not enqueued\n", xprt);
+ goto out_unlock;
+ }
+ BUG_ON(xprt->xpt_pool != NULL);
+ xprt->xpt_pool = pool;
+
+ /* Handle pending connection */
+ if (test_bit(XPT_CONN, &xprt->xpt_flags))
+ goto process;
+
+ /* Handle close in-progress */
+ if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+ goto process;
+
+ /* Check if we have space to reply to a request */
+ if (!xprt->xpt_ops->xpo_has_wspace(xprt)) {
+ /* Don't enqueue while not enough space for reply */
+ dprintk("svc: no write space, transport %p not enqueued\n",
+ xprt);
+ xprt->xpt_pool = NULL;
+ clear_bit(XPT_BUSY, &xprt->xpt_flags);
+ goto out_unlock;
+ }
+
+ process:
+ if (!list_empty(&pool->sp_threads)) {
+ rqstp = list_entry(pool->sp_threads.next,
+ struct svc_rqst,
+ rq_list);
+ dprintk("svc: transport %p served by daemon %p\n",
+ xprt, rqstp);
+ svc_thread_dequeue(pool, rqstp);
+ if (rqstp->rq_xprt)
+ printk(KERN_ERR
+ "svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
+ rqstp, rqstp->rq_xprt);
+ rqstp->rq_xprt = xprt;
+ svc_xprt_get(xprt);
+ rqstp->rq_reserved = serv->sv_max_mesg;
+ atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
+ BUG_ON(xprt->xpt_pool != pool);
+ wake_up(&rqstp->rq_wait);
+ } else {
+ dprintk("svc: transport %p put into queue\n", xprt);
+ list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
+ BUG_ON(xprt->xpt_pool != pool);
+ }
+
+out_unlock:
+ spin_unlock_bh(&pool->sp_lock);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
+
+/*
+ * Dequeue the first transport. Must be called with the pool->sp_lock held.
+ */
+static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
+{
+ struct svc_xprt *xprt;
+
+ if (list_empty(&pool->sp_sockets))
+ return NULL;
+
+ xprt = list_entry(pool->sp_sockets.next,
+ struct svc_xprt, xpt_ready);
+ list_del_init(&xprt->xpt_ready);
+
+ dprintk("svc: transport %p dequeued, inuse=%d\n",
+ xprt, atomic_read(&xprt->xpt_ref.refcount));
+
+ return xprt;
+}
+
+/*
+ * svc_xprt_received conditionally queues the transport for processing
+ * by another thread. The caller must hold the XPT_BUSY bit and must
+ * not thereafter touch transport data.
+ *
+ * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
+ * insufficient) data.
+ */
+void svc_xprt_received(struct svc_xprt *xprt)
+{
+ BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
+ xprt->xpt_pool = NULL;
+ clear_bit(XPT_BUSY, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_received);
+
+/**
+ * svc_reserve - change the space reserved for the reply to a request.
+ * @rqstp: The request in question
+ * @space: new max space to reserve
+ *
+ * Each request reserves some space on the output queue of the transport
+ * to make sure the reply fits. This function reduces that reserved
+ * space to be the amount of space used already, plus @space.
+ *
+ */
+void svc_reserve(struct svc_rqst *rqstp, int space)
+{
+ space += rqstp->rq_res.head[0].iov_len;
+
+ if (space < rqstp->rq_reserved) {
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
+ rqstp->rq_reserved = space;
+
+ svc_xprt_enqueue(xprt);
+ }
+}
+EXPORT_SYMBOL(svc_reserve);
+
+static void svc_xprt_release(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+
+ rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
+
+ svc_free_res_pages(rqstp);
+ rqstp->rq_res.page_len = 0;
+ rqstp->rq_res.page_base = 0;
+
+ /* Reset response buffer and release
+ * the reservation.
+ * But first, check that enough space was reserved
+ * for the reply, otherwise we have a bug!
+ */
+ if ((rqstp->rq_res.len) > rqstp->rq_reserved)
+ printk(KERN_ERR "RPC request reserved %d but used %d\n",
+ rqstp->rq_reserved,
+ rqstp->rq_res.len);
+
+ rqstp->rq_res.head[0].iov_len = 0;
+ svc_reserve(rqstp, 0);
+ rqstp->rq_xprt = NULL;
+
+ svc_xprt_put(xprt);
+}
+
+/*
+ * External function to wake up a server waiting for data
+ * This really only makes sense for services like lockd
+ * which have exactly one thread anyway.
+ */
+void svc_wake_up(struct svc_serv *serv)
+{
+ struct svc_rqst *rqstp;
+ unsigned int i;
+ struct svc_pool *pool;
+
+ for (i = 0; i < serv->sv_nrpools; i++) {
+ pool = &serv->sv_pools[i];
+
+ spin_lock_bh(&pool->sp_lock);
+ if (!list_empty(&pool->sp_threads)) {
+ rqstp = list_entry(pool->sp_threads.next,
+ struct svc_rqst,
+ rq_list);
+ dprintk("svc: daemon %p woken up.\n", rqstp);
+ /*
+ svc_thread_dequeue(pool, rqstp);
+ rqstp->rq_xprt = NULL;
+ */
+ wake_up(&rqstp->rq_wait);
+ }
+ spin_unlock_bh(&pool->sp_lock);
+ }
+}
+EXPORT_SYMBOL(svc_wake_up);
+
+int svc_port_is_privileged(struct sockaddr *sin)
+{
+ switch (sin->sa_family) {
+ case AF_INET:
+ return ntohs(((struct sockaddr_in *)sin)->sin_port)
+ < PROT_SOCK;
+ case AF_INET6:
+ return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
+ < PROT_SOCK;
+ default:
+ return 0;
+ }
+}
+
+/*
+ * Make sure that we don't have too many active connections. If we
+ * have, something must be dropped.
+ *
+ * There's no point in trying to do random drop here for DoS
+ * prevention. The NFS clients does 1 reconnect in 15 seconds. An
+ * attacker can easily beat that.
+ *
+ * The only somewhat efficient mechanism would be if drop old
+ * connections from the same IP first. But right now we don't even
+ * record the client IP in svc_sock.
+ */
+static void svc_check_conn_limits(struct svc_serv *serv)
+{
+ if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
+ struct svc_xprt *xprt = NULL;
+ spin_lock_bh(&serv->sv_lock);
+ if (!list_empty(&serv->sv_tempsocks)) {
+ if (net_ratelimit()) {
+ /* Try to help the admin */
+ printk(KERN_NOTICE "%s: too many open "
+ "connections, consider increasing the "
+ "number of nfsd threads\n",
+ serv->sv_name);
+ }
+ /*
+ * Always select the oldest connection. It's not fair,
+ * but so is life
+ */
+ xprt = list_entry(serv->sv_tempsocks.prev,
+ struct svc_xprt,
+ xpt_list);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_get(xprt);
+ }
+ spin_unlock_bh(&serv->sv_lock);
+
+ if (xprt) {
+ svc_xprt_enqueue(xprt);
+ svc_xprt_put(xprt);
+ }
+ }
+}
+
+/*
+ * Receive the next request on any transport. This code is carefully
+ * organised not to touch any cachelines in the shared svc_serv
+ * structure, only cachelines in the local svc_pool.
+ */
+int svc_recv(struct svc_rqst *rqstp, long timeout)
+{
+ struct svc_xprt *xprt = NULL;
+ struct svc_serv *serv = rqstp->rq_server;
+ struct svc_pool *pool = rqstp->rq_pool;
+ int len, i;
+ int pages;
+ struct xdr_buf *arg;
+ DECLARE_WAITQUEUE(wait, current);
+
+ dprintk("svc: server %p waiting for data (to = %ld)\n",
+ rqstp, timeout);
+
+ if (rqstp->rq_xprt)
+ printk(KERN_ERR
+ "svc_recv: service %p, transport not NULL!\n",
+ rqstp);
+ if (waitqueue_active(&rqstp->rq_wait))
+ printk(KERN_ERR
+ "svc_recv: service %p, wait queue active!\n",
+ rqstp);
+
+ /* now allocate needed pages. If we get a failure, sleep briefly */
+ pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
+ for (i = 0; i < pages ; i++)
+ while (rqstp->rq_pages[i] == NULL) {
+ struct page *p = alloc_page(GFP_KERNEL);
+ if (!p) {
+ int j = msecs_to_jiffies(500);
+ schedule_timeout_uninterruptible(j);
+ }
+ rqstp->rq_pages[i] = p;
+ }
+ rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
+ BUG_ON(pages >= RPCSVC_MAXPAGES);
+
+ /* Make arg->head point to first page and arg->pages point to rest */
+ arg = &rqstp->rq_arg;
+ arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
+ arg->head[0].iov_len = PAGE_SIZE;
+ arg->pages = rqstp->rq_pages + 1;
+ arg->page_base = 0;
+ /* save at least one page for response */
+ arg->page_len = (pages-2)*PAGE_SIZE;
+ arg->len = (pages-1)*PAGE_SIZE;
+ arg->tail[0].iov_len = 0;
+
+ try_to_freeze();
+ cond_resched();
+ if (signalled())
+ return -EINTR;
+
+ spin_lock_bh(&pool->sp_lock);
+ xprt = svc_xprt_dequeue(pool);
+ if (xprt) {
+ rqstp->rq_xprt = xprt;
+ svc_xprt_get(xprt);
+ rqstp->rq_reserved = serv->sv_max_mesg;
+ atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
+ } else {
+ /* No data pending. Go to sleep */
+ svc_thread_enqueue(pool, rqstp);
+
+ /*
+ * We have to be able to interrupt this wait
+ * to bring down the daemons ...
+ */
+ set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&rqstp->rq_wait, &wait);
+ spin_unlock_bh(&pool->sp_lock);
+
+ schedule_timeout(timeout);
+
+ try_to_freeze();
+
+ spin_lock_bh(&pool->sp_lock);
+ remove_wait_queue(&rqstp->rq_wait, &wait);
+
+ xprt = rqstp->rq_xprt;
+ if (!xprt) {
+ svc_thread_dequeue(pool, rqstp);
+ spin_unlock_bh(&pool->sp_lock);
+ dprintk("svc: server %p, no data yet\n", rqstp);
+ return signalled()? -EINTR : -EAGAIN;
+ }
+ }
+ spin_unlock_bh(&pool->sp_lock);
+
+ len = 0;
+ if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
+ dprintk("svc_recv: found XPT_CLOSE\n");
+ svc_delete_xprt(xprt);
+ } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
+ struct svc_xprt *newxpt;
+ newxpt = xprt->xpt_ops->xpo_accept(xprt);
+ if (newxpt) {
+ /*
+ * We know this module_get will succeed because the
+ * listener holds a reference too
+ */
+ __module_get(newxpt->xpt_class->xcl_owner);
+ svc_check_conn_limits(xprt->xpt_server);
+ spin_lock_bh(&serv->sv_lock);
+ set_bit(XPT_TEMP, &newxpt->xpt_flags);
+ list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
+ serv->sv_tmpcnt++;
+ if (serv->sv_temptimer.function == NULL) {
+ /* setup timer to age temp transports */
+ setup_timer(&serv->sv_temptimer,
+ svc_age_temp_xprts,
+ (unsigned long)serv);
+ mod_timer(&serv->sv_temptimer,
+ jiffies + svc_conn_age_period * HZ);
+ }
+ spin_unlock_bh(&serv->sv_lock);
+ svc_xprt_received(newxpt);
+ }
+ svc_xprt_received(xprt);
+ } else {
+ dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
+ rqstp, pool->sp_id, xprt,
+ atomic_read(&xprt->xpt_ref.refcount));
+ rqstp->rq_deferred = svc_deferred_dequeue(xprt);
+ if (rqstp->rq_deferred) {
+ svc_xprt_received(xprt);
+ len = svc_deferred_recv(rqstp);
+ } else
+ len = xprt->xpt_ops->xpo_recvfrom(rqstp);
+ dprintk("svc: got len=%d\n", len);
+ }
+
+ /* No data, incomplete (TCP) read, or accept() */
+ if (len == 0 || len == -EAGAIN) {
+ rqstp->rq_res.len = 0;
+ svc_xprt_release(rqstp);
+ return -EAGAIN;
+ }
+ clear_bit(XPT_OLD, &xprt->xpt_flags);
+
+ rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
+ rqstp->rq_chandle.defer = svc_defer;
+
+ if (serv->sv_stats)
+ serv->sv_stats->netcnt++;
+ return len;
+}
+EXPORT_SYMBOL(svc_recv);
+
+/*
+ * Drop request
+ */
+void svc_drop(struct svc_rqst *rqstp)
+{
+ dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
+ svc_xprt_release(rqstp);
+}
+EXPORT_SYMBOL(svc_drop);
+
+/*
+ * Return reply to client.
+ */
+int svc_send(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt;
+ int len;
+ struct xdr_buf *xb;
+
+ xprt = rqstp->rq_xprt;
+ if (!xprt)
+ return -EFAULT;
+
+ /* release the receive skb before sending the reply */
+ rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
+
+ /* calculate over-all length */
+ xb = &rqstp->rq_res;
+ xb->len = xb->head[0].iov_len +
+ xb->page_len +
+ xb->tail[0].iov_len;
+
+ /* Grab mutex to serialize outgoing data. */
+ mutex_lock(&xprt->xpt_mutex);
+ if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+ len = -ENOTCONN;
+ else
+ len = xprt->xpt_ops->xpo_sendto(rqstp);
+ mutex_unlock(&xprt->xpt_mutex);
+ svc_xprt_release(rqstp);
+
+ if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
+ return 0;
+ return len;
+}
+
+/*
+ * Timer function to close old temporary transports, using
+ * a mark-and-sweep algorithm.
+ */
+static void svc_age_temp_xprts(unsigned long closure)
+{
+ struct svc_serv *serv = (struct svc_serv *)closure;
+ struct svc_xprt *xprt;
+ struct list_head *le, *next;
+ LIST_HEAD(to_be_aged);
+
+ dprintk("svc_age_temp_xprts\n");
+
+ if (!spin_trylock_bh(&serv->sv_lock)) {
+ /* busy, try again 1 sec later */
+ dprintk("svc_age_temp_xprts: busy\n");
+ mod_timer(&serv->sv_temptimer, jiffies + HZ);
+ return;
+ }
+
+ list_for_each_safe(le, next, &serv->sv_tempsocks) {
+ xprt = list_entry(le, struct svc_xprt, xpt_list);
+
+ /* First time through, just mark it OLD. Second time
+ * through, close it. */
+ if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags))
+ continue;
+ if (atomic_read(&xprt->xpt_ref.refcount) > 1
+ || test_bit(XPT_BUSY, &xprt->xpt_flags))
+ continue;
+ svc_xprt_get(xprt);
+ list_move(le, &to_be_aged);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ set_bit(XPT_DETACHED, &xprt->xpt_flags);
+ }
+ spin_unlock_bh(&serv->sv_lock);
+
+ while (!list_empty(&to_be_aged)) {
+ le = to_be_aged.next;
+ /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */
+ list_del_init(le);
+ xprt = list_entry(le, struct svc_xprt, xpt_list);
+
+ dprintk("queuing xprt %p for closing\n", xprt);
+
+ /* a thread will dequeue and close it soon */
+ svc_xprt_enqueue(xprt);
+ svc_xprt_put(xprt);
+ }
+
+ mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
+}
+
+/*
+ * Remove a dead transport
+ */
+void svc_delete_xprt(struct svc_xprt *xprt)
+{
+ struct svc_serv *serv = xprt->xpt_server;
+
+ dprintk("svc: svc_delete_xprt(%p)\n", xprt);
+ xprt->xpt_ops->xpo_detach(xprt);
+
+ spin_lock_bh(&serv->sv_lock);
+ if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
+ list_del_init(&xprt->xpt_list);
+ /*
+ * We used to delete the transport from whichever list
+ * it's sk_xprt.xpt_ready node was on, but we don't actually
+ * need to. This is because the only time we're called
+ * while still attached to a queue, the queue itself
+ * is about to be destroyed (in svc_destroy).
+ */
+ if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) {
+ BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2);
+ if (test_bit(XPT_TEMP, &xprt->xpt_flags))
+ serv->sv_tmpcnt--;
+ svc_xprt_put(xprt);
+ }
+ spin_unlock_bh(&serv->sv_lock);
+}
+
+void svc_close_xprt(struct svc_xprt *xprt)
+{
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
+ /* someone else will have to effect the close */
+ return;
+
+ svc_xprt_get(xprt);
+ svc_delete_xprt(xprt);
+ clear_bit(XPT_BUSY, &xprt->xpt_flags);
+ svc_xprt_put(xprt);
+}
+EXPORT_SYMBOL_GPL(svc_close_xprt);
+
+void svc_close_all(struct list_head *xprt_list)
+{
+ struct svc_xprt *xprt;
+ struct svc_xprt *tmp;
+
+ list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ if (test_bit(XPT_BUSY, &xprt->xpt_flags)) {
+ /* Waiting to be processed, but no threads left,
+ * So just remove it from the waiting list
+ */
+ list_del_init(&xprt->xpt_ready);
+ clear_bit(XPT_BUSY, &xprt->xpt_flags);
+ }
+ svc_close_xprt(xprt);
+ }
+}
+
+/*
+ * Handle defer and revisit of requests
+ */
+
+static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
+{
+ struct svc_deferred_req *dr =
+ container_of(dreq, struct svc_deferred_req, handle);
+ struct svc_xprt *xprt = dr->xprt;
+
+ if (too_many) {
+ svc_xprt_put(xprt);
+ kfree(dr);
+ return;
+ }
+ dprintk("revisit queued\n");
+ dr->xprt = NULL;
+ spin_lock(&xprt->xpt_lock);
+ list_add(&dr->handle.recent, &xprt->xpt_deferred);
+ spin_unlock(&xprt->xpt_lock);
+ set_bit(XPT_DEFERRED, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ svc_xprt_put(xprt);
+}
+
+/*
+ * Save the request off for later processing. The request buffer looks
+ * like this:
+ *
+ * <xprt-header><rpc-header><rpc-pagelist><rpc-tail>
+ *
+ * This code can only handle requests that consist of an xprt-header
+ * and rpc-header.
+ */
+static struct cache_deferred_req *svc_defer(struct cache_req *req)
+{
+ struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
+ struct svc_deferred_req *dr;
+
+ if (rqstp->rq_arg.page_len)
+ return NULL; /* if more than a page, give up FIXME */
+ if (rqstp->rq_deferred) {
+ dr = rqstp->rq_deferred;
+ rqstp->rq_deferred = NULL;
+ } else {
+ size_t skip;
+ size_t size;
+ /* FIXME maybe discard if size too large */
+ size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len;
+ dr = kmalloc(size, GFP_KERNEL);
+ if (dr == NULL)
+ return NULL;
+
+ dr->handle.owner = rqstp->rq_server;
+ dr->prot = rqstp->rq_prot;
+ memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
+ dr->addrlen = rqstp->rq_addrlen;
+ dr->daddr = rqstp->rq_daddr;
+ dr->argslen = rqstp->rq_arg.len >> 2;
+ dr->xprt_hlen = rqstp->rq_xprt_hlen;
+
+ /* back up head to the start of the buffer and copy */
+ skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
+ memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip,
+ dr->argslen << 2);
+ }
+ svc_xprt_get(rqstp->rq_xprt);
+ dr->xprt = rqstp->rq_xprt;
+
+ dr->handle.revisit = svc_revisit;
+ return &dr->handle;
+}
+
+/*
+ * recv data from a deferred request into an active one
+ */
+static int svc_deferred_recv(struct svc_rqst *rqstp)
+{
+ struct svc_deferred_req *dr = rqstp->rq_deferred;
+
+ /* setup iov_base past transport header */
+ rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2);
+ /* The iov_len does not include the transport header bytes */
+ rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen;
+ rqstp->rq_arg.page_len = 0;
+ /* The rq_arg.len includes the transport header bytes */
+ rqstp->rq_arg.len = dr->argslen<<2;
+ rqstp->rq_prot = dr->prot;
+ memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
+ rqstp->rq_addrlen = dr->addrlen;
+ /* Save off transport header len in case we get deferred again */
+ rqstp->rq_xprt_hlen = dr->xprt_hlen;
+ rqstp->rq_daddr = dr->daddr;
+ rqstp->rq_respages = rqstp->rq_pages;
+ return (dr->argslen<<2) - dr->xprt_hlen;
+}
+
+
+static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
+{
+ struct svc_deferred_req *dr = NULL;
+
+ if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags))
+ return NULL;
+ spin_lock(&xprt->xpt_lock);
+ clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
+ if (!list_empty(&xprt->xpt_deferred)) {
+ dr = list_entry(xprt->xpt_deferred.next,
+ struct svc_deferred_req,
+ handle.recent);
+ list_del_init(&dr->handle.recent);
+ set_bit(XPT_DEFERRED, &xprt->xpt_flags);
+ }
+ spin_unlock(&xprt->xpt_lock);
+ return dr;
+}
+
+/*
+ * Return the transport instance pointer for the endpoint accepting
+ * connections/peer traffic from the specified transport class,
+ * address family and port.
+ *
+ * Specifying 0 for the address family or port is effectively a
+ * wild-card, and will result in matching the first transport in the
+ * service's list that has a matching class name.
+ */
+struct svc_xprt *svc_find_xprt(struct svc_serv *serv, char *xcl_name,
+ int af, int port)
+{
+ struct svc_xprt *xprt;
+ struct svc_xprt *found = NULL;
+
+ /* Sanity check the args */
+ if (!serv || !xcl_name)
+ return found;
+
+ spin_lock_bh(&serv->sv_lock);
+ list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
+ if (strcmp(xprt->xpt_class->xcl_name, xcl_name))
+ continue;
+ if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family)
+ continue;
+ if (port && port != svc_xprt_local_port(xprt))
+ continue;
+ found = xprt;
+ svc_xprt_get(xprt);
+ break;
+ }
+ spin_unlock_bh(&serv->sv_lock);
+ return found;
+}
+EXPORT_SYMBOL_GPL(svc_find_xprt);
+
+/*
+ * Format a buffer with a list of the active transports. A zero for
+ * the buflen parameter disables target buffer overflow checking.
+ */
+int svc_xprt_names(struct svc_serv *serv, char *buf, int buflen)
+{
+ struct svc_xprt *xprt;
+ char xprt_str[64];
+ int totlen = 0;
+ int len;
+
+ /* Sanity check args */
+ if (!serv)
+ return 0;
+
+ spin_lock_bh(&serv->sv_lock);
+ list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
+ len = snprintf(xprt_str, sizeof(xprt_str),
+ "%s %d\n", xprt->xpt_class->xcl_name,
+ svc_xprt_local_port(xprt));
+ /* If the string was truncated, replace with error string */
+ if (len >= sizeof(xprt_str))
+ strcpy(xprt_str, "name-too-long\n");
+ /* Don't overflow buffer */
+ len = strlen(xprt_str);
+ if (buflen && (len + totlen >= buflen))
+ break;
+ strcpy(buf+totlen, xprt_str);
+ totlen += len;
+ }
+ spin_unlock_bh(&serv->sv_lock);
+ return totlen;
+}
+EXPORT_SYMBOL_GPL(svc_xprt_names);
diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c
index af7c5f0..8a73cbb 100644
--- a/net/sunrpc/svcauth.c
+++ b/net/sunrpc/svcauth.c
@@ -57,11 +57,13 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)
rqstp->rq_authop = aops;
return aops->accept(rqstp, authp);
}
+EXPORT_SYMBOL(svc_authenticate);
int svc_set_client(struct svc_rqst *rqstp)
{
return rqstp->rq_authop->set_client(rqstp);
}
+EXPORT_SYMBOL(svc_set_client);
/* A request, which was authenticated, has now executed.
* Time to finalise the credentials and verifier
@@ -93,6 +95,7 @@ svc_auth_register(rpc_authflavor_t flavor, struct auth_ops *aops)
spin_unlock(&authtab_lock);
return rv;
}
+EXPORT_SYMBOL(svc_auth_register);
void
svc_auth_unregister(rpc_authflavor_t flavor)
@@ -129,6 +132,7 @@ void auth_domain_put(struct auth_domain *dom)
spin_unlock(&auth_domain_lock);
}
}
+EXPORT_SYMBOL(auth_domain_put);
struct auth_domain *
auth_domain_lookup(char *name, struct auth_domain *new)
@@ -153,8 +157,10 @@ auth_domain_lookup(char *name, struct auth_domain *new)
spin_unlock(&auth_domain_lock);
return new;
}
+EXPORT_SYMBOL(auth_domain_lookup);
struct auth_domain *auth_domain_find(char *name)
{
return auth_domain_lookup(name, NULL);
}
+EXPORT_SYMBOL(auth_domain_find);
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 41147941..3c64051 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -63,6 +63,7 @@ struct auth_domain *unix_domain_find(char *name)
rv = auth_domain_lookup(name, &new->h);
}
}
+EXPORT_SYMBOL(unix_domain_find);
static void svcauth_unix_domain_release(struct auth_domain *dom)
{
@@ -340,6 +341,7 @@ int auth_unix_add_addr(struct in_addr addr, struct auth_domain *dom)
else
return -ENOMEM;
}
+EXPORT_SYMBOL(auth_unix_add_addr);
int auth_unix_forget_old(struct auth_domain *dom)
{
@@ -351,6 +353,7 @@ int auth_unix_forget_old(struct auth_domain *dom)
udom->addr_changes++;
return 0;
}
+EXPORT_SYMBOL(auth_unix_forget_old);
struct auth_domain *auth_unix_lookup(struct in_addr addr)
{
@@ -375,50 +378,56 @@ struct auth_domain *auth_unix_lookup(struct in_addr addr)
cache_put(&ipm->h, &ip_map_cache);
return rv;
}
+EXPORT_SYMBOL(auth_unix_lookup);
void svcauth_unix_purge(void)
{
cache_purge(&ip_map_cache);
}
+EXPORT_SYMBOL(svcauth_unix_purge);
static inline struct ip_map *
ip_map_cached_get(struct svc_rqst *rqstp)
{
- struct ip_map *ipm;
- struct svc_sock *svsk = rqstp->rq_sock;
- spin_lock(&svsk->sk_lock);
- ipm = svsk->sk_info_authunix;
- if (ipm != NULL) {
- if (!cache_valid(&ipm->h)) {
- /*
- * The entry has been invalidated since it was
- * remembered, e.g. by a second mount from the
- * same IP address.
- */
- svsk->sk_info_authunix = NULL;
- spin_unlock(&svsk->sk_lock);
- cache_put(&ipm->h, &ip_map_cache);
- return NULL;
+ struct ip_map *ipm = NULL;
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+
+ if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) {
+ spin_lock(&xprt->xpt_lock);
+ ipm = xprt->xpt_auth_cache;
+ if (ipm != NULL) {
+ if (!cache_valid(&ipm->h)) {
+ /*
+ * The entry has been invalidated since it was
+ * remembered, e.g. by a second mount from the
+ * same IP address.
+ */
+ xprt->xpt_auth_cache = NULL;
+ spin_unlock(&xprt->xpt_lock);
+ cache_put(&ipm->h, &ip_map_cache);
+ return NULL;
+ }
+ cache_get(&ipm->h);
}
- cache_get(&ipm->h);
+ spin_unlock(&xprt->xpt_lock);
}
- spin_unlock(&svsk->sk_lock);
return ipm;
}
static inline void
ip_map_cached_put(struct svc_rqst *rqstp, struct ip_map *ipm)
{
- struct svc_sock *svsk = rqstp->rq_sock;
+ struct svc_xprt *xprt = rqstp->rq_xprt;
- spin_lock(&svsk->sk_lock);
- if (svsk->sk_sock->type == SOCK_STREAM &&
- svsk->sk_info_authunix == NULL) {
- /* newly cached, keep the reference */
- svsk->sk_info_authunix = ipm;
- ipm = NULL;
+ if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) {
+ spin_lock(&xprt->xpt_lock);
+ if (xprt->xpt_auth_cache == NULL) {
+ /* newly cached, keep the reference */
+ xprt->xpt_auth_cache = ipm;
+ ipm = NULL;
+ }
+ spin_unlock(&xprt->xpt_lock);
}
- spin_unlock(&svsk->sk_lock);
if (ipm)
cache_put(&ipm->h, &ip_map_cache);
}
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index c75bffe..1d3e5fc 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -5,7 +5,7 @@
*
* The server scheduling algorithm does not always distribute the load
* evenly when servicing a single client. May need to modify the
- * svc_sock_enqueue procedure...
+ * svc_xprt_enqueue procedure...
*
* TCP support is largely untested and may be a little slow. The problem
* is that we currently do two separate recvfrom's, one for the 4-byte
@@ -48,72 +48,40 @@
#include <linux/sunrpc/svcsock.h>
#include <linux/sunrpc/stats.h>
-/* SMP locking strategy:
- *
- * svc_pool->sp_lock protects most of the fields of that pool.
- * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
- * when both need to be taken (rare), svc_serv->sv_lock is first.
- * BKL protects svc_serv->sv_nrthread.
- * svc_sock->sk_lock protects the svc_sock->sk_deferred list
- * and the ->sk_info_authunix cache.
- * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
- *
- * Some flags can be set to certain values at any time
- * providing that certain rules are followed:
- *
- * SK_CONN, SK_DATA, can be set or cleared at any time.
- * after a set, svc_sock_enqueue must be called.
- * after a clear, the socket must be read/accepted
- * if this succeeds, it must be set again.
- * SK_CLOSE can set at any time. It is never cleared.
- * sk_inuse contains a bias of '1' until SK_DEAD is set.
- * so when sk_inuse hits zero, we know the socket is dead
- * and no-one is using it.
- * SK_DEAD can only be set while SK_BUSY is held which ensures
- * no other thread will be using the socket or will try to
- * set SK_DEAD.
- *
- */
-
-#define RPCDBG_FACILITY RPCDBG_SVCSOCK
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
int *errp, int flags);
-static void svc_delete_socket(struct svc_sock *svsk);
static void svc_udp_data_ready(struct sock *, int);
static int svc_udp_recvfrom(struct svc_rqst *);
static int svc_udp_sendto(struct svc_rqst *);
-static void svc_close_socket(struct svc_sock *svsk);
-
-static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk);
-static int svc_deferred_recv(struct svc_rqst *rqstp);
-static struct cache_deferred_req *svc_defer(struct cache_req *req);
-
-/* apparently the "standard" is that clients close
- * idle connections after 5 minutes, servers after
- * 6 minutes
- * http://www.connectathon.org/talks96/nfstcp.pdf
- */
-static int svc_conn_age_period = 6*60;
+static void svc_sock_detach(struct svc_xprt *);
+static void svc_sock_free(struct svc_xprt *);
+static struct svc_xprt *svc_create_socket(struct svc_serv *, int,
+ struct sockaddr *, int, int);
#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key svc_key[2];
static struct lock_class_key svc_slock_key[2];
-static inline void svc_reclassify_socket(struct socket *sock)
+static void svc_reclassify_socket(struct socket *sock)
{
struct sock *sk = sock->sk;
BUG_ON(sock_owned_by_user(sk));
switch (sk->sk_family) {
case AF_INET:
sock_lock_init_class_and_name(sk, "slock-AF_INET-NFSD",
- &svc_slock_key[0], "sk_lock-AF_INET-NFSD", &svc_key[0]);
+ &svc_slock_key[0],
+ "sk_xprt.xpt_lock-AF_INET-NFSD",
+ &svc_key[0]);
break;
case AF_INET6:
sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFSD",
- &svc_slock_key[1], "sk_lock-AF_INET6-NFSD", &svc_key[1]);
+ &svc_slock_key[1],
+ "sk_xprt.xpt_lock-AF_INET6-NFSD",
+ &svc_key[1]);
break;
default:
@@ -121,81 +89,26 @@ static inline void svc_reclassify_socket(struct socket *sock)
}
}
#else
-static inline void svc_reclassify_socket(struct socket *sock)
+static void svc_reclassify_socket(struct socket *sock)
{
}
#endif
-static char *__svc_print_addr(struct sockaddr *addr, char *buf, size_t len)
-{
- switch (addr->sa_family) {
- case AF_INET:
- snprintf(buf, len, "%u.%u.%u.%u, port=%u",
- NIPQUAD(((struct sockaddr_in *) addr)->sin_addr),
- ntohs(((struct sockaddr_in *) addr)->sin_port));
- break;
-
- case AF_INET6:
- snprintf(buf, len, "%x:%x:%x:%x:%x:%x:%x:%x, port=%u",
- NIP6(((struct sockaddr_in6 *) addr)->sin6_addr),
- ntohs(((struct sockaddr_in6 *) addr)->sin6_port));
- break;
-
- default:
- snprintf(buf, len, "unknown address type: %d", addr->sa_family);
- break;
- }
- return buf;
-}
-
-/**
- * svc_print_addr - Format rq_addr field for printing
- * @rqstp: svc_rqst struct containing address to print
- * @buf: target buffer for formatted address
- * @len: length of target buffer
- *
- */
-char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
-{
- return __svc_print_addr(svc_addr(rqstp), buf, len);
-}
-EXPORT_SYMBOL_GPL(svc_print_addr);
-
-/*
- * Queue up an idle server thread. Must have pool->sp_lock held.
- * Note: this is really a stack rather than a queue, so that we only
- * use as many different threads as we need, and the rest don't pollute
- * the cache.
- */
-static inline void
-svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
- list_add(&rqstp->rq_list, &pool->sp_threads);
-}
-
-/*
- * Dequeue an nfsd thread. Must have pool->sp_lock held.
- */
-static inline void
-svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
- list_del(&rqstp->rq_list);
-}
-
/*
* Release an skbuff after use
*/
-static inline void
-svc_release_skb(struct svc_rqst *rqstp)
+static void svc_release_skb(struct svc_rqst *rqstp)
{
- struct sk_buff *skb = rqstp->rq_skbuff;
+ struct sk_buff *skb = rqstp->rq_xprt_ctxt;
struct svc_deferred_req *dr = rqstp->rq_deferred;
if (skb) {
- rqstp->rq_skbuff = NULL;
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+ rqstp->rq_xprt_ctxt = NULL;
dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
- skb_free_datagram(rqstp->rq_sock->sk_sk, skb);
+ skb_free_datagram(svsk->sk_sk, skb);
}
if (dr) {
rqstp->rq_deferred = NULL;
@@ -203,253 +116,6 @@ svc_release_skb(struct svc_rqst *rqstp)
}
}
-/*
- * Any space to write?
- */
-static inline unsigned long
-svc_sock_wspace(struct svc_sock *svsk)
-{
- int wspace;
-
- if (svsk->sk_sock->type == SOCK_STREAM)
- wspace = sk_stream_wspace(svsk->sk_sk);
- else
- wspace = sock_wspace(svsk->sk_sk);
-
- return wspace;
-}
-
-/*
- * Queue up a socket with data pending. If there are idle nfsd
- * processes, wake 'em up.
- *
- */
-static void
-svc_sock_enqueue(struct svc_sock *svsk)
-{
- struct svc_serv *serv = svsk->sk_server;
- struct svc_pool *pool;
- struct svc_rqst *rqstp;
- int cpu;
-
- if (!(svsk->sk_flags &
- ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) ))
- return;
- if (test_bit(SK_DEAD, &svsk->sk_flags))
- return;
-
- cpu = get_cpu();
- pool = svc_pool_for_cpu(svsk->sk_server, cpu);
- put_cpu();
-
- spin_lock_bh(&pool->sp_lock);
-
- if (!list_empty(&pool->sp_threads) &&
- !list_empty(&pool->sp_sockets))
- printk(KERN_ERR
- "svc_sock_enqueue: threads and sockets both waiting??\n");
-
- if (test_bit(SK_DEAD, &svsk->sk_flags)) {
- /* Don't enqueue dead sockets */
- dprintk("svc: socket %p is dead, not enqueued\n", svsk->sk_sk);
- goto out_unlock;
- }
-
- /* Mark socket as busy. It will remain in this state until the
- * server has processed all pending data and put the socket back
- * on the idle list. We update SK_BUSY atomically because
- * it also guards against trying to enqueue the svc_sock twice.
- */
- if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) {
- /* Don't enqueue socket while already enqueued */
- dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
- goto out_unlock;
- }
- BUG_ON(svsk->sk_pool != NULL);
- svsk->sk_pool = pool;
-
- set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
- if (((atomic_read(&svsk->sk_reserved) + serv->sv_max_mesg)*2
- > svc_sock_wspace(svsk))
- && !test_bit(SK_CLOSE, &svsk->sk_flags)
- && !test_bit(SK_CONN, &svsk->sk_flags)) {
- /* Don't enqueue while not enough space for reply */
- dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n",
- svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_max_mesg,
- svc_sock_wspace(svsk));
- svsk->sk_pool = NULL;
- clear_bit(SK_BUSY, &svsk->sk_flags);
- goto out_unlock;
- }
- clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
-
-
- if (!list_empty(&pool->sp_threads)) {
- rqstp = list_entry(pool->sp_threads.next,
- struct svc_rqst,
- rq_list);
- dprintk("svc: socket %p served by daemon %p\n",
- svsk->sk_sk, rqstp);
- svc_thread_dequeue(pool, rqstp);
- if (rqstp->rq_sock)
- printk(KERN_ERR
- "svc_sock_enqueue: server %p, rq_sock=%p!\n",
- rqstp, rqstp->rq_sock);
- rqstp->rq_sock = svsk;
- atomic_inc(&svsk->sk_inuse);
- rqstp->rq_reserved = serv->sv_max_mesg;
- atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
- BUG_ON(svsk->sk_pool != pool);
- wake_up(&rqstp->rq_wait);
- } else {
- dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
- list_add_tail(&svsk->sk_ready, &pool->sp_sockets);
- BUG_ON(svsk->sk_pool != pool);
- }
-
-out_unlock:
- spin_unlock_bh(&pool->sp_lock);
-}
-
-/*
- * Dequeue the first socket. Must be called with the pool->sp_lock held.
- */
-static inline struct svc_sock *
-svc_sock_dequeue(struct svc_pool *pool)
-{
- struct svc_sock *svsk;
-
- if (list_empty(&pool->sp_sockets))
- return NULL;
-
- svsk = list_entry(pool->sp_sockets.next,
- struct svc_sock, sk_ready);
- list_del_init(&svsk->sk_ready);
-
- dprintk("svc: socket %p dequeued, inuse=%d\n",
- svsk->sk_sk, atomic_read(&svsk->sk_inuse));
-
- return svsk;
-}
-
-/*
- * Having read something from a socket, check whether it
- * needs to be re-enqueued.
- * Note: SK_DATA only gets cleared when a read-attempt finds
- * no (or insufficient) data.
- */
-static inline void
-svc_sock_received(struct svc_sock *svsk)
-{
- svsk->sk_pool = NULL;
- clear_bit(SK_BUSY, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
-}
-
-
-/**
- * svc_reserve - change the space reserved for the reply to a request.
- * @rqstp: The request in question
- * @space: new max space to reserve
- *
- * Each request reserves some space on the output queue of the socket
- * to make sure the reply fits. This function reduces that reserved
- * space to be the amount of space used already, plus @space.
- *
- */
-void svc_reserve(struct svc_rqst *rqstp, int space)
-{
- space += rqstp->rq_res.head[0].iov_len;
-
- if (space < rqstp->rq_reserved) {
- struct svc_sock *svsk = rqstp->rq_sock;
- atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved);
- rqstp->rq_reserved = space;
-
- svc_sock_enqueue(svsk);
- }
-}
-
-/*
- * Release a socket after use.
- */
-static inline void
-svc_sock_put(struct svc_sock *svsk)
-{
- if (atomic_dec_and_test(&svsk->sk_inuse)) {
- BUG_ON(! test_bit(SK_DEAD, &svsk->sk_flags));
-
- dprintk("svc: releasing dead socket\n");
- if (svsk->sk_sock->file)
- sockfd_put(svsk->sk_sock);
- else
- sock_release(svsk->sk_sock);
- if (svsk->sk_info_authunix != NULL)
- svcauth_unix_info_release(svsk->sk_info_authunix);
- kfree(svsk);
- }
-}
-
-static void
-svc_sock_release(struct svc_rqst *rqstp)
-{
- struct svc_sock *svsk = rqstp->rq_sock;
-
- svc_release_skb(rqstp);
-
- svc_free_res_pages(rqstp);
- rqstp->rq_res.page_len = 0;
- rqstp->rq_res.page_base = 0;
-
-
- /* Reset response buffer and release
- * the reservation.
- * But first, check that enough space was reserved
- * for the reply, otherwise we have a bug!
- */
- if ((rqstp->rq_res.len) > rqstp->rq_reserved)
- printk(KERN_ERR "RPC request reserved %d but used %d\n",
- rqstp->rq_reserved,
- rqstp->rq_res.len);
-
- rqstp->rq_res.head[0].iov_len = 0;
- svc_reserve(rqstp, 0);
- rqstp->rq_sock = NULL;
-
- svc_sock_put(svsk);
-}
-
-/*
- * External function to wake up a server waiting for data
- * This really only makes sense for services like lockd
- * which have exactly one thread anyway.
- */
-void
-svc_wake_up(struct svc_serv *serv)
-{
- struct svc_rqst *rqstp;
- unsigned int i;
- struct svc_pool *pool;
-
- for (i = 0; i < serv->sv_nrpools; i++) {
- pool = &serv->sv_pools[i];
-
- spin_lock_bh(&pool->sp_lock);
- if (!list_empty(&pool->sp_threads)) {
- rqstp = list_entry(pool->sp_threads.next,
- struct svc_rqst,
- rq_list);
- dprintk("svc: daemon %p woken up.\n", rqstp);
- /*
- svc_thread_dequeue(pool, rqstp);
- rqstp->rq_sock = NULL;
- */
- wake_up(&rqstp->rq_wait);
- }
- spin_unlock_bh(&pool->sp_lock);
- }
-}
-
union svc_pktinfo_u {
struct in_pktinfo pkti;
struct in6_pktinfo pkti6;
@@ -459,7 +125,9 @@ union svc_pktinfo_u {
static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
{
- switch (rqstp->rq_sock->sk_sk->sk_family) {
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+ switch (svsk->sk_sk->sk_family) {
case AF_INET: {
struct in_pktinfo *pki = CMSG_DATA(cmh);
@@ -489,10 +157,10 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
/*
* Generic sendto routine
*/
-static int
-svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
{
- struct svc_sock *svsk = rqstp->rq_sock;
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
struct socket *sock = svsk->sk_sock;
int slen;
union {
@@ -565,7 +233,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
}
out:
dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n",
- rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len,
+ svsk, xdr->head[0].iov_base, xdr->head[0].iov_len,
xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf)));
return len;
@@ -602,7 +270,7 @@ svc_sock_names(char *buf, struct svc_serv *serv, char *toclose)
if (!serv)
return 0;
spin_lock_bh(&serv->sv_lock);
- list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) {
+ list_for_each_entry(svsk, &serv->sv_permsocks, sk_xprt.xpt_list) {
int onelen = one_sock_name(buf+len, svsk);
if (toclose && strcmp(toclose, buf+len) == 0)
closesk = svsk;
@@ -614,7 +282,7 @@ svc_sock_names(char *buf, struct svc_serv *serv, char *toclose)
/* Should unregister with portmap, but you cannot
* unregister just one protocol...
*/
- svc_close_socket(closesk);
+ svc_close_xprt(&closesk->sk_xprt);
else if (toclose)
return -ENOENT;
return len;
@@ -624,8 +292,7 @@ EXPORT_SYMBOL(svc_sock_names);
/*
* Check input queue length
*/
-static int
-svc_recv_available(struct svc_sock *svsk)
+static int svc_recv_available(struct svc_sock *svsk)
{
struct socket *sock = svsk->sk_sock;
int avail, err;
@@ -638,48 +305,31 @@ svc_recv_available(struct svc_sock *svsk)
/*
* Generic recvfrom routine.
*/
-static int
-svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen)
+static int svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr,
+ int buflen)
{
- struct svc_sock *svsk = rqstp->rq_sock;
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
struct msghdr msg = {
.msg_flags = MSG_DONTWAIT,
};
- struct sockaddr *sin;
int len;
+ rqstp->rq_xprt_hlen = 0;
+
len = kernel_recvmsg(svsk->sk_sock, &msg, iov, nr, buflen,
msg.msg_flags);
- /* sock_recvmsg doesn't fill in the name/namelen, so we must..
- */
- memcpy(&rqstp->rq_addr, &svsk->sk_remote, svsk->sk_remotelen);
- rqstp->rq_addrlen = svsk->sk_remotelen;
-
- /* Destination address in request is needed for binding the
- * source address in RPC callbacks later.
- */
- sin = (struct sockaddr *)&svsk->sk_local;
- switch (sin->sa_family) {
- case AF_INET:
- rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
- break;
- case AF_INET6:
- rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
- break;
- }
-
dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
svsk, iov[0].iov_base, iov[0].iov_len, len);
-
return len;
}
/*
* Set socket snd and rcv buffer lengths
*/
-static inline void
-svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
+static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
+ unsigned int rcv)
{
#if 0
mm_segment_t oldfs;
@@ -704,16 +354,16 @@ svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
/*
* INET callback when data has been received on the socket.
*/
-static void
-svc_udp_data_ready(struct sock *sk, int count)
+static void svc_udp_data_ready(struct sock *sk, int count)
{
struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
if (svsk) {
dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
- svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
- set_bit(SK_DATA, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
+ svsk, sk, count,
+ test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+ svc_xprt_enqueue(&svsk->sk_xprt);
}
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible(sk->sk_sleep);
@@ -722,15 +372,14 @@ svc_udp_data_ready(struct sock *sk, int count)
/*
* INET callback when space is newly available on the socket.
*/
-static void
-svc_write_space(struct sock *sk)
+static void svc_write_space(struct sock *sk)
{
struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
if (svsk) {
dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
- svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
- svc_sock_enqueue(svsk);
+ svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
+ svc_xprt_enqueue(&svsk->sk_xprt);
}
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
@@ -740,10 +389,19 @@ svc_write_space(struct sock *sk)
}
}
-static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp,
- struct cmsghdr *cmh)
+/*
+ * Copy the UDP datagram's destination address to the rqstp structure.
+ * The 'destination' address in this case is the address to which the
+ * peer sent the datagram, i.e. our local address. For multihomed
+ * hosts, this can change from msg to msg. Note that only the IP
+ * address changes, the port number should remain the same.
+ */
+static void svc_udp_get_dest_address(struct svc_rqst *rqstp,
+ struct cmsghdr *cmh)
{
- switch (rqstp->rq_sock->sk_sk->sk_family) {
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+ switch (svsk->sk_sk->sk_family) {
case AF_INET: {
struct in_pktinfo *pki = CMSG_DATA(cmh);
rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
@@ -760,11 +418,11 @@ static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp,
/*
* Receive a datagram from a UDP socket.
*/
-static int
-svc_udp_recvfrom(struct svc_rqst *rqstp)
+static int svc_udp_recvfrom(struct svc_rqst *rqstp)
{
- struct svc_sock *svsk = rqstp->rq_sock;
- struct svc_serv *serv = svsk->sk_server;
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+ struct svc_serv *serv = svsk->sk_xprt.xpt_server;
struct sk_buff *skb;
union {
struct cmsghdr hdr;
@@ -779,7 +437,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
.msg_flags = MSG_DONTWAIT,
};
- if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+ if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
/* udp sockets need large rcvbuf as all pending
* requests are still in that buffer. sndbuf must
* also be large enough that there is enough space
@@ -792,17 +450,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
(serv->sv_nrthreads+3) * serv->sv_max_mesg,
(serv->sv_nrthreads+3) * serv->sv_max_mesg);
- if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
- svc_sock_received(svsk);
- return svc_deferred_recv(rqstp);
- }
-
- if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
- svc_delete_socket(svsk);
- return 0;
- }
-
- clear_bit(SK_DATA, &svsk->sk_flags);
+ clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
skb = NULL;
err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
0, 0, MSG_PEEK | MSG_DONTWAIT);
@@ -813,24 +461,27 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
if (err != -EAGAIN) {
/* possibly an icmp error */
dprintk("svc: recvfrom returned error %d\n", -err);
- set_bit(SK_DATA, &svsk->sk_flags);
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
}
- svc_sock_received(svsk);
+ svc_xprt_received(&svsk->sk_xprt);
return -EAGAIN;
}
- rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+ len = svc_addr_len(svc_addr(rqstp));
+ if (len < 0)
+ return len;
+ rqstp->rq_addrlen = len;
if (skb->tstamp.tv64 == 0) {
skb->tstamp = ktime_get_real();
/* Don't enable netstamp, sunrpc doesn't
need that much accuracy */
}
svsk->sk_sk->sk_stamp = skb->tstamp;
- set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */
/*
* Maybe more packets - kick another thread ASAP.
*/
- svc_sock_received(svsk);
+ svc_xprt_received(&svsk->sk_xprt);
len = skb->len - sizeof(struct udphdr);
rqstp->rq_arg.len = len;
@@ -861,13 +512,14 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
skb_free_datagram(svsk->sk_sk, skb);
} else {
/* we can use it in-place */
- rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr);
+ rqstp->rq_arg.head[0].iov_base = skb->data +
+ sizeof(struct udphdr);
rqstp->rq_arg.head[0].iov_len = len;
if (skb_checksum_complete(skb)) {
skb_free_datagram(svsk->sk_sk, skb);
return 0;
}
- rqstp->rq_skbuff = skb;
+ rqstp->rq_xprt_ctxt = skb;
}
rqstp->rq_arg.page_base = 0;
@@ -900,27 +552,81 @@ svc_udp_sendto(struct svc_rqst *rqstp)
return error;
}
-static void
-svc_udp_init(struct svc_sock *svsk)
+static void svc_udp_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+static int svc_udp_has_wspace(struct svc_xprt *xprt)
+{
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct svc_serv *serv = xprt->xpt_server;
+ unsigned long required;
+
+ /*
+ * Set the SOCK_NOSPACE flag before checking the available
+ * sock space.
+ */
+ set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+ required = atomic_read(&svsk->sk_xprt.xpt_reserved) + serv->sv_max_mesg;
+ if (required*2 > sock_wspace(svsk->sk_sk))
+ return 0;
+ clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+ return 1;
+}
+
+static struct svc_xprt *svc_udp_accept(struct svc_xprt *xprt)
+{
+ BUG();
+ return NULL;
+}
+
+static struct svc_xprt *svc_udp_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ return svc_create_socket(serv, IPPROTO_UDP, sa, salen, flags);
+}
+
+static struct svc_xprt_ops svc_udp_ops = {
+ .xpo_create = svc_udp_create,
+ .xpo_recvfrom = svc_udp_recvfrom,
+ .xpo_sendto = svc_udp_sendto,
+ .xpo_release_rqst = svc_release_skb,
+ .xpo_detach = svc_sock_detach,
+ .xpo_free = svc_sock_free,
+ .xpo_prep_reply_hdr = svc_udp_prep_reply_hdr,
+ .xpo_has_wspace = svc_udp_has_wspace,
+ .xpo_accept = svc_udp_accept,
+};
+
+static struct svc_xprt_class svc_udp_class = {
+ .xcl_name = "udp",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_udp_ops,
+ .xcl_max_payload = RPCSVC_MAXPAYLOAD_UDP,
+};
+
+static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
{
int one = 1;
mm_segment_t oldfs;
+ svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv);
+ clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
svsk->sk_sk->sk_data_ready = svc_udp_data_ready;
svsk->sk_sk->sk_write_space = svc_write_space;
- svsk->sk_recvfrom = svc_udp_recvfrom;
- svsk->sk_sendto = svc_udp_sendto;
/* initialise setting must have enough space to
* receive and respond to one request.
* svc_udp_recvfrom will re-adjust if necessary
*/
svc_sock_setbufsize(svsk->sk_sock,
- 3 * svsk->sk_server->sv_max_mesg,
- 3 * svsk->sk_server->sv_max_mesg);
+ 3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
+ 3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
- set_bit(SK_DATA, &svsk->sk_flags); /* might have come in before data_ready set up */
- set_bit(SK_CHNGBUF, &svsk->sk_flags);
+ /* data might have come in before data_ready set up */
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+ set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
oldfs = get_fs();
set_fs(KERNEL_DS);
@@ -934,8 +640,7 @@ svc_udp_init(struct svc_sock *svsk)
* A data_ready event on a listening socket means there's a connection
* pending. Do not use state_change as a substitute for it.
*/
-static void
-svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
+static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
{
struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
@@ -954,8 +659,8 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
*/
if (sk->sk_state == TCP_LISTEN) {
if (svsk) {
- set_bit(SK_CONN, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
+ set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
+ svc_xprt_enqueue(&svsk->sk_xprt);
} else
printk("svc: socket %p: no user data\n", sk);
}
@@ -967,8 +672,7 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
/*
* A state change on a connected socket means it's dying or dead.
*/
-static void
-svc_tcp_state_change(struct sock *sk)
+static void svc_tcp_state_change(struct sock *sk)
{
struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
@@ -978,51 +682,36 @@ svc_tcp_state_change(struct sock *sk)
if (!svsk)
printk("svc: socket %p: no user data\n", sk);
else {
- set_bit(SK_CLOSE, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
+ set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+ svc_xprt_enqueue(&svsk->sk_xprt);
}
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible_all(sk->sk_sleep);
}
-static void
-svc_tcp_data_ready(struct sock *sk, int count)
+static void svc_tcp_data_ready(struct sock *sk, int count)
{
struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
dprintk("svc: socket %p TCP data ready (svsk %p)\n",
sk, sk->sk_user_data);
if (svsk) {
- set_bit(SK_DATA, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+ svc_xprt_enqueue(&svsk->sk_xprt);
}
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible(sk->sk_sleep);
}
-static inline int svc_port_is_privileged(struct sockaddr *sin)
-{
- switch (sin->sa_family) {
- case AF_INET:
- return ntohs(((struct sockaddr_in *)sin)->sin_port)
- < PROT_SOCK;
- case AF_INET6:
- return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
- < PROT_SOCK;
- default:
- return 0;
- }
-}
-
/*
* Accept a TCP connection
*/
-static void
-svc_tcp_accept(struct svc_sock *svsk)
+static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
{
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
struct sockaddr_storage addr;
struct sockaddr *sin = (struct sockaddr *) &addr;
- struct svc_serv *serv = svsk->sk_server;
+ struct svc_serv *serv = svsk->sk_xprt.xpt_server;
struct socket *sock = svsk->sk_sock;
struct socket *newsock;
struct svc_sock *newsvsk;
@@ -1031,9 +720,9 @@ svc_tcp_accept(struct svc_sock *svsk)
dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
if (!sock)
- return;
+ return NULL;
- clear_bit(SK_CONN, &svsk->sk_flags);
+ clear_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
err = kernel_accept(sock, &newsock, O_NONBLOCK);
if (err < 0) {
if (err == -ENOMEM)
@@ -1042,11 +731,9 @@ svc_tcp_accept(struct svc_sock *svsk)
else if (err != -EAGAIN && net_ratelimit())
printk(KERN_WARNING "%s: accept failed (err %d)!\n",
serv->sv_name, -err);
- return;
+ return NULL;
}
-
- set_bit(SK_CONN, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
+ set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
err = kernel_getpeername(newsock, sin, &slen);
if (err < 0) {
@@ -1077,106 +764,42 @@ svc_tcp_accept(struct svc_sock *svsk)
if (!(newsvsk = svc_setup_socket(serv, newsock, &err,
(SVC_SOCK_ANONYMOUS | SVC_SOCK_TEMPORARY))))
goto failed;
- memcpy(&newsvsk->sk_remote, sin, slen);
- newsvsk->sk_remotelen = slen;
+ svc_xprt_set_remote(&newsvsk->sk_xprt, sin, slen);
err = kernel_getsockname(newsock, sin, &slen);
if (unlikely(err < 0)) {
dprintk("svc_tcp_accept: kernel_getsockname error %d\n", -err);
slen = offsetof(struct sockaddr, sa_data);
}
- memcpy(&newsvsk->sk_local, sin, slen);
-
- svc_sock_received(newsvsk);
-
- /* make sure that we don't have too many active connections.
- * If we have, something must be dropped.
- *
- * There's no point in trying to do random drop here for
- * DoS prevention. The NFS clients does 1 reconnect in 15
- * seconds. An attacker can easily beat that.
- *
- * The only somewhat efficient mechanism would be if drop
- * old connections from the same IP first. But right now
- * we don't even record the client IP in svc_sock.
- */
- if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
- struct svc_sock *svsk = NULL;
- spin_lock_bh(&serv->sv_lock);
- if (!list_empty(&serv->sv_tempsocks)) {
- if (net_ratelimit()) {
- /* Try to help the admin */
- printk(KERN_NOTICE "%s: too many open TCP "
- "sockets, consider increasing the "
- "number of nfsd threads\n",
- serv->sv_name);
- printk(KERN_NOTICE
- "%s: last TCP connect from %s\n",
- serv->sv_name, __svc_print_addr(sin,
- buf, sizeof(buf)));
- }
- /*
- * Always select the oldest socket. It's not fair,
- * but so is life
- */
- svsk = list_entry(serv->sv_tempsocks.prev,
- struct svc_sock,
- sk_list);
- set_bit(SK_CLOSE, &svsk->sk_flags);
- atomic_inc(&svsk->sk_inuse);
- }
- spin_unlock_bh(&serv->sv_lock);
-
- if (svsk) {
- svc_sock_enqueue(svsk);
- svc_sock_put(svsk);
- }
-
- }
+ svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen);
if (serv->sv_stats)
serv->sv_stats->nettcpconn++;
- return;
+ return &newsvsk->sk_xprt;
failed:
sock_release(newsock);
- return;
+ return NULL;
}
/*
* Receive data from a TCP socket.
*/
-static int
-svc_tcp_recvfrom(struct svc_rqst *rqstp)
+static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
{
- struct svc_sock *svsk = rqstp->rq_sock;
- struct svc_serv *serv = svsk->sk_server;
+ struct svc_sock *svsk =
+ container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+ struct svc_serv *serv = svsk->sk_xprt.xpt_server;
int len;
struct kvec *vec;
int pnum, vlen;
dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
- svsk, test_bit(SK_DATA, &svsk->sk_flags),
- test_bit(SK_CONN, &svsk->sk_flags),
- test_bit(SK_CLOSE, &svsk->sk_flags));
+ svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
+ test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
+ test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
- if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
- svc_sock_received(svsk);
- return svc_deferred_recv(rqstp);
- }
-
- if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
- svc_delete_socket(svsk);
- return 0;
- }
-
- if (svsk->sk_sk->sk_state == TCP_LISTEN) {
- svc_tcp_accept(svsk);
- svc_sock_received(svsk);
- return 0;
- }
-
- if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+ if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
/* sndbuf needs to have room for one request
* per thread, otherwise we can stall even when the
* network isn't a bottleneck.
@@ -1193,7 +816,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
(serv->sv_nrthreads+3) * serv->sv_max_mesg,
3 * serv->sv_max_mesg);
- clear_bit(SK_DATA, &svsk->sk_flags);
+ clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
/* Receive data. If we haven't got the record length yet, get
* the next four bytes. Otherwise try to gobble up as much as
@@ -1212,7 +835,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
if (len < want) {
dprintk("svc: short recvfrom while reading record length (%d of %lu)\n",
len, want);
- svc_sock_received(svsk);
+ svc_xprt_received(&svsk->sk_xprt);
return -EAGAIN; /* record header not complete */
}
@@ -1248,11 +871,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
if (len < svsk->sk_reclen) {
dprintk("svc: incomplete TCP record (%d of %d)\n",
len, svsk->sk_reclen);
- svc_sock_received(svsk);
+ svc_xprt_received(&svsk->sk_xprt);
return -EAGAIN; /* record not complete */
}
len = svsk->sk_reclen;
- set_bit(SK_DATA, &svsk->sk_flags);
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
vec = rqstp->rq_vec;
vec[0] = rqstp->rq_arg.head[0];
@@ -1281,30 +904,31 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
}
- rqstp->rq_skbuff = NULL;
+ rqstp->rq_xprt_ctxt = NULL;
rqstp->rq_prot = IPPROTO_TCP;
/* Reset TCP read info */
svsk->sk_reclen = 0;
svsk->sk_tcplen = 0;
- svc_sock_received(svsk);
+ svc_xprt_copy_addrs(rqstp, &svsk->sk_xprt);
+ svc_xprt_received(&svsk->sk_xprt);
if (serv->sv_stats)
serv->sv_stats->nettcpcnt++;
return len;
err_delete:
- svc_delete_socket(svsk);
+ set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
return -EAGAIN;
error:
if (len == -EAGAIN) {
dprintk("RPC: TCP recvfrom got EAGAIN\n");
- svc_sock_received(svsk);
+ svc_xprt_received(&svsk->sk_xprt);
} else {
printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
- svsk->sk_server->sv_name, -len);
+ svsk->sk_xprt.xpt_server->sv_name, -len);
goto err_delete;
}
@@ -1314,8 +938,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
/*
* Send out data on TCP socket.
*/
-static int
-svc_tcp_sendto(struct svc_rqst *rqstp)
+static int svc_tcp_sendto(struct svc_rqst *rqstp)
{
struct xdr_buf *xbufp = &rqstp->rq_res;
int sent;
@@ -1328,35 +951,109 @@ svc_tcp_sendto(struct svc_rqst *rqstp)
reclen = htonl(0x80000000|((xbufp->len ) - 4));
memcpy(xbufp->head[0].iov_base, &reclen, 4);
- if (test_bit(SK_DEAD, &rqstp->rq_sock->sk_flags))
+ if (test_bit(XPT_DEAD, &rqstp->rq_xprt->xpt_flags))
return -ENOTCONN;
sent = svc_sendto(rqstp, &rqstp->rq_res);
if (sent != xbufp->len) {
- printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
- rqstp->rq_sock->sk_server->sv_name,
+ printk(KERN_NOTICE
+ "rpc-srv/tcp: %s: %s %d when sending %d bytes "
+ "- shutting down socket\n",
+ rqstp->rq_xprt->xpt_server->sv_name,
(sent<0)?"got error":"sent only",
sent, xbufp->len);
- set_bit(SK_CLOSE, &rqstp->rq_sock->sk_flags);
- svc_sock_enqueue(rqstp->rq_sock);
+ set_bit(XPT_CLOSE, &rqstp->rq_xprt->xpt_flags);
+ svc_xprt_enqueue(rqstp->rq_xprt);
sent = -EAGAIN;
}
return sent;
}
-static void
-svc_tcp_init(struct svc_sock *svsk)
+/*
+ * Setup response header. TCP has a 4B record length field.
+ */
+static void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+ struct kvec *resv = &rqstp->rq_res.head[0];
+
+ /* tcp needs a space for the record length... */
+ svc_putnl(resv, 0);
+}
+
+static int svc_tcp_has_wspace(struct svc_xprt *xprt)
+{
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct svc_serv *serv = svsk->sk_xprt.xpt_server;
+ int required;
+ int wspace;
+
+ /*
+ * Set the SOCK_NOSPACE flag before checking the available
+ * sock space.
+ */
+ set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+ required = atomic_read(&svsk->sk_xprt.xpt_reserved) + serv->sv_max_mesg;
+ wspace = sk_stream_wspace(svsk->sk_sk);
+
+ if (wspace < sk_stream_min_wspace(svsk->sk_sk))
+ return 0;
+ if (required * 2 > wspace)
+ return 0;
+
+ clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+ return 1;
+}
+
+static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ return svc_create_socket(serv, IPPROTO_TCP, sa, salen, flags);
+}
+
+static struct svc_xprt_ops svc_tcp_ops = {
+ .xpo_create = svc_tcp_create,
+ .xpo_recvfrom = svc_tcp_recvfrom,
+ .xpo_sendto = svc_tcp_sendto,
+ .xpo_release_rqst = svc_release_skb,
+ .xpo_detach = svc_sock_detach,
+ .xpo_free = svc_sock_free,
+ .xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,
+ .xpo_has_wspace = svc_tcp_has_wspace,
+ .xpo_accept = svc_tcp_accept,
+};
+
+static struct svc_xprt_class svc_tcp_class = {
+ .xcl_name = "tcp",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_tcp_ops,
+ .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+void svc_init_xprt_sock(void)
+{
+ svc_reg_xprt_class(&svc_tcp_class);
+ svc_reg_xprt_class(&svc_udp_class);
+}
+
+void svc_cleanup_xprt_sock(void)
+{
+ svc_unreg_xprt_class(&svc_tcp_class);
+ svc_unreg_xprt_class(&svc_udp_class);
+}
+
+static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
{
struct sock *sk = svsk->sk_sk;
struct tcp_sock *tp = tcp_sk(sk);
- svsk->sk_recvfrom = svc_tcp_recvfrom;
- svsk->sk_sendto = svc_tcp_sendto;
-
+ svc_xprt_init(&svc_tcp_class, &svsk->sk_xprt, serv);
+ set_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
if (sk->sk_state == TCP_LISTEN) {
dprintk("setting up TCP socket for listening\n");
+ set_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags);
sk->sk_data_ready = svc_tcp_listen_data_ready;
- set_bit(SK_CONN, &svsk->sk_flags);
+ set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
} else {
dprintk("setting up TCP socket for reading\n");
sk->sk_state_change = svc_tcp_state_change;
@@ -1373,18 +1070,17 @@ svc_tcp_init(struct svc_sock *svsk)
* svc_tcp_recvfrom will re-adjust if necessary
*/
svc_sock_setbufsize(svsk->sk_sock,
- 3 * svsk->sk_server->sv_max_mesg,
- 3 * svsk->sk_server->sv_max_mesg);
+ 3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
+ 3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
- set_bit(SK_CHNGBUF, &svsk->sk_flags);
- set_bit(SK_DATA, &svsk->sk_flags);
+ set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
if (sk->sk_state != TCP_ESTABLISHED)
- set_bit(SK_CLOSE, &svsk->sk_flags);
+ set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
}
}
-void
-svc_sock_update_bufs(struct svc_serv *serv)
+void svc_sock_update_bufs(struct svc_serv *serv)
{
/*
* The number of server threads has changed. Update
@@ -1395,232 +1091,18 @@ svc_sock_update_bufs(struct svc_serv *serv)
spin_lock_bh(&serv->sv_lock);
list_for_each(le, &serv->sv_permsocks) {
struct svc_sock *svsk =
- list_entry(le, struct svc_sock, sk_list);
- set_bit(SK_CHNGBUF, &svsk->sk_flags);
+ list_entry(le, struct svc_sock, sk_xprt.xpt_list);
+ set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
}
list_for_each(le, &serv->sv_tempsocks) {
struct svc_sock *svsk =
- list_entry(le, struct svc_sock, sk_list);
- set_bit(SK_CHNGBUF, &svsk->sk_flags);
+ list_entry(le, struct svc_sock, sk_xprt.xpt_list);
+ set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
}
spin_unlock_bh(&serv->sv_lock);
}
/*
- * Receive the next request on any socket. This code is carefully
- * organised not to touch any cachelines in the shared svc_serv
- * structure, only cachelines in the local svc_pool.
- */
-int
-svc_recv(struct svc_rqst *rqstp, long timeout)
-{
- struct svc_sock *svsk = NULL;
- struct svc_serv *serv = rqstp->rq_server;
- struct svc_pool *pool = rqstp->rq_pool;
- int len, i;
- int pages;
- struct xdr_buf *arg;
- DECLARE_WAITQUEUE(wait, current);
-
- dprintk("svc: server %p waiting for data (to = %ld)\n",
- rqstp, timeout);
-
- if (rqstp->rq_sock)
- printk(KERN_ERR
- "svc_recv: service %p, socket not NULL!\n",
- rqstp);
- if (waitqueue_active(&rqstp->rq_wait))
- printk(KERN_ERR
- "svc_recv: service %p, wait queue active!\n",
- rqstp);
-
-
- /* now allocate needed pages. If we get a failure, sleep briefly */
- pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
- for (i=0; i < pages ; i++)
- while (rqstp->rq_pages[i] == NULL) {
- struct page *p = alloc_page(GFP_KERNEL);
- if (!p)
- schedule_timeout_uninterruptible(msecs_to_jiffies(500));
- rqstp->rq_pages[i] = p;
- }
- rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
- BUG_ON(pages >= RPCSVC_MAXPAGES);
-
- /* Make arg->head point to first page and arg->pages point to rest */
- arg = &rqstp->rq_arg;
- arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
- arg->head[0].iov_len = PAGE_SIZE;
- arg->pages = rqstp->rq_pages + 1;
- arg->page_base = 0;
- /* save at least one page for response */
- arg->page_len = (pages-2)*PAGE_SIZE;
- arg->len = (pages-1)*PAGE_SIZE;
- arg->tail[0].iov_len = 0;
-
- try_to_freeze();
- cond_resched();
- if (signalled())
- return -EINTR;
-
- spin_lock_bh(&pool->sp_lock);
- if ((svsk = svc_sock_dequeue(pool)) != NULL) {
- rqstp->rq_sock = svsk;
- atomic_inc(&svsk->sk_inuse);
- rqstp->rq_reserved = serv->sv_max_mesg;
- atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
- } else {
- /* No data pending. Go to sleep */
- svc_thread_enqueue(pool, rqstp);
-
- /*
- * We have to be able to interrupt this wait
- * to bring down the daemons ...
- */
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&rqstp->rq_wait, &wait);
- spin_unlock_bh(&pool->sp_lock);
-
- schedule_timeout(timeout);
-
- try_to_freeze();
-
- spin_lock_bh(&pool->sp_lock);
- remove_wait_queue(&rqstp->rq_wait, &wait);
-
- if (!(svsk = rqstp->rq_sock)) {
- svc_thread_dequeue(pool, rqstp);
- spin_unlock_bh(&pool->sp_lock);
- dprintk("svc: server %p, no data yet\n", rqstp);
- return signalled()? -EINTR : -EAGAIN;
- }
- }
- spin_unlock_bh(&pool->sp_lock);
-
- dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n",
- rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse));
- len = svsk->sk_recvfrom(rqstp);
- dprintk("svc: got len=%d\n", len);
-
- /* No data, incomplete (TCP) read, or accept() */
- if (len == 0 || len == -EAGAIN) {
- rqstp->rq_res.len = 0;
- svc_sock_release(rqstp);
- return -EAGAIN;
- }
- svsk->sk_lastrecv = get_seconds();
- clear_bit(SK_OLD, &svsk->sk_flags);
-
- rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
- rqstp->rq_chandle.defer = svc_defer;
-
- if (serv->sv_stats)
- serv->sv_stats->netcnt++;
- return len;
-}
-
-/*
- * Drop request
- */
-void
-svc_drop(struct svc_rqst *rqstp)
-{
- dprintk("svc: socket %p dropped request\n", rqstp->rq_sock);
- svc_sock_release(rqstp);
-}
-
-/*
- * Return reply to client.
- */
-int
-svc_send(struct svc_rqst *rqstp)
-{
- struct svc_sock *svsk;
- int len;
- struct xdr_buf *xb;
-
- if ((svsk = rqstp->rq_sock) == NULL) {
- printk(KERN_WARNING "NULL socket pointer in %s:%d\n",
- __FILE__, __LINE__);
- return -EFAULT;
- }
-
- /* release the receive skb before sending the reply */
- svc_release_skb(rqstp);
-
- /* calculate over-all length */
- xb = & rqstp->rq_res;
- xb->len = xb->head[0].iov_len +
- xb->page_len +
- xb->tail[0].iov_len;
-
- /* Grab svsk->sk_mutex to serialize outgoing data. */
- mutex_lock(&svsk->sk_mutex);
- if (test_bit(SK_DEAD, &svsk->sk_flags))
- len = -ENOTCONN;
- else
- len = svsk->sk_sendto(rqstp);
- mutex_unlock(&svsk->sk_mutex);
- svc_sock_release(rqstp);
-
- if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
- return 0;
- return len;
-}
-
-/*
- * Timer function to close old temporary sockets, using
- * a mark-and-sweep algorithm.
- */
-static void
-svc_age_temp_sockets(unsigned long closure)
-{
- struct svc_serv *serv = (struct svc_serv *)closure;
- struct svc_sock *svsk;
- struct list_head *le, *next;
- LIST_HEAD(to_be_aged);
-
- dprintk("svc_age_temp_sockets\n");
-
- if (!spin_trylock_bh(&serv->sv_lock)) {
- /* busy, try again 1 sec later */
- dprintk("svc_age_temp_sockets: busy\n");
- mod_timer(&serv->sv_temptimer, jiffies + HZ);
- return;
- }
-
- list_for_each_safe(le, next, &serv->sv_tempsocks) {
- svsk = list_entry(le, struct svc_sock, sk_list);
-
- if (!test_and_set_bit(SK_OLD, &svsk->sk_flags))
- continue;
- if (atomic_read(&svsk->sk_inuse) > 1 || test_bit(SK_BUSY, &svsk->sk_flags))
- continue;
- atomic_inc(&svsk->sk_inuse);
- list_move(le, &to_be_aged);
- set_bit(SK_CLOSE, &svsk->sk_flags);
- set_bit(SK_DETACHED, &svsk->sk_flags);
- }
- spin_unlock_bh(&serv->sv_lock);
-
- while (!list_empty(&to_be_aged)) {
- le = to_be_aged.next;
- /* fiddling the sk_list node is safe 'cos we're SK_DETACHED */
- list_del_init(le);
- svsk = list_entry(le, struct svc_sock, sk_list);
-
- dprintk("queuing svsk %p for closing, %lu seconds old\n",
- svsk, get_seconds() - svsk->sk_lastrecv);
-
- /* a thread will dequeue and close it soon */
- svc_sock_enqueue(svsk);
- svc_sock_put(svsk);
- }
-
- mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
-}
-
-/*
* Initialize socket for RPC use and create svc_sock struct
* XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF.
*/
@@ -1631,7 +1113,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
struct svc_sock *svsk;
struct sock *inet;
int pmap_register = !(flags & SVC_SOCK_ANONYMOUS);
- int is_temporary = flags & SVC_SOCK_TEMPORARY;
dprintk("svc: svc_setup_socket %p\n", sock);
if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) {
@@ -1651,44 +1132,18 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
return NULL;
}
- set_bit(SK_BUSY, &svsk->sk_flags);
inet->sk_user_data = svsk;
svsk->sk_sock = sock;
svsk->sk_sk = inet;
svsk->sk_ostate = inet->sk_state_change;
svsk->sk_odata = inet->sk_data_ready;
svsk->sk_owspace = inet->sk_write_space;
- svsk->sk_server = serv;
- atomic_set(&svsk->sk_inuse, 1);
- svsk->sk_lastrecv = get_seconds();
- spin_lock_init(&svsk->sk_lock);
- INIT_LIST_HEAD(&svsk->sk_deferred);
- INIT_LIST_HEAD(&svsk->sk_ready);
- mutex_init(&svsk->sk_mutex);
/* Initialize the socket */
if (sock->type == SOCK_DGRAM)
- svc_udp_init(svsk);
+ svc_udp_init(svsk, serv);
else
- svc_tcp_init(svsk);
-
- spin_lock_bh(&serv->sv_lock);
- if (is_temporary) {
- set_bit(SK_TEMP, &svsk->sk_flags);
- list_add(&svsk->sk_list, &serv->sv_tempsocks);
- serv->sv_tmpcnt++;
- if (serv->sv_temptimer.function == NULL) {
- /* setup timer to age temp sockets */
- setup_timer(&serv->sv_temptimer, svc_age_temp_sockets,
- (unsigned long)serv);
- mod_timer(&serv->sv_temptimer,
- jiffies + svc_conn_age_period * HZ);
- }
- } else {
- clear_bit(SK_TEMP, &svsk->sk_flags);
- list_add(&svsk->sk_list, &serv->sv_permsocks);
- }
- spin_unlock_bh(&serv->sv_lock);
+ svc_tcp_init(svsk, serv);
dprintk("svc: svc_setup_socket created %p (inet %p)\n",
svsk, svsk->sk_sk);
@@ -1717,7 +1172,16 @@ int svc_addsock(struct svc_serv *serv,
else {
svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS);
if (svsk) {
- svc_sock_received(svsk);
+ struct sockaddr_storage addr;
+ struct sockaddr *sin = (struct sockaddr *)&addr;
+ int salen;
+ if (kernel_getsockname(svsk->sk_sock, sin, &salen) == 0)
+ svc_xprt_set_local(&svsk->sk_xprt, sin, salen);
+ clear_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags);
+ spin_lock_bh(&serv->sv_lock);
+ list_add(&svsk->sk_xprt.xpt_list, &serv->sv_permsocks);
+ spin_unlock_bh(&serv->sv_lock);
+ svc_xprt_received(&svsk->sk_xprt);
err = 0;
}
}
@@ -1733,14 +1197,19 @@ EXPORT_SYMBOL_GPL(svc_addsock);
/*
* Create socket for RPC service.
*/
-static int svc_create_socket(struct svc_serv *serv, int protocol,
- struct sockaddr *sin, int len, int flags)
+static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
+ int protocol,
+ struct sockaddr *sin, int len,
+ int flags)
{
struct svc_sock *svsk;
struct socket *sock;
int error;
int type;
char buf[RPC_MAX_ADDRBUFLEN];
+ struct sockaddr_storage addr;
+ struct sockaddr *newsin = (struct sockaddr *)&addr;
+ int newlen;
dprintk("svc: svc_create_socket(%s, %d, %s)\n",
serv->sv_program->pg_name, protocol,
@@ -1749,13 +1218,13 @@ static int svc_create_socket(struct svc_serv *serv, int protocol,
if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
printk(KERN_WARNING "svc: only UDP and TCP "
"sockets supported\n");
- return -EINVAL;
+ return ERR_PTR(-EINVAL);
}
type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
error = sock_create_kern(sin->sa_family, type, protocol, &sock);
if (error < 0)
- return error;
+ return ERR_PTR(error);
svc_reclassify_socket(sock);
@@ -1765,203 +1234,55 @@ static int svc_create_socket(struct svc_serv *serv, int protocol,
if (error < 0)
goto bummer;
+ newlen = len;
+ error = kernel_getsockname(sock, newsin, &newlen);
+ if (error < 0)
+ goto bummer;
+
if (protocol == IPPROTO_TCP) {
if ((error = kernel_listen(sock, 64)) < 0)
goto bummer;
}
if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) {
- svc_sock_received(svsk);
- return ntohs(inet_sk(svsk->sk_sk)->sport);
+ svc_xprt_set_local(&svsk->sk_xprt, newsin, newlen);
+ return (struct svc_xprt *)svsk;
}
bummer:
dprintk("svc: svc_create_socket error = %d\n", -error);
sock_release(sock);
- return error;
+ return ERR_PTR(error);
}
/*
- * Remove a dead socket
+ * Detach the svc_sock from the socket so that no
+ * more callbacks occur.
*/
-static void
-svc_delete_socket(struct svc_sock *svsk)
+static void svc_sock_detach(struct svc_xprt *xprt)
{
- struct svc_serv *serv;
- struct sock *sk;
-
- dprintk("svc: svc_delete_socket(%p)\n", svsk);
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct sock *sk = svsk->sk_sk;
- serv = svsk->sk_server;
- sk = svsk->sk_sk;
+ dprintk("svc: svc_sock_detach(%p)\n", svsk);
+ /* put back the old socket callbacks */
sk->sk_state_change = svsk->sk_ostate;
sk->sk_data_ready = svsk->sk_odata;
sk->sk_write_space = svsk->sk_owspace;
-
- spin_lock_bh(&serv->sv_lock);
-
- if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
- list_del_init(&svsk->sk_list);
- /*
- * We used to delete the svc_sock from whichever list
- * it's sk_ready node was on, but we don't actually
- * need to. This is because the only time we're called
- * while still attached to a queue, the queue itself
- * is about to be destroyed (in svc_destroy).
- */
- if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) {
- BUG_ON(atomic_read(&svsk->sk_inuse)<2);
- atomic_dec(&svsk->sk_inuse);
- if (test_bit(SK_TEMP, &svsk->sk_flags))
- serv->sv_tmpcnt--;
- }
-
- spin_unlock_bh(&serv->sv_lock);
-}
-
-static void svc_close_socket(struct svc_sock *svsk)
-{
- set_bit(SK_CLOSE, &svsk->sk_flags);
- if (test_and_set_bit(SK_BUSY, &svsk->sk_flags))
- /* someone else will have to effect the close */
- return;
-
- atomic_inc(&svsk->sk_inuse);
- svc_delete_socket(svsk);
- clear_bit(SK_BUSY, &svsk->sk_flags);
- svc_sock_put(svsk);
-}
-
-void svc_force_close_socket(struct svc_sock *svsk)
-{
- set_bit(SK_CLOSE, &svsk->sk_flags);
- if (test_bit(SK_BUSY, &svsk->sk_flags)) {
- /* Waiting to be processed, but no threads left,
- * So just remove it from the waiting list
- */
- list_del_init(&svsk->sk_ready);
- clear_bit(SK_BUSY, &svsk->sk_flags);
- }
- svc_close_socket(svsk);
-}
-
-/**
- * svc_makesock - Make a socket for nfsd and lockd
- * @serv: RPC server structure
- * @protocol: transport protocol to use
- * @port: port to use
- * @flags: requested socket characteristics
- *
- */
-int svc_makesock(struct svc_serv *serv, int protocol, unsigned short port,
- int flags)
-{
- struct sockaddr_in sin = {
- .sin_family = AF_INET,
- .sin_addr.s_addr = INADDR_ANY,
- .sin_port = htons(port),
- };
-
- dprintk("svc: creating socket proto = %d\n", protocol);
- return svc_create_socket(serv, protocol, (struct sockaddr *) &sin,
- sizeof(sin), flags);
}
/*
- * Handle defer and revisit of requests
+ * Free the svc_sock's socket resources and the svc_sock itself.
*/
-
-static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
+static void svc_sock_free(struct svc_xprt *xprt)
{
- struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
- struct svc_sock *svsk;
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ dprintk("svc: svc_sock_free(%p)\n", svsk);
- if (too_many) {
- svc_sock_put(dr->svsk);
- kfree(dr);
- return;
- }
- dprintk("revisit queued\n");
- svsk = dr->svsk;
- dr->svsk = NULL;
- spin_lock(&svsk->sk_lock);
- list_add(&dr->handle.recent, &svsk->sk_deferred);
- spin_unlock(&svsk->sk_lock);
- set_bit(SK_DEFERRED, &svsk->sk_flags);
- svc_sock_enqueue(svsk);
- svc_sock_put(svsk);
-}
-
-static struct cache_deferred_req *
-svc_defer(struct cache_req *req)
-{
- struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
- int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len);
- struct svc_deferred_req *dr;
-
- if (rqstp->rq_arg.page_len)
- return NULL; /* if more than a page, give up FIXME */
- if (rqstp->rq_deferred) {
- dr = rqstp->rq_deferred;
- rqstp->rq_deferred = NULL;
- } else {
- int skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
- /* FIXME maybe discard if size too large */
- dr = kmalloc(size, GFP_KERNEL);
- if (dr == NULL)
- return NULL;
-
- dr->handle.owner = rqstp->rq_server;
- dr->prot = rqstp->rq_prot;
- memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
- dr->addrlen = rqstp->rq_addrlen;
- dr->daddr = rqstp->rq_daddr;
- dr->argslen = rqstp->rq_arg.len >> 2;
- memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2);
- }
- atomic_inc(&rqstp->rq_sock->sk_inuse);
- dr->svsk = rqstp->rq_sock;
-
- dr->handle.revisit = svc_revisit;
- return &dr->handle;
-}
-
-/*
- * recv data from a deferred request into an active one
- */
-static int svc_deferred_recv(struct svc_rqst *rqstp)
-{
- struct svc_deferred_req *dr = rqstp->rq_deferred;
-
- rqstp->rq_arg.head[0].iov_base = dr->args;
- rqstp->rq_arg.head[0].iov_len = dr->argslen<<2;
- rqstp->rq_arg.page_len = 0;
- rqstp->rq_arg.len = dr->argslen<<2;
- rqstp->rq_prot = dr->prot;
- memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
- rqstp->rq_addrlen = dr->addrlen;
- rqstp->rq_daddr = dr->daddr;
- rqstp->rq_respages = rqstp->rq_pages;
- return dr->argslen<<2;
-}
-
-
-static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk)
-{
- struct svc_deferred_req *dr = NULL;
-
- if (!test_bit(SK_DEFERRED, &svsk->sk_flags))
- return NULL;
- spin_lock(&svsk->sk_lock);
- clear_bit(SK_DEFERRED, &svsk->sk_flags);
- if (!list_empty(&svsk->sk_deferred)) {
- dr = list_entry(svsk->sk_deferred.next,
- struct svc_deferred_req,
- handle.recent);
- list_del_init(&dr->handle.recent);
- set_bit(SK_DEFERRED, &svsk->sk_flags);
- }
- spin_unlock(&svsk->sk_lock);
- return dr;
+ if (svsk->sk_sock->file)
+ sockfd_put(svsk->sk_sock);
+ else
+ sock_release(svsk->sk_sock);
+ kfree(svsk);
}
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index bada7de..0f8c439 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -18,6 +18,7 @@
#include <linux/sunrpc/types.h>
#include <linux/sunrpc/sched.h>
#include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/svc_xprt.h>
/*
* Declare the debug flags here
@@ -55,6 +56,30 @@ rpc_unregister_sysctl(void)
}
}
+static int proc_do_xprt(ctl_table *table, int write, struct file *file,
+ void __user *buffer, size_t *lenp, loff_t *ppos)
+{
+ char tmpbuf[256];
+ int len;
+ if ((*ppos && !write) || !*lenp) {
+ *lenp = 0;
+ return 0;
+ }
+ if (write)
+ return -EINVAL;
+ else {
+ len = svc_print_xprts(tmpbuf, sizeof(tmpbuf));
+ if (!access_ok(VERIFY_WRITE, buffer, len))
+ return -EFAULT;
+
+ if (__copy_to_user(buffer, tmpbuf, len))
+ return -EFAULT;
+ }
+ *lenp -= len;
+ *ppos += len;
+ return 0;
+}
+
static int
proc_dodebug(ctl_table *table, int write, struct file *file,
void __user *buffer, size_t *lenp, loff_t *ppos)
@@ -147,6 +172,12 @@ static ctl_table debug_table[] = {
.mode = 0644,
.proc_handler = &proc_dodebug
},
+ {
+ .procname = "transports",
+ .maxlen = 256,
+ .mode = 0444,
+ .proc_handler = &proc_do_xprt,
+ },
{ .ctl_name = 0 }
};
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 5426406..995c3fd 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -96,11 +96,13 @@ xdr_encode_string(__be32 *p, const char *string)
EXPORT_SYMBOL(xdr_encode_string);
__be32 *
-xdr_decode_string_inplace(__be32 *p, char **sp, int *lenp, int maxlen)
+xdr_decode_string_inplace(__be32 *p, char **sp,
+ unsigned int *lenp, unsigned int maxlen)
{
- unsigned int len;
+ u32 len;
- if ((len = ntohl(*p++)) > maxlen)
+ len = ntohl(*p++);
+ if (len > maxlen)
return NULL;
*lenp = len;
*sp = (char *) p;
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 264f0fe..5a8f268 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,3 +1,8 @@
obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
xprtrdma-y := transport.o rpc_rdma.o verbs.o
+
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += svcrdma.o
+
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
new file mode 100644
index 0000000..88c0ca2
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+
+atomic_t rdma_stat_recv;
+atomic_t rdma_stat_read;
+atomic_t rdma_stat_write;
+atomic_t rdma_stat_sq_starve;
+atomic_t rdma_stat_rq_starve;
+atomic_t rdma_stat_rq_poll;
+atomic_t rdma_stat_rq_prod;
+atomic_t rdma_stat_sq_poll;
+atomic_t rdma_stat_sq_prod;
+
+/*
+ * This function implements reading and resetting an atomic_t stat
+ * variable through read/write to a proc file. Any write to the file
+ * resets the associated statistic to zero. Any read returns it's
+ * current value.
+ */
+static int read_reset_stat(ctl_table *table, int write,
+ struct file *filp, void __user *buffer, size_t *lenp,
+ loff_t *ppos)
+{
+ atomic_t *stat = (atomic_t *)table->data;
+
+ if (!stat)
+ return -EINVAL;
+
+ if (write)
+ atomic_set(stat, 0);
+ else {
+ char str_buf[32];
+ char *data;
+ int len = snprintf(str_buf, 32, "%d\n", atomic_read(stat));
+ if (len >= 32)
+ return -EFAULT;
+ len = strlen(str_buf);
+ if (*ppos > len) {
+ *lenp = 0;
+ return 0;
+ }
+ data = &str_buf[*ppos];
+ len -= *ppos;
+ if (len > *lenp)
+ len = *lenp;
+ if (len && copy_to_user(buffer, str_buf, len))
+ return -EFAULT;
+ *lenp = len;
+ *ppos += len;
+ }
+ return 0;
+}
+
+static struct ctl_table_header *svcrdma_table_header;
+static ctl_table svcrdma_parm_table[] = {
+ {
+ .procname = "max_requests",
+ .data = &svcrdma_max_requests,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_requests,
+ .extra2 = &max_max_requests
+ },
+ {
+ .procname = "max_req_size",
+ .data = &svcrdma_max_req_size,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_inline,
+ .extra2 = &max_max_inline
+ },
+ {
+ .procname = "max_outbound_read_requests",
+ .data = &svcrdma_ord,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_ord,
+ .extra2 = &max_ord,
+ },
+
+ {
+ .procname = "rdma_stat_read",
+ .data = &rdma_stat_read,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_recv",
+ .data = &rdma_stat_recv,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_write",
+ .data = &rdma_stat_write,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_starve",
+ .data = &rdma_stat_sq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_starve",
+ .data = &rdma_stat_rq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_poll",
+ .data = &rdma_stat_rq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_prod",
+ .data = &rdma_stat_rq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_poll",
+ .data = &rdma_stat_sq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_prod",
+ .data = &rdma_stat_sq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_table[] = {
+ {
+ .procname = "svc_rdma",
+ .mode = 0555,
+ .child = svcrdma_parm_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_root_table[] = {
+ {
+ .ctl_name = CTL_SUNRPC,
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = svcrdma_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+void svc_rdma_cleanup(void)
+{
+ dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+ if (svcrdma_table_header) {
+ unregister_sysctl_table(svcrdma_table_header);
+ svcrdma_table_header = NULL;
+ }
+ svc_unreg_xprt_class(&svc_rdma_class);
+}
+
+int svc_rdma_init(void)
+{
+ dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+ dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord);
+ dprintk("\tmax_requests : %d\n", svcrdma_max_requests);
+ dprintk("\tsq_depth : %d\n",
+ svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+ dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
+ if (!svcrdma_table_header)
+ svcrdma_table_header =
+ register_sysctl_table(svcrdma_root_table);
+
+ /* Register RDMA with the SVC transport switch */
+ svc_reg_xprt_class(&svc_rdma_class);
+ return 0;
+}
+MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
new file mode 100644
index 0000000..9530ef2
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -0,0 +1,412 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/unaligned.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * position : u32 offset into XDR stream
+ * handle : u32 RKEY
+ * . . .
+ * end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+ struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
+
+ while (ch->rc_discrim != xdr_zero) {
+ u64 ch_offset;
+
+ if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+ return NULL;
+ }
+
+ ch->rc_discrim = ntohl(ch->rc_discrim);
+ ch->rc_position = ntohl(ch->rc_position);
+ ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
+ ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
+ va = (u32 *)&ch->rc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ ch++;
+ }
+ return (u32 *)&ch->rc_position;
+}
+
+/*
+ * Determine number of chunks and total bytes in chunk list. The chunk
+ * list has already been verified to fit within the RPCRDMA header.
+ */
+void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
+ int *ch_count, int *byte_count)
+{
+ /* compute the number of bytes represented by read chunks */
+ *byte_count = 0;
+ *ch_count = 0;
+ for (; ch->rc_discrim != 0; ch++) {
+ *byte_count = *byte_count + ch->rc_target.rs_length;
+ *ch_count = *ch_count + 1;
+ }
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * nchunks : <count>
+ * handle : u32 RKEY ---+
+ * length : u32 <len of segment> |
+ * offset : remove va + <count>
+ * . . . |
+ * ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for not write-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ }
+
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ return (u32 *)&ary->wc_array[ch_no].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for no reply-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ }
+
+ return (u32 *)&ary->wc_array[ch_no];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
+ struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ u32 *va;
+ u32 *vaend;
+ u32 hdr_len;
+
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ dprintk("svcrdma: header too short = %d\n",
+ rqstp->rq_arg.len);
+ return -EINVAL;
+ }
+
+ /* Decode the header */
+ rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+ rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+ rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+ rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+ if (rmsgp->rm_vers != RPCRDMA_VERSION)
+ return -ENOSYS;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ int hdrlen;
+ rmsgp->rm_body.rm_padded.rm_align =
+ ntohl(rmsgp->rm_body.rm_padded.rm_align);
+ rmsgp->rm_body.rm_padded.rm_thresh =
+ ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ if (hdrlen > rqstp->rq_arg.len)
+ return -EINVAL;
+ return hdrlen;
+ }
+
+ /* The chunk list may contain either a read chunk list or a write
+ * chunk list and a reply chunk list.
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+ vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ va = decode_read_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_write_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_reply_array(va, vaend);
+ if (!va)
+ return -EINVAL;
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+ *rdma_req = rmsgp;
+ return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ struct rpcrdma_read_chunk *ch;
+ struct rpcrdma_write_array *ary;
+ u32 *va;
+ u32 hdrlen;
+
+ dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+ rqstp);
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ return hdrlen;
+ }
+
+ /*
+ * Skip all chunks to find RPC msg. These were previously processed
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+
+ /* Skip read-list */
+ for (ch = (struct rpcrdma_read_chunk *)va;
+ ch->rc_discrim != xdr_zero; ch++);
+ va = (u32 *)&ch->rc_position;
+
+ /* Skip write-list */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+ /* Skip reply-array */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks];
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+ return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err, u32 *va)
+{
+ u32 *startp = va;
+
+ *va++ = htonl(rmsgp->rm_xid);
+ *va++ = htonl(rmsgp->rm_vers);
+ *va++ = htonl(xprt->sc_max_requests);
+ *va++ = htonl(RDMA_ERROR);
+ *va++ = htonl(err);
+ if (err == ERR_VERS) {
+ *va++ = htonl(RPCRDMA_VERSION);
+ *va++ = htonl(RPCRDMA_VERSION);
+ }
+
+ return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_write_array *wr_ary;
+
+ /* There is no read-list in a reply */
+
+ /* skip write list */
+ wr_ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+ wc_target.rs_length;
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ /* skip reply array */
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+ struct rpcrdma_write_array *ary;
+
+ /* no read-list */
+ rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+ /* write-array discrim */
+ ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+
+ /* write-list terminator */
+ ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+ /* reply-array discriminator */
+ ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+ int chunks)
+{
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
+ int chunk_no,
+ u32 rs_handle, u64 rs_offset,
+ u32 write_len)
+{
+ struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+ seg->rs_handle = htonl(rs_handle);
+ seg->rs_length = htonl(write_len);
+ xdr_encode_hyper((u32 *) &seg->rs_offset, rs_offset);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ enum rpcrdma_proc rdma_type)
+{
+ rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+ rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+ rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+ rdma_resp->rm_type = htonl(rdma_type);
+
+ /* Encode <nul> chunks lists */
+ rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
new file mode 100644
index 0000000..ab54a73
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *ctxt,
+ u32 byte_count)
+{
+ struct page *page;
+ u32 bc;
+ int sge_no;
+
+ /* Swap the page in the SGE with the page in argpages */
+ page = ctxt->pages[0];
+ put_page(rqstp->rq_pages[0]);
+ rqstp->rq_pages[0] = page;
+
+ /* Set up the XDR head */
+ rqstp->rq_arg.head[0].iov_base = page_address(page);
+ rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+ rqstp->rq_arg.len = byte_count;
+ rqstp->rq_arg.buflen = byte_count;
+
+ /* Compute bytes past head in the SGL */
+ bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+ /* If data remains, store it in the pagelist */
+ rqstp->rq_arg.page_len = bc;
+ rqstp->rq_arg.page_base = 0;
+ rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+ sge_no = 1;
+ while (bc && sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no];
+ put_page(rqstp->rq_pages[sge_no]);
+ rqstp->rq_pages[sge_no] = page;
+ bc -= min(bc, ctxt->sge[sge_no].length);
+ rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+ sge_no++;
+ }
+ rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+
+ /* We should never run out of SGE because the limit is defined to
+ * support the max allowed RPC data length
+ */
+ BUG_ON(bc && (sge_no == ctxt->count));
+ BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
+ != byte_count);
+ BUG_ON(rqstp->rq_arg.len != byte_count);
+
+ /* If not all pages were used from the SGL, free the remaining ones */
+ bc = sge_no;
+ while (sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no++];
+ put_page(page);
+ }
+ ctxt->count = bc;
+
+ /* Set up tail */
+ rqstp->rq_arg.tail[0].iov_base = NULL;
+ rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+struct chunk_sge {
+ int start; /* sge no for this chunk */
+ int count; /* sge count for this chunk */
+};
+
+/* Encode a read-chunk-list as an array of IB SGE
+ *
+ * Assumptions:
+ * - chunk[0]->position points to pages[0] at an offset of 0
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ * chunk in the read list
+ *
+ */
+static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ struct rpcrdma_msg *rmsgp,
+ struct ib_sge *sge,
+ struct chunk_sge *ch_sge_ary,
+ int ch_count,
+ int byte_count)
+{
+ int sge_no;
+ int sge_bytes;
+ int page_off;
+ int page_no;
+ int ch_bytes;
+ int ch_no;
+ struct rpcrdma_read_chunk *ch;
+
+ sge_no = 0;
+ page_no = 0;
+ page_off = 0;
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch_no = 0;
+ ch_bytes = ch->rc_target.rs_length;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = &head->pages[head->count];
+ head->sge[0].length = head->count; /* save count of hdr pages */
+ head->arg.page_base = 0;
+ head->arg.page_len = ch_bytes;
+ head->arg.len = rqstp->rq_arg.len + ch_bytes;
+ head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
+ head->count++;
+ ch_sge_ary[0].start = 0;
+ while (byte_count) {
+ sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ rqstp->rq_arg.pages[page_no],
+ page_off, sge_bytes,
+ DMA_FROM_DEVICE);
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ /*
+ * Don't bump head->count here because the same page
+ * may be used by multiple SGE.
+ */
+ head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+
+ byte_count -= sge_bytes;
+ ch_bytes -= sge_bytes;
+ sge_no++;
+ /*
+ * If all bytes for this chunk have been mapped to an
+ * SGE, move to the next SGE
+ */
+ if (ch_bytes == 0) {
+ ch_sge_ary[ch_no].count =
+ sge_no - ch_sge_ary[ch_no].start;
+ ch_no++;
+ ch++;
+ ch_sge_ary[ch_no].start = sge_no;
+ ch_bytes = ch->rc_target.rs_length;
+ /* If bytes remaining account for next chunk */
+ if (byte_count) {
+ head->arg.page_len += ch_bytes;
+ head->arg.len += ch_bytes;
+ head->arg.buflen += ch_bytes;
+ }
+ }
+ /*
+ * If this SGE consumed all of the page, move to the
+ * next page
+ */
+ if ((sge_bytes + page_off) == PAGE_SIZE) {
+ page_no++;
+ page_off = 0;
+ /*
+ * If there are still bytes left to map, bump
+ * the page count
+ */
+ if (byte_count)
+ head->count++;
+ } else
+ page_off += sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+ return sge_no;
+}
+
+static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
+ struct ib_sge *sge,
+ u64 *sgl_offset,
+ int count)
+{
+ int i;
+
+ ctxt->count = count;
+ for (i = 0; i < count; i++) {
+ ctxt->sge[i].addr = sge[i].addr;
+ ctxt->sge[i].length = sge[i].length;
+ *sgl_offset = *sgl_offset + sge[i].length;
+ }
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+#ifdef RDMA_TRANSPORT_IWARP
+ if ((RDMA_TRANSPORT_IWARP ==
+ rdma_node_get_transport(xprt->sc_cm_id->
+ device->node_type))
+ && sge_count > 1)
+ return 1;
+ else
+#endif
+ return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/*
+ * Use RDMA_READ to read data from the advertised client buffer into the
+ * XDR stream starting at rq_arg.head[0].iov_base.
+ * Each chunk in the array
+ * contains the following fields:
+ * discrim - '1', This isn't used for data placement
+ * position - The xdr stream offset (the same for every chunk)
+ * handle - RMR for client memory region
+ * length - data transfer length
+ * offset - 64 bit tagged offset in remote memory region
+ *
+ * On our side, we need to read into a pagelist. The first page immediately
+ * follows the RPC header.
+ *
+ * This function returns 1 to indicate success. The data is not yet in
+ * the pagelist and therefore the RPC request must be deferred. The
+ * I/O completion will enqueue the transport again and
+ * svc_rdma_recvfrom will complete the request.
+ *
+ * NOTE: The ctxt must not be touched after the last WR has been posted
+ * because the I/O completion processing may occur on another
+ * processor and free / modify the context. Ne touche pas!
+ */
+static int rdma_read_xdr(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *hdr_ctxt)
+{
+ struct ib_send_wr read_wr;
+ int err = 0;
+ int ch_no;
+ struct ib_sge *sge;
+ int ch_count;
+ int byte_count;
+ int sge_count;
+ u64 sgl_offset;
+ struct rpcrdma_read_chunk *ch;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct svc_rdma_op_ctxt *head;
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct svc_rdma_op_ctxt *tmp_ch_ctxt;
+ struct chunk_sge *ch_sge_ary;
+
+ /* If no read list is present, return 0 */
+ ch = svc_rdma_get_read_chunk(rmsgp);
+ if (!ch)
+ return 0;
+
+ /* Allocate temporary contexts to keep SGE */
+ BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+ tmp_ch_ctxt = svc_rdma_get_context(xprt);
+ ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
+
+ svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+ sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
+ sge, ch_sge_ary,
+ ch_count, byte_count);
+ head = svc_rdma_get_context(xprt);
+ sgl_offset = 0;
+ ch_no = 0;
+
+ for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch->rc_discrim != 0; ch++, ch_no++) {
+next_sge:
+ if (!ctxt)
+ ctxt = head;
+ else {
+ ctxt->next = svc_rdma_get_context(xprt);
+ ctxt = ctxt->next;
+ }
+ ctxt->next = NULL;
+ ctxt->direction = DMA_FROM_DEVICE;
+ clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ if ((ch+1)->rc_discrim == 0) {
+ /*
+ * Checked in sq_cq_reap to see if we need to
+ * be enqueued
+ */
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ ctxt->next = hdr_ctxt;
+ hdr_ctxt->next = head;
+ }
+
+ /* Prepare READ WR */
+ memset(&read_wr, 0, sizeof read_wr);
+ ctxt->wr_op = IB_WR_RDMA_READ;
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.opcode = IB_WR_RDMA_READ;
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
+ read_wr.wr.rdma.remote_addr =
+ get_unaligned(&(ch->rc_target.rs_offset)) +
+ sgl_offset;
+ read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+ read_wr.num_sge =
+ rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
+ rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
+ &sgl_offset,
+ read_wr.num_sge);
+
+ /* Post the read */
+ err = svc_rdma_send(xprt, &read_wr);
+ if (err) {
+ printk(KERN_ERR "svcrdma: Error posting send = %d\n",
+ err);
+ /*
+ * Break the circular list so free knows when
+ * to stop if the error happened to occur on
+ * the last read
+ */
+ ctxt->next = NULL;
+ goto out;
+ }
+ atomic_inc(&rdma_stat_read);
+
+ if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
+ ch_sge_ary[ch_no].count -= read_wr.num_sge;
+ ch_sge_ary[ch_no].start += read_wr.num_sge;
+ goto next_sge;
+ }
+ sgl_offset = 0;
+ err = 0;
+ }
+
+ out:
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ svc_rdma_put_context(tmp_ch_ctxt, 0);
+
+ /* Detach arg pages. svc_recv will replenish them */
+ for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
+ rqstp->rq_pages[ch_no] = NULL;
+
+ /*
+ * Detach res pages. svc_release must see a resused count of
+ * zero or it will attempt to put them.
+ */
+ while (rqstp->rq_resused)
+ rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+
+ if (err) {
+ printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ /* Free the linked list of read contexts */
+ while (head != NULL) {
+ ctxt = head->next;
+ svc_rdma_put_context(head, 1);
+ head = ctxt;
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
+static int rdma_read_complete(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *data)
+{
+ struct svc_rdma_op_ctxt *head = data->next;
+ int page_no;
+ int ret;
+
+ BUG_ON(!head);
+
+ /* Copy RPC pages */
+ for (page_no = 0; page_no < head->count; page_no++) {
+ put_page(rqstp->rq_pages[page_no]);
+ rqstp->rq_pages[page_no] = head->pages[page_no];
+ }
+ /* Point rq_arg.pages past header */
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+ rqstp->rq_arg.page_len = head->arg.page_len;
+ rqstp->rq_arg.page_base = head->arg.page_base;
+
+ /* rq_respages starts after the last arg page */
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+ rqstp->rq_resused = 0;
+
+ /* Rebuild rq_arg head and tail. */
+ rqstp->rq_arg.head[0] = head->arg.head[0];
+ rqstp->rq_arg.tail[0] = head->arg.tail[0];
+ rqstp->rq_arg.len = head->arg.len;
+ rqstp->rq_arg.buflen = head->arg.buflen;
+
+ /* XXX: What should this be? */
+ rqstp->rq_prot = IPPROTO_MAX;
+
+ /*
+ * Free the contexts we used to build the RDMA_READ. We have
+ * to be careful here because the context list uses the same
+ * next pointer used to chain the contexts associated with the
+ * RDMA_READ
+ */
+ data->next = NULL; /* terminate circular list */
+ do {
+ data = head->next;
+ svc_rdma_put_context(head, 0);
+ head = data;
+ } while (head != NULL);
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+
+ /* Indicate that we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+ svc_xprt_received(rqstp->rq_xprt);
+ return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma_xprt =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct rpcrdma_msg *rmsgp;
+ int ret = 0;
+ int len;
+
+ dprintk("svcrdma: rqstp=%p\n", rqstp);
+
+ /*
+ * The rq_xprt_ctxt indicates if we've consumed an RQ credit
+ * or not. It is used in the rdma xpo_release_rqst function to
+ * determine whether or not to return an RQ WQE to the RQ.
+ */
+ rqstp->rq_xprt_ctxt = NULL;
+
+ spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+ ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ }
+ spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (ctxt)
+ return rdma_read_complete(rqstp, ctxt);
+
+ spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+ ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ } else {
+ atomic_inc(&rdma_stat_rq_starve);
+ clear_bit(XPT_DATA, &xprt->xpt_flags);
+ ctxt = NULL;
+ }
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!ctxt) {
+ /* This is the EAGAIN path. The svc_recv routine will
+ * return -EAGAIN, the nfsd thread will go to call into
+ * svc_recv again and we shouldn't be on the active
+ * transport list
+ */
+ if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+ goto close_out;
+
+ BUG_ON(ret);
+ goto out;
+ }
+ dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+ ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+ BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
+ atomic_inc(&rdma_stat_recv);
+
+ /* Build up the XDR from the receive buffers. */
+ rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+ /* Decode the RDMA header. */
+ len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ rqstp->rq_xprt_hlen = len;
+
+ /* If the request is invalid, reply with an error */
+ if (len < 0) {
+ if (len == -ENOSYS)
+ (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+ goto close_out;
+ }
+
+ /* Read read-list data. If we would need to wait, defer
+ * it. Not that in this case, we don't return the RQ credit
+ * until after the read completes.
+ */
+ if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
+ svc_xprt_received(xprt);
+ return 0;
+ }
+
+ /* Indicate we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ svc_rdma_put_context(ctxt, 0);
+ out:
+ dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret, rqstp->rq_arg.len,
+ rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+ rqstp->rq_prot = IPPROTO_MAX;
+ svc_xprt_copy_addrs(rqstp, xprt);
+ svc_xprt_received(xprt);
+ return ret;
+
+ close_out:
+ if (ctxt) {
+ svc_rdma_put_context(ctxt, 1);
+ /* Indicate we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+ }
+ dprintk("svcrdma: transport %p is closing\n", xprt);
+ /*
+ * Set the close bit and enqueue it. svc_recv will see the
+ * close bit and call svc_xprt_delete
+ */
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_received(xprt);
+ return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
new file mode 100644
index 0000000..3e32194
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -0,0 +1,520 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/* Encode an XDR as an array of IB SGE
+ *
+ * Assumptions:
+ * - head[0] is physically contiguous.
+ * - tail[0] is physically contiguous.
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ *
+ * Output:
+ * SGE[0] reserved for RCPRDMA header
+ * SGE[1] data from xdr->head[]
+ * SGE[2..sge_count-2] data from xdr->pages[]
+ * SGE[sge_count-1] data from xdr->tail.
+ *
+ */
+static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
+ struct xdr_buf *xdr,
+ struct ib_sge *sge,
+ int *sge_count)
+{
+ /* Max we need is the length of the XDR / pagesize + one for
+ * head + one for tail + one for RPCRDMA header
+ */
+ int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
+ int sge_no;
+ u32 byte_count = xdr->len;
+ u32 sge_bytes;
+ u32 page_bytes;
+ int page_off;
+ int page_no;
+
+ /* Skip the first sge, this is for the RPCRDMA header */
+ sge_no = 1;
+
+ /* Head SGE */
+ sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->head[0].iov_base,
+ xdr->head[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no++;
+
+ /* pages SGE */
+ page_no = 0;
+ page_bytes = xdr->page_len;
+ page_off = xdr->page_base;
+ while (byte_count && page_bytes) {
+ sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ xdr->pages[page_no], page_off,
+ sge_bytes, DMA_TO_DEVICE);
+ sge_bytes = min(sge_bytes, page_bytes);
+ byte_count -= sge_bytes;
+ page_bytes -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+
+ sge_no++;
+ page_no++;
+ page_off = 0; /* reset for next time through loop */
+ }
+
+ /* Tail SGE */
+ if (byte_count && xdr->tail[0].iov_len) {
+ sge[sge_no].addr =
+ ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->tail[0].iov_base,
+ xdr->tail[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no++;
+ }
+
+ BUG_ON(sge_no > sge_max);
+ BUG_ON(byte_count != 0);
+
+ *sge_count = sge_no;
+ return sge;
+}
+
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+ u32 rmr, u64 to,
+ u32 xdr_off, int write_len,
+ struct ib_sge *xdr_sge, int sge_count)
+{
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct ib_send_wr write_wr;
+ struct ib_sge *sge;
+ int xdr_sge_no;
+ int sge_no;
+ int sge_bytes;
+ int sge_off;
+ int bc;
+ struct svc_rdma_op_ctxt *ctxt;
+ int ret = 0;
+
+ BUG_ON(sge_count >= 32);
+ dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
+ "write_len=%d, xdr_sge=%p, sge_count=%d\n",
+ rmr, to, xdr_off, write_len, xdr_sge, sge_count);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 0;
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+
+ /* Find the SGE associated with xdr_off */
+ for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count;
+ xdr_sge_no++) {
+ if (xdr_sge[xdr_sge_no].length > bc)
+ break;
+ bc -= xdr_sge[xdr_sge_no].length;
+ }
+
+ sge_off = bc;
+ bc = write_len;
+ sge_no = 0;
+
+ /* Copy the remaining SGE */
+ while (bc != 0 && xdr_sge_no < sge_count) {
+ sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
+ sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
+ sge_bytes = min((size_t)bc,
+ (size_t)(xdr_sge[xdr_sge_no].length-sge_off));
+ sge[sge_no].length = sge_bytes;
+
+ sge_off = 0;
+ sge_no++;
+ xdr_sge_no++;
+ bc -= sge_bytes;
+ }
+
+ BUG_ON(bc != 0);
+ BUG_ON(xdr_sge_no > sge_count);
+
+ /* Prepare WRITE WR */
+ memset(&write_wr, 0, sizeof write_wr);
+ ctxt->wr_op = IB_WR_RDMA_WRITE;
+ write_wr.wr_id = (unsigned long)ctxt;
+ write_wr.sg_list = &sge[0];
+ write_wr.num_sge = sge_no;
+ write_wr.opcode = IB_WR_RDMA_WRITE;
+ write_wr.send_flags = IB_SEND_SIGNALED;
+ write_wr.wr.rdma.rkey = rmr;
+ write_wr.wr.rdma.remote_addr = to;
+
+ /* Post It */
+ atomic_inc(&rdma_stat_write);
+ if (svc_rdma_send(xprt, &write_wr)) {
+ svc_rdma_put_context(ctxt, 1);
+ /* Fatal error, close transport */
+ ret = -EIO;
+ }
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ return ret;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_off;
+ int chunk_no;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_write_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[1];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* Write chunks start at the pagelist */
+ for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ struct rpcrdma_segment *arg_ch;
+ u64 rs_offset;
+
+ arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, arg_ch->rs_length);
+
+ /* Prepare the response chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(arg_ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ arg_ch->rs_handle,
+ rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ arg_ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+ return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_no;
+ int chunk_off;
+ struct rpcrdma_segment *ch;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ /* XXX: need to fix when reply lists occur with read-list and or
+ * write-list */
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[2];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* xdr offset starts at RPC message */
+ for (xdr_off = 0, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ u64 rs_offset;
+ ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, ch->rs_length);
+
+
+ /* Prepare the reply chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ ch->rs_handle, rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+ return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ * single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ * are sent via this RDMA_SEND and another portion of the data is
+ * sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ * header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct page *page,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rdma_op_ctxt *ctxt,
+ int sge_count,
+ int byte_count)
+{
+ struct ib_send_wr send_wr;
+ int sge_no;
+ int sge_bytes;
+ int page_no;
+ int ret;
+
+ /* Prepare the context */
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+
+ /* Prepare the SGE for the RPCRDMA Header */
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device,
+ page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+ ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+
+ /* Determine how many of our SGE are to be transmitted */
+ for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
+ sge_bytes = min((size_t)ctxt->sge[sge_no].length,
+ (size_t)byte_count);
+ byte_count -= sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+
+ /* Save all respages in the ctxt and remove them from the
+ * respages array. They are our pages until the I/O
+ * completes.
+ */
+ for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
+ ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+ ctxt->count++;
+ rqstp->rq_respages[page_no] = NULL;
+ }
+
+ BUG_ON(sge_no > rdma->sc_max_sge);
+ memset(&send_wr, 0, sizeof send_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = sge_no;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret)
+ svc_rdma_put_context(ctxt, 1);
+
+ return ret;
+}
+
+void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+/*
+ * Return the start of an xdr buffer.
+ */
+static void *xdr_start(struct xdr_buf *xdr)
+{
+ return xdr->head[0].iov_base -
+ (xdr->len -
+ xdr->page_len -
+ xdr->tail[0].iov_len -
+ xdr->head[0].iov_len);
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct rpcrdma_msg *rdma_argp;
+ struct rpcrdma_msg *rdma_resp;
+ struct rpcrdma_write_array *reply_ary;
+ enum rpcrdma_proc reply_type;
+ int ret;
+ int inline_bytes;
+ struct ib_sge *sge;
+ int sge_count = 0;
+ struct page *res_page;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+ /* Get the RDMA request header. */
+ rdma_argp = xdr_start(&rqstp->rq_arg);
+
+ /* Build an SGE for the XDR */
+ ctxt = svc_rdma_get_context(rdma);
+ ctxt->direction = DMA_TO_DEVICE;
+ sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
+
+ inline_bytes = rqstp->rq_res.len;
+
+ /* Create the RDMA response header */
+ res_page = svc_rdma_get_page();
+ rdma_resp = page_address(res_page);
+ reply_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (reply_ary)
+ reply_type = RDMA_NOMSG;
+ else
+ reply_type = RDMA_MSG;
+ svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+ rdma_resp, reply_type);
+
+ /* Send any write-chunk data and build resp write-list */
+ ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
+ ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ /* Send any reply-list data and update resp reply-list */
+ ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
+ ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
+ inline_bytes);
+ dprintk("svcrdma: send_reply returns %d\n", ret);
+ return ret;
+ error:
+ svc_rdma_put_context(ctxt, 0);
+ put_page(res_page);
+ return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
new file mode 100644
index 0000000..f09444c
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -0,0 +1,1080 @@
+/*
+ * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/svc_xprt.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags);
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
+static void svc_rdma_release_rqst(struct svc_rqst *);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void dto_tasklet_func(unsigned long data);
+static void svc_rdma_detach(struct svc_xprt *xprt);
+static void svc_rdma_free(struct svc_xprt *xprt);
+static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static DEFINE_SPINLOCK(dto_lock);
+static LIST_HEAD(dto_xprt_q);
+
+static struct svc_xprt_ops svc_rdma_ops = {
+ .xpo_create = svc_rdma_create,
+ .xpo_recvfrom = svc_rdma_recvfrom,
+ .xpo_sendto = svc_rdma_sendto,
+ .xpo_release_rqst = svc_rdma_release_rqst,
+ .xpo_detach = svc_rdma_detach,
+ .xpo_free = svc_rdma_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_has_wspace = svc_rdma_has_wspace,
+ .xpo_accept = svc_rdma_accept,
+};
+
+struct svc_xprt_class svc_rdma_class = {
+ .xcl_name = "rdma",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_ops,
+ .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+ int target;
+ int at_least_one = 0;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+ xprt->sc_ctxt_max);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ while (xprt->sc_ctxt_cnt < target) {
+ xprt->sc_ctxt_cnt++;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (ctxt) {
+ at_least_one = 1;
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ } else {
+ /* kmalloc failed...give up for now */
+ xprt->sc_ctxt_cnt--;
+ break;
+ }
+ }
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
+ xprt->sc_ctxt_max, xprt->sc_ctxt_cnt);
+ return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ while (1) {
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ /* Try to bump my cache. */
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ if (rdma_bump_context_cache(xprt))
+ continue;
+
+ printk(KERN_INFO "svcrdma: sleeping waiting for "
+ "context memory on xprt=%p\n",
+ xprt);
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ continue;
+ }
+ ctxt = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt->next;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ ctxt->xprt = xprt;
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ ctxt->count = 0;
+ break;
+ }
+ return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ BUG_ON(!ctxt);
+ xprt = ctxt->xprt;
+ if (free_pages)
+ for (i = 0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+
+ for (i = 0; i < ctxt->count; i++)
+ dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+ dprintk("svcrdma: received CQ event id=%d, context=%p\n",
+ event->event, context);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+
+ switch (event->event) {
+ /* These are considered benign events */
+ case IB_EVENT_PATH_MIG:
+ case IB_EVENT_COMM_EST:
+ case IB_EVENT_SQ_DRAINED:
+ case IB_EVENT_QP_LAST_WQE_REACHED:
+ dprintk("svcrdma: QP event %d received for QP=%p\n",
+ event->event, event->element.qp);
+ break;
+ /* These are considered fatal events */
+ case IB_EVENT_PATH_MIG_ERR:
+ case IB_EVENT_QP_FATAL:
+ case IB_EVENT_QP_REQ_ERR:
+ case IB_EVENT_QP_ACCESS_ERR:
+ case IB_EVENT_DEVICE_FATAL:
+ default:
+ dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+ "closing transport\n",
+ event->event, event->element.qp);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ break;
+ }
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+
+ spin_lock_irqsave(&dto_lock, flags);
+ while (!list_empty(&dto_xprt_q)) {
+ xprt = list_entry(dto_xprt_q.next,
+ struct svcxprt_rdma, sc_dto_q);
+ list_del_init(&xprt->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ rq_cq_reap(xprt);
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ /*
+ * If data arrived before established event,
+ * don't enqueue. This defers RPC I/O until the
+ * RDMA connection is complete.
+ */
+ if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ }
+
+ if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ sq_cq_reap(xprt);
+ }
+
+ spin_lock_irqsave(&dto_lock, flags);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler
+ *
+ * Since an RQ completion handler is called on interrupt context, we
+ * need to defer the handling of the I/O to a tasklet
+ */
+static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an SQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO
+ * context on the dto_q for the transport.
+ */
+static void rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ int ret;
+ struct ib_wc wc;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+
+ atomic_inc(&rdma_stat_rq_poll);
+
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+ ctxt->wc_status = wc.status;
+ ctxt->byte_len = wc.byte_len;
+ if (wc.status != IB_WC_SUCCESS) {
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+ continue;
+ }
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ }
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_rq_prod);
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ */
+static void sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_wc wc;
+ struct ib_cq *cq = xprt->sc_sq_cq;
+ int ret;
+
+ atomic_inc(&rdma_stat_sq_poll);
+ while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+ xprt = ctxt->xprt;
+
+ if (wc.status != IB_WC_SUCCESS)
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ switch (ctxt->wr_op) {
+ case IB_WR_SEND:
+ case IB_WR_RDMA_WRITE:
+ svc_rdma_put_context(ctxt, 1);
+ break;
+
+ case IB_WR_RDMA_READ:
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ spin_lock_bh(&xprt->sc_read_complete_lock);
+ list_add_tail(&ctxt->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock_bh(&xprt->sc_read_complete_lock);
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ default:
+ printk(KERN_ERR "svcrdma: unexpected completion type, "
+ "opcode=%d, status=%d\n",
+ wc.opcode, wc.status);
+ break;
+ }
+ }
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_sq_prod);
+}
+
+static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an RQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+static void create_context_cache(struct svcxprt_rdma *xprt,
+ int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+ int i;
+
+ xprt->sc_ctxt_max = ctxt_max;
+ xprt->sc_ctxt_bump = ctxt_bump;
+ xprt->sc_ctxt_cnt = 0;
+ xprt->sc_ctxt_head = NULL;
+ for (i = 0; i < ctxt_count; i++) {
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (ctxt) {
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ xprt->sc_ctxt_cnt++;
+ }
+ }
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+ struct svc_rdma_op_ctxt *next;
+ if (!ctxt)
+ return;
+
+ do {
+ next = ctxt->next;
+ kfree(ctxt);
+ ctxt = next;
+ } while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
+ int listener)
+{
+ struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+ if (!cma_xprt)
+ return NULL;
+ svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
+ INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+ init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+ spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_read_complete_lock);
+ spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+ cma_xprt->sc_ord = svcrdma_ord;
+
+ cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+ cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+ atomic_set(&cma_xprt->sc_sq_count, 0);
+
+ if (!listener) {
+ int reqs = cma_xprt->sc_max_requests;
+ create_context_cache(cma_xprt,
+ reqs << 1, /* starting size */
+ reqs, /* bump amount */
+ reqs +
+ cma_xprt->sc_sq_depth +
+ RPCRDMA_MAX_THREADS + 1); /* max */
+ if (!cma_xprt->sc_ctxt_head) {
+ kfree(cma_xprt);
+ return NULL;
+ }
+ clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+ } else
+ set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+
+ return cma_xprt;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+ struct page *page;
+
+ while ((page = alloc_page(GFP_KERNEL)) == NULL) {
+ /* If we can't get memory, wait a bit and try again */
+ printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
+ "jiffies.\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ }
+ return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+ struct ib_recv_wr recv_wr, *bad_recv_wr;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct page *page;
+ unsigned long pa;
+ int sge_no;
+ int buflen;
+ int ret;
+
+ ctxt = svc_rdma_get_context(xprt);
+ buflen = 0;
+ ctxt->direction = DMA_FROM_DEVICE;
+ for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
+ BUG_ON(sge_no >= xprt->sc_max_sge);
+ page = svc_rdma_get_page();
+ ctxt->pages[sge_no] = page;
+ pa = ib_dma_map_page(xprt->sc_cm_id->device,
+ page, 0, PAGE_SIZE,
+ DMA_FROM_DEVICE);
+ ctxt->sge[sge_no].addr = pa;
+ ctxt->sge[sge_no].length = PAGE_SIZE;
+ ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ buflen += PAGE_SIZE;
+ }
+ ctxt->count = sge_no;
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &ctxt->sge[0];
+ recv_wr.num_sge = ctxt->count;
+ recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+ ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ return ret;
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_xprt
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+ struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+ struct svcxprt_rdma *newxprt;
+
+ /* Create a new transport */
+ newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
+ if (!newxprt) {
+ dprintk("svcrdma: failed to create new transport\n");
+ return;
+ }
+ newxprt->sc_cm_id = new_cma_id;
+ new_cma_id->context = newxprt;
+ dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+ newxprt, newxprt->sc_cm_id, listen_xprt);
+
+ /*
+ * Enqueue the new transport on the accept queue of the listening
+ * transport
+ */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ /*
+ * Can't use svc_xprt_received here because we are not on a
+ * rqstp thread
+ */
+ set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal events.
+ */
+static int rdma_listen_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, cma_id->context, event->event);
+ handle_connect_req(cma_id);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt)
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on listening endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+static int rdma_cma_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svc_xprt *xprt = cma_id->context;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ switch (event->event) {
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on DTO xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
+ svc_xprt_enqueue(xprt);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ }
+ break;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, xprt, event->event);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ }
+ break;
+ default:
+ dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+ return 0;
+}
+
+/*
+ * Create a listening RDMA service endpoint.
+ */
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct rdma_cm_id *listen_id;
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_xprt *xprt;
+ int ret;
+
+ dprintk("svcrdma: Creating RDMA socket\n");
+
+ cma_xprt = rdma_create_xprt(serv, 1);
+ if (!cma_xprt)
+ return ERR_PTR(ENOMEM);
+ xprt = &cma_xprt->sc_xprt;
+
+ listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+ if (IS_ERR(listen_id)) {
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_create_id failed = %ld\n",
+ PTR_ERR(listen_id));
+ return (void *)listen_id;
+ }
+ ret = rdma_bind_addr(listen_id, sa);
+ if (ret) {
+ rdma_destroy_xprt(cma_xprt);
+ rdma_destroy_id(listen_id);
+ dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+ return ERR_PTR(ret);
+ }
+ cma_xprt->sc_cm_id = listen_id;
+
+ ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+ if (ret) {
+ rdma_destroy_id(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+ }
+
+ /*
+ * We need to use the address from the cm_id in case the
+ * caller specified 0 for the port number.
+ */
+ sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
+
+ return &cma_xprt->sc_xprt;
+}
+
+/*
+ * This is the xpo_recvfrom function for listening endpoints. Its
+ * purpose is to accept incoming connections. The CMA callback handler
+ * has already created a new transport and attached it to the new CMA
+ * ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_xprt structure. This
+ * function takes svc_xprt structures off the accept_q and completes
+ * the connection.
+ */
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *listen_rdma;
+ struct svcxprt_rdma *newxprt = NULL;
+ struct rdma_conn_param conn_param;
+ struct ib_qp_init_attr qp_attr;
+ struct ib_device_attr devattr;
+ struct sockaddr *sa;
+ int ret;
+ int i;
+
+ listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ clear_bit(XPT_CONN, &xprt->xpt_flags);
+ /* Get the next entry off the accept list */
+ spin_lock_bh(&listen_rdma->sc_lock);
+ if (!list_empty(&listen_rdma->sc_accept_q)) {
+ newxprt = list_entry(listen_rdma->sc_accept_q.next,
+ struct svcxprt_rdma, sc_accept_q);
+ list_del_init(&newxprt->sc_accept_q);
+ }
+ if (!list_empty(&listen_rdma->sc_accept_q))
+ set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
+ spin_unlock_bh(&listen_rdma->sc_lock);
+ if (!newxprt)
+ return NULL;
+
+ dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+ newxprt, newxprt->sc_cm_id);
+
+ ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+ if (ret) {
+ dprintk("svcrdma: could not query device attributes on "
+ "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
+ goto errout;
+ }
+
+ /* Qualify the transport resource defaults with the
+ * capabilities of this particular device */
+ newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+ (size_t)RPCSVC_MAXPAGES);
+ newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+ newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
+ (size_t)svcrdma_ord);
+
+ newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+ if (IS_ERR(newxprt->sc_pd)) {
+ dprintk("svcrdma: error creating PD for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ sq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_sq_depth,
+ 0);
+ if (IS_ERR(newxprt->sc_sq_cq)) {
+ dprintk("svcrdma: error creating SQ CQ for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ rq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_max_requests,
+ 0);
+ if (IS_ERR(newxprt->sc_rq_cq)) {
+ dprintk("svcrdma: error creating RQ CQ for connect request\n");
+ goto errout;
+ }
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.event_handler = qp_event_handler;
+ qp_attr.qp_context = &newxprt->sc_xprt;
+ qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+ qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+ qp_attr.send_cq = newxprt->sc_sq_cq;
+ qp_attr.recv_cq = newxprt->sc_rq_cq;
+ dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+ " cm_id->device=%p, sc_pd->device=%p\n"
+ " cap.max_send_wr = %d\n"
+ " cap.max_recv_wr = %d\n"
+ " cap.max_send_sge = %d\n"
+ " cap.max_recv_sge = %d\n",
+ newxprt->sc_cm_id, newxprt->sc_pd,
+ newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+ qp_attr.cap.max_send_wr,
+ qp_attr.cap.max_recv_wr,
+ qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_sge);
+
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ /*
+ * XXX: This is a hack. We need a xx_request_qp interface
+ * that will adjust the qp_attr's with a best-effort
+ * number
+ */
+ qp_attr.cap.max_send_sge -= 2;
+ qp_attr.cap.max_recv_sge -= 2;
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
+ &qp_attr);
+ if (ret) {
+ dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
+ goto errout;
+ }
+ newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+ newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+ newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+ newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+ }
+ newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+ /* Register all of physical memory */
+ newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+ IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE);
+ if (IS_ERR(newxprt->sc_phys_mr)) {
+ dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret);
+ goto errout;
+ }
+
+ /* Post receive buffers */
+ for (i = 0; i < newxprt->sc_max_requests; i++) {
+ ret = svc_rdma_post_recv(newxprt);
+ if (ret) {
+ dprintk("svcrdma: failure posting receive buffers\n");
+ goto errout;
+ }
+ }
+
+ /* Swap out the handler */
+ newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+ /* Accept Connection */
+ set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.responder_resources = 0;
+ conn_param.initiator_depth = newxprt->sc_ord;
+ ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+ if (ret) {
+ dprintk("svcrdma: failed to accept new connection, ret=%d\n",
+ ret);
+ goto errout;
+ }
+
+ dprintk("svcrdma: new connection %p accepted with the following "
+ "attributes:\n"
+ " local_ip : %d.%d.%d.%d\n"
+ " local_port : %d\n"
+ " remote_ip : %d.%d.%d.%d\n"
+ " remote_port : %d\n"
+ " max_sge : %d\n"
+ " sq_depth : %d\n"
+ " max_requests : %d\n"
+ " ord : %d\n",
+ newxprt,
+ NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_port),
+ NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_port),
+ newxprt->sc_max_sge,
+ newxprt->sc_sq_depth,
+ newxprt->sc_max_requests,
+ newxprt->sc_ord);
+
+ /* Set the local and remote addresses in the transport */
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ return &newxprt->sc_xprt;
+
+ errout:
+ dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+ rdma_destroy_id(newxprt->sc_cm_id);
+ rdma_destroy_xprt(newxprt);
+ return NULL;
+}
+
+/*
+ * Post an RQ WQE to the RQ when the rqst is being released. This
+ * effectively returns an RQ credit to the client. The rq_xprt_ctxt
+ * will be null if the request is deferred due to an RDMA_READ or the
+ * transport had no data ready (EAGAIN). Note that an RPC deferred in
+ * svc_process will still return the credit, this is because the data
+ * is copied and no longer consume a WQE/WC.
+ */
+static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
+{
+ int err;
+ struct svcxprt_rdma *rdma =
+ container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+ if (rqstp->rq_xprt_ctxt) {
+ BUG_ON(rqstp->rq_xprt_ctxt != rdma);
+ err = svc_rdma_post_recv(rdma);
+ if (err)
+ dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
+ err);
+ }
+ rqstp->rq_xprt_ctxt = NULL;
+}
+
+/* Disable data ready events for this connection */
+static void svc_rdma_detach(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ unsigned long flags;
+
+ dprintk("svc: svc_rdma_detach(%p)\n", xprt);
+ /*
+ * Shutdown the connection. This will ensure we don't get any
+ * more events from the provider.
+ */
+ rdma_disconnect(rdma->sc_cm_id);
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ /* We may already be on the DTO list */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (!list_empty(&rdma->sc_dto_q))
+ list_del_init(&rdma->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+ dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+ rdma_destroy_xprt(rdma);
+ kfree(rdma);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+ if (xprt->sc_qp && !IS_ERR(xprt->sc_qp))
+ ib_destroy_qp(xprt->sc_qp);
+
+ if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq))
+ ib_destroy_cq(xprt->sc_sq_cq);
+
+ if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq))
+ ib_destroy_cq(xprt->sc_rq_cq);
+
+ if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr))
+ ib_dereg_mr(xprt->sc_phys_mr);
+
+ if (xprt->sc_pd && !IS_ERR(xprt->sc_pd))
+ ib_dealloc_pd(xprt->sc_pd);
+
+ destroy_context_cache(xprt->sc_ctxt_head);
+}
+
+static int svc_rdma_has_wspace(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ /*
+ * If there are fewer SQ WR available than required to send a
+ * simple response, return false.
+ */
+ if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
+ return 0;
+
+ /*
+ * ...or there are already waiters on the SQ,
+ * return false.
+ */
+ if (waitqueue_active(&rdma->sc_send_wait))
+ return 0;
+
+ /* Otherwise return true. */
+ return 1;
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr;
+ int ret;
+
+ if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+ return 0;
+
+ BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+ BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
+ wr->opcode);
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ spin_lock_bh(&xprt->sc_lock);
+ if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+ spin_unlock_bh(&xprt->sc_lock);
+ atomic_inc(&rdma_stat_sq_starve);
+ /* See if we can reap some SQ WR */
+ sq_cq_reap(xprt);
+
+ /* Wait until SQ WR available if SQ still full */
+ wait_event(xprt->sc_send_wait,
+ atomic_read(&xprt->sc_sq_count) <
+ xprt->sc_sq_depth);
+ continue;
+ }
+ /* Bumped used SQ WR count and post */
+ ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+ if (!ret)
+ atomic_inc(&xprt->sc_sq_count);
+ else
+ dprintk("svcrdma: failed to post SQ WR rc=%d, "
+ "sc_sq_count=%d, sc_sq_depth=%d\n",
+ ret, atomic_read(&xprt->sc_sq_count),
+ xprt->sc_sq_depth);
+ spin_unlock_bh(&xprt->sc_lock);
+ break;
+ }
+ return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
+{
+ struct ib_send_wr err_wr;
+ struct ib_sge sge;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ u32 *va;
+ int length;
+ int ret;
+
+ p = svc_rdma_get_page();
+ va = page_address(p);
+
+ /* XDR encode error */
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ /* Prepare SGE for local address */
+ sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+ sge.lkey = xprt->sc_phys_mr->lkey;
+ sge.length = length;
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = &sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error posting send = %d\n", ret);
+ svc_rdma_put_context(ctxt, 1);
+ }
+
+ return ret;
+}
OpenPOWER on IntegriCloud