root / tests / qemu-iotests / iotests.py @ 0dbe8a1b
History | View | Annotate | Download (9.2 kB)
1 |
# Common utilities and Python wrappers for qemu-iotests
|
---|---|
2 |
#
|
3 |
# Copyright (C) 2012 IBM Corp.
|
4 |
#
|
5 |
# This program is free software; you can redistribute it and/or modify
|
6 |
# it under the terms of the GNU General Public License as published by
|
7 |
# the Free Software Foundation; either version 2 of the License, or
|
8 |
# (at your option) any later version.
|
9 |
#
|
10 |
# This program is distributed in the hope that it will be useful,
|
11 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
12 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
13 |
# GNU General Public License for more details.
|
14 |
#
|
15 |
# You should have received a copy of the GNU General Public License
|
16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
17 |
#
|
18 |
|
19 |
import os |
20 |
import re |
21 |
import subprocess |
22 |
import string |
23 |
import unittest |
24 |
import sys; sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'QMP')) |
25 |
import qmp |
26 |
import struct |
27 |
|
28 |
__all__ = ['imgfmt', 'imgproto', 'test_dir' 'qemu_img', 'qemu_io', |
29 |
'VM', 'QMPTestCase', 'notrun', 'main'] |
30 |
|
31 |
# This will not work if arguments or path contain spaces but is necessary if we
|
32 |
# want to support the override options that ./check supports.
|
33 |
qemu_img_args = os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') |
34 |
qemu_io_args = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') |
35 |
qemu_args = os.environ.get('QEMU', 'qemu').strip().split(' ') |
36 |
|
37 |
imgfmt = os.environ.get('IMGFMT', 'raw') |
38 |
imgproto = os.environ.get('IMGPROTO', 'file') |
39 |
test_dir = os.environ.get('TEST_DIR', '/var/tmp') |
40 |
|
41 |
def qemu_img(*args): |
42 |
'''Run qemu-img and return the exit code'''
|
43 |
devnull = open('/dev/null', 'r+') |
44 |
return subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull) |
45 |
|
46 |
def qemu_img_verbose(*args): |
47 |
'''Run qemu-img without suppressing its output and return the exit code'''
|
48 |
return subprocess.call(qemu_img_args + list(args)) |
49 |
|
50 |
def qemu_io(*args): |
51 |
'''Run qemu-io and return the stdout data'''
|
52 |
args = qemu_io_args + list(args)
|
53 |
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0] |
54 |
|
55 |
def compare_images(img1, img2): |
56 |
'''Return True if two image files are identical'''
|
57 |
return qemu_img('compare', '-f', imgfmt, |
58 |
'-F', imgfmt, img1, img2) == 0 |
59 |
|
60 |
def create_image(name, size): |
61 |
'''Create a fully-allocated raw image with sector markers'''
|
62 |
file = open(name, 'w') |
63 |
i = 0
|
64 |
while i < size:
|
65 |
sector = struct.pack('>l504xl', i / 512, i / 512) |
66 |
file.write(sector)
|
67 |
i = i + 512
|
68 |
file.close()
|
69 |
|
70 |
class VM(object): |
71 |
'''A QEMU VM'''
|
72 |
|
73 |
def __init__(self): |
74 |
self._monitor_path = os.path.join(test_dir, 'qemu-mon.%d' % os.getpid()) |
75 |
self._qemu_log_path = os.path.join(test_dir, 'qemu-log.%d' % os.getpid()) |
76 |
self._args = qemu_args + ['-chardev', |
77 |
'socket,id=mon,path=' + self._monitor_path, |
78 |
'-mon', 'chardev=mon,mode=control', |
79 |
'-qtest', 'stdio', '-machine', 'accel=qtest', |
80 |
'-display', 'none', '-vga', 'none'] |
81 |
self._num_drives = 0 |
82 |
|
83 |
def add_drive(self, path, opts=''): |
84 |
'''Add a virtio-blk drive to the VM'''
|
85 |
options = ['if=virtio',
|
86 |
'format=%s' % imgfmt,
|
87 |
'cache=none',
|
88 |
'file=%s' % path,
|
89 |
'id=drive%d' % self._num_drives] |
90 |
if opts:
|
91 |
options.append(opts) |
92 |
|
93 |
self._args.append('-drive') |
94 |
self._args.append(','.join(options)) |
95 |
self._num_drives += 1 |
96 |
return self |
97 |
|
98 |
def add_fd(self, fd, fdset, opaque, opts=''): |
99 |
'''Pass a file descriptor to the VM'''
|
100 |
options = ['fd=%d' % fd,
|
101 |
'set=%d' % fdset,
|
102 |
'opaque=%s' % opaque]
|
103 |
if opts:
|
104 |
options.append(opts) |
105 |
|
106 |
self._args.append('-add-fd') |
107 |
self._args.append(','.join(options)) |
108 |
return self |
109 |
|
110 |
def launch(self): |
111 |
'''Launch the VM and establish a QMP connection'''
|
112 |
devnull = open('/dev/null', 'rb') |
113 |
qemulog = open(self._qemu_log_path, 'wb') |
114 |
try:
|
115 |
self._qmp = qmp.QEMUMonitorProtocol(self._monitor_path, server=True) |
116 |
self._popen = subprocess.Popen(self._args, stdin=devnull, stdout=qemulog, |
117 |
stderr=subprocess.STDOUT) |
118 |
self._qmp.accept()
|
119 |
except:
|
120 |
os.remove(self._monitor_path)
|
121 |
raise
|
122 |
|
123 |
def shutdown(self): |
124 |
'''Terminate the VM and clean up'''
|
125 |
if not self._popen is None: |
126 |
self._qmp.cmd('quit') |
127 |
self._popen.wait()
|
128 |
os.remove(self._monitor_path)
|
129 |
os.remove(self._qemu_log_path)
|
130 |
self._popen = None |
131 |
|
132 |
underscore_to_dash = string.maketrans('_', '-') |
133 |
def qmp(self, cmd, **args): |
134 |
'''Invoke a QMP command and return the result dict'''
|
135 |
qmp_args = dict()
|
136 |
for k in args.keys(): |
137 |
qmp_args[k.translate(self.underscore_to_dash)] = args[k]
|
138 |
|
139 |
return self._qmp.cmd(cmd, args=qmp_args) |
140 |
|
141 |
def get_qmp_event(self, wait=False): |
142 |
'''Poll for one queued QMP events and return it'''
|
143 |
return self._qmp.pull_event(wait=wait) |
144 |
|
145 |
def get_qmp_events(self, wait=False): |
146 |
'''Poll for queued QMP events and return a list of dicts'''
|
147 |
events = self._qmp.get_events(wait=wait)
|
148 |
self._qmp.clear_events()
|
149 |
return events
|
150 |
|
151 |
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
|
152 |
|
153 |
class QMPTestCase(unittest.TestCase): |
154 |
'''Abstract base class for QMP test cases'''
|
155 |
|
156 |
def dictpath(self, d, path): |
157 |
'''Traverse a path in a nested dict'''
|
158 |
for component in path.split('/'): |
159 |
m = index_re.match(component) |
160 |
if m:
|
161 |
component, idx = m.groups() |
162 |
idx = int(idx)
|
163 |
|
164 |
if not isinstance(d, dict) or component not in d: |
165 |
self.fail('failed path traversal for "%s" in "%s"' % (path, str(d))) |
166 |
d = d[component] |
167 |
|
168 |
if m:
|
169 |
if not isinstance(d, list): |
170 |
self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d))) |
171 |
try:
|
172 |
d = d[idx] |
173 |
except IndexError: |
174 |
self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d))) |
175 |
return d
|
176 |
|
177 |
def assert_qmp_absent(self, d, path): |
178 |
try:
|
179 |
result = self.dictpath(d, path)
|
180 |
except AssertionError: |
181 |
return
|
182 |
self.fail('path "%s" has value "%s"' % (path, str(result))) |
183 |
|
184 |
def assert_qmp(self, d, path, value): |
185 |
'''Assert that the value for a specific path in a QMP dict matches'''
|
186 |
result = self.dictpath(d, path)
|
187 |
self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value))) |
188 |
|
189 |
def assert_no_active_block_jobs(self): |
190 |
result = self.vm.qmp('query-block-jobs') |
191 |
self.assert_qmp(result, 'return', []) |
192 |
|
193 |
def cancel_and_wait(self, drive='drive0', force=False): |
194 |
'''Cancel a block job and wait for it to finish, returning the event'''
|
195 |
result = self.vm.qmp('block-job-cancel', device=drive, force=force) |
196 |
self.assert_qmp(result, 'return', {}) |
197 |
|
198 |
cancelled = False
|
199 |
result = None
|
200 |
while not cancelled: |
201 |
for event in self.vm.get_qmp_events(wait=True): |
202 |
if event['event'] == 'BLOCK_JOB_COMPLETED' or \ |
203 |
event['event'] == 'BLOCK_JOB_CANCELLED': |
204 |
self.assert_qmp(event, 'data/device', drive) |
205 |
result = event |
206 |
cancelled = True
|
207 |
|
208 |
self.assert_no_active_block_jobs()
|
209 |
return result
|
210 |
|
211 |
def wait_until_completed(self, drive='drive0'): |
212 |
'''Wait for a block job to finish, returning the event'''
|
213 |
completed = False
|
214 |
while not completed: |
215 |
for event in self.vm.get_qmp_events(wait=True): |
216 |
if event['event'] == 'BLOCK_JOB_COMPLETED': |
217 |
self.assert_qmp(event, 'data/device', drive) |
218 |
self.assert_qmp_absent(event, 'data/error') |
219 |
self.assert_qmp(event, 'data/offset', self.image_len) |
220 |
self.assert_qmp(event, 'data/len', self.image_len) |
221 |
completed = True
|
222 |
|
223 |
self.assert_no_active_block_jobs()
|
224 |
return event
|
225 |
|
226 |
def notrun(reason): |
227 |
'''Skip this test suite'''
|
228 |
# Each test in qemu-iotests has a number ("seq")
|
229 |
seq = os.path.basename(sys.argv[0])
|
230 |
|
231 |
open('%s.notrun' % seq, 'wb').write(reason + '\n') |
232 |
print '%s not run: %s' % (seq, reason) |
233 |
sys.exit(0)
|
234 |
|
235 |
def main(supported_fmts=[]): |
236 |
'''Run tests'''
|
237 |
|
238 |
if supported_fmts and (imgfmt not in supported_fmts): |
239 |
notrun('not suitable for this image format: %s' % imgfmt)
|
240 |
|
241 |
# We need to filter out the time taken from the output so that qemu-iotest
|
242 |
# can reliably diff the results against master output.
|
243 |
import StringIO |
244 |
output = StringIO.StringIO() |
245 |
|
246 |
class MyTestRunner(unittest.TextTestRunner): |
247 |
def __init__(self, stream=output, descriptions=True, verbosity=1): |
248 |
unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
|
249 |
|
250 |
# unittest.main() will use sys.exit() so expect a SystemExit exception
|
251 |
try:
|
252 |
unittest.main(testRunner=MyTestRunner) |
253 |
finally:
|
254 |
sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue())) |