diff options
-rw-r--r-- | sys/kern/sys_socket.c | 76 | ||||
-rw-r--r-- | tests/sys/aio/aio_test.c | 65 |
2 files changed, 109 insertions, 32 deletions
diff --git a/sys/kern/sys_socket.c b/sys/kern/sys_socket.c index 4c314a1..ffd59df 100644 --- a/sys/kern/sys_socket.c +++ b/sys/kern/sys_socket.c @@ -556,7 +556,7 @@ soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job) struct file *fp; struct uio uio; struct iovec iov; - size_t cnt; + size_t cnt, done; int error, flags; SOCKBUF_UNLOCK(sb); @@ -567,8 +567,9 @@ retry: td_savedcred = td->td_ucred; td->td_ucred = job->cred; - cnt = job->uaiocb.aio_nbytes; - iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf; + done = job->uaiocb._aiocb_private.status; + cnt = job->uaiocb.aio_nbytes - done; + iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf + done); iov.iov_len = cnt; uio.uio_iov = &iov; uio.uio_iovcnt = 1; @@ -602,42 +603,52 @@ retry: } } - cnt -= uio.uio_resid; + done += cnt - uio.uio_resid; + job->uaiocb._aiocb_private.status = done; td->td_ucred = td_savedcred; - if (cnt != 0 && (error == ERESTART || error == EINTR || - error == EWOULDBLOCK)) - error = 0; if (error == EWOULDBLOCK) { /* - * A read() or write() on the socket raced with this - * request. If the socket is now ready, try again. - * If it is not, place this request at the head of the + * The request was either partially completed or not + * completed at all due to racing with a read() or + * write() on the socket. If the socket is + * non-blocking, return with any partial completion. + * If the socket is blocking or if no progress has + * been made, requeue this request at the head of the * queue to try again when the socket is ready. */ - SOCKBUF_LOCK(sb); - empty_results++; - if (soaio_ready(so, sb)) { - empty_retries++; - SOCKBUF_UNLOCK(sb); - goto retry; - } - - if (!aio_set_cancel_function(job, soo_aio_cancel)) { - MPASS(cnt == 0); - SOCKBUF_UNLOCK(sb); - aio_cancel(job); - SOCKBUF_LOCK(sb); - } else { - TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); - } - } else { - if (error) - aio_complete(job, -1, error); - else - aio_complete(job, cnt, 0); + MPASS(done != job->uaiocb.aio_nbytes); SOCKBUF_LOCK(sb); - } + if (done == 0 || !(so->so_state & SS_NBIO)) { + empty_results++; + if (soaio_ready(so, sb)) { + empty_retries++; + SOCKBUF_UNLOCK(sb); + goto retry; + } + + if (!aio_set_cancel_function(job, soo_aio_cancel)) { + SOCKBUF_UNLOCK(sb); + if (done != 0) + aio_complete(job, done, 0); + else + aio_cancel(job); + SOCKBUF_LOCK(sb); + } else { + TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); + } + return; + } + SOCKBUF_UNLOCK(sb); + } + if (done != 0 && (error == ERESTART || error == EINTR || + error == EWOULDBLOCK)) + error = 0; + if (error) + aio_complete(job, -1, error); + else + aio_complete(job, done, 0); + SOCKBUF_LOCK(sb); } static void @@ -758,6 +769,7 @@ soo_aio_queue(struct file *fp, struct kaiocb *job) if (!aio_set_cancel_function(job, soo_aio_cancel)) panic("new job was cancelled"); TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list); + job->uaiocb._aiocb_private.status = 0; if (!(sb->sb_flags & SB_AIO_RUNNING)) { if (soaio_ready(so, sb)) sowakeup_aio(so, sb); diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c index 1b10088..c81b8d6 100644 --- a/tests/sys/aio/aio_test.c +++ b/tests/sys/aio/aio_test.c @@ -781,6 +781,70 @@ ATF_TC_BODY(aio_socket_two_reads, tc) close(s[0]); } +/* + * This test ensures that aio_write() on a blocking socket of a "large" + * buffer does not return a short completion. + */ +ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write); +ATF_TC_BODY(aio_socket_blocking_short_write, tc) +{ + struct aiocb iocb, *iocbp; + char *buffer[2]; + ssize_t done; + int buffer_size, sb_size; + socklen_t len; + int s[2]; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + + ATF_REQUIRE(socketpair(PF_UNIX, SOCK_STREAM, 0, s) != -1); + + len = sizeof(sb_size); + ATF_REQUIRE(getsockopt(s[0], SOL_SOCKET, SO_RCVBUF, &sb_size, &len) != + -1); + ATF_REQUIRE(len == sizeof(sb_size)); + buffer_size = sb_size; + + ATF_REQUIRE(getsockopt(s[1], SOL_SOCKET, SO_SNDBUF, &sb_size, &len) != + -1); + ATF_REQUIRE(len == sizeof(sb_size)); + if (sb_size > buffer_size) + buffer_size = sb_size; + + /* + * Use twice the size of the MAX(receive buffer, send buffer) + * to ensure that the write is split up into multiple writes + * internally. + */ + buffer_size *= 2; + + buffer[0] = malloc(buffer_size); + ATF_REQUIRE(buffer[0] != NULL); + buffer[1] = malloc(buffer_size); + ATF_REQUIRE(buffer[1] != NULL); + + srandomdev(); + aio_fill_buffer(buffer[1], buffer_size, random()); + + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_fildes = s[1]; + iocb.aio_buf = buffer[1]; + iocb.aio_nbytes = buffer_size; + ATF_REQUIRE(aio_write(&iocb) == 0); + + done = recv(s[0], buffer[0], buffer_size, MSG_WAITALL); + ATF_REQUIRE(done == buffer_size); + + done = aio_waitcomplete(&iocbp, NULL); + ATF_REQUIRE(iocbp == &iocb); + ATF_REQUIRE(done == buffer_size); + + ATF_REQUIRE(memcmp(buffer[0], buffer[1], buffer_size) == 0); + + close(s[1]); + close(s[0]); +} + ATF_TP_ADD_TCS(tp) { @@ -792,6 +856,7 @@ ATF_TP_ADD_TCS(tp) ATF_TP_ADD_TC(tp, aio_md_test); ATF_TP_ADD_TC(tp, aio_large_read_test); ATF_TP_ADD_TC(tp, aio_socket_two_reads); + ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write); return (atf_no_error()); } |