Pull request

v2:
  * Fixed qcow2 sanitizer warnings [Peter]
  * Renamed get_error test cases to get_error_all to avoid tripping "error:"
    grep scripts [Peter]
  * Added Fam's iothread stop patch
 -----BEGIN PGP SIGNATURE-----
 
 iQEcBAABAgAGBQJX1862AAoJEJykq7OBq3PIK/YH/jLBxEu/H4hu+NzR7V7ur8lv
 F5E633tVzfxC7iM5CGUV+6qKM1UsIXvXn98JDxuGPh8aZaPdhsaalrS5iAJby6LN
 nPxam3ps1RlPeFA3hS3OqMizlO0qhp2gQZ5T6KPEktNXgSKP3kyiumXDo26XQoj6
 dtW1yap9lcvk3z9XBdTTpAfPLmAurvcxUptZsxQgo6O6/7jCuLyxEDuFenL13tOU
 vUvdrWvBn44YlzDjJc75nw2EBnCFLphw6SCj6seMj8RXDn2JUdVM0DQIZ2HQtBcT
 YlVjYl8hNyBwRZGi1QjSzdIhPYZQen0gg3EVJUBdwg/zgkrS9w91JImSwJGMxyg=
 =AHxB
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging

Pull request

v2:
 * Fixed qcow2 sanitizer warnings [Peter]
 * Renamed get_error test cases to get_error_all to avoid tripping "error:"
   grep scripts [Peter]
 * Added Fam's iothread stop patch

# gpg: Signature made Tue 13 Sep 2016 11:02:30 BST
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
  iothread: Stop threads before main() quits
  tests: fix qvirtqueue_kick
  MAINTAINERS: add maintainer for replication
  support replication driver in blockdev-add
  tests: add unit test case for replication
  replication: Implement new driver for block replication
  replication: Introduce new APIs to do replication operation
  configure: support replication
  mirror: auto complete active commit
  docs: block replication's description
  block: Link backup into block core
  Backup: export interfaces for extra serialization
  Backup: clear all bitmap when doing block checkpoint
  block: unblock backup operations in backing file
  virtio-blk: rename virtio_device_info to virtio_blk_info
  linux-aio: process completions from ioq_submit()
  linux-aio: split processing events function
  linux-aio: consume events in userspace instead of calling io_getevents
  qcow2: avoid memcpy(dst, NULL, len)

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2016-09-13 14:31:18 +01:00
commit 4dfbe3767a
27 changed files with 2116 additions and 68 deletions

View File

@ -1624,6 +1624,15 @@ L: qemu-block@nongnu.org
S: Supported
F: tests/image-fuzzer/
Replication
M: Wen Congyang <wency@cn.fujitsu.com>
M: Changlong Xie <xiecl.fnst@cn.fujitsu.com>
S: Supported
F: replication*
F: block/replication.c
F: tests/test-replication.c
F: docs/block-replication.txt
Build and test automation
-------------------------
M: Alex Bennée <alex.bennee@linaro.org>

View File

@ -15,6 +15,7 @@ block-obj-$(CONFIG_POSIX) += aio-posix.o
block-obj-$(CONFIG_WIN32) += aio-win32.o
block-obj-y += block/
block-obj-y += qemu-io-cmds.o
block-obj-$(CONFIG_REPLICATION) += replication.o
block-obj-m = block/

17
block.c
View File

@ -1312,6 +1312,23 @@ void bdrv_set_backing_hd(BlockDriverState *bs, BlockDriverState *backing_hd)
/* Otherwise we won't be able to commit due to check in bdrv_commit */
bdrv_op_unblock(backing_hd, BLOCK_OP_TYPE_COMMIT_TARGET,
bs->backing_blocker);
/*
* We do backup in 3 ways:
* 1. drive backup
* The target bs is new opened, and the source is top BDS
* 2. blockdev backup
* Both the source and the target are top BDSes.
* 3. internal backup(used for block replication)
* Both the source and the target are backing file
*
* In case 1 and 2, neither the source nor the target is the backing file.
* In case 3, we will block the top BDS, so there is only one block job
* for the top BDS and its backing chain.
*/
bdrv_op_unblock(backing_hd, BLOCK_OP_TYPE_BACKUP_SOURCE,
bs->backing_blocker);
bdrv_op_unblock(backing_hd, BLOCK_OP_TYPE_BACKUP_TARGET,
bs->backing_blocker);
out:
bdrv_refresh_limits(bs, NULL);
}

View File

@ -22,11 +22,12 @@ block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
block-obj-$(CONFIG_LIBSSH2) += ssh.o
block-obj-y += accounting.o dirty-bitmap.o
block-obj-y += write-threshold.o
block-obj-y += backup.o
block-obj-$(CONFIG_REPLICATION) += replication.o
block-obj-y += crypto.o
common-obj-y += stream.o
common-obj-y += backup.o
iscsi.o-cflags := $(LIBISCSI_CFLAGS)
iscsi.o-libs := $(LIBISCSI_LIBS)

View File

@ -17,6 +17,7 @@
#include "block/block.h"
#include "block/block_int.h"
#include "block/blockjob.h"
#include "block/block_backup.h"
#include "qapi/error.h"
#include "qapi/qmp/qerror.h"
#include "qemu/ratelimit.h"
@ -27,13 +28,6 @@
#define BACKUP_CLUSTER_SIZE_DEFAULT (1 << 16)
#define SLICE_TIME 100000000ULL /* ns */
typedef struct CowRequest {
int64_t start;
int64_t end;
QLIST_ENTRY(CowRequest) list;
CoQueue wait_queue; /* coroutines blocked on this request */
} CowRequest;
typedef struct BackupBlockJob {
BlockJob common;
BlockBackend *target;
@ -255,6 +249,57 @@ static void backup_attached_aio_context(BlockJob *job, AioContext *aio_context)
blk_set_aio_context(s->target, aio_context);
}
void backup_do_checkpoint(BlockJob *job, Error **errp)
{
BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
int64_t len;
assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
if (backup_job->sync_mode != MIRROR_SYNC_MODE_NONE) {
error_setg(errp, "The backup job only supports block checkpoint in"
" sync=none mode");
return;
}
len = DIV_ROUND_UP(backup_job->common.len, backup_job->cluster_size);
bitmap_zero(backup_job->done_bitmap, len);
}
void backup_wait_for_overlapping_requests(BlockJob *job, int64_t sector_num,
int nb_sectors)
{
BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
int64_t sectors_per_cluster = cluster_size_sectors(backup_job);
int64_t start, end;
assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
start = sector_num / sectors_per_cluster;
end = DIV_ROUND_UP(sector_num + nb_sectors, sectors_per_cluster);
wait_for_overlapping_requests(backup_job, start, end);
}
void backup_cow_request_begin(CowRequest *req, BlockJob *job,
int64_t sector_num,
int nb_sectors)
{
BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
int64_t sectors_per_cluster = cluster_size_sectors(backup_job);
int64_t start, end;
assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
start = sector_num / sectors_per_cluster;
end = DIV_ROUND_UP(sector_num + nb_sectors, sectors_per_cluster);
cow_request_begin(req, backup_job, start, end);
}
void backup_cow_request_end(CowRequest *req)
{
cow_request_end(req);
}
static const BlockJobDriver backup_job_driver = {
.instance_size = sizeof(BackupBlockJob),
.job_type = BLOCK_JOB_TYPE_BACKUP,

View File

@ -59,7 +59,6 @@ struct LinuxAioState {
/* I/O completion processing */
QEMUBH *completion_bh;
struct io_event events[MAX_EVENTS];
int event_idx;
int event_max;
};
@ -95,64 +94,153 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
laiocb->ret = ret;
if (laiocb->co) {
qemu_coroutine_enter(laiocb->co);
/* Jump and continue completion for foreign requests, don't do
* anything for current request, it will be completed shortly. */
if (laiocb->co != qemu_coroutine_self()) {
qemu_coroutine_enter(laiocb->co);
}
} else {
laiocb->common.cb(laiocb->common.opaque, ret);
qemu_aio_unref(laiocb);
}
}
/* The completion BH fetches completed I/O requests and invokes their
* callbacks.
/**
* aio_ring buffer which is shared between userspace and kernel.
*
* This copied from linux/fs/aio.c, common header does not exist
* but AIO exists for ages so we assume ABI is stable.
*/
struct aio_ring {
unsigned id; /* kernel internal index number */
unsigned nr; /* number of io_events */
unsigned head; /* Written to by userland or by kernel. */
unsigned tail;
unsigned magic;
unsigned compat_features;
unsigned incompat_features;
unsigned header_length; /* size of aio_ring */
struct io_event io_events[0];
};
/**
* io_getevents_peek:
* @ctx: AIO context
* @events: pointer on events array, output value
* Returns the number of completed events and sets a pointer
* on events array. This function does not update the internal
* ring buffer, only reads head and tail. When @events has been
* processed io_getevents_commit() must be called.
*/
static inline unsigned int io_getevents_peek(io_context_t ctx,
struct io_event **events)
{
struct aio_ring *ring = (struct aio_ring *)ctx;
unsigned int head = ring->head, tail = ring->tail;
unsigned int nr;
nr = tail >= head ? tail - head : ring->nr - head;
*events = ring->io_events + head;
/* To avoid speculative loads of s->events[i] before observing tail.
Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
smp_rmb();
return nr;
}
/**
* io_getevents_commit:
* @ctx: AIO context
* @nr: the number of events on which head should be advanced
*
* Advances head of a ring buffer.
*/
static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
{
struct aio_ring *ring = (struct aio_ring *)ctx;
if (nr) {
ring->head = (ring->head + nr) % ring->nr;
}
}
/**
* io_getevents_advance_and_peek:
* @ctx: AIO context
* @events: pointer on events array, output value
* @nr: the number of events on which head should be advanced
*
* Advances head of a ring buffer and returns number of elements left.
*/
static inline unsigned int
io_getevents_advance_and_peek(io_context_t ctx,
struct io_event **events,
unsigned int nr)
{
io_getevents_commit(ctx, nr);
return io_getevents_peek(ctx, events);
}
/**
* qemu_laio_process_completions:
* @s: AIO state
*
* Fetches completed I/O requests and invokes their callbacks.
*
* The function is somewhat tricky because it supports nested event loops, for
* example when a request callback invokes aio_poll(). In order to do this,
* the completion events array and index are kept in LinuxAioState. The BH
* reschedules itself as long as there are completions pending so it will
* either be called again in a nested event loop or will be called after all
* events have been completed. When there are no events left to complete, the
* BH returns without rescheduling.
* indices are kept in LinuxAioState. Function schedules BH completion so it
* can be called again in a nested event loop. When there are no events left
* to complete the BH is being canceled.
*/
static void qemu_laio_completion_bh(void *opaque)
static void qemu_laio_process_completions(LinuxAioState *s)
{
LinuxAioState *s = opaque;
/* Fetch more completion events when empty */
if (s->event_idx == s->event_max) {
do {
struct timespec ts = { 0 };
s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
s->events, &ts);
} while (s->event_max == -EINTR);
s->event_idx = 0;
if (s->event_max <= 0) {
s->event_max = 0;
return; /* no more events */
}
s->io_q.in_flight -= s->event_max;
}
struct io_event *events;
/* Reschedule so nested event loops see currently pending completions */
qemu_bh_schedule(s->completion_bh);
/* Process completion events */
while (s->event_idx < s->event_max) {
struct iocb *iocb = s->events[s->event_idx].obj;
struct qemu_laiocb *laiocb =
while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
s->event_idx))) {
for (s->event_idx = 0; s->event_idx < s->event_max; ) {
struct iocb *iocb = events[s->event_idx].obj;
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);
laiocb->ret = io_event_ret(&s->events[s->event_idx]);
s->event_idx++;
laiocb->ret = io_event_ret(&events[s->event_idx]);
qemu_laio_process_completion(laiocb);
}
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
/* Change counters one-by-one because we can be nested. */
s->io_q.in_flight--;
s->event_idx++;
qemu_laio_process_completion(laiocb);
}
}
qemu_bh_cancel(s->completion_bh);
/* If we are nested we have to notify the level above that we are done
* by setting event_max to zero, upper level will then jump out of it's
* own `for` loop. If we are the last all counters droped to zero. */
s->event_max = 0;
s->event_idx = 0;
}
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
qemu_laio_process_completions(s);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}
}
static void qemu_laio_completion_bh(void *opaque)
{
LinuxAioState *s = opaque;
qemu_laio_process_completions_and_submit(s);
}
static void qemu_laio_completion_cb(EventNotifier *e)
@ -160,7 +248,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
LinuxAioState *s = container_of(e, LinuxAioState, e);
if (event_notifier_test_and_clear(&s->e)) {
qemu_laio_completion_bh(s);
qemu_laio_process_completions_and_submit(s);
}
}
@ -236,6 +324,19 @@ static void ioq_submit(LinuxAioState *s)
QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
} while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
s->io_q.blocked = (s->io_q.in_queue > 0);
if (s->io_q.in_flight) {
/* We can try to complete something just right away if there are
* still requests in-flight. */
qemu_laio_process_completions(s);
/*
* Even we have completed everything (in_flight == 0), the queue can
* have still pended requests (in_queue > 0). We do not attempt to
* repeat submission to avoid IO hang. The reason is simple: s->e is
* still set and completion callback will be called shortly and all
* pended requests will be submitted from there.
*/
}
}
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
@ -293,6 +394,7 @@ int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
.co = qemu_coroutine_self(),
.nbytes = qiov->size,
.ctx = s,
.ret = -EINPROGRESS,
.is_read = (type == QEMU_AIO_READ),
.qiov = qiov,
};
@ -302,7 +404,9 @@ int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
return ret;
}
qemu_coroutine_yield();
if (laiocb.ret == -EINPROGRESS) {
qemu_coroutine_yield();
}
return laiocb.ret;
}

View File

@ -916,7 +916,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
BlockCompletionFunc *cb,
void *opaque, Error **errp,
const BlockJobDriver *driver,
bool is_none_mode, BlockDriverState *base)
bool is_none_mode, BlockDriverState *base,
bool auto_complete)
{
MirrorBlockJob *s;
@ -952,6 +953,9 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
s->granularity = granularity;
s->buf_size = ROUND_UP(buf_size, granularity);
s->unmap = unmap;
if (auto_complete) {
s->should_complete = true;
}
s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp);
if (!s->dirty_bitmap) {
@ -990,14 +994,15 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
mirror_start_job(job_id, bs, target, replaces,
speed, granularity, buf_size, backing_mode,
on_source_error, on_target_error, unmap, cb, opaque, errp,
&mirror_job_driver, is_none_mode, base);
&mirror_job_driver, is_none_mode, base, false);
}
void commit_active_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, int64_t speed,
BlockdevOnError on_error,
BlockCompletionFunc *cb,
void *opaque, Error **errp)
void *opaque, Error **errp,
bool auto_complete)
{
int64_t length, base_length;
int orig_base_flags;
@ -1038,7 +1043,7 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
mirror_start_job(job_id, bs, base, NULL, speed, 0, 0,
MIRROR_LEAVE_BACKING_CHAIN,
on_error, on_error, false, cb, opaque, &local_err,
&commit_active_job_driver, false, base);
&commit_active_job_driver, false, base, auto_complete);
if (local_err) {
error_propagate(errp, local_err);
goto error_restore_flags;

View File

@ -83,7 +83,9 @@ int qcow2_grow_l1_table(BlockDriverState *bs, uint64_t min_size,
}
memset(new_l1_table, 0, align_offset(new_l1_size2, 512));
memcpy(new_l1_table, s->l1_table, s->l1_size * sizeof(uint64_t));
if (s->l1_size) {
memcpy(new_l1_table, s->l1_table, s->l1_size * sizeof(uint64_t));
}
/* write new table (align to cluster) */
BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ALLOC_TABLE);

View File

@ -1804,7 +1804,10 @@ static size_t header_ext_add(char *buf, uint32_t magic, const void *s,
.magic = cpu_to_be32(magic),
.len = cpu_to_be32(len),
};
memcpy(buf + sizeof(QCowExtension), s, len);
if (len) {
memcpy(buf + sizeof(QCowExtension), s, len);
}
return ext_len;
}

659
block/replication.c Normal file
View File

@ -0,0 +1,659 @@
/*
* Replication Block filter
*
* Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2016 Intel Corporation
* Copyright (c) 2016 FUJITSU LIMITED
*
* Author:
* Wen Congyang <wency@cn.fujitsu.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include "qemu-common.h"
#include "block/nbd.h"
#include "block/blockjob.h"
#include "block/block_int.h"
#include "block/block_backup.h"
#include "sysemu/block-backend.h"
#include "qapi/error.h"
#include "replication.h"
typedef struct BDRVReplicationState {
ReplicationMode mode;
int replication_state;
BdrvChild *active_disk;
BdrvChild *hidden_disk;
BdrvChild *secondary_disk;
char *top_id;
ReplicationState *rs;
Error *blocker;
int orig_hidden_flags;
int orig_secondary_flags;
int error;
} BDRVReplicationState;
enum {
BLOCK_REPLICATION_NONE, /* block replication is not started */
BLOCK_REPLICATION_RUNNING, /* block replication is running */
BLOCK_REPLICATION_FAILOVER, /* failover is running in background */
BLOCK_REPLICATION_FAILOVER_FAILED, /* failover failed */
BLOCK_REPLICATION_DONE, /* block replication is done */
};
static void replication_start(ReplicationState *rs, ReplicationMode mode,
Error **errp);
static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
static void replication_get_error(ReplicationState *rs, Error **errp);
static void replication_stop(ReplicationState *rs, bool failover,
Error **errp);
#define REPLICATION_MODE "mode"
#define REPLICATION_TOP_ID "top-id"
static QemuOptsList replication_runtime_opts = {
.name = "replication",
.head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
.desc = {
{
.name = REPLICATION_MODE,
.type = QEMU_OPT_STRING,
},
{
.name = REPLICATION_TOP_ID,
.type = QEMU_OPT_STRING,
},
{ /* end of list */ }
},
};
static ReplicationOps replication_ops = {
.start = replication_start,
.checkpoint = replication_do_checkpoint,
.get_error = replication_get_error,
.stop = replication_stop,
};
static int replication_open(BlockDriverState *bs, QDict *options,
int flags, Error **errp)
{
int ret;
BDRVReplicationState *s = bs->opaque;
Error *local_err = NULL;
QemuOpts *opts = NULL;
const char *mode;
const char *top_id;
ret = -EINVAL;
opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
qemu_opts_absorb_qdict(opts, options, &local_err);
if (local_err) {
goto fail;
}
mode = qemu_opt_get(opts, REPLICATION_MODE);
if (!mode) {
error_setg(&local_err, "Missing the option mode");
goto fail;
}
if (!strcmp(mode, "primary")) {
s->mode = REPLICATION_MODE_PRIMARY;
} else if (!strcmp(mode, "secondary")) {
s->mode = REPLICATION_MODE_SECONDARY;
top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
s->top_id = g_strdup(top_id);
if (!s->top_id) {
error_setg(&local_err, "Missing the option top-id");
goto fail;
}
} else {
error_setg(&local_err,
"The option mode's value should be primary or secondary");
goto fail;
}
s->rs = replication_new(bs, &replication_ops);
ret = 0;
fail:
qemu_opts_del(opts);
error_propagate(errp, local_err);
return ret;
}
static void replication_close(BlockDriverState *bs)
{
BDRVReplicationState *s = bs->opaque;
if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
replication_stop(s->rs, false, NULL);
}
if (s->mode == REPLICATION_MODE_SECONDARY) {
g_free(s->top_id);
}
replication_remove(s->rs);
}
static int64_t replication_getlength(BlockDriverState *bs)
{
return bdrv_getlength(bs->file->bs);
}
static int replication_get_io_status(BDRVReplicationState *s)
{
switch (s->replication_state) {
case BLOCK_REPLICATION_NONE:
return -EIO;
case BLOCK_REPLICATION_RUNNING:
return 0;
case BLOCK_REPLICATION_FAILOVER:
return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
case BLOCK_REPLICATION_FAILOVER_FAILED:
return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
case BLOCK_REPLICATION_DONE:
/*
* active commit job completes, and active disk and secondary_disk
* is swapped, so we can operate bs->file directly
*/
return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
default:
abort();
}
}
static int replication_return_value(BDRVReplicationState *s, int ret)
{
if (s->mode == REPLICATION_MODE_SECONDARY) {
return ret;
}
if (ret < 0) {
s->error = ret;
ret = 0;
}
return ret;
}
static coroutine_fn int replication_co_readv(BlockDriverState *bs,
int64_t sector_num,
int remaining_sectors,
QEMUIOVector *qiov)
{
BDRVReplicationState *s = bs->opaque;
BdrvChild *child = s->secondary_disk;
BlockJob *job = NULL;
CowRequest req;
int ret;
if (s->mode == REPLICATION_MODE_PRIMARY) {
/* We only use it to forward primary write requests */
return -EIO;
}
ret = replication_get_io_status(s);
if (ret < 0) {
return ret;
}
if (child && child->bs) {
job = child->bs->job;
}
if (job) {
backup_wait_for_overlapping_requests(child->bs->job, sector_num,
remaining_sectors);
backup_cow_request_begin(&req, child->bs->job, sector_num,
remaining_sectors);
ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors,
qiov);
backup_cow_request_end(&req);
goto out;
}
ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors, qiov);
out:
return replication_return_value(s, ret);
}
static coroutine_fn int replication_co_writev(BlockDriverState *bs,
int64_t sector_num,
int remaining_sectors,
QEMUIOVector *qiov)
{
BDRVReplicationState *s = bs->opaque;
QEMUIOVector hd_qiov;
uint64_t bytes_done = 0;
BdrvChild *top = bs->file;
BdrvChild *base = s->secondary_disk;
BdrvChild *target;
int ret, n;
ret = replication_get_io_status(s);
if (ret < 0) {
goto out;
}
if (ret == 0) {
ret = bdrv_co_writev(top, sector_num,
remaining_sectors, qiov);
return replication_return_value(s, ret);
}
/*
* Failover failed, only write to active disk if the sectors
* have already been allocated in active disk/hidden disk.
*/
qemu_iovec_init(&hd_qiov, qiov->niov);
while (remaining_sectors > 0) {
ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
remaining_sectors, &n);
if (ret < 0) {
goto out1;
}
qemu_iovec_reset(&hd_qiov);
qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
target = ret ? top : base;
ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
if (ret < 0) {
goto out1;
}
remaining_sectors -= n;
sector_num += n;
bytes_done += n * BDRV_SECTOR_SIZE;
}
out1:
qemu_iovec_destroy(&hd_qiov);
out:
return ret;
}
static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
BlockDriverState *candidate)
{
return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
}
static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
{
Error *local_err = NULL;
int ret;
if (!s->secondary_disk->bs->job) {
error_setg(errp, "Backup job was cancelled unexpectedly");
return;
}
backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
if (ret < 0) {
error_setg(errp, "Cannot make active disk empty");
return;
}
ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
if (ret < 0) {
error_setg(errp, "Cannot make hidden disk empty");
return;
}
}
static void reopen_backing_file(BDRVReplicationState *s, bool writable,
Error **errp)
{
BlockReopenQueue *reopen_queue = NULL;
int orig_hidden_flags, orig_secondary_flags;
int new_hidden_flags, new_secondary_flags;
Error *local_err = NULL;
if (writable) {
orig_hidden_flags = s->orig_hidden_flags =
bdrv_get_flags(s->hidden_disk->bs);
new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
~BDRV_O_INACTIVE;
orig_secondary_flags = s->orig_secondary_flags =
bdrv_get_flags(s->secondary_disk->bs);
new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
~BDRV_O_INACTIVE;
} else {
orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
~BDRV_O_INACTIVE;
new_hidden_flags = s->orig_hidden_flags;
orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
~BDRV_O_INACTIVE;
new_secondary_flags = s->orig_secondary_flags;
}
if (orig_hidden_flags != new_hidden_flags) {
reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
new_hidden_flags);
}
if (!(orig_secondary_flags & BDRV_O_RDWR)) {
reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
NULL, new_secondary_flags);
}
if (reopen_queue) {
bdrv_reopen_multiple(reopen_queue, &local_err);
error_propagate(errp, local_err);
}
}
static void backup_job_cleanup(BDRVReplicationState *s)
{
BlockDriverState *top_bs;
top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
if (!top_bs) {
return;
}
bdrv_op_unblock_all(top_bs, s->blocker);
error_free(s->blocker);
reopen_backing_file(s, false, NULL);
}
static void backup_job_completed(void *opaque, int ret)
{
BDRVReplicationState *s = opaque;
if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
/* The backup job is cancelled unexpectedly */
s->error = -EIO;
}
backup_job_cleanup(s);
}
static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
{
BdrvChild *child;
/* The bs itself is the top_bs */
if (top_bs == bs) {
return true;
}
/* Iterate over top_bs's children */
QLIST_FOREACH(child, &top_bs->children, next) {
if (child->bs == bs || check_top_bs(child->bs, bs)) {
return true;
}
}
return false;
}
static void replication_start(ReplicationState *rs, ReplicationMode mode,
Error **errp)
{
BlockDriverState *bs = rs->opaque;
BDRVReplicationState *s;
BlockDriverState *top_bs;
int64_t active_length, hidden_length, disk_length;
AioContext *aio_context;
Error *local_err = NULL;
aio_context = bdrv_get_aio_context(bs);
aio_context_acquire(aio_context);
s = bs->opaque;
if (s->replication_state != BLOCK_REPLICATION_NONE) {
error_setg(errp, "Block replication is running or done");
aio_context_release(aio_context);
return;
}
if (s->mode != mode) {
error_setg(errp, "The parameter mode's value is invalid, needs %d,"
" but got %d", s->mode, mode);
aio_context_release(aio_context);
return;
}
switch (s->mode) {
case REPLICATION_MODE_PRIMARY:
break;
case REPLICATION_MODE_SECONDARY:
s->active_disk = bs->file;
if (!s->active_disk || !s->active_disk->bs ||
!s->active_disk->bs->backing) {
error_setg(errp, "Active disk doesn't have backing file");
aio_context_release(aio_context);
return;
}
s->hidden_disk = s->active_disk->bs->backing;
if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
error_setg(errp, "Hidden disk doesn't have backing file");
aio_context_release(aio_context);
return;
}
s->secondary_disk = s->hidden_disk->bs->backing;
if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
error_setg(errp, "The secondary disk doesn't have block backend");
aio_context_release(aio_context);
return;
}
/* verify the length */
active_length = bdrv_getlength(s->active_disk->bs);
hidden_length = bdrv_getlength(s->hidden_disk->bs);
disk_length = bdrv_getlength(s->secondary_disk->bs);
if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
active_length != hidden_length || hidden_length != disk_length) {
error_setg(errp, "Active disk, hidden disk, secondary disk's length"
" are not the same");
aio_context_release(aio_context);
return;
}
if (!s->active_disk->bs->drv->bdrv_make_empty ||
!s->hidden_disk->bs->drv->bdrv_make_empty) {
error_setg(errp,
"Active disk or hidden disk doesn't support make_empty");
aio_context_release(aio_context);
return;
}
/* reopen the backing file in r/w mode */
reopen_backing_file(s, true, &local_err);
if (local_err) {
error_propagate(errp, local_err);
aio_context_release(aio_context);
return;
}
/* start backup job now */
error_setg(&s->blocker,
"Block device is in use by internal backup job");
top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
if (!top_bs || !bdrv_is_root_node(top_bs) ||
!check_top_bs(top_bs, bs)) {
error_setg(errp, "No top_bs or it is invalid");
reopen_backing_file(s, false, NULL);
aio_context_release(aio_context);
return;
}
bdrv_op_block_all(top_bs, s->blocker);
bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
backup_start("replication-backup", s->secondary_disk->bs,
s->hidden_disk->bs, 0, MIRROR_SYNC_MODE_NONE, NULL, false,
BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
backup_job_completed, s, NULL, &local_err);
if (local_err) {
error_propagate(errp, local_err);
backup_job_cleanup(s);
aio_context_release(aio_context);
return;
}
break;
default:
aio_context_release(aio_context);
abort();
}
s->replication_state = BLOCK_REPLICATION_RUNNING;
if (s->mode == REPLICATION_MODE_SECONDARY) {
secondary_do_checkpoint(s, errp);
}
s->error = 0;
aio_context_release(aio_context);
}
static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
{
BlockDriverState *bs = rs->opaque;
BDRVReplicationState *s;
AioContext *aio_context;
aio_context = bdrv_get_aio_context(bs);
aio_context_acquire(aio_context);
s = bs->opaque;
if (s->mode == REPLICATION_MODE_SECONDARY) {
secondary_do_checkpoint(s, errp);
}
aio_context_release(aio_context);
}
static void replication_get_error(ReplicationState *rs, Error **errp)
{
BlockDriverState *bs = rs->opaque;
BDRVReplicationState *s;
AioContext *aio_context;
aio_context = bdrv_get_aio_context(bs);
aio_context_acquire(aio_context);
s = bs->opaque;
if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
error_setg(errp, "Block replication is not running");
aio_context_release(aio_context);
return;
}
if (s->error) {
error_setg(errp, "I/O error occurred");
aio_context_release(aio_context);
return;
}
aio_context_release(aio_context);
}
static void replication_done(void *opaque, int ret)
{
BlockDriverState *bs = opaque;
BDRVReplicationState *s = bs->opaque;
if (ret == 0) {
s->replication_state = BLOCK_REPLICATION_DONE;
/* refresh top bs's filename */
bdrv_refresh_filename(bs);
s->active_disk = NULL;
s->secondary_disk = NULL;
s->hidden_disk = NULL;
s->error = 0;
} else {
s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
s->error = -EIO;
}
}
static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
{
BlockDriverState *bs = rs->opaque;
BDRVReplicationState *s;
AioContext *aio_context;
aio_context = bdrv_get_aio_context(bs);
aio_context_acquire(aio_context);
s = bs->opaque;
if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
error_setg(errp, "Block replication is not running");
aio_context_release(aio_context);
return;
}
switch (s->mode) {
case REPLICATION_MODE_PRIMARY:
s->replication_state = BLOCK_REPLICATION_DONE;
s->error = 0;
break;
case REPLICATION_MODE_SECONDARY:
/*
* This BDS will be closed, and the job should be completed
* before the BDS is closed, because we will access hidden
* disk, secondary disk in backup_job_completed().
*/
if (s->secondary_disk->bs->job) {
block_job_cancel_sync(s->secondary_disk->bs->job);
}
if (!failover) {
secondary_do_checkpoint(s, errp);
s->replication_state = BLOCK_REPLICATION_DONE;
aio_context_release(aio_context);
return;
}
s->replication_state = BLOCK_REPLICATION_FAILOVER;
commit_active_start("replication-commit", s->active_disk->bs,
s->secondary_disk->bs, 0, BLOCKDEV_ON_ERROR_REPORT,
replication_done,
bs, errp, true);
break;
default:
aio_context_release(aio_context);
abort();
}
aio_context_release(aio_context);
}
BlockDriver bdrv_replication = {
.format_name = "replication",
.protocol_name = "replication",
.instance_size = sizeof(BDRVReplicationState),
.bdrv_open = replication_open,
.bdrv_close = replication_close,
.bdrv_getlength = replication_getlength,
.bdrv_co_readv = replication_co_readv,
.bdrv_co_writev = replication_co_writev,
.is_filter = true,
.bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
.has_variable_length = true,
};
static void bdrv_replication_init(void)
{
bdrv_register(&bdrv_replication);
}
block_init(bdrv_replication_init);

View File

@ -3090,7 +3090,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
goto out;
}
commit_active_start(has_job_id ? job_id : NULL, bs, base_bs, speed,
on_error, block_job_cb, bs, &local_err);
on_error, block_job_cb, bs, &local_err, false);
} else {
commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, speed,
on_error, block_job_cb, bs,

11
configure vendored
View File

@ -321,6 +321,7 @@ vhdx=""
numa=""
tcmalloc="no"
jemalloc="no"
replication="yes"
# parse CC options first
for opt do
@ -1156,6 +1157,10 @@ for opt do
;;
--enable-jemalloc) jemalloc="yes"
;;
--disable-replication) replication="no"
;;
--enable-replication) replication="yes"
;;
*)
echo "ERROR: unknown option $opt"
echo "Try '$0 --help' for more information"
@ -1386,6 +1391,7 @@ disabled with --disable-FEATURE, default is enabled if available:
numa libnuma support
tcmalloc tcmalloc support
jemalloc jemalloc support
replication replication support
NOTE: The object files are built at the place where configure is launched
EOF
@ -4927,6 +4933,7 @@ echo "NUMA host support $numa"
echo "tcmalloc support $tcmalloc"
echo "jemalloc support $jemalloc"
echo "avx2 optimization $avx2_opt"
echo "replication support $replication"
if test "$sdl_too_old" = "yes"; then
echo "-> Your SDL version is too old - please upgrade to have SDL support"
@ -5507,6 +5514,10 @@ if test "$have_rtnetlink" = "yes" ; then
echo "CONFIG_RTNETLINK=y" >> $config_host_mak
fi
if test "$replication" = "yes" ; then
echo "CONFIG_REPLICATION=y" >> $config_host_mak
fi
# Hold two types of flag:
# CONFIG_THREAD_SETNAME_BYTHREAD - we've got a way of setting the name on
# a thread we have a handle to

239
docs/block-replication.txt Normal file
View File

@ -0,0 +1,239 @@
Block replication
----------------------------------------
Copyright Fujitsu, Corp. 2016
Copyright (c) 2016 Intel Corporation
Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
This work is licensed under the terms of the GNU GPL, version 2 or later.
See the COPYING file in the top-level directory.
Block replication is used for continuous checkpoints. It is designed
for COLO (COarse-grain LOck-stepping) where the Secondary VM is running.
It can also be applied for FT/HA (Fault-tolerance/High Assurance) scenario,
where the Secondary VM is not running.
This document gives an overview of block replication's design.
== Background ==
High availability solutions such as micro checkpoint and COLO will do
consecutive checkpoints. The VM state of the Primary and Secondary VM is
identical right after a VM checkpoint, but becomes different as the VM
executes till the next checkpoint. To support disk contents checkpoint,
the modified disk contents in the Secondary VM must be buffered, and are
only dropped at next checkpoint time. To reduce the network transportation
effort during a vmstate checkpoint, the disk modification operations of
the Primary disk are asynchronously forwarded to the Secondary node.
== Workflow ==
The following is the image of block replication workflow:
+----------------------+ +------------------------+
|Primary Write Requests| |Secondary Write Requests|
+----------------------+ +------------------------+
| |
| (4)
| V
| /-------------\
| Copy and Forward | |
|---------(1)----------+ | Disk Buffer |
| | | |
| (3) \-------------/
| speculative ^
| write through (2)
| | |
V V |
+--------------+ +----------------+
| Primary Disk | | Secondary Disk |
+--------------+ +----------------+
1) Primary write requests will be copied and forwarded to Secondary
QEMU.
2) Before Primary write requests are written to Secondary disk, the
original sector content will be read from Secondary disk and
buffered in the Disk buffer, but it will not overwrite the existing
sector content (it could be from either "Secondary Write Requests" or
previous COW of "Primary Write Requests") in the Disk buffer.
3) Primary write requests will be written to Secondary disk.
4) Secondary write requests will be buffered in the Disk buffer and it
will overwrite the existing sector content in the buffer.
== Architecture ==
We are going to implement block replication from many basic
blocks that are already in QEMU.
virtio-blk ||
^ || .----------
| || | Secondary
1 Quorum || '----------
/ \ ||
/ \ ||
Primary 2 filter
disk ^ virtio-blk
| ^
3 NBD -------> 3 NBD |
client || server 2 filter
|| ^ ^
--------. || | |
Primary | || Secondary disk <--------- hidden-disk 5 <--------- active-disk 4
--------' || | backing ^ backing
|| | |
|| | |
|| '-------------------------'
|| drive-backup sync=none 6
1) The disk on the primary is represented by a block device with two
children, providing replication between a primary disk and the host that
runs the secondary VM. The read pattern (fifo) for quorum can be extended
to make the primary always read from the local disk instead of going through
NBD.
2) The new block filter (the name is replication) will control the block
replication.
3) The secondary disk receives writes from the primary VM through QEMU's
embedded NBD server (speculative write-through).
4) The disk on the secondary is represented by a custom block device
(called active-disk). It should start as an empty disk, and the format
should support bdrv_make_empty() and backing file.
5) The hidden-disk is created automatically. It buffers the original content
that is modified by the primary VM. It should also start as an empty disk,
and the driver supports bdrv_make_empty() and backing file.
6) The drive-backup job (sync=none) is run to allow hidden-disk to buffer
any state that would otherwise be lost by the speculative write-through
of the NBD server into the secondary disk. So before block replication,
the primary disk and secondary disk should contain the same data.
== Failure Handling ==
There are 7 internal errors when block replication is running:
1. I/O error on primary disk
2. Forwarding primary write requests failed
3. Backup failed
4. I/O error on secondary disk
5. I/O error on active disk
6. Making active disk or hidden disk empty failed
7. Doing failover failed
In case 1 and 5, we just report the error to the disk layer. In case 2, 3,
4 and 6, we just report block replication's error to FT/HA manager (which
decides when to do a new checkpoint, when to do failover).
In case 7, if active commit failed, we use replication failover failed state
in Secondary's write operation (what decides which target to write).
== New block driver interface ==
We add four block driver interfaces to control block replication:
a. replication_start_all()
Start block replication, called in migration/checkpoint thread.
We must call block_replication_start_all() in secondary QEMU before
calling block_replication_start_all() in primary QEMU. The caller
must hold the I/O mutex lock if it is in migration/checkpoint
thread.
b. replication_do_checkpoint_all()
This interface is called after all VM state is transferred to
Secondary QEMU. The Disk buffer will be dropped in this interface.
The caller must hold the I/O mutex lock if it is in migration/checkpoint
thread.
c. replication_get_error_all()
This interface is called to check if error happened in replication.
The caller must hold the I/O mutex lock if it is in migration/checkpoint
thread.
d. replication_stop_all()
It is called on failover. We will flush the Disk buffer into
Secondary Disk and stop block replication. The vm should be stopped
before calling it if you use this API to shutdown the guest, or other
things except failover. The caller must hold the I/O mutex lock if it is
in migration/checkpoint thread.
== Usage ==
Primary:
-drive if=xxx,driver=quorum,read-pattern=fifo,id=colo1,vote-threshold=1,\
children.0.file.filename=1.raw,\
children.0.driver=raw
Run qmp command in primary qemu:
{ 'execute': 'human-monitor-command',
'arguments': {
'command-line': 'drive_add -n buddy driver=replication,mode=primary,file.driver=nbd,file.host=xxxx,file.port=xxxx,file.export=colo1,node-name=nbd_client1'
}
}
{ 'execute': 'x-blockdev-change',
'arguments': {
'parent': 'colo1',
'node': 'nbd_client1'
}
}
Note:
1. There should be only one NBD Client for each primary disk.
2. host is the secondary physical machine's hostname or IP
3. Each disk must have its own export name.
4. It is all a single argument to -drive and you should ignore the
leading whitespace.
5. The qmp command line must be run after running qmp command line in
secondary qemu.
6. After failover we need remove children.1 (replication driver).
Secondary:
-drive if=none,driver=raw,file.filename=1.raw,id=colo1 \
-drive if=xxx,id=topxxx,driver=replication,mode=secondary,top-id=topxxx\
file.file.filename=active_disk.qcow2,\
file.driver=qcow2,\
file.backing.file.filename=hidden_disk.qcow2,\
file.backing.driver=qcow2,\
file.backing.backing=colo1
Then run qmp command in secondary qemu:
{ 'execute': 'nbd-server-start',
'arguments': {
'addr': {
'type': 'inet',
'data': {
'host': 'xxx',
'port': 'xxx'
}
}
}
}
{ 'execute': 'nbd-server-add',
'arguments': {
'device': 'colo1',
'writable': true
}
}
Note:
1. The export name in secondary QEMU command line is the secondary
disk's id.
2. The export name for the same disk must be the same
3. The qmp command nbd-server-start and nbd-server-add must be run
before running the qmp command migrate on primary QEMU
4. Active disk, hidden disk and nbd target's length should be the
same.
5. It is better to put active disk and hidden disk in ramdisk.
6. It is all a single argument to -drive, and you should ignore
the leading whitespace.
After Failover:
Primary:
The secondary host is down, so we should run the following qmp command
to remove the nbd child from the quorum:
{ 'execute': 'x-blockdev-change',
'arguments': {
'parent': 'colo1',
'child': 'children.1'
}
}
{ 'execute': 'human-monitor-command',
'arguments': {
'command-line': 'drive_del xxxx'
}
}
Note: there is no qmp command to remove the blockdev now
Secondary:
The primary host is down, so we should do the following thing:
{ 'execute': 'nbd-server-stop' }
TODO:
1. Continuous block replication
2. Shared disk

View File

@ -992,7 +992,7 @@ static void virtio_blk_class_init(ObjectClass *klass, void *data)
vdc->load = virtio_blk_load_device;
}
static const TypeInfo virtio_device_info = {
static const TypeInfo virtio_blk_info = {
.name = TYPE_VIRTIO_BLK,
.parent = TYPE_VIRTIO_DEVICE,
.instance_size = sizeof(VirtIOBlock),
@ -1002,7 +1002,7 @@ static const TypeInfo virtio_device_info = {
static void virtio_register_types(void)
{
type_register_static(&virtio_device_info);
type_register_static(&virtio_blk_info);
}
type_init(virtio_register_types)

View File

@ -0,0 +1,39 @@
/*
* QEMU backup
*
* Copyright (c) 2013 Proxmox Server Solutions
* Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2016 Intel Corporation
* Copyright (c) 2016 FUJITSU LIMITED
*
* Authors:
* Dietmar Maurer <dietmar@proxmox.com>
* Changlong Xie <xiecl.fnst@cn.fujitsu.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*
*/
#ifndef BLOCK_BACKUP_H
#define BLOCK_BACKUP_H
#include "block/block_int.h"
typedef struct CowRequest {
int64_t start;
int64_t end;
QLIST_ENTRY(CowRequest) list;
CoQueue wait_queue; /* coroutines blocked on this request */
} CowRequest;
void backup_wait_for_overlapping_requests(BlockJob *job, int64_t sector_num,
int nb_sectors);
void backup_cow_request_begin(CowRequest *req, BlockJob *job,
int64_t sector_num,
int nb_sectors);
void backup_cow_request_end(CowRequest *req);
void backup_do_checkpoint(BlockJob *job, Error **errp);
#endif

View File

@ -702,13 +702,14 @@ void commit_start(const char *job_id, BlockDriverState *bs,
* @cb: Completion function for the job.
* @opaque: Opaque pointer value passed to @cb.
* @errp: Error object.
* @auto_complete: Auto complete the job.
*
*/
void commit_active_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, int64_t speed,
BlockdevOnError on_error,
BlockCompletionFunc *cb,
void *opaque, Error **errp);
void *opaque, Error **errp, bool auto_complete);
/*
* mirror_start:
* @job_id: The id of the newly-created job, or %NULL to use the

View File

@ -35,5 +35,6 @@ typedef struct {
char *iothread_get_id(IOThread *iothread);
AioContext *iothread_get_aio_context(IOThread *iothread);
void iothread_stop_all(void);
#endif /* IOTHREAD_H */

View File

@ -54,16 +54,25 @@ static void *iothread_run(void *opaque)
return NULL;
}
static void iothread_instance_finalize(Object *obj)
static int iothread_stop(Object *object, void *opaque)
{
IOThread *iothread = IOTHREAD(obj);
IOThread *iothread;
if (!iothread->ctx) {
return;
iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
if (!iothread || !iothread->ctx) {
return 0;
}
iothread->stopping = true;
aio_notify(iothread->ctx);
qemu_thread_join(&iothread->thread);
return 0;
}
static void iothread_instance_finalize(Object *obj)
{
IOThread *iothread = IOTHREAD(obj);
iothread_stop(obj, NULL);
qemu_cond_destroy(&iothread->init_done_cond);
qemu_mutex_destroy(&iothread->init_done_lock);
aio_context_unref(iothread->ctx);
@ -174,3 +183,10 @@ IOThreadInfoList *qmp_query_iothreads(Error **errp)
object_child_foreach(container, query_one_iothread, &prev);
return head;
}
void iothread_stop_all(void)
{
Object *container = object_get_objects_root();
object_child_foreach(container, iothread_stop, NULL);
}

View File

@ -252,6 +252,7 @@
# 2.3: 'host_floppy' deprecated
# 2.5: 'host_floppy' dropped
# 2.6: 'luks' added
# 2.8: 'replication' added
#
# @backing_file: #optional the name of the backing file (for copy-on-write)
#
@ -1712,8 +1713,8 @@
'data': [ 'archipelago', 'blkdebug', 'blkverify', 'bochs', 'cloop',
'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom',
'host_device', 'http', 'https', 'luks', 'null-aio', 'null-co',
'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw', 'tftp',
'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] }
'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw',
'replication', 'tftp', 'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] }
##
# @BlockdevOptionsFile
@ -2177,6 +2178,36 @@
'*debug-level': 'int',
'*logfile': 'str' } }
##
# @ReplicationMode
#
# An enumeration of replication modes.
#
# @primary: Primary mode, the vm's state will be sent to secondary QEMU.
#
# @secondary: Secondary mode, receive the vm's state from primary QEMU.
#
# Since: 2.8
##
{ 'enum' : 'ReplicationMode', 'data' : [ 'primary', 'secondary' ] }
##
# @BlockdevOptionsReplication
#
# Driver specific block device options for replication
#
# @mode: the replication mode
#
# @top-id: #optional In secondary mode, node name or device ID of the root
# node who owns the replication node chain. Ignored in primary mode.
#
# Since: 2.8
##
{ 'struct': 'BlockdevOptionsReplication',
'base': 'BlockdevOptionsGenericFormat',
'data': { 'mode': 'ReplicationMode',
'*top-id': 'str' } }
##
# @BlockdevOptions
#
@ -2242,6 +2273,7 @@
'quorum': 'BlockdevOptionsQuorum',
'raw': 'BlockdevOptionsGenericFormat',
# TODO rbd: Wait for structured options
'replication':'BlockdevOptionsReplication',
# TODO sheepdog: Wait for structured options
# TODO ssh: Should take InetSocketAddress for 'host'?
'tftp': 'BlockdevOptionsFile',

View File

@ -921,7 +921,7 @@ static int img_commit(int argc, char **argv)
};
commit_active_start("commit", bs, base_bs, 0, BLOCKDEV_ON_ERROR_REPORT,
common_block_job_cb, &cbi, &local_err);
common_block_job_cb, &cbi, &local_err, false);
if (local_err) {
goto done;
}

107
replication.c Normal file
View File

@ -0,0 +1,107 @@
/*
* Replication filter
*
* Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2016 Intel Corporation
* Copyright (c) 2016 FUJITSU LIMITED
*
* Author:
* Changlong Xie <xiecl.fnst@cn.fujitsu.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "replication.h"
static QLIST_HEAD(, ReplicationState) replication_states;
ReplicationState *replication_new(void *opaque, ReplicationOps *ops)
{
ReplicationState *rs;
assert(ops != NULL);
rs = g_new0(ReplicationState, 1);
rs->opaque = opaque;
rs->ops = ops;
QLIST_INSERT_HEAD(&replication_states, rs, node);
return rs;
}
void replication_remove(ReplicationState *rs)
{
if (rs) {
QLIST_REMOVE(rs, node);
g_free(rs);
}
}
/*
* The caller of the function MUST make sure vm stopped
*/
void replication_start_all(ReplicationMode mode, Error **errp)
{
ReplicationState *rs, *next;
Error *local_err = NULL;
QLIST_FOREACH_SAFE(rs, &replication_states, node, next) {
if (rs->ops && rs->ops->start) {
rs->ops->start(rs, mode, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
return;
}
}
}
void replication_do_checkpoint_all(Error **errp)
{
ReplicationState *rs, *next;
Error *local_err = NULL;
QLIST_FOREACH_SAFE(rs, &replication_states, node, next) {
if (rs->ops && rs->ops->checkpoint) {
rs->ops->checkpoint(rs, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
return;
}
}
}
void replication_get_error_all(Error **errp)
{
ReplicationState *rs, *next;
Error *local_err = NULL;
QLIST_FOREACH_SAFE(rs, &replication_states, node, next) {
if (rs->ops && rs->ops->get_error) {
rs->ops->get_error(rs, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
return;
}
}
}
void replication_stop_all(bool failover, Error **errp)
{
ReplicationState *rs, *next;
Error *local_err = NULL;
QLIST_FOREACH_SAFE(rs, &replication_states, node, next) {
if (rs->ops && rs->ops->stop) {
rs->ops->stop(rs, failover, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
return;
}
}
}

174
replication.h Normal file
View File

@ -0,0 +1,174 @@
/*
* Replication filter
*
* Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2016 Intel Corporation
* Copyright (c) 2016 FUJITSU LIMITED
*
* Author:
* Changlong Xie <xiecl.fnst@cn.fujitsu.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#ifndef REPLICATION_H
#define REPLICATION_H
#include "qemu/queue.h"
typedef struct ReplicationOps ReplicationOps;
typedef struct ReplicationState ReplicationState;
/**
* SECTION:replication.h
* @title:Base Replication System
* @short_description: interfaces for handling replication
*
* The Replication Model provides a framework for handling Replication
*
* <example>
* <title>How to use replication interfaces</title>
* <programlisting>
* #include "replication.h"
*
* typedef struct BDRVReplicationState {
* ReplicationState *rs;
* } BDRVReplicationState;
*
* static void replication_start(ReplicationState *rs, ReplicationMode mode,
* Error **errp);
* static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
* static void replication_get_error(ReplicationState *rs, Error **errp);
* static void replication_stop(ReplicationState *rs, bool failover,
* Error **errp);
*
* static ReplicationOps replication_ops = {
* .start = replication_start,
* .checkpoint = replication_do_checkpoint,
* .get_error = replication_get_error,
* .stop = replication_stop,
* }
*
* static int replication_open(BlockDriverState *bs, QDict *options,
* int flags, Error **errp)
* {
* BDRVReplicationState *s = bs->opaque;
* s->rs = replication_new(bs, &replication_ops);
* return 0;
* }
*
* static void replication_close(BlockDriverState *bs)
* {
* BDRVReplicationState *s = bs->opaque;
* replication_remove(s->rs);
* }
*
* BlockDriver bdrv_replication = {
* .format_name = "replication",
* .protocol_name = "replication",
* .instance_size = sizeof(BDRVReplicationState),
*
* .bdrv_open = replication_open,
* .bdrv_close = replication_close,
* };
*
* static void bdrv_replication_init(void)
* {
* bdrv_register(&bdrv_replication);
* }
*
* block_init(bdrv_replication_init);
* </programlisting>
* </example>
*
* We create an example about how to use replication interfaces in above.
* Then in migration, we can use replication_(start/stop/do_checkpoint/
* get_error)_all to handle all replication operations.
*/
/**
* ReplicationState:
* @opaque: opaque pointer value passed to this ReplicationState
* @ops: replication operation of this ReplicationState
* @node: node that we will insert into @replication_states QLIST
*/
struct ReplicationState {
void *opaque;
ReplicationOps *ops;
QLIST_ENTRY(ReplicationState) node;
};
/**
* ReplicationOps:
* @start: callback to start replication
* @stop: callback to stop replication
* @checkpoint: callback to do checkpoint
* @get_error: callback to check if error occurred during replication
*/
struct ReplicationOps {
void (*start)(ReplicationState *rs, ReplicationMode mode, Error **errp);
void (*stop)(ReplicationState *rs, bool failover, Error **errp);
void (*checkpoint)(ReplicationState *rs, Error **errp);
void (*get_error)(ReplicationState *rs, Error **errp);
};
/**
* replication_new:
* @opaque: opaque pointer value passed to ReplicationState
* @ops: replication operation of the new relevant ReplicationState
*
* Called to create a new ReplicationState instance, and then insert it
* into @replication_states QLIST
*
* Returns: the new ReplicationState instance
*/
ReplicationState *replication_new(void *opaque, ReplicationOps *ops);
/**
* replication_remove:
* @rs: the ReplicationState instance to remove
*
* Called to remove a ReplicationState instance, and then delete it from
* @replication_states QLIST
*/
void replication_remove(ReplicationState *rs);
/**
* replication_start_all:
* @mode: replication mode that could be "primary" or "secondary"
* @errp: returns an error if this function fails
*
* Start replication, called in migration/checkpoint thread
*
* Note: the caller of the function MUST make sure vm stopped
*/
void replication_start_all(ReplicationMode mode, Error **errp);
/**
* replication_do_checkpoint_all:
* @errp: returns an error if this function fails
*
* This interface is called after all VM state is transferred to Secondary QEMU
*/
void replication_do_checkpoint_all(Error **errp);
/**
* replication_get_error_all:
* @errp: returns an error if this function fails
*
* This interface is called to check if error occurred during replication
*/
void replication_get_error_all(Error **errp);
/**
* replication_stop_all:
* @failover: boolean value that indicates if we need do failover or not
* @errp: returns an error if this function fails
*
* It is called on failover. The vm should be stopped before calling it, if you
* use this API to shutdown the guest, or other things except failover
*/
void replication_stop_all(bool failover, Error **errp);
#endif /* REPLICATION_H */

1
tests/.gitignore vendored
View File

@ -63,6 +63,7 @@ test-qmp-introspect.[ch]
test-qmp-marshal.c
test-qmp-output-visitor
test-rcu-list
test-replication
test-rfifolock
test-string-input-visitor
test-string-output-visitor

View File

@ -112,6 +112,7 @@ check-unit-y += tests/test-crypto-xts$(EXESUF)
check-unit-y += tests/test-crypto-block$(EXESUF)
gcov-files-test-logging-y = tests/test-logging.c
check-unit-y += tests/test-logging$(EXESUF)
check-unit-$(CONFIG_REPLICATION) += tests/test-replication$(EXESUF)
check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
@ -502,6 +503,9 @@ tests/test-base64$(EXESUF): tests/test-base64.o \
tests/test-logging$(EXESUF): tests/test-logging.o $(test-util-obj-y)
tests/test-replication$(EXESUF): tests/test-replication.o $(test-util-obj-y) \
$(test-block-obj-y)
tests/test-qapi-types.c tests/test-qapi-types.h :\
$(SRC_PATH)/tests/qapi-schema/qapi-schema-test.json $(SRC_PATH)/scripts/qapi-types.py $(qapi-py)
$(call quiet-command,$(PYTHON) $(SRC_PATH)/scripts/qapi-types.py \

View File

@ -257,16 +257,16 @@ void qvirtqueue_kick(const QVirtioBus *bus, QVirtioDevice *d, QVirtQueue *vq,
uint32_t free_head)
{
/* vq->avail->idx */
uint16_t idx = readl(vq->avail + 2);
uint16_t idx = readw(vq->avail + 2);
/* vq->used->flags */
uint16_t flags;
/* vq->used->avail_event */
uint16_t avail_event;
/* vq->avail->ring[idx % vq->size] */
writel(vq->avail + 4 + (2 * (idx % vq->size)), free_head);
writew(vq->avail + 4 + (2 * (idx % vq->size)), free_head);
/* vq->avail->idx */
writel(vq->avail + 2, idx + 1);
writew(vq->avail + 2, idx + 1);
/* Must read after idx is updated */
flags = readw(vq->avail);

575
tests/test-replication.c Normal file
View File

@ -0,0 +1,575 @@
/*
* Block replication tests
*
* Copyright (c) 2016 FUJITSU LIMITED
* Author: Changlong Xie <xiecl.fnst@cn.fujitsu.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "replication.h"
#include "block/block_int.h"
#include "sysemu/block-backend.h"
#define IMG_SIZE (64 * 1024 * 1024)
/* primary */
#define P_ID "primary-id"
static char p_local_disk[] = "/tmp/p_local_disk.XXXXXX";
/* secondary */
#define S_ID "secondary-id"
#define S_LOCAL_DISK_ID "secondary-local-disk-id"
static char s_local_disk[] = "/tmp/s_local_disk.XXXXXX";
static char s_active_disk[] = "/tmp/s_active_disk.XXXXXX";
static char s_hidden_disk[] = "/tmp/s_hidden_disk.XXXXXX";
/* FIXME: steal from blockdev.c */
QemuOptsList qemu_drive_opts = {
.name = "drive",
.head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head),
.desc = {
{ /* end of list */ }
},
};
#define NOT_DONE 0x7fffffff
static void blk_rw_done(void *opaque, int ret)
{
*(int *)opaque = ret;
}
static void test_blk_read(BlockBackend *blk, long pattern,
int64_t pattern_offset, int64_t pattern_count,
int64_t offset, int64_t count,
bool expect_failed)
{
void *pattern_buf = NULL;
QEMUIOVector qiov;
void *cmp_buf = NULL;
int async_ret = NOT_DONE;
if (pattern) {
cmp_buf = g_malloc(pattern_count);
memset(cmp_buf, pattern, pattern_count);
}
pattern_buf = g_malloc(count);
if (pattern) {
memset(pattern_buf, pattern, count);
} else {
memset(pattern_buf, 0x00, count);
}
qemu_iovec_init(&qiov, 1);
qemu_iovec_add(&qiov, pattern_buf, count);
blk_aio_preadv(blk, offset, &qiov, 0, blk_rw_done, &async_ret);
while (async_ret == NOT_DONE) {
main_loop_wait(false);
}
if (expect_failed) {
g_assert(async_ret != 0);
} else {
g_assert(async_ret == 0);
if (pattern) {
g_assert(memcmp(pattern_buf + pattern_offset,
cmp_buf, pattern_count) <= 0);
}
}
g_free(pattern_buf);
}
static void test_blk_write(BlockBackend *blk, long pattern, int64_t offset,
int64_t count, bool expect_failed)
{
void *pattern_buf = NULL;
QEMUIOVector qiov;
int async_ret = NOT_DONE;
pattern_buf = g_malloc(count);
if (pattern) {
memset(pattern_buf, pattern, count);
} else {
memset(pattern_buf, 0x00, count);
}
qemu_iovec_init(&qiov, 1);
qemu_iovec_add(&qiov, pattern_buf, count);
blk_aio_pwritev(blk, offset, &qiov, 0, blk_rw_done, &async_ret);
while (async_ret == NOT_DONE) {
main_loop_wait(false);
}
if (expect_failed) {
g_assert(async_ret != 0);
} else {
g_assert(async_ret == 0);
}
g_free(pattern_buf);
}
/*
* Create a uniquely-named empty temporary file.
*/
static void make_temp(char *template)
{
int fd;
fd = mkstemp(template);
g_assert(fd >= 0);
close(fd);
}
static void prepare_imgs(void)
{
Error *local_err = NULL;
make_temp(p_local_disk);
make_temp(s_local_disk);
make_temp(s_active_disk);
make_temp(s_hidden_disk);
/* Primary */
bdrv_img_create(p_local_disk, "qcow2", NULL, NULL, NULL, IMG_SIZE,
BDRV_O_RDWR, &local_err, true);
g_assert(!local_err);
/* Secondary */
bdrv_img_create(s_local_disk, "qcow2", NULL, NULL, NULL, IMG_SIZE,
BDRV_O_RDWR, &local_err, true);
g_assert(!local_err);
bdrv_img_create(s_active_disk, "qcow2", NULL, NULL, NULL, IMG_SIZE,
BDRV_O_RDWR, &local_err, true);
g_assert(!local_err);
bdrv_img_create(s_hidden_disk, "qcow2", NULL, NULL, NULL, IMG_SIZE,
BDRV_O_RDWR, &local_err, true);
g_assert(!local_err);
}
static void cleanup_imgs(void)
{
/* Primary */
unlink(p_local_disk);
/* Secondary */
unlink(s_local_disk);
unlink(s_active_disk);
unlink(s_hidden_disk);
}
static BlockBackend *start_primary(void)
{
BlockBackend *blk;
QemuOpts *opts;
QDict *qdict;
Error *local_err = NULL;
char *cmdline;
cmdline = g_strdup_printf("driver=replication,mode=primary,node-name=xxx,"
"file.driver=qcow2,file.file.filename=%s"
, p_local_disk);
opts = qemu_opts_parse_noisily(&qemu_drive_opts, cmdline, false);
g_free(cmdline);
qdict = qemu_opts_to_qdict(opts, NULL);
qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
blk = blk_new_open(NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
g_assert(blk);
g_assert(!local_err);
monitor_add_blk(blk, P_ID, &local_err);
g_assert(!local_err);
qemu_opts_del(opts);
return blk;
}
static void teardown_primary(void)
{
BlockBackend *blk;
/* remove P_ID */
blk = blk_by_name(P_ID);
assert(blk);
monitor_remove_blk(blk);
blk_unref(blk);
}
static void test_primary_read(void)
{
BlockBackend *blk;
blk = start_primary();
/* read from 0 to IMG_SIZE */
test_blk_read(blk, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
teardown_primary();
}
static void test_primary_write(void)
{
BlockBackend *blk;
blk = start_primary();
/* write from 0 to IMG_SIZE */
test_blk_write(blk, 0, 0, IMG_SIZE, true);
teardown_primary();
}
static void test_primary_start(void)
{
BlockBackend *blk = NULL;
Error *local_err = NULL;
blk = start_primary();
replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
g_assert(!local_err);
/* read from 0 to IMG_SIZE */
test_blk_read(blk, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
/* write 0x22 from 0 to IMG_SIZE */
test_blk_write(blk, 0x22, 0, IMG_SIZE, false);
teardown_primary();
}
static void test_primary_stop(void)
{
Error *local_err = NULL;
bool failover = true;
start_primary();
replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
g_assert(!local_err);
replication_stop_all(failover, &local_err);
g_assert(!local_err);
teardown_primary();
}
static void test_primary_do_checkpoint(void)
{
Error *local_err = NULL;
start_primary();
replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
g_assert(!local_err);
replication_do_checkpoint_all(&local_err);
g_assert(!local_err);
teardown_primary();
}
static void test_primary_get_error_all(void)
{
Error *local_err = NULL;
start_primary();
replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
g_assert(!local_err);
replication_get_error_all(&local_err);
g_assert(!local_err);
teardown_primary();
}
static BlockBackend *start_secondary(void)
{
QemuOpts *opts;
QDict *qdict;
BlockBackend *blk;
char *cmdline;
Error *local_err = NULL;
/* add s_local_disk and forge S_LOCAL_DISK_ID */
cmdline = g_strdup_printf("file.filename=%s,driver=qcow2", s_local_disk);
opts = qemu_opts_parse_noisily(&qemu_drive_opts, cmdline, false);
g_free(cmdline);
qdict = qemu_opts_to_qdict(opts, NULL);
qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
blk = blk_new_open(NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
assert(blk);
monitor_add_blk(blk, S_LOCAL_DISK_ID, &local_err);
g_assert(!local_err);
/* format s_local_disk with pattern "0x11" */
test_blk_write(blk, 0x11, 0, IMG_SIZE, false);
qemu_opts_del(opts);
/* add S_(ACTIVE/HIDDEN)_DISK and forge S_ID */
cmdline = g_strdup_printf("driver=replication,mode=secondary,top-id=%s,"
"file.driver=qcow2,file.file.filename=%s,"
"file.backing.driver=qcow2,"
"file.backing.file.filename=%s,"
"file.backing.backing=%s"
, S_ID, s_active_disk, s_hidden_disk
, S_LOCAL_DISK_ID);
opts = qemu_opts_parse_noisily(&qemu_drive_opts, cmdline, false);
g_free(cmdline);
qdict = qemu_opts_to_qdict(opts, NULL);
qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
blk = blk_new_open(NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
assert(blk);
monitor_add_blk(blk, S_ID, &local_err);
g_assert(!local_err);
qemu_opts_del(opts);
return blk;
}
static void teardown_secondary(void)
{
/* only need to destroy two BBs */
BlockBackend *blk;
/* remove S_LOCAL_DISK_ID */
blk = blk_by_name(S_LOCAL_DISK_ID);
assert(blk);
monitor_remove_blk(blk);
blk_unref(blk);
/* remove S_ID */
blk = blk_by_name(S_ID);
assert(blk);
monitor_remove_blk(blk);
blk_unref(blk);
}
static void test_secondary_read(void)
{
BlockBackend *blk;
blk = start_secondary();
/* read from 0 to IMG_SIZE */
test_blk_read(blk, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
teardown_secondary();
}
static void test_secondary_write(void)
{
BlockBackend *blk;
blk = start_secondary();
/* write from 0 to IMG_SIZE */
test_blk_write(blk, 0, 0, IMG_SIZE, true);
teardown_secondary();
}
static void test_secondary_start(void)
{
BlockBackend *top_blk, *local_blk;
Error *local_err = NULL;
bool failover = true;
top_blk = start_secondary();
replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
g_assert(!local_err);
/* read from s_local_disk (0, IMG_SIZE) */
test_blk_read(top_blk, 0x11, 0, IMG_SIZE, 0, IMG_SIZE, false);
/* write 0x22 to s_local_disk (IMG_SIZE / 2, IMG_SIZE) */
local_blk = blk_by_name(S_LOCAL_DISK_ID);
test_blk_write(local_blk, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, false);
/* replication will backup s_local_disk to s_hidden_disk */
test_blk_read(top_blk, 0x11, IMG_SIZE / 2,
IMG_SIZE / 2, 0, IMG_SIZE, false);
/* write 0x33 to s_active_disk (0, IMG_SIZE / 2) */
test_blk_write(top_blk, 0x33, 0, IMG_SIZE / 2, false);
/* read from s_active_disk (0, IMG_SIZE/2) */
test_blk_read(top_blk, 0x33, 0, IMG_SIZE / 2,
0, IMG_SIZE / 2, false);
/* unblock top_bs */
replication_stop_all(failover, &local_err);
g_assert(!local_err);
teardown_secondary();
}
static void test_secondary_stop(void)
{
BlockBackend *top_blk, *local_blk;
Error *local_err = NULL;
bool failover = true;
top_blk = start_secondary();
replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
g_assert(!local_err);
/* write 0x22 to s_local_disk (IMG_SIZE / 2, IMG_SIZE) */
local_blk = blk_by_name(S_LOCAL_DISK_ID);
test_blk_write(local_blk, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, false);
/* replication will backup s_local_disk to s_hidden_disk */
test_blk_read(top_blk, 0x11, IMG_SIZE / 2,
IMG_SIZE / 2, 0, IMG_SIZE, false);
/* write 0x33 to s_active_disk (0, IMG_SIZE / 2) */
test_blk_write(top_blk, 0x33, 0, IMG_SIZE / 2, false);
/* do active commit */
replication_stop_all(failover, &local_err);
g_assert(!local_err);
/* read from s_local_disk (0, IMG_SIZE / 2) */
test_blk_read(top_blk, 0x33, 0, IMG_SIZE / 2,
0, IMG_SIZE / 2, false);
/* read from s_local_disk (IMG_SIZE / 2, IMG_SIZE) */
test_blk_read(top_blk, 0x22, IMG_SIZE / 2,
IMG_SIZE / 2, 0, IMG_SIZE, false);
teardown_secondary();
}
static void test_secondary_do_checkpoint(void)
{
BlockBackend *top_blk, *local_blk;
Error *local_err = NULL;
bool failover = true;
top_blk = start_secondary();
replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
g_assert(!local_err);
/* write 0x22 to s_local_disk (IMG_SIZE / 2, IMG_SIZE) */
local_blk = blk_by_name(S_LOCAL_DISK_ID);
test_blk_write(local_blk, 0x22, IMG_SIZE / 2,
IMG_SIZE / 2, false);
/* replication will backup s_local_disk to s_hidden_disk */
test_blk_read(top_blk, 0x11, IMG_SIZE / 2,
IMG_SIZE / 2, 0, IMG_SIZE, false);
replication_do_checkpoint_all(&local_err);
g_assert(!local_err);
/* after checkpoint, read pattern 0x22 from s_local_disk */
test_blk_read(top_blk, 0x22, IMG_SIZE / 2,
IMG_SIZE / 2, 0, IMG_SIZE, false);
/* unblock top_bs */
replication_stop_all(failover, &local_err);
g_assert(!local_err);
teardown_secondary();
}
static void test_secondary_get_error_all(void)
{
Error *local_err = NULL;
bool failover = true;
start_secondary();
replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
g_assert(!local_err);
replication_get_error_all(&local_err);
g_assert(!local_err);
/* unblock top_bs */
replication_stop_all(failover, &local_err);
g_assert(!local_err);
teardown_secondary();
}
static void sigabrt_handler(int signo)
{
cleanup_imgs();
}
static void setup_sigabrt_handler(void)
{
struct sigaction sigact;
sigact = (struct sigaction) {
.sa_handler = sigabrt_handler,
.sa_flags = SA_RESETHAND,
};
sigemptyset(&sigact.sa_mask);
sigaction(SIGABRT, &sigact, NULL);
}
int main(int argc, char **argv)
{
int ret;
qemu_init_main_loop(&error_fatal);
bdrv_init();
g_test_init(&argc, &argv, NULL);
setup_sigabrt_handler();
prepare_imgs();
/* Primary */
g_test_add_func("/replication/primary/read", test_primary_read);
g_test_add_func("/replication/primary/write", test_primary_write);
g_test_add_func("/replication/primary/start", test_primary_start);
g_test_add_func("/replication/primary/stop", test_primary_stop);
g_test_add_func("/replication/primary/do_checkpoint",
test_primary_do_checkpoint);
g_test_add_func("/replication/primary/get_error_all",
test_primary_get_error_all);
/* Secondary */
g_test_add_func("/replication/secondary/read", test_secondary_read);
g_test_add_func("/replication/secondary/write", test_secondary_write);
g_test_add_func("/replication/secondary/start", test_secondary_start);
g_test_add_func("/replication/secondary/stop", test_secondary_stop);
g_test_add_func("/replication/secondary/do_checkpoint",
test_secondary_do_checkpoint);
g_test_add_func("/replication/secondary/get_error_all",
test_secondary_get_error_all);
ret = g_test_run();
cleanup_imgs();
return ret;
}

2
vl.c
View File

@ -121,6 +121,7 @@ int main(int argc, char **argv)
#include "crypto/init.h"
#include "sysemu/replay.h"
#include "qapi/qmp/qerror.h"
#include "sysemu/iothread.h"
#define MAX_VIRTIO_CONSOLES 1
#define MAX_SCLP_CONSOLES 1
@ -4616,6 +4617,7 @@ int main(int argc, char **argv, char **envp)
trace_init_vcpu_events();
main_loop();
replay_disable_events();
iothread_stop_all();
bdrv_close_all();
pause_all_vcpus();