Statistics
| Branch: | Revision:

root / tests / qemu-iotests / 030 @ db58f9c0

History | View | Annotate | Download (5.4 kB)

1
#!/usr/bin/env python
2
#
3
# Tests for image streaming.
4
#
5
# Copyright (C) 2012 IBM Corp.
6
#
7
# This program is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation; either version 2 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
#
20

    
21
import os
22
import iotests
23
from iotests import qemu_img, qemu_io
24

    
25
backing_img = os.path.join(iotests.test_dir, 'backing.img')
26
test_img = os.path.join(iotests.test_dir, 'test.img')
27

    
28
class ImageStreamingTestCase(iotests.QMPTestCase):
29
    '''Abstract base class for image streaming test cases'''
30

    
31
    def assert_no_active_streams(self):
32
        result = self.vm.qmp('query-block-jobs')
33
        self.assert_qmp(result, 'return', [])
34

    
35
class TestSingleDrive(ImageStreamingTestCase):
36
    image_len = 1 * 1024 * 1024 # MB
37

    
38
    def setUp(self):
39
        qemu_img('create', backing_img, str(TestSingleDrive.image_len))
40
        qemu_img('create', '-f', iotests.imgfmt, '-o', 'backing_file=%s' % backing_img, test_img)
41
        self.vm = iotests.VM().add_drive(test_img)
42
        self.vm.launch()
43

    
44
    def tearDown(self):
45
        self.vm.shutdown()
46
        os.remove(test_img)
47
        os.remove(backing_img)
48

    
49
    def test_stream(self):
50
        self.assert_no_active_streams()
51

    
52
        result = self.vm.qmp('block-stream', device='drive0')
53
        self.assert_qmp(result, 'return', {})
54

    
55
        completed = False
56
        while not completed:
57
            for event in self.vm.get_qmp_events(wait=True):
58
                if event['event'] == 'BLOCK_JOB_COMPLETED':
59
                    self.assert_qmp(event, 'data/type', 'stream')
60
                    self.assert_qmp(event, 'data/device', 'drive0')
61
                    self.assert_qmp(event, 'data/offset', self.image_len)
62
                    self.assert_qmp(event, 'data/len', self.image_len)
63
                    completed = True
64

    
65
        self.assert_no_active_streams()
66

    
67
        self.assertFalse('sectors not allocated' in qemu_io('-c', 'map', test_img),
68
                         'image file not fully populated after streaming')
69

    
70
    def test_device_not_found(self):
71
        result = self.vm.qmp('block-stream', device='nonexistent')
72
        self.assert_qmp(result, 'error/class', 'DeviceNotFound')
73

    
74
class TestStreamStop(ImageStreamingTestCase):
75
    image_len = 8 * 1024 * 1024 * 1024 # GB
76

    
77
    def setUp(self):
78
        qemu_img('create', backing_img, str(TestStreamStop.image_len))
79
        qemu_img('create', '-f', iotests.imgfmt, '-o', 'backing_file=%s' % backing_img, test_img)
80
        self.vm = iotests.VM().add_drive(test_img)
81
        self.vm.launch()
82

    
83
    def tearDown(self):
84
        self.vm.shutdown()
85
        os.remove(test_img)
86
        os.remove(backing_img)
87

    
88
    def test_stream_stop(self):
89
        import time
90

    
91
        self.assert_no_active_streams()
92

    
93
        result = self.vm.qmp('block-stream', device='drive0')
94
        self.assert_qmp(result, 'return', {})
95

    
96
        time.sleep(1)
97
        events = self.vm.get_qmp_events(wait=False)
98
        self.assertEqual(events, [], 'unexpected QMP event: %s' % events)
99

    
100
        self.vm.qmp('block-job-cancel', device='drive0')
101
        self.assert_qmp(result, 'return', {})
102

    
103
        cancelled = False
104
        while not cancelled:
105
            for event in self.vm.get_qmp_events(wait=True):
106
                if event['event'] == 'BLOCK_JOB_CANCELLED':
107
                    self.assert_qmp(event, 'data/type', 'stream')
108
                    self.assert_qmp(event, 'data/device', 'drive0')
109
                    cancelled = True
110

    
111
        self.assert_no_active_streams()
112

    
113
# This is a short performance test which is not run by default.
114
# Invoke "IMGFMT=qed ./030 TestSetSpeed.perf_test_set_speed"
115
class TestSetSpeed(ImageStreamingTestCase):
116
    image_len = 80 * 1024 * 1024 # MB
117

    
118
    def setUp(self):
119
        qemu_img('create', backing_img, str(TestSetSpeed.image_len))
120
        qemu_img('create', '-f', iotests.imgfmt, '-o', 'backing_file=%s' % backing_img, test_img)
121
        self.vm = iotests.VM().add_drive(test_img)
122
        self.vm.launch()
123

    
124
    def tearDown(self):
125
        self.vm.shutdown()
126
        os.remove(test_img)
127
        os.remove(backing_img)
128

    
129
    def perf_test_set_speed(self):
130
        self.assert_no_active_streams()
131

    
132
        result = self.vm.qmp('block-stream', device='drive0')
133
        self.assert_qmp(result, 'return', {})
134

    
135
        result = self.vm.qmp('block-job-set-speed', device='drive0', value=8 * 1024 * 1024)
136
        self.assert_qmp(result, 'return', {})
137

    
138
        completed = False
139
        while not completed:
140
            for event in self.vm.get_qmp_events(wait=True):
141
                if event['event'] == 'BLOCK_JOB_COMPLETED':
142
                    self.assert_qmp(event, 'data/type', 'stream')
143
                    self.assert_qmp(event, 'data/device', 'drive0')
144
                    self.assert_qmp(event, 'data/offset', self.image_len)
145
                    self.assert_qmp(event, 'data/len', self.image_len)
146
                    completed = True
147

    
148
        self.assert_no_active_streams()
149

    
150
if __name__ == '__main__':
151
    iotests.main(supported_fmts=['qcow2', 'qed'])