sheepdog: move coroutine send/recv function to generic code

Outside coroutines, avoid busy waiting on EAGAIN by temporarily
making the socket blocking.

The API of qemu_recvv/qemu_sendv is slightly different from
do_readv/do_writev because they do not handle coroutines.  It
returns the number of bytes written before encountering an
EAGAIN.  The specificity of yielding on EAGAIN is entirely in
qemu-coroutine.c.

Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2011-09-08 13:46:25 +02:00
parent 993295fedc
commit 8c5135f90e
5 changed files with 260 additions and 211 deletions

View File

@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
#######################################################################
# coroutines
coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o
coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
ifeq ($(CONFIG_UCONTEXT_COROUTINE),y)
coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o
else

View File

@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
return acb;
}
#ifdef _WIN32
struct msghdr {
struct iovec *msg_iov;
size_t msg_iovlen;
};
static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
{
size_t size = 0;
char *buf, *p;
int i, ret;
/* count the msg size */
for (i = 0; i < msg->msg_iovlen; i++) {
size += msg->msg_iov[i].iov_len;
}
buf = g_malloc(size);
p = buf;
for (i = 0; i < msg->msg_iovlen; i++) {
memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
p += msg->msg_iov[i].iov_len;
}
ret = send(s, buf, size, flags);
g_free(buf);
return ret;
}
static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
{
size_t size = 0;
char *buf, *p;
int i, ret;
/* count the msg size */
for (i = 0; i < msg->msg_iovlen; i++) {
size += msg->msg_iov[i].iov_len;
}
buf = g_malloc(size);
ret = qemu_recv(s, buf, size, flags);
if (ret < 0) {
goto out;
}
p = buf;
for (i = 0; i < msg->msg_iovlen; i++) {
memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
p += msg->msg_iov[i].iov_len;
}
out:
g_free(buf);
return ret;
}
#endif
/*
* Send/recv data with iovec buffers
*
* This function send/recv data from/to the iovec buffer directly.
* The first `offset' bytes in the iovec buffer are skipped and next
* `len' bytes are used.
*
* For example,
*
* do_send_recv(sockfd, iov, len, offset, 1);
*
* is equals to
*
* char *buf = malloc(size);
* iov_to_buf(iov, iovcnt, buf, offset, size);
* send(sockfd, buf, size, 0);
* free(buf);
*/
static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
int write)
{
struct msghdr msg;
int ret, diff;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = iov;
msg.msg_iovlen = 1;
len += offset;
while (iov->iov_len < len) {
len -= iov->iov_len;
iov++;
msg.msg_iovlen++;
}
diff = iov->iov_len - len;
iov->iov_len -= diff;
while (msg.msg_iov->iov_len <= offset) {
offset -= msg.msg_iov->iov_len;
msg.msg_iov++;
msg.msg_iovlen--;
}
msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
msg.msg_iov->iov_len -= offset;
if (write) {
ret = sendmsg(sockfd, &msg, 0);
} else {
ret = recvmsg(sockfd, &msg, 0);
}
msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
msg.msg_iov->iov_len += offset;
iov->iov_len += diff;
return ret;
}
static int connect_to_sdog(const char *addr, const char *port)
{
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@ -618,83 +495,19 @@ success:
return fd;
}
static int do_readv_writev(int sockfd, struct iovec *iov, int len,
int iov_offset, int write)
{
int ret;
again:
ret = do_send_recv(sockfd, iov, len, iov_offset, write);
if (ret < 0) {
if (errno == EINTR) {
goto again;
}
if (errno == EAGAIN) {
if (qemu_in_coroutine()) {
qemu_coroutine_yield();
}
goto again;
}
error_report("failed to recv a rsp, %s", strerror(errno));
return 1;
}
iov_offset += ret;
len -= ret;
if (len) {
goto again;
}
return 0;
}
static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
{
return do_readv_writev(sockfd, iov, len, iov_offset, 0);
}
static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
{
return do_readv_writev(sockfd, iov, len, iov_offset, 1);
}
static int do_read_write(int sockfd, void *buf, int len, int write)
{
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = len;
return do_readv_writev(sockfd, &iov, len, 0, write);
}
static int do_read(int sockfd, void *buf, int len)
{
return do_read_write(sockfd, buf, len, 0);
}
static int do_write(int sockfd, void *buf, int len)
{
return do_read_write(sockfd, buf, len, 1);
}
static int send_req(int sockfd, SheepdogReq *hdr, void *data,
unsigned int *wlen)
{
int ret;
struct iovec iov[2];
iov[0].iov_base = hdr;
iov[0].iov_len = sizeof(*hdr);
if (*wlen) {
iov[1].iov_base = data;
iov[1].iov_len = *wlen;
ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
if (ret < sizeof(*hdr)) {
error_report("failed to send a req, %s", strerror(errno));
}
ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
if (ret) {
ret = qemu_send_full(sockfd, data, *wlen, 0);
if (ret < *wlen) {
error_report("failed to send a req, %s", strerror(errno));
ret = -1;
}
return ret;
@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
{
int ret;
socket_set_block(sockfd);
ret = send_req(sockfd, hdr, data, wlen);
if (ret) {
ret = -1;
if (ret < 0) {
goto out;
}
ret = do_read(sockfd, hdr, sizeof(*hdr));
if (ret) {
ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
if (ret < sizeof(*hdr)) {
error_report("failed to get a rsp, %s", strerror(errno));
ret = -1;
goto out;
}
@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
}
if (*rlen) {
ret = do_read(sockfd, data, *rlen);
if (ret) {
ret = qemu_recv_full(sockfd, data, *rlen, 0);
if (ret < *rlen) {
error_report("failed to get the data, %s", strerror(errno));
ret = -1;
goto out;
}
}
ret = 0;
out:
socket_set_nonblock(sockfd);
return ret;
}
@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque)
}
/* read a header */
ret = do_read(fd, &rsp, sizeof(rsp));
if (ret) {
ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
if (ret < 0) {
error_report("failed to get the header, %s", strerror(errno));
goto out;
}
@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque)
}
break;
case AIOCB_READ_UDATA:
ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
aio_req->iov_offset);
if (ret) {
ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
aio_req->iov_offset);
if (ret < 0) {
error_report("failed to get the data, %s", strerror(errno));
goto out;
}
@ -1114,16 +926,16 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
set_cork(s->fd, 1);
/* send a header */
ret = do_write(s->fd, &hdr, sizeof(hdr));
if (ret) {
ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
if (ret < 0) {
qemu_co_mutex_unlock(&s->lock);
error_report("failed to send a req, %s", strerror(errno));
return -EIO;
}
if (wlen) {
ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
if (ret) {
ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
if (ret < 0) {
qemu_co_mutex_unlock(&s->lock);
error_report("failed to send a data, %s", strerror(errno));
return -EIO;

111
cutils.c
View File

@ -25,6 +25,8 @@
#include "host-utils.h"
#include <math.h>
#include "qemu_socket.h"
void pstrcpy(char *buf, int buf_size, const char *str)
{
int c;
@ -403,3 +405,112 @@ int qemu_parse_fd(const char *param)
}
return fd;
}
/*
* Send/recv data with iovec buffers
*
* This function send/recv data from/to the iovec buffer directly.
* The first `offset' bytes in the iovec buffer are skipped and next
* `len' bytes are used.
*
* For example,
*
* do_sendv_recvv(sockfd, iov, len, offset, 1);
*
* is equal to
*
* char *buf = malloc(size);
* iov_to_buf(iov, iovcnt, buf, offset, size);
* send(sockfd, buf, size, 0);
* free(buf);
*/
static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
int do_sendv)
{
int ret, diff, iovlen;
struct iovec *last_iov;
/* last_iov is inclusive, so count from one. */
iovlen = 1;
last_iov = iov;
len += offset;
while (last_iov->iov_len < len) {
len -= last_iov->iov_len;
last_iov++;
iovlen++;
}
diff = last_iov->iov_len - len;
last_iov->iov_len -= diff;
while (iov->iov_len <= offset) {
offset -= iov->iov_len;
iov++;
iovlen--;
}
iov->iov_base = (char *) iov->iov_base + offset;
iov->iov_len -= offset;
{
#if defined CONFIG_IOVEC && defined CONFIG_POSIX
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = iov;
msg.msg_iovlen = iovlen;
do {
if (do_sendv) {
ret = sendmsg(sockfd, &msg, 0);
} else {
ret = recvmsg(sockfd, &msg, 0);
}
} while (ret == -1 && errno == EINTR);
#else
struct iovec *p = iov;
ret = 0;
while (iovlen > 0) {
int rc;
if (do_sendv) {
rc = send(sockfd, p->iov_base, p->iov_len, 0);
} else {
rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
}
if (rc == -1) {
if (errno == EINTR) {
continue;
}
if (ret == 0) {
ret = -1;
}
break;
}
if (rc == 0) {
break;
}
ret += rc;
iovlen--, p++;
}
#endif
}
/* Undo the changes above */
iov->iov_base = (char *) iov->iov_base - offset;
iov->iov_len += offset;
last_iov->iov_len += diff;
return ret;
}
int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset)
{
return do_sendv_recvv(sockfd, iov, len, iov_offset, 0);
}
int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset)
{
return do_sendv_recvv(sockfd, iov, len, iov_offset, 1);
}

View File

@ -175,7 +175,7 @@ ssize_t qemu_write_full(int fd, const void *buf, size_t count)
QEMU_WARN_UNUSED_RESULT;
ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags)
QEMU_WARN_UNUSED_RESULT;
ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags)
ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags)
QEMU_WARN_UNUSED_RESULT;
void qemu_set_cloexec(int fd);
@ -190,6 +190,9 @@ int qemu_pipe(int pipefd[2]);
#define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags)
#endif
int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset);
int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset);
/* Error handling. */
void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2);
@ -276,6 +279,33 @@ struct qemu_work_item {
void qemu_init_vcpu(void *env);
#endif
/**
* Sends an iovec (or optionally a part of it) down a socket, yielding
* when the socket is full.
*/
int qemu_co_sendv(int sockfd, struct iovec *iov,
int len, int iov_offset);
/**
* Receives data into an iovec (or optionally into a part of it) from
* a socket, yielding when there is no data in the socket.
*/
int qemu_co_recvv(int sockfd, struct iovec *iov,
int len, int iov_offset);
/**
* Sends a buffer down a socket, yielding when the socket is full.
*/
int qemu_co_send(int sockfd, void *buf, int len);
/**
* Receives data into a buffer from a socket, yielding when there
* is no data in the socket.
*/
int qemu_co_recv(int sockfd, void *buf, int len);
typedef struct QEMUIOVector {
struct iovec *iov;
int niov;

96
qemu-coroutine-io.c Normal file
View File

@ -0,0 +1,96 @@
/*
* Coroutine-aware I/O functions
*
* Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
* Copyright (c) 2011, Red Hat, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu-common.h"
#include "qemu_socket.h"
#include "qemu-coroutine.h"
int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov,
int len, int iov_offset)
{
int total = 0;
int ret;
while (len) {
ret = qemu_recvv(sockfd, iov, len, iov_offset + total);
if (ret < 0) {
if (errno == EAGAIN) {
qemu_coroutine_yield();
continue;
}
if (total == 0) {
total = -1;
}
break;
}
if (ret == 0) {
break;
}
total += ret, len -= ret;
}
return total;
}
int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov,
int len, int iov_offset)
{
int total = 0;
int ret;
while (len) {
ret = qemu_sendv(sockfd, iov, len, iov_offset + total);
if (ret < 0) {
if (errno == EAGAIN) {
qemu_coroutine_yield();
continue;
}
if (total == 0) {
total = -1;
}
break;
}
total += ret, len -= ret;
}
return total;
}
int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len)
{
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = len;
return qemu_co_recvv(sockfd, &iov, len, 0);
}
int coroutine_fn qemu_co_send(int sockfd, void *buf, int len)
{
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = len;
return qemu_co_sendv(sockfd, &iov, len, 0);
}