diff --git a/block/nbd.c b/block/nbd.c index 1b0e384c39..e0af5b4725 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -151,10 +151,18 @@ static void nbd_reply_ready(void *opaque) { BDRVNBDState *s = opaque; uint64_t i; + int ret; if (s->reply.handle == 0) { - /* No reply already in flight. Fetch a header. */ - if (nbd_receive_reply(s->sock, &s->reply) < 0) { + /* No reply already in flight. Fetch a header. It is possible + * that another thread has done the same thing in parallel, so + * the socket is not readable anymore. + */ + ret = nbd_receive_reply(s->sock, &s->reply); + if (ret == -EAGAIN) { + return; + } + if (ret < 0) { s->reply.handle = 0; goto fail; } diff --git a/nbd.c b/nbd.c index b31b3b2a4f..4c6d7f193d 100644 --- a/nbd.c +++ b/nbd.c @@ -78,9 +78,6 @@ /* That's all folks */ -#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true) -#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false) - ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; @@ -107,7 +104,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) err = socket_error(); /* recoverable error */ - if (err == EINTR || err == EAGAIN) { + if (err == EINTR || (offset > 0 && err == EAGAIN)) { continue; } @@ -126,6 +123,26 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) return offset; } +static ssize_t read_sync(int fd, void *buffer, size_t size) +{ + /* Sockets are kept in blocking mode in the negotiation phase. After + * that, a non-readable socket simply means that another thread stole + * our request/reply. Synchronization is done with recv_coroutine, so + * that this is coroutine-safe. + */ + return nbd_wr_sync(fd, buffer, size, true); +} + +static ssize_t write_sync(int fd, void *buffer, size_t size) +{ + int ret; + do { + /* For writes, we do expect the socket to be writable. */ + ret = nbd_wr_sync(fd, buffer, size, false); + } while (ret == -EAGAIN); + return ret; +} + static void combine_addr(char *buf, size_t len, const char* address, uint16_t port) { @@ -203,6 +220,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags) [28 .. 151] reserved (0) */ + socket_set_block(csock); rc = -EINVAL; TRACE("Beginning negotiation."); @@ -222,6 +240,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags) TRACE("Negotiation succeeded."); rc = 0; fail: + socket_set_nonblock(csock); return rc; } @@ -235,6 +254,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, TRACE("Receiving negotiation."); + socket_set_block(csock); rc = -EINVAL; if (read_sync(csock, buf, 8) != 8) { @@ -349,6 +369,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, rc = 0; fail: + socket_set_nonblock(csock); return rc; } @@ -742,8 +763,11 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque ssize_t rc; client->recv_coroutine = qemu_coroutine_self(); - if (nbd_receive_request(csock, request) < 0) { - rc = -EIO; + rc = nbd_receive_request(csock, request); + if (rc < 0) { + if (rc != -EAGAIN) { + rc = -EIO; + } goto out; } @@ -791,6 +815,9 @@ static void nbd_trip(void *opaque) TRACE("Reading request."); ret = nbd_co_receive_request(req, &request); + if (ret == -EAGAIN) { + goto done; + } if (ret == -EIO) { goto out; } @@ -901,6 +928,7 @@ static void nbd_trip(void *opaque) TRACE("Request/Reply complete"); +done: nbd_request_put(req); return;