mirror of
https://github.com/qemu/qemu.git
synced 2024-12-15 07:23:52 +08:00
fd9c577b24
This case will test whether the monitor can receive fd at runtime. To verify better, additional monitor is created to see if qemu can handler two monitor instances correctly. Signed-off-by: Wenchao Xia <xiawenc@linux.vnet.ibm.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
179 lines
7.1 KiB
Python
Executable File
179 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Tests for fdsets and getfd.
|
|
#
|
|
# Copyright (C) 2012 IBM Corp.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import iotests
|
|
from iotests import qemu_img
|
|
|
|
image0 = os.path.join(iotests.test_dir, 'image0')
|
|
image1 = os.path.join(iotests.test_dir, 'image1')
|
|
image2 = os.path.join(iotests.test_dir, 'image2')
|
|
image3 = os.path.join(iotests.test_dir, 'image3')
|
|
image4 = os.path.join(iotests.test_dir, 'image4')
|
|
|
|
class TestFdSets(iotests.QMPTestCase):
|
|
|
|
def setUp(self):
|
|
self.vm = iotests.VM()
|
|
qemu_img('create', '-f', iotests.imgfmt, image0, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image1, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image2, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image3, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image4, '128K')
|
|
self.file0 = open(image0, 'r')
|
|
self.file1 = open(image1, 'w+')
|
|
self.file2 = open(image2, 'r')
|
|
self.file3 = open(image3, 'r')
|
|
self.file4 = open(image4, 'r')
|
|
self.vm.add_fd(self.file0.fileno(), 1, 'image0:r')
|
|
self.vm.add_fd(self.file1.fileno(), 1, 'image1:w+')
|
|
self.vm.add_fd(self.file2.fileno(), 0, 'image2:r')
|
|
self.vm.add_fd(self.file3.fileno(), 2, 'image3:r')
|
|
self.vm.add_fd(self.file4.fileno(), 2, 'image4:r')
|
|
self.vm.add_drive("/dev/fdset/1")
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
self.file0.close()
|
|
self.file1.close()
|
|
self.file2.close()
|
|
self.file3.close()
|
|
self.file4.close()
|
|
os.remove(image0)
|
|
os.remove(image1)
|
|
os.remove(image2)
|
|
os.remove(image3)
|
|
os.remove(image4)
|
|
|
|
def test_query_fdset(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 2)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[2]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image3:r')
|
|
self.assert_qmp(result, 'return[0]/fds[1]/opaque', 'image4:r')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[1]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[2]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fdset(self):
|
|
result = self.vm.qmp('remove-fd', fdset_id=2)
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[0]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
fd_image3 = result['return'][0]['fds'][0]['fd']
|
|
result = self.vm.qmp('remove-fd', fdset_id=2, fd=fd_image3)
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 2)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[2]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image4:r')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[1]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[2]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd_invalid_fdset(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
fd_image3 = result['return'][0]['fds'][0]['fd']
|
|
result = self.vm.qmp('remove-fd', fdset_id=3, fd=fd_image3)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'File descriptor named \'fdset-id:3, fd:%d\' not found' % fd_image3)
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd_invalid_fd(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
result = self.vm.qmp('remove-fd', fdset_id=2, fd=999)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'File descriptor named \'fdset-id:2, fd:999\' not found')
|
|
self.vm.shutdown()
|
|
|
|
def test_add_fd_invalid_fd(self):
|
|
result = self.vm.qmp('add-fd', fdset_id=2)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'No file descriptor supplied via SCM_RIGHTS')
|
|
self.vm.shutdown()
|
|
|
|
# Add fd at runtime, there are two ways: monitor related or fdset related
|
|
class TestSCMFd(iotests.QMPTestCase):
|
|
def setUp(self):
|
|
self.vm = iotests.VM()
|
|
qemu_img('create', '-f', iotests.imgfmt, image0, '128K')
|
|
# Add an unused monitor, to verify it works fine when two monitor
|
|
# instances present
|
|
self.vm.add_monitor_telnet("0",4445)
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
os.remove(image0)
|
|
|
|
def _send_fd_by_SCM(self):
|
|
ret = self.vm.send_fd_scm(image0)
|
|
self.assertEqual(ret, 0, 'Failed to send fd with UNIX SCM')
|
|
|
|
def test_add_fd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('add-fd', fdset_id=2, opaque='image0:r')
|
|
self.assert_qmp(result, 'return/fdset-id', 2)
|
|
|
|
def test_getfd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
def test_getfd_invalid_fdname(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='0image0:r')
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
"Parameter 'fdname' expects a name not starting with a digit")
|
|
|
|
def test_closefd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('closefd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
def test_closefd_fd_not_found(self):
|
|
fdname = 'image0:r'
|
|
result = self.vm.qmp('closefd', fdname=fdname)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
"File descriptor named '%s' not found" % fdname)
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=['raw'])
|