summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sys/kern/sys_socket.c76
-rw-r--r--tests/sys/aio/aio_test.c65
2 files changed, 109 insertions, 32 deletions
diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c
index 4c314a1..ffd59df 100644
--- a/sys/kern/sys_socket.c
+++ b/sys/kern/sys_socket.c
@@ -556,7 +556,7 @@ soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
struct file *fp;
struct uio uio;
struct iovec iov;
- size_t cnt;
+ size_t cnt, done;
int error, flags;
SOCKBUF_UNLOCK(sb);
@@ -567,8 +567,9 @@ retry:
td_savedcred = td->td_ucred;
td->td_ucred = job->cred;
- cnt = job->uaiocb.aio_nbytes;
- iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf;
+ done = job->uaiocb._aiocb_private.status;
+ cnt = job->uaiocb.aio_nbytes - done;
+ iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf + done);
iov.iov_len = cnt;
uio.uio_iov = &iov;
uio.uio_iovcnt = 1;
@@ -602,42 +603,52 @@ retry:
}
}
- cnt -= uio.uio_resid;
+ done += cnt - uio.uio_resid;
+ job->uaiocb._aiocb_private.status = done;
td->td_ucred = td_savedcred;
- if (cnt != 0 && (error == ERESTART || error == EINTR ||
- error == EWOULDBLOCK))
- error = 0;
if (error == EWOULDBLOCK) {
/*
- * A read() or write() on the socket raced with this
- * request. If the socket is now ready, try again.
- * If it is not, place this request at the head of the
+ * The request was either partially completed or not
+ * completed at all due to racing with a read() or
+ * write() on the socket. If the socket is
+ * non-blocking, return with any partial completion.
+ * If the socket is blocking or if no progress has
+ * been made, requeue this request at the head of the
* queue to try again when the socket is ready.
*/
- SOCKBUF_LOCK(sb);
- empty_results++;
- if (soaio_ready(so, sb)) {
- empty_retries++;
- SOCKBUF_UNLOCK(sb);
- goto retry;
- }
-
- if (!aio_set_cancel_function(job, soo_aio_cancel)) {
- MPASS(cnt == 0);
- SOCKBUF_UNLOCK(sb);
- aio_cancel(job);
- SOCKBUF_LOCK(sb);
- } else {
- TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
- }
- } else {
- if (error)
- aio_complete(job, -1, error);
- else
- aio_complete(job, cnt, 0);
+ MPASS(done != job->uaiocb.aio_nbytes);
SOCKBUF_LOCK(sb);
- }
+ if (done == 0 || !(so->so_state & SS_NBIO)) {
+ empty_results++;
+ if (soaio_ready(so, sb)) {
+ empty_retries++;
+ SOCKBUF_UNLOCK(sb);
+ goto retry;
+ }
+
+ if (!aio_set_cancel_function(job, soo_aio_cancel)) {
+ SOCKBUF_UNLOCK(sb);
+ if (done != 0)
+ aio_complete(job, done, 0);
+ else
+ aio_cancel(job);
+ SOCKBUF_LOCK(sb);
+ } else {
+ TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
+ }
+ return;
+ }
+ SOCKBUF_UNLOCK(sb);
+ }
+ if (done != 0 && (error == ERESTART || error == EINTR ||
+ error == EWOULDBLOCK))
+ error = 0;
+ if (error)
+ aio_complete(job, -1, error);
+ else
+ aio_complete(job, done, 0);
+ SOCKBUF_LOCK(sb);
}
static void
@@ -758,6 +769,7 @@ soo_aio_queue(struct file *fp, struct kaiocb *job)
if (!aio_set_cancel_function(job, soo_aio_cancel))
panic("new job was cancelled");
TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
+ job->uaiocb._aiocb_private.status = 0;
if (!(sb->sb_flags & SB_AIO_RUNNING)) {
if (soaio_ready(so, sb))
sowakeup_aio(so, sb);
diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c
index 1b10088..c81b8d6 100644
--- a/tests/sys/aio/aio_test.c
+++ b/tests/sys/aio/aio_test.c
@@ -781,6 +781,70 @@ ATF_TC_BODY(aio_socket_two_reads, tc)
close(s[0]);
}
+/*
+ * This test ensures that aio_write() on a blocking socket of a "large"
+ * buffer does not return a short completion.
+ */
+ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write);
+ATF_TC_BODY(aio_socket_blocking_short_write, tc)
+{
+ struct aiocb iocb, *iocbp;
+ char *buffer[2];
+ ssize_t done;
+ int buffer_size, sb_size;
+ socklen_t len;
+ int s[2];
+
+ ATF_REQUIRE_KERNEL_MODULE("aio");
+
+ ATF_REQUIRE(socketpair(PF_UNIX, SOCK_STREAM, 0, s) != -1);
+
+ len = sizeof(sb_size);
+ ATF_REQUIRE(getsockopt(s[0], SOL_SOCKET, SO_RCVBUF, &sb_size, &len) !=
+ -1);
+ ATF_REQUIRE(len == sizeof(sb_size));
+ buffer_size = sb_size;
+
+ ATF_REQUIRE(getsockopt(s[1], SOL_SOCKET, SO_SNDBUF, &sb_size, &len) !=
+ -1);
+ ATF_REQUIRE(len == sizeof(sb_size));
+ if (sb_size > buffer_size)
+ buffer_size = sb_size;
+
+ /*
+ * Use twice the size of the MAX(receive buffer, send buffer)
+ * to ensure that the write is split up into multiple writes
+ * internally.
+ */
+ buffer_size *= 2;
+
+ buffer[0] = malloc(buffer_size);
+ ATF_REQUIRE(buffer[0] != NULL);
+ buffer[1] = malloc(buffer_size);
+ ATF_REQUIRE(buffer[1] != NULL);
+
+ srandomdev();
+ aio_fill_buffer(buffer[1], buffer_size, random());
+
+ memset(&iocb, 0, sizeof(iocb));
+ iocb.aio_fildes = s[1];
+ iocb.aio_buf = buffer[1];
+ iocb.aio_nbytes = buffer_size;
+ ATF_REQUIRE(aio_write(&iocb) == 0);
+
+ done = recv(s[0], buffer[0], buffer_size, MSG_WAITALL);
+ ATF_REQUIRE(done == buffer_size);
+
+ done = aio_waitcomplete(&iocbp, NULL);
+ ATF_REQUIRE(iocbp == &iocb);
+ ATF_REQUIRE(done == buffer_size);
+
+ ATF_REQUIRE(memcmp(buffer[0], buffer[1], buffer_size) == 0);
+
+ close(s[1]);
+ close(s[0]);
+}
+
ATF_TP_ADD_TCS(tp)
{
@@ -792,6 +856,7 @@ ATF_TP_ADD_TCS(tp)
ATF_TP_ADD_TC(tp, aio_md_test);
ATF_TP_ADD_TC(tp, aio_large_read_test);
ATF_TP_ADD_TC(tp, aio_socket_two_reads);
+ ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write);
return (atf_no_error());
}
OpenPOWER on IntegriCloud