summaryrefslogtreecommitdiffstats
path: root/tests/qemu-iotests/iotests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r--tests/qemu-iotests/iotests.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index 569ca3d..8a8f181 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -23,6 +23,7 @@ import string
import unittest
import sys; sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'QMP'))
import qmp
+import struct
__all__ = ['imgfmt', 'imgproto', 'test_dir' 'qemu_img', 'qemu_io',
'VM', 'QMPTestCase', 'notrun', 'main']
@@ -51,6 +52,21 @@ def qemu_io(*args):
args = qemu_io_args + list(args)
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
+def compare_images(img1, img2):
+ '''Return True if two image files are identical'''
+ return qemu_img('compare', '-f', imgfmt,
+ '-F', imgfmt, img1, img2) == 0
+
+def create_image(name, size):
+ '''Create a fully-allocated raw image with sector markers'''
+ file = open(name, 'w')
+ i = 0
+ while i < size:
+ sector = struct.pack('>l504xl', i / 512, i / 512)
+ file.write(sector)
+ i = i + 512
+ file.close()
+
class VM(object):
'''A QEMU VM'''
@@ -170,6 +186,28 @@ class QMPTestCase(unittest.TestCase):
result = self.dictpath(d, path)
self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
+ def assert_no_active_block_jobs(self):
+ result = self.vm.qmp('query-block-jobs')
+ self.assert_qmp(result, 'return', [])
+
+ def cancel_and_wait(self, drive='drive0', force=False):
+ '''Cancel a block job and wait for it to finish, returning the event'''
+ result = self.vm.qmp('block-job-cancel', device=drive, force=force)
+ self.assert_qmp(result, 'return', {})
+
+ cancelled = False
+ result = None
+ while not cancelled:
+ for event in self.vm.get_qmp_events(wait=True):
+ if event['event'] == 'BLOCK_JOB_COMPLETED' or \
+ event['event'] == 'BLOCK_JOB_CANCELLED':
+ self.assert_qmp(event, 'data/device', drive)
+ result = event
+ cancelled = True
+
+ self.assert_no_active_block_jobs()
+ return result
+
def notrun(reason):
'''Skip this test suite'''
# Each test in qemu-iotests has a number ("seq")
OpenPOWER on IntegriCloud