mirror of
https://github.com/qemu/qemu.git
synced 2024-12-04 09:13:39 +08:00
QIOChannel: Add flags on io_writev and introduce io_flush callback
Add flags to io_writev and introduce io_flush as optional callback to QIOChannelClass, allowing the implementation of zero copy writes by subclasses. How to use them: - Write data using qio_channel_writev*(...,QIO_CHANNEL_WRITE_FLAG_ZERO_COPY), - Wait write completion with qio_channel_flush(). Notes: As some zero copy write implementations work asynchronously, it's recommended to keep the write buffer untouched until the return of qio_channel_flush(), to avoid the risk of sending an updated buffer instead of the buffer state during write. As io_flush callback is optional, if a subclass does not implement it, then: - io_flush will return 0 without changing anything. Also, some functions like qio_channel_writev_full_all() were adapted to receive a flag parameter. That allows shared code between zero copy and non-zero copy writev, and also an easier implementation on new flags. Signed-off-by: Leonardo Bras <leobras@redhat.com> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Reviewed-by: Peter Xu <peterx@redhat.com> Reviewed-by: Juan Quintela <quintela@redhat.com> Message-Id: <20220513062836.965425-3-leobras@redhat.com> Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
This commit is contained in:
parent
354081d43d
commit
b88651cb4d
@ -122,7 +122,7 @@ int io_channel_send_full(QIOChannel *ioc,
|
|||||||
|
|
||||||
ret = qio_channel_writev_full(
|
ret = qio_channel_writev_full(
|
||||||
ioc, &iov, 1,
|
ioc, &iov, 1,
|
||||||
fds, nfds, NULL);
|
fds, nfds, 0, NULL);
|
||||||
if (ret == QIO_CHANNEL_ERR_BLOCK) {
|
if (ret == QIO_CHANNEL_ERR_BLOCK) {
|
||||||
if (offset) {
|
if (offset) {
|
||||||
return offset;
|
return offset;
|
||||||
|
@ -68,7 +68,7 @@ bool mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc, Error **errp)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!qio_channel_writev_full_all(ioc, send, G_N_ELEMENTS(send),
|
if (!qio_channel_writev_full_all(ioc, send, G_N_ELEMENTS(send),
|
||||||
fds, nfds, errp)) {
|
fds, nfds, 0, errp)) {
|
||||||
ret = true;
|
ret = true;
|
||||||
} else {
|
} else {
|
||||||
trace_mpqemu_send_io_error(msg->cmd, msg->size, nfds);
|
trace_mpqemu_send_io_error(msg->cmd, msg->size, nfds);
|
||||||
|
@ -32,12 +32,15 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
|
|||||||
|
|
||||||
#define QIO_CHANNEL_ERR_BLOCK -2
|
#define QIO_CHANNEL_ERR_BLOCK -2
|
||||||
|
|
||||||
|
#define QIO_CHANNEL_WRITE_FLAG_ZERO_COPY 0x1
|
||||||
|
|
||||||
typedef enum QIOChannelFeature QIOChannelFeature;
|
typedef enum QIOChannelFeature QIOChannelFeature;
|
||||||
|
|
||||||
enum QIOChannelFeature {
|
enum QIOChannelFeature {
|
||||||
QIO_CHANNEL_FEATURE_FD_PASS,
|
QIO_CHANNEL_FEATURE_FD_PASS,
|
||||||
QIO_CHANNEL_FEATURE_SHUTDOWN,
|
QIO_CHANNEL_FEATURE_SHUTDOWN,
|
||||||
QIO_CHANNEL_FEATURE_LISTEN,
|
QIO_CHANNEL_FEATURE_LISTEN,
|
||||||
|
QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -104,6 +107,7 @@ struct QIOChannelClass {
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp);
|
Error **errp);
|
||||||
ssize_t (*io_readv)(QIOChannel *ioc,
|
ssize_t (*io_readv)(QIOChannel *ioc,
|
||||||
const struct iovec *iov,
|
const struct iovec *iov,
|
||||||
@ -136,6 +140,8 @@ struct QIOChannelClass {
|
|||||||
IOHandler *io_read,
|
IOHandler *io_read,
|
||||||
IOHandler *io_write,
|
IOHandler *io_write,
|
||||||
void *opaque);
|
void *opaque);
|
||||||
|
int (*io_flush)(QIOChannel *ioc,
|
||||||
|
Error **errp);
|
||||||
};
|
};
|
||||||
|
|
||||||
/* General I/O handling functions */
|
/* General I/O handling functions */
|
||||||
@ -228,6 +234,7 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
|
|||||||
* @niov: the length of the @iov array
|
* @niov: the length of the @iov array
|
||||||
* @fds: an array of file handles to send
|
* @fds: an array of file handles to send
|
||||||
* @nfds: number of file handles in @fds
|
* @nfds: number of file handles in @fds
|
||||||
|
* @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
|
||||||
* @errp: pointer to a NULL-initialized error object
|
* @errp: pointer to a NULL-initialized error object
|
||||||
*
|
*
|
||||||
* Write data to the IO channel, reading it from the
|
* Write data to the IO channel, reading it from the
|
||||||
@ -260,6 +267,7 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp);
|
Error **errp);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -837,6 +845,7 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
|
|||||||
* @niov: the length of the @iov array
|
* @niov: the length of the @iov array
|
||||||
* @fds: an array of file handles to send
|
* @fds: an array of file handles to send
|
||||||
* @nfds: number of file handles in @fds
|
* @nfds: number of file handles in @fds
|
||||||
|
* @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
|
||||||
* @errp: pointer to a NULL-initialized error object
|
* @errp: pointer to a NULL-initialized error object
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
@ -846,6 +855,14 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
|
|||||||
* to be written, yielding from the current coroutine
|
* to be written, yielding from the current coroutine
|
||||||
* if required.
|
* if required.
|
||||||
*
|
*
|
||||||
|
* If QIO_CHANNEL_WRITE_FLAG_ZERO_COPY is passed in flags,
|
||||||
|
* instead of waiting for all requested data to be written,
|
||||||
|
* this function will wait until it's all queued for writing.
|
||||||
|
* In this case, if the buffer gets changed between queueing and
|
||||||
|
* sending, the updated buffer will be sent. If this is not a
|
||||||
|
* desired behavior, it's suggested to call qio_channel_flush()
|
||||||
|
* before reusing the buffer.
|
||||||
|
*
|
||||||
* Returns: 0 if all bytes were written, or -1 on error
|
* Returns: 0 if all bytes were written, or -1 on error
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -853,6 +870,25 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
|
|||||||
const struct iovec *iov,
|
const struct iovec *iov,
|
||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds, size_t nfds,
|
int *fds, size_t nfds,
|
||||||
Error **errp);
|
int flags, Error **errp);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* qio_channel_flush:
|
||||||
|
* @ioc: the channel object
|
||||||
|
* @errp: pointer to a NULL-initialized error object
|
||||||
|
*
|
||||||
|
* Will block until every packet queued with
|
||||||
|
* qio_channel_writev_full() + QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
|
||||||
|
* is sent, or return in case of any error.
|
||||||
|
*
|
||||||
|
* If not implemented, acts as a no-op, and returns 0.
|
||||||
|
*
|
||||||
|
* Returns -1 if any error is found,
|
||||||
|
* 1 if every send failed to use zero copy.
|
||||||
|
* 0 otherwise.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int qio_channel_flush(QIOChannel *ioc,
|
||||||
|
Error **errp);
|
||||||
|
|
||||||
#endif /* QIO_CHANNEL_H */
|
#endif /* QIO_CHANNEL_H */
|
||||||
|
@ -81,6 +81,7 @@ static ssize_t qio_channel_buffer_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
|
QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
|
||||||
|
@ -276,6 +276,7 @@ static ssize_t qio_channel_command_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
|
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
|
||||||
|
@ -114,6 +114,7 @@ static ssize_t qio_channel_file_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
|
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
|
||||||
|
@ -524,6 +524,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
|
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
|
||||||
@ -619,6 +620,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
|
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
|
||||||
|
@ -301,6 +301,7 @@ static ssize_t qio_channel_tls_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
|
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
|
||||||
|
@ -1127,6 +1127,7 @@ static ssize_t qio_channel_websock_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
|
QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
|
||||||
|
49
io/channel.c
49
io/channel.c
@ -72,18 +72,32 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
|
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
|
||||||
|
|
||||||
if ((fds || nfds) &&
|
if (fds || nfds) {
|
||||||
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
|
if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
|
||||||
|
error_setg_errno(errp, EINVAL,
|
||||||
|
"Channel does not support file descriptor passing");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
|
||||||
|
error_setg_errno(errp, EINVAL,
|
||||||
|
"Zero Copy does not support file descriptor passing");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
|
||||||
|
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
|
||||||
error_setg_errno(errp, EINVAL,
|
error_setg_errno(errp, EINVAL,
|
||||||
"Channel does not support file descriptor passing");
|
"Requested Zero Copy feature is not available");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
|
return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -217,14 +231,14 @@ int qio_channel_writev_all(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
|
return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int qio_channel_writev_full_all(QIOChannel *ioc,
|
int qio_channel_writev_full_all(QIOChannel *ioc,
|
||||||
const struct iovec *iov,
|
const struct iovec *iov,
|
||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds, size_t nfds,
|
int *fds, size_t nfds,
|
||||||
Error **errp)
|
int flags, Error **errp)
|
||||||
{
|
{
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
struct iovec *local_iov = g_new(struct iovec, niov);
|
struct iovec *local_iov = g_new(struct iovec, niov);
|
||||||
@ -237,8 +251,10 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
|
|||||||
|
|
||||||
while (nlocal_iov > 0) {
|
while (nlocal_iov > 0) {
|
||||||
ssize_t len;
|
ssize_t len;
|
||||||
len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
|
|
||||||
errp);
|
len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
|
||||||
|
nfds, flags, errp);
|
||||||
|
|
||||||
if (len == QIO_CHANNEL_ERR_BLOCK) {
|
if (len == QIO_CHANNEL_ERR_BLOCK) {
|
||||||
if (qemu_in_coroutine()) {
|
if (qemu_in_coroutine()) {
|
||||||
qio_channel_yield(ioc, G_IO_OUT);
|
qio_channel_yield(ioc, G_IO_OUT);
|
||||||
@ -277,7 +293,7 @@ ssize_t qio_channel_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
|
return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -297,7 +313,7 @@ ssize_t qio_channel_write(QIOChannel *ioc,
|
|||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
|
struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
|
||||||
return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
|
return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -473,6 +489,19 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
|
|||||||
return klass->io_seek(ioc, offset, whence, errp);
|
return klass->io_seek(ioc, offset, whence, errp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int qio_channel_flush(QIOChannel *ioc,
|
||||||
|
Error **errp)
|
||||||
|
{
|
||||||
|
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
|
||||||
|
|
||||||
|
if (!klass->io_flush ||
|
||||||
|
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return klass->io_flush(ioc, errp);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void qio_channel_restart_read(void *opaque)
|
static void qio_channel_restart_read(void *opaque)
|
||||||
{
|
{
|
||||||
|
@ -2840,6 +2840,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
|
|||||||
size_t niov,
|
size_t niov,
|
||||||
int *fds,
|
int *fds,
|
||||||
size_t nfds,
|
size_t nfds,
|
||||||
|
int flags,
|
||||||
Error **errp)
|
Error **errp)
|
||||||
{
|
{
|
||||||
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
|
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
|
||||||
|
@ -77,7 +77,7 @@ static int pr_manager_helper_write(PRManagerHelper *pr_mgr,
|
|||||||
iov.iov_base = (void *)buf;
|
iov.iov_base = (void *)buf;
|
||||||
iov.iov_len = sz;
|
iov.iov_len = sz;
|
||||||
n_written = qio_channel_writev_full(QIO_CHANNEL(pr_mgr->ioc), &iov, 1,
|
n_written = qio_channel_writev_full(QIO_CHANNEL(pr_mgr->ioc), &iov, 1,
|
||||||
nfds ? &fd : NULL, nfds, errp);
|
nfds ? &fd : NULL, nfds, 0, errp);
|
||||||
|
|
||||||
if (n_written <= 0) {
|
if (n_written <= 0) {
|
||||||
assert(n_written != QIO_CHANNEL_ERR_BLOCK);
|
assert(n_written != QIO_CHANNEL_ERR_BLOCK);
|
||||||
|
@ -444,6 +444,7 @@ static void test_io_channel_unix_fd_pass(void)
|
|||||||
G_N_ELEMENTS(iosend),
|
G_N_ELEMENTS(iosend),
|
||||||
fdsend,
|
fdsend,
|
||||||
G_N_ELEMENTS(fdsend),
|
G_N_ELEMENTS(fdsend),
|
||||||
|
0,
|
||||||
&error_abort);
|
&error_abort);
|
||||||
|
|
||||||
qio_channel_readv_full(dst,
|
qio_channel_readv_full(dst,
|
||||||
|
Loading…
Reference in New Issue
Block a user