summaryrefslogtreecommitdiffstats
path: root/sys/kern
diff options
context:
space:
mode:
authordyson <dyson@FreeBSD.org>1996-02-04 19:56:35 +0000
committerdyson <dyson@FreeBSD.org>1996-02-04 19:56:35 +0000
commit276899d730ed09c8d2362c150aa0908e69928c36 (patch)
treec4581fd40d3e230191bb0fb5da9e47cd2b097e1f /sys/kern
parentda644672140f1509396e47cf079f636657af7168 (diff)
downloadFreeBSD-src-276899d730ed09c8d2362c150aa0908e69928c36.zip
FreeBSD-src-276899d730ed09c8d2362c150aa0908e69928c36.tar.gz
Improve the performance for pipe(2) again. Also include some
fixes for previous version of new pipes from Bruce Evans. This new version: Supports more properly the semantics of select (BDE). Supports "OLD_PIPE" correctly (kern_descrip.c, BDE). Eliminates incorrect EPIPE returns (bash 'pipe broken' messages.) Much faster yet, currently tuned relatively conservatively -- but now gives approx 50% more perf than the new pipes code did originally. (That was about 50% more perf than the original BSD pipe code.) Known bugs outstanding: No support for async io (SIGIO). Will be included soon. Next to do: Merge support for FIFOs. Submitted by: bde
Diffstat (limited to 'sys/kern')
-rw-r--r--sys/kern/kern_descrip.c8
-rw-r--r--sys/kern/sys_pipe.c488
2 files changed, 449 insertions, 47 deletions
diff --git a/sys/kern/kern_descrip.c b/sys/kern/kern_descrip.c
index 5c781e5..d214f0d 100644
--- a/sys/kern/kern_descrip.c
+++ b/sys/kern/kern_descrip.c
@@ -36,7 +36,7 @@
* SUCH DAMAGE.
*
* @(#)kern_descrip.c 8.6 (Berkeley) 4/19/94
- * $Id: kern_descrip.c,v 1.23 1995/12/14 08:31:14 phk Exp $
+ * $Id: kern_descrip.c,v 1.24 1996/01/28 23:41:39 dyson Exp $
*/
#include <sys/param.h>
@@ -425,9 +425,11 @@ ofstat(p, uap, retval)
error = soo_stat((struct socket *)fp->f_data, &ub);
break;
+#ifndef OLD_PIPE
case DTYPE_PIPE:
error = pipe_stat((struct pipe *)fp->f_data, &ub);
break;
+#endif
default:
panic("ofstat");
@@ -474,9 +476,11 @@ fstat(p, uap, retval)
error = soo_stat((struct socket *)fp->f_data, &ub);
break;
+#ifndef OLD_PIPE
case DTYPE_PIPE:
error = pipe_stat((struct pipe *)fp->f_data, &ub);
break;
+#endif
default:
panic("fstat");
@@ -512,7 +516,9 @@ fpathconf(p, uap, retval)
return (EBADF);
switch (fp->f_type) {
+#ifndef OLD_PIPE
case DTYPE_PIPE:
+#endif
case DTYPE_SOCKET:
if (uap->name != _PC_PIPE_BUF)
return (EINVAL);
diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c
index dda604a..4e563c9 100644
--- a/sys/kern/sys_pipe.c
+++ b/sys/kern/sys_pipe.c
@@ -18,7 +18,7 @@
* 5. Modifications may be freely made to this file if the above conditions
* are met.
*
- * $Id: sys_pipe.c,v 1.3 1996/01/31 02:05:12 dyson Exp $
+ * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $
*/
#ifndef OLD_PIPE
@@ -30,6 +30,29 @@
* do.
*/
+/*
+ * This code has two modes of operation, a small write mode and a large
+ * write mode. The small write mode acts like conventional pipes with
+ * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
+ * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
+ * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
+ * the receiving process can copy it directly from the pages in the sending
+ * process.
+ *
+ * If the sending process receives a signal, it is possible that it will
+ * go away, and certainly it's address space can change, because control
+ * is returned back to the user-mode side. In that case, the pipe code
+ * arranges to copy the buffer supplied by the user process, to a pageable
+ * kernel buffer, and the receiving process will grab the data from the
+ * pageable kernel buffer. Since signals don't happen all that often,
+ * the copy operation is normally eliminated.
+ *
+ * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
+ * happen for small transfers so that the system will not spend all of
+ * it's time context switching. PIPE_SIZE is constrained by the
+ * amount of kernel virtual memory.
+ */
+
#include <sys/param.h>
#include <sys/systm.h>
#include <sys/proc.h>
@@ -58,6 +81,7 @@
#include <vm/vm_extern.h>
#include <vm/pmap.h>
#include <vm/vm_map.h>
+#include <vm/vm_page.h>
static int pipe_read __P((struct file *fp, struct uio *uio,
struct ucred *cred));
@@ -76,15 +100,34 @@ static struct fileops pipeops =
* reference for performance reasons, so small amounts of outstanding I/O
* will not wipe the cache.
*/
-#define PIPESIZE (16384)
-#define MINPIPESIZE (PIPESIZE/3)
-#define MAXPIPESIZE (2*PIPESIZE/3)
+#define MINPIPESIZE (PIPE_SIZE/3)
+#define MAXPIPESIZE (2*PIPE_SIZE/3)
+
+/*
+ * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
+ * is there so that on large systems, we don't exhaust it.
+ */
+#define MAXPIPEKVA (8*1024*1024)
+
+/*
+ * Limit for direct transfers, we cannot, of course limit
+ * the amount of kva for pipes in general though.
+ */
+#define LIMITPIPEKVA (16*1024*1024)
+int amountpipekva;
static void pipeclose __P((struct pipe *cpipe));
static void pipebufferinit __P((struct pipe *cpipe));
static void pipeinit __P((struct pipe *cpipe));
-static __inline int pipelock __P((struct pipe *cpipe));
+static __inline int pipelock __P((struct pipe *cpipe, int catch));
static __inline void pipeunlock __P((struct pipe *cpipe));
+static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
+static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
+static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
+static void pipe_clone_write_buffer __P((struct pipe *wpipe));
+static void pipe_mark_pages_clean __P((struct pipe *cpipe));
+static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
+static void pipespace __P((struct pipe *cpipe));
/*
* The pipe system call for the DTYPE_PIPE type of pipes
@@ -106,8 +149,10 @@ pipe(p, uap, retval)
rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
pipeinit(rpipe);
+ rpipe->pipe_state |= PIPE_DIRECTOK;
wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
pipeinit(wpipe);
+ wpipe->pipe_state |= PIPE_DIRECTOK;
error = falloc(p, &rf, &fd);
if (error)
@@ -140,17 +185,13 @@ free1:
return (error);
}
-/*
- * initialize and allocate VM and memory for pipe
- */
static void
-pipeinit(cpipe)
+pipespace(cpipe)
struct pipe *cpipe;
{
int npages, error;
- npages = round_page(PIPESIZE)/PAGE_SIZE;
-
+ npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
/*
* Create an object, I don't like the idea of paging to/from
* kernel_object.
@@ -163,16 +204,29 @@ pipeinit(cpipe)
* The map entry is, by default, pageable.
*/
error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
- (vm_offset_t *) &cpipe->pipe_buffer.buffer, PIPESIZE, 1,
+ (vm_offset_t *) &cpipe->pipe_buffer.buffer,
+ cpipe->pipe_buffer.size, 1,
VM_PROT_ALL, VM_PROT_ALL, 0);
if (error != KERN_SUCCESS)
panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
+ amountpipekva += cpipe->pipe_buffer.size;
+}
+
+/*
+ * initialize and allocate VM and memory for pipe
+ */
+static void
+pipeinit(cpipe)
+ struct pipe *cpipe;
+{
cpipe->pipe_buffer.in = 0;
cpipe->pipe_buffer.out = 0;
cpipe->pipe_buffer.cnt = 0;
- cpipe->pipe_buffer.size = PIPESIZE;
+ cpipe->pipe_buffer.size = PIPE_SIZE;
+ /* Buffer kva gets dynamically allocated */
+ cpipe->pipe_buffer.buffer = NULL;
cpipe->pipe_state = 0;
cpipe->pipe_peer = NULL;
@@ -181,6 +235,14 @@ pipeinit(cpipe)
cpipe->pipe_atime = time;
cpipe->pipe_mtime = time;
bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
+
+ /*
+ * pipe data structure initializations to support direct pipe I/O
+ */
+ cpipe->pipe_map.cnt = 0;
+ cpipe->pipe_map.kva = 0;
+ cpipe->pipe_map.pos = 0;
+ cpipe->pipe_map.npages = 0;
}
@@ -188,13 +250,15 @@ pipeinit(cpipe)
* lock a pipe for I/O, blocking other access
*/
static __inline int
-pipelock(cpipe)
+pipelock(cpipe, catch)
struct pipe *cpipe;
+ int catch;
{
int error;
while (cpipe->pipe_state & PIPE_LOCK) {
cpipe->pipe_state |= PIPE_LWANT;
- if (error = tsleep( &cpipe->pipe_state, PRIBIO|PCATCH, "pipelk", 0)) {
+ if (error = tsleep( &cpipe->pipe_state,
+ catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
return error;
}
}
@@ -217,6 +281,24 @@ pipeunlock(cpipe)
return;
}
+#if 0
+static void
+pipe_mark_pages_clean(cpipe)
+ struct pipe *cpipe;
+{
+ vm_size_t off;
+ vm_page_t m;
+
+ for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
+ m = vm_page_lookup(cpipe->pipe_buffer.object, off);
+ if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
+ m->dirty = 0;
+ pmap_clear_modify(VM_PAGE_TO_PHYS(m));
+ }
+ }
+}
+#endif
+
/* ARGSUSED */
static int
pipe_read(fp, uio, cred)
@@ -228,16 +310,20 @@ pipe_read(fp, uio, cred)
struct pipe *rpipe = (struct pipe *) fp->f_data;
int error = 0;
int nread = 0;
+ int size;
++rpipe->pipe_busy;
while (uio->uio_resid) {
+ /*
+ * normal pipe buffer receive
+ */
if (rpipe->pipe_buffer.cnt > 0) {
int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
if (size > rpipe->pipe_buffer.cnt)
size = rpipe->pipe_buffer.cnt;
if (size > uio->uio_resid)
size = uio->uio_resid;
- if ((error = pipelock(rpipe)) == 0) {
+ if ((error = pipelock(rpipe,1)) == 0) {
error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
size, uio);
pipeunlock(rpipe);
@@ -252,6 +338,29 @@ pipe_read(fp, uio, cred)
rpipe->pipe_buffer.cnt -= size;
nread += size;
rpipe->pipe_atime = time;
+ /*
+ * Direct copy, bypassing a kernel buffer.
+ */
+ } else if ((size = rpipe->pipe_map.cnt) &&
+ (rpipe->pipe_state & PIPE_DIRECTW)) {
+ caddr_t va;
+ if (size > uio->uio_resid)
+ size = uio->uio_resid;
+ if ((error = pipelock(rpipe,1)) == 0) {
+ va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
+ error = uiomove(va, size, uio);
+ pipeunlock(rpipe);
+ }
+ if (error)
+ break;
+ nread += size;
+ rpipe->pipe_atime = time;
+ rpipe->pipe_map.pos += size;
+ rpipe->pipe_map.cnt -= size;
+ if (rpipe->pipe_map.cnt == 0) {
+ rpipe->pipe_state &= ~PIPE_DIRECTW;
+ wakeup(rpipe);
+ }
} else {
/*
* detect EOF condition
@@ -272,8 +381,6 @@ pipe_read(fp, uio, cred)
error = EAGAIN;
break;
}
- if (rpipe->pipe_peer == NULL)
- break;
/*
* If there is no more to read in the pipe, reset
@@ -281,7 +388,7 @@ pipe_read(fp, uio, cred)
* cache hit stats.
*/
- if ((error = pipelock(rpipe)) == 0) {
+ if ((error = pipelock(rpipe,1)) == 0) {
if (rpipe->pipe_buffer.cnt == 0) {
rpipe->pipe_buffer.in = 0;
rpipe->pipe_buffer.out = 0;
@@ -307,8 +414,11 @@ pipe_read(fp, uio, cred)
* it's pointers to the beginning. This improves
* cache hit stats.
*/
- if ((error == 0) && (error = pipelock(rpipe)) == 0) {
+ if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
if (rpipe->pipe_buffer.cnt == 0) {
+#if 0
+ pipe_mark_pages_clean(rpipe);
+#endif
rpipe->pipe_buffer.in = 0;
rpipe->pipe_buffer.out = 0;
}
@@ -330,35 +440,273 @@ pipe_read(fp, uio, cred)
return error;
}
-/* ARGSUSED */
+/*
+ * Map the sending processes' buffer into kernel space and wire it.
+ * This is similar to a physical write operation.
+ */
static int
-pipe_write(fp, uio, cred)
- struct file *fp;
+pipe_build_write_buffer(wpipe, uio)
+ struct pipe *wpipe;
struct uio *uio;
- struct ucred *cred;
{
- struct pipe *rpipe = (struct pipe *) fp->f_data;
- struct pipe *wpipe = rpipe->pipe_peer;
+ int size;
+ int i;
+ vm_offset_t addr, endaddr, paddr;
+
+ size = uio->uio_iov->iov_len;
+ if (size > wpipe->pipe_buffer.size)
+ size = wpipe->pipe_buffer.size;
+
+ endaddr = round_page(uio->uio_iov->iov_base + size);
+ for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
+ addr < endaddr;
+ addr += PAGE_SIZE, i+=1) {
+
+ vm_page_t m;
+
+ vm_fault_quick( addr, VM_PROT_READ);
+ paddr = pmap_kextract(addr);
+ if (!paddr) {
+ int j;
+ for(j=0;j<i;j++)
+ vm_page_unwire(wpipe->pipe_map.ms[j]);
+ return EFAULT;
+ }
+
+ m = PHYS_TO_VM_PAGE(paddr);
+ vm_page_wire(m);
+ wpipe->pipe_map.ms[i] = m;
+ }
+
+/*
+ * set up the control block
+ */
+ wpipe->pipe_map.npages = i;
+ wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
+ wpipe->pipe_map.cnt = size;
+
+/*
+ * and map the buffer
+ */
+ if (wpipe->pipe_map.kva == 0) {
+ wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
+ wpipe->pipe_buffer.size);
+ amountpipekva += wpipe->pipe_buffer.size;
+ }
+ pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
+ wpipe->pipe_map.npages);
+
+/*
+ * and update the uio data
+ */
+
+ uio->uio_iov->iov_len -= size;
+ uio->uio_iov->iov_base += size;
+ if (uio->uio_iov->iov_len == 0)
+ uio->uio_iov++;
+ uio->uio_resid -= size;
+ uio->uio_offset += size;
+ return 0;
+}
+
+/*
+ * unmap and unwire the process buffer
+ */
+static void
+pipe_destroy_write_buffer(wpipe)
+struct pipe *wpipe;
+{
+ int i;
+ pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
+
+ if (wpipe->pipe_map.kva) {
+ if (amountpipekva > MAXPIPEKVA) {
+ vm_offset_t kva = wpipe->pipe_map.kva;
+ wpipe->pipe_map.kva = 0;
+ kmem_free(kernel_map, kva,
+ wpipe->pipe_buffer.size);
+ amountpipekva -= wpipe->pipe_buffer.size;
+ }
+ }
+ for (i=0;i<wpipe->pipe_map.npages;i++)
+ vm_page_unwire(wpipe->pipe_map.ms[i]);
+}
+
+/*
+ * In the case of a signal, the writing process might go away. This
+ * code copies the data into the circular buffer so that the source
+ * pages can be freed without loss of data.
+ */
+static void
+pipe_clone_write_buffer(wpipe)
+struct pipe *wpipe;
+{
+ int size;
+ int pos;
+
+ size = wpipe->pipe_map.cnt;
+ pos = wpipe->pipe_map.pos;
+ bcopy((caddr_t) wpipe->pipe_map.kva+pos,
+ (caddr_t) wpipe->pipe_buffer.buffer,
+ size);
+
+ wpipe->pipe_buffer.in = size;
+ wpipe->pipe_buffer.out = 0;
+ wpipe->pipe_buffer.cnt = size;
+ wpipe->pipe_state &= ~PIPE_DIRECTW;
+
+ pipe_destroy_write_buffer(wpipe);
+}
+
+/*
+ * This implements the pipe buffer write mechanism. Note that only
+ * a direct write OR a normal pipe write can be pending at any given time.
+ * If there are any characters in the pipe buffer, the direct write will
+ * be deferred until the receiving process grabs all of the bytes from
+ * the pipe buffer. Then the direct mapping write is set-up.
+ */
+static int
+pipe_direct_write(wpipe, uio)
+ struct pipe *wpipe;
+ struct uio *uio;
+{
+ int error;
+ while (wpipe->pipe_state & PIPE_DIRECTW) {
+ error = tsleep(wpipe,
+ PRIBIO|PCATCH, "pipdww", 0);
+ if (error || (wpipe->pipe_state & PIPE_EOF))
+ goto error1;
+ }
+ wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
+ wpipe->pipe_state |= PIPE_DIRECTW;
+ while (wpipe->pipe_buffer.cnt > 0) {
+ error = tsleep(wpipe,
+ PRIBIO|PCATCH, "pipdwc", 0);
+ if (error || (wpipe->pipe_state & PIPE_EOF)) {
+ wpipe->pipe_state &= ~PIPE_DIRECTW;
+ if (error == 0)
+ error = EPIPE;
+ goto error1;
+ }
+ }
+
+ error = pipe_build_write_buffer(wpipe, uio);
+ if (error) {
+ wpipe->pipe_state &= ~PIPE_DIRECTW;
+ goto error1;
+ }
+
+ if (wpipe->pipe_state & PIPE_WANTR) {
+ wpipe->pipe_state &= ~PIPE_WANTR;
+ wakeup(wpipe);
+ }
+
+ error = 0;
+ while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
+ if (wpipe->pipe_state & PIPE_EOF) {
+ pipelock(wpipe, 0);
+ pipe_destroy_write_buffer(wpipe);
+ pipeunlock(wpipe);
+ wakeup(wpipe);
+ return EPIPE;
+ }
+ error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
+ }
+
+ pipelock(wpipe,0);
+ if (wpipe->pipe_state & PIPE_DIRECTW) {
+ /*
+ * this bit of trickery substitutes a kernel buffer for
+ * the process that might be going away.
+ */
+ pipe_clone_write_buffer(wpipe);
+ } else {
+ pipe_destroy_write_buffer(wpipe);
+ }
+ pipeunlock(wpipe);
+ return error;
+
+error1:
+ wakeup(wpipe);
+ return error;
+}
+
+static __inline int
+pipewrite(wpipe, uio, nbio)
+ struct pipe *wpipe;
+ struct uio *uio;
+ int nbio;
+{
int error = 0;
/*
* detect loss of pipe read side, issue SIGPIPE if lost.
*/
if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
- psignal(curproc, SIGPIPE);
return EPIPE;
}
+ if( wpipe->pipe_buffer.buffer == NULL) {
+ if ((error = pipelock(wpipe,1)) == 0) {
+ pipespace(wpipe);
+ pipeunlock(wpipe);
+ } else {
+ return error;
+ }
+ }
+
++wpipe->pipe_busy;
while (uio->uio_resid) {
- int space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
- if (space > 0) {
+ int space;
+ /*
+ * If the transfer is large, we can gain performance if
+ * we do process-to-process copies directly.
+ */
+ if ((amountpipekva < LIMITPIPEKVA) &&
+ (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
+ error = pipe_direct_write( wpipe, uio);
+ if (error) {
+ break;
+ }
+ continue;
+ }
+
+ /*
+ * Pipe buffered writes cannot be coincidental with
+ * direct writes. We wait until the currently executing
+ * direct write is completed before we start filling the
+ * pipe buffer.
+ */
+ retrywrite:
+ while (wpipe->pipe_state & PIPE_DIRECTW) {
+ error = tsleep(wpipe,
+ PRIBIO|PCATCH, "pipbww", 0);
+ if (error)
+ break;
+ }
+
+ space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
+
+ /*
+ * We must afford contiguous writes on buffers of size
+ * PIPE_BUF or less.
+ */
+ if ((space > 0) &&
+ ((uio->uio_resid > PIPE_BUF) || (uio->uio_resid <= space))) {
int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
if (size > space)
size = space;
if (size > uio->uio_resid)
size = uio->uio_resid;
- if ((error = pipelock(wpipe)) == 0) {
+ if ((error = pipelock(wpipe,1)) == 0) {
+ /*
+ * It is possible for a direct write to
+ * slip in on us... handle it here...
+ */
+ if (wpipe->pipe_state & PIPE_DIRECTW) {
+ pipeunlock(wpipe);
+ goto retrywrite;
+ }
error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
size, uio);
pipeunlock(wpipe);
@@ -383,9 +731,11 @@ pipe_write(fp, uio, cred)
/*
* don't block on non-blocking I/O
*/
- if (wpipe->pipe_state & PIPE_NBIO) {
+ if (nbio) {
+ error = EAGAIN;
break;
}
+
wpipe->pipe_state |= PIPE_WANTW;
if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
break;
@@ -395,13 +745,12 @@ pipe_write(fp, uio, cred)
* to ourselves.
*/
if (wpipe->pipe_state & PIPE_EOF) {
- psignal(curproc, SIGPIPE);
error = EPIPE;
+ break;
}
}
}
- --wpipe->pipe_busy;
if ((wpipe->pipe_busy == 0) &&
(wpipe->pipe_state & PIPE_WANT)) {
wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
@@ -416,13 +765,32 @@ pipe_write(fp, uio, cred)
wakeup(wpipe);
}
}
+ if ((wpipe->pipe_buffer.cnt == 0) &&
+ (uio->uio_resid == 0) &&
+ (error == EPIPE))
+ error = 0;
+
if (wpipe->pipe_state & PIPE_SEL) {
wpipe->pipe_state &= ~PIPE_SEL;
selwakeup(&wpipe->pipe_sel);
}
+
+ --wpipe->pipe_busy;
return error;
}
+/* ARGSUSED */
+static int
+pipe_write(fp, uio, cred)
+ struct file *fp;
+ struct uio *uio;
+ struct ucred *cred;
+{
+ struct pipe *rpipe = (struct pipe *) fp->f_data;
+ struct pipe *wpipe = rpipe->pipe_peer;
+ return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
+}
+
/*
* we implement a very minimal set of ioctls for compatibility with sockets.
*/
@@ -482,7 +850,8 @@ pipe_select(fp, which, p)
switch (which) {
case FREAD:
- if (rpipe->pipe_buffer.cnt > 0) {
+ if (rpipe->pipe_buffer.cnt > 0 ||
+ (rpipe->pipe_state & PIPE_EOF)) {
splx(s);
return (1);
}
@@ -491,11 +860,9 @@ pipe_select(fp, which, p)
break;
case FWRITE:
- if (wpipe == 0) {
- splx(s);
- return (1);
- }
- if (wpipe->pipe_buffer.cnt < wpipe->pipe_buffer.size) {
+ if ((wpipe == NULL) ||
+ (wpipe->pipe_state & PIPE_EOF) ||
+ ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
splx(s);
return (1);
}
@@ -504,6 +871,13 @@ pipe_select(fp, which, p)
break;
case 0:
+ if ((rpipe->pipe_state & PIPE_EOF) ||
+ (wpipe == NULL) ||
+ (wpipe->pipe_state & PIPE_EOF)) {
+ splx(s);
+ return (1);
+ }
+
selrecord(p, &rpipe->pipe_sel);
rpipe->pipe_state |= PIPE_SEL;
break;
@@ -519,7 +893,7 @@ pipe_stat(pipe, ub)
{
bzero((caddr_t)ub, sizeof (*ub));
ub->st_mode = S_IFSOCK;
- ub->st_blksize = pipe->pipe_buffer.size / 2;
+ ub->st_blksize = pipe->pipe_buffer.size;
ub->st_size = pipe->pipe_buffer.cnt;
ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
@@ -548,7 +922,14 @@ static void
pipeclose(cpipe)
struct pipe *cpipe;
{
+ struct pipe *ppipe;
if (cpipe) {
+
+ if (cpipe->pipe_state & PIPE_SEL) {
+ cpipe->pipe_state &= ~PIPE_SEL;
+ selwakeup(&cpipe->pipe_sel);
+ }
+
/*
* If the other side is blocked, wake it up saying that
* we want to close it down.
@@ -562,17 +943,32 @@ pipeclose(cpipe)
/*
* Disconnect from peer
*/
- if (cpipe->pipe_peer) {
- cpipe->pipe_peer->pipe_state |= PIPE_EOF;
- wakeup(cpipe->pipe_peer);
- cpipe->pipe_peer->pipe_peer = NULL;
+ if (ppipe = cpipe->pipe_peer) {
+ if (ppipe->pipe_state & PIPE_SEL) {
+ ppipe->pipe_state &= ~PIPE_SEL;
+ selwakeup(&ppipe->pipe_sel);
+ }
+
+ ppipe->pipe_state |= PIPE_EOF;
+ wakeup(ppipe);
+ ppipe->pipe_peer = NULL;
}
/*
* free resources
*/
- kmem_free(kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer,
- cpipe->pipe_buffer.size);
+ if (cpipe->pipe_buffer.buffer) {
+ amountpipekva -= cpipe->pipe_buffer.size;
+ kmem_free(kernel_map,
+ (vm_offset_t)cpipe->pipe_buffer.buffer,
+ cpipe->pipe_buffer.size);
+ }
+ if (cpipe->pipe_map.kva) {
+ amountpipekva -= cpipe->pipe_buffer.size;
+ kmem_free(kernel_map,
+ cpipe->pipe_map.kva,
+ cpipe->pipe_buffer.size);
+ }
free(cpipe, M_TEMP);
}
}
OpenPOWER on IntegriCloud