diff --git a/include/io/task.h b/include/io/task.h new file mode 100644 index 0000000000..2418714156 --- /dev/null +++ b/include/io/task.h @@ -0,0 +1,256 @@ +/* + * QEMU I/O task + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#ifndef QIO_TASK_H__ +#define QIO_TASK_H__ + +#include "qemu-common.h" +#include "qapi/error.h" +#include "qom/object.h" + +typedef struct QIOTask QIOTask; + +typedef void (*QIOTaskFunc)(Object *source, + Error *err, + gpointer opaque); + +typedef int (*QIOTaskWorker)(QIOTask *task, + Error **errp, + gpointer opaque); + +/** + * QIOTask: + * + * The QIOTask object provides a simple mechanism for reporting + * success / failure of long running background operations. + * + * A object on which the operation is to be performed could have + * a public API which accepts a task callback: + * + * + * Task callback function signature + * + * void myobject_operation(QMyObject *obj, + * QIOTaskFunc *func, + * gpointer opaque, + * GDestroyNotify *notify); + * + * + * + * The 'func' parameter is the callback to be invoked, and 'opaque' + * is data to pass to it. The optional 'notify' function is used + * to free 'opaque' when no longer needed. + * + * Now, lets say the implementation of this method wants to set + * a timer to run once a second checking for completion of some + * activity. It would do something like + * + * + * Task callback function implementation + * + * void myobject_operation(QMyObject *obj, + * QIOTaskFunc *func, + * gpointer opaque, + * GDestroyNotify *notify) + * { + * QIOTask *task; + * + * task = qio_task_new(OBJECT(obj), func, opaque, notify); + * + * g_timeout_add_full(G_PRIORITY_DEFAULT, + * 1000, + * myobject_operation_timer, + * task, + * NULL); + * } + * + * + * + * It could equally have setup a watch on a file descriptor or + * created a background thread, or something else entirely. + * Notice that the source object is passed to the task, and + * QIOTask will hold a reference on that. This ensure that + * the QMyObject instance cannot be garbage collected while + * the async task is still in progress. + * + * In this case, myobject_operation_timer will fire after + * 3 secs and do + * + * + * Task timer function + * + * gboolean myobject_operation_timer(gpointer opaque) + * { + * QIOTask *task = QIO_TASK(opaque); + * Error *err;* + * + * ...check something important... + * if (err) { + * qio_task_abort(task, err); + * error_free(task); + * return FALSE; + * } else if (...work is completed ...) { + * qio_task_complete(task); + * return FALSE; + * } + * ...carry on polling ... + * return TRUE; + * } + * + * + * + * Once this function returns false, object_unref will be called + * automatically on the task causing it to be released and the + * ref on QMyObject dropped too. + * + * The QIOTask module can also be used to perform operations + * in a background thread context, while still reporting the + * results in the main event thread. This allows code which + * cannot easily be rewritten to be asychronous (such as DNS + * lookups) to be easily run non-blocking. Reporting the + * results in the main thread context means that the caller + * typically does not need to be concerned about thread + * safety wrt the QEMU global mutex. + * + * For example, the socket_listen() method will block the caller + * while DNS lookups take place if given a name, instead of IP + * address. The C library often do not provide a practical async + * DNS API, so the to get non-blocking DNS lookups in a portable + * manner requires use of a thread. So achieve a non-blocking + * socket listen using QIOTask would require: + * + * + * static int myobject_listen_worker(QIOTask *task, + * Error **errp, + * gpointer opaque) + * { + * QMyObject obj = QMY_OBJECT(qio_task_get_source(task)); + * SocketAddress *addr = opaque; + * + * obj->fd = socket_listen(addr, errp); + * if (obj->fd < 0) { + * return -1; + * } + * return 0; + * } + * + * void myobject_listen_async(QMyObject *obj, + * SocketAddress *addr, + * QIOTaskFunc *func, + * gpointer opaque, + * GDestroyNotify *notify) + * { + * QIOTask *task; + * SocketAddress *addrCopy; + * + * qapi_copy_SocketAddress(&addrCopy, addr); + * task = qio_task_new(OBJECT(obj), func, opaque, notify); + * + * qio_task_run_in_thread(task, myobject_listen_worker, + * addrCopy, + * qapi_free_SocketAddress); + * } + * + * + * NB, The 'func' callback passed into myobject_listen_async + * will be invoked from the main event thread, despite the + * actual operation being performed in a different thread. + */ + +/** + * qio_task_new: + * @source: the object on which the operation is invoked + * @func: the callback to invoke when the task completes + * @opaque: opaque data to pass to @func when invoked + * @destroy: optional callback to free @opaque + * + * Creates a new task struct to track completion of a + * background operation running on the object @source. + * When the operation completes or fails, the callback + * @func will be invoked. The callback can access the + * 'err' attribute in the task object to determine if + * the operation was successful or not. + * + * The returned task will be released when one of + * qio_task_abort() or qio_task_complete() are invoked. + * + * Returns: the task struct + */ +QIOTask *qio_task_new(Object *source, + QIOTaskFunc func, + gpointer opaque, + GDestroyNotify destroy); + +/** + * qio_task_run_in_thread: + * @task: the task struct + * @worker: the function to invoke in a thread + * @opaque: opaque data to pass to @worker + * @destroy: function to free @opaque + * + * Run a task in a background thread. If @worker + * returns 0 it will call qio_task_complete() in + * the main event thread context. If @worker + * returns -1 it will call qio_task_abort() in + * the main event thread context. + */ +void qio_task_run_in_thread(QIOTask *task, + QIOTaskWorker worker, + gpointer opaque, + GDestroyNotify destroy); + +/** + * qio_task_complete: + * @task: the task struct + * + * Mark the operation as succesfully completed + * and free the memory for @task. + */ +void qio_task_complete(QIOTask *task); + +/** + * qio_task_abort: + * @task: the task struct + * @err: the error to record for the operation + * + * Mark the operation as failed, with @err providing + * details about the failure. The @err may be freed + * afer the function returns, as the notification + * callback is invoked synchronously. The @task will + * be freed when this call completes. + */ +void qio_task_abort(QIOTask *task, + Error *err); + + +/** + * qio_task_get_source: + * @task: the task struct + * + * Get the source object associated with the background + * task. This returns a new reference to the object, + * which the caller must released with object_unref() + * when no longer required. + * + * Returns: the source object + */ +Object *qio_task_get_source(QIOTask *task); + +#endif /* QIO_TASK_H__ */ diff --git a/io/Makefile.objs b/io/Makefile.objs index b02ea908ef..503b95c30b 100644 --- a/io/Makefile.objs +++ b/io/Makefile.objs @@ -1,2 +1,3 @@ io-obj-y = channel.o io-obj-y += channel-watch.o +io-obj-y += task.o diff --git a/io/task.c b/io/task.c new file mode 100644 index 0000000000..3127fca771 --- /dev/null +++ b/io/task.c @@ -0,0 +1,159 @@ +/* + * QEMU I/O task + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#include "io/task.h" +#include "qemu/thread.h" +#include "trace.h" + +struct QIOTask { + Object *source; + QIOTaskFunc func; + gpointer opaque; + GDestroyNotify destroy; +}; + + +QIOTask *qio_task_new(Object *source, + QIOTaskFunc func, + gpointer opaque, + GDestroyNotify destroy) +{ + QIOTask *task; + + task = g_new0(QIOTask, 1); + + task->source = source; + object_ref(source); + task->func = func; + task->opaque = opaque; + task->destroy = destroy; + + trace_qio_task_new(task, source, func, opaque); + + return task; +} + +static void qio_task_free(QIOTask *task) +{ + if (task->destroy) { + task->destroy(task->opaque); + } + object_unref(task->source); + + g_free(task); +} + + +struct QIOTaskThreadData { + QIOTask *task; + QIOTaskWorker worker; + gpointer opaque; + GDestroyNotify destroy; + Error *err; + int ret; +}; + + +static gboolean gio_task_thread_result(gpointer opaque) +{ + struct QIOTaskThreadData *data = opaque; + + trace_qio_task_thread_result(data->task); + if (data->ret == 0) { + qio_task_complete(data->task); + } else { + qio_task_abort(data->task, data->err); + } + + error_free(data->err); + if (data->destroy) { + data->destroy(data->opaque); + } + + g_free(data); + + return FALSE; +} + + +static gpointer qio_task_thread_worker(gpointer opaque) +{ + struct QIOTaskThreadData *data = opaque; + + trace_qio_task_thread_run(data->task); + data->ret = data->worker(data->task, &data->err, data->opaque); + if (data->ret < 0 && data->err == NULL) { + error_setg(&data->err, "Task worker failed but did not set an error"); + } + + /* We're running in the background thread, and must only + * ever report the task results in the main event loop + * thread. So we schedule an idle callback to report + * the worker results + */ + trace_qio_task_thread_exit(data->task); + g_idle_add(gio_task_thread_result, data); + return NULL; +} + + +void qio_task_run_in_thread(QIOTask *task, + QIOTaskWorker worker, + gpointer opaque, + GDestroyNotify destroy) +{ + struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1); + QemuThread thread; + + data->task = task; + data->worker = worker; + data->opaque = opaque; + data->destroy = destroy; + + trace_qio_task_thread_start(task, worker, opaque); + qemu_thread_create(&thread, + "io-task-worker", + qio_task_thread_worker, + data, + QEMU_THREAD_DETACHED); +} + + +void qio_task_complete(QIOTask *task) +{ + task->func(task->source, NULL, task->opaque); + trace_qio_task_complete(task); + qio_task_free(task); +} + +void qio_task_abort(QIOTask *task, + Error *err) +{ + task->func(task->source, err, task->opaque); + trace_qio_task_abort(task); + qio_task_free(task); +} + + +Object *qio_task_get_source(QIOTask *task) +{ + object_ref(task->source); + return task->source; +} diff --git a/tests/.gitignore b/tests/.gitignore index 1e55722b6a..eec12cc2db 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -24,6 +24,7 @@ test-cutils test-hbitmap test-int128 test-iov +test-io-task test-mul64 test-opts-visitor test-qapi-event.[ch] diff --git a/tests/Makefile b/tests/Makefile index 053c1ae481..515a7c7c30 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -84,6 +84,7 @@ check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlscredsx509$(EXESUF) check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlssession$(EXESUF) check-unit-$(CONFIG_LINUX) += tests/test-qga$(EXESUF) check-unit-y += tests/test-timed-average$(EXESUF) +check-unit-y += tests/test-io-task$(EXESUF) check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh @@ -381,6 +382,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \ $(test-qom-obj-y) test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y) test-block-obj-y = $(block-obj-y) $(test-crypto-obj-y) +test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y) tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y) tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y) @@ -469,6 +471,7 @@ tests/test-crypto-tlscredsx509$(EXESUF): tests/test-crypto-tlscredsx509.o \ tests/test-crypto-tlssession.o-cflags := $(TASN1_CFLAGS) tests/test-crypto-tlssession$(EXESUF): tests/test-crypto-tlssession.o \ tests/crypto-tls-x509-helpers.o tests/pkix_asn1_tab.o $(test-crypto-obj-y) +tests/test-io-task$(EXESUF): tests/test-io-task.o $(test-io-obj-y) libqos-obj-y = tests/libqos/pci.o tests/libqos/fw_cfg.o tests/libqos/malloc.o libqos-obj-y += tests/libqos/i2c.o tests/libqos/libqos.o diff --git a/tests/test-io-task.c b/tests/test-io-task.c new file mode 100644 index 0000000000..3344382c7f --- /dev/null +++ b/tests/test-io-task.c @@ -0,0 +1,268 @@ +/* + * QEMU I/O task tests + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#include + +#include "io/task.h" + +#define TYPE_DUMMY "qemu:dummy" + +typedef struct DummyObject DummyObject; +typedef struct DummyObjectClass DummyObjectClass; + +struct DummyObject { + Object parent; +}; + +struct DummyObjectClass { + ObjectClass parent; +}; + +static const TypeInfo dummy_info = { + .parent = TYPE_OBJECT, + .name = TYPE_DUMMY, + .instance_size = sizeof(DummyObject), + .class_size = sizeof(DummyObjectClass), +}; + +struct TestTaskData { + Object *source; + Error *err; + bool freed; +}; + + +static void task_callback(Object *source, + Error *err, + gpointer opaque) +{ + struct TestTaskData *data = opaque; + + data->source = source; + data->err = err; +} + + +static void test_task_complete(void) +{ + QIOTask *task; + Object *obj = object_new(TYPE_DUMMY); + Object *src; + struct TestTaskData data = { NULL, NULL, false }; + + task = qio_task_new(obj, task_callback, &data, NULL); + src = qio_task_get_source(task); + + qio_task_complete(task); + + g_assert(obj == src); + + object_unref(obj); + object_unref(src); + + g_assert(data.source == obj); + g_assert(data.err == NULL); + g_assert(data.freed == false); +} + + +static void task_data_free(gpointer opaque) +{ + struct TestTaskData *data = opaque; + + data->freed = true; +} + + +static void test_task_data_free(void) +{ + QIOTask *task; + Object *obj = object_new(TYPE_DUMMY); + struct TestTaskData data = { NULL, NULL, false }; + + task = qio_task_new(obj, task_callback, &data, task_data_free); + + qio_task_complete(task); + + object_unref(obj); + + g_assert(data.source == obj); + g_assert(data.err == NULL); + g_assert(data.freed == true); +} + + +static void test_task_error(void) +{ + QIOTask *task; + Object *obj = object_new(TYPE_DUMMY); + struct TestTaskData data = { NULL, NULL, false }; + Error *err = NULL; + + task = qio_task_new(obj, task_callback, &data, NULL); + + error_setg(&err, "Some error"); + + qio_task_abort(task, err); + + error_free(err); + object_unref(obj); + + g_assert(data.source == obj); + g_assert(data.err == err); + g_assert(data.freed == false); + +} + + +struct TestThreadWorkerData { + Object *source; + Error *err; + bool fail; + GThread *worker; + GThread *complete; + GMainLoop *loop; +}; + +static int test_task_thread_worker(QIOTask *task, + Error **errp, + gpointer opaque) +{ + struct TestThreadWorkerData *data = opaque; + + data->worker = g_thread_self(); + + if (data->fail) { + error_setg(errp, "Testing fail"); + return -1; + } + + return 0; +} + + +static void test_task_thread_callback(Object *source, + Error *err, + gpointer opaque) +{ + struct TestThreadWorkerData *data = opaque; + + data->source = source; + data->err = err; + + data->complete = g_thread_self(); + + g_main_loop_quit(data->loop); +} + + +static void test_task_thread_complete(void) +{ + QIOTask *task; + Object *obj = object_new(TYPE_DUMMY); + struct TestThreadWorkerData data = { 0 }; + GThread *self; + + data.loop = g_main_loop_new(g_main_context_default(), + TRUE); + + task = qio_task_new(obj, + test_task_thread_callback, + &data, + NULL); + + qio_task_run_in_thread(task, + test_task_thread_worker, + &data, + NULL); + + g_main_loop_run(data.loop); + + g_main_loop_unref(data.loop); + object_unref(obj); + + g_assert(data.source == obj); + g_assert(data.err == NULL); + + self = g_thread_self(); + + /* Make sure the test_task_thread_worker actually got + * run in a different thread */ + g_assert(data.worker != self); + + /* And that the test_task_thread_callback got rnu in + * the main loop thread (ie this one) */ + g_assert(data.complete == self); +} + + +static void test_task_thread_error(void) +{ + QIOTask *task; + Object *obj = object_new(TYPE_DUMMY); + struct TestThreadWorkerData data = { 0 }; + GThread *self; + + data.loop = g_main_loop_new(g_main_context_default(), + TRUE); + data.fail = true; + + task = qio_task_new(obj, + test_task_thread_callback, + &data, + NULL); + + qio_task_run_in_thread(task, + test_task_thread_worker, + &data, + NULL); + + g_main_loop_run(data.loop); + + g_main_loop_unref(data.loop); + object_unref(obj); + + g_assert(data.source == obj); + g_assert(data.err != NULL); + + self = g_thread_self(); + + /* Make sure the test_task_thread_worker actually got + * run in a different thread */ + g_assert(data.worker != self); + + /* And that the test_task_thread_callback got rnu in + * the main loop thread (ie this one) */ + g_assert(data.complete == self); +} + + +int main(int argc, char **argv) +{ + g_test_init(&argc, &argv, NULL); + module_call_init(MODULE_INIT_QOM); + type_register_static(&dummy_info); + g_test_add_func("/crypto/task/complete", test_task_complete); + g_test_add_func("/crypto/task/datafree", test_task_data_free); + g_test_add_func("/crypto/task/error", test_task_error); + g_test_add_func("/crypto/task/thread_complete", test_task_thread_complete); + g_test_add_func("/crypto/task/thread_error", test_task_thread_error); + return g_test_run(); +} diff --git a/trace-events b/trace-events index fa504cf494..acf2484b7d 100644 --- a/trace-events +++ b/trace-events @@ -1808,3 +1808,12 @@ user_handle_signal(void *env, int target_sig) "env=%p signal %d" user_host_signal(void *env, int host_sig, int target_sig) "env=%p signal %d (target %d(" user_queue_signal(void *env, int target_sig) "env=%p signal %d" user_s390x_restore_sigregs(void *env, uint64_t sc_psw_addr, uint64_t env_psw_addr) "env=%p frame psw.addr "PRIx64 " current psw.addr "PRIx64"" + +# io/task.c +qio_task_new(void *task, void *source, void *func, void *opaque) "Task new task=%p source=%p func=%p opaque=%p" +qio_task_complete(void *task) "Task complete task=%p" +qio_task_abort(void *task) "Task abort task=%p" +qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start task=%p worker=%p opaque=%p" +qio_task_thread_run(void *task) "Task thread run task=%p" +qio_task_thread_exit(void *task) "Task thread exit task=%p" +qio_task_thread_result(void *task) "Task thread result task=%p"