Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_kvm_unittest.py @ dcedd81a

History | View | Annotate | Download (13.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29
import struct
30
import re
31

    
32
from ganeti import serializer
33
from ganeti import constants
34
from ganeti import compat
35
from ganeti import objects
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import pathutils
39

    
40
from ganeti.hypervisor import hv_kvm
41

    
42
import testutils
43

    
44

    
45
class QmpStub(threading.Thread):
46
  """Stub for a QMP endpoint for a KVM instance
47

48
  """
49
  _QMP_BANNER_DATA = {
50
    "QMP": {
51
      "version": {
52
        "package": "",
53
        "qemu": {
54
          "micro": 50,
55
          "minor": 13,
56
          "major": 0,
57
          },
58
        "capabilities": [],
59
        },
60
      }
61
    }
62
  _EMPTY_RESPONSE = {
63
    "return": [],
64
    }
65

    
66
  def __init__(self, socket_filename, server_responses):
67
    """Creates a QMP stub
68

69
    @type socket_filename: string
70
    @param socket_filename: filename of the UNIX socket that will be created
71
                            this class and used for the communication
72
    @type server_responses: list
73
    @param server_responses: list of responses that the server sends in response
74
                             to whatever it receives
75
    """
76
    threading.Thread.__init__(self)
77
    self.socket_filename = socket_filename
78
    self.script = server_responses
79

    
80
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
81
    self.socket.bind(self.socket_filename)
82
    self.socket.listen(1)
83

    
84
  def run(self):
85
    # Hypothesis: the messages we receive contain only a complete QMP message
86
    # encoded in JSON.
87
    conn, addr = self.socket.accept()
88

    
89
    # Send the banner as the first thing
90
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
91

    
92
    # Expect qmp_capabilities and return an empty response
93
    conn.recv(4096)
94
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
95

    
96
    while True:
97
      # We ignore the expected message, as the purpose of this object is not
98
      # to verify the correctness of the communication but to act as a
99
      # partner for the SUT (System Under Test, that is QmpConnection)
100
      msg = conn.recv(4096)
101
      if not msg:
102
        break
103

    
104
      if not self.script:
105
        break
106
      response = self.script.pop(0)
107
      if isinstance(response, str):
108
        conn.send(response)
109
      elif isinstance(response, list):
110
        for chunk in response:
111
          conn.send(chunk)
112
      else:
113
        raise errors.ProgrammerError("Unknown response type for %s" % response)
114

    
115
    conn.close()
116

    
117
  def encode_string(self, message):
118
    return (serializer.DumpJson(message) +
119
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
120

    
121

    
122
class TestQmpMessage(testutils.GanetiTestCase):
123
  def testSerialization(self):
124
    test_data = {
125
      "execute": "command",
126
      "arguments": ["a", "b", "c"],
127
      }
128
    message = hv_kvm.QmpMessage(test_data)
129

    
130
    for k, v in test_data.items():
131
      self.assertEqual(message[k], v)
132

    
133
    serialized = str(message)
134
    self.assertEqual(len(serialized.splitlines()), 1,
135
                     msg="Got multi-line message")
136

    
137
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
138
    self.assertEqual(rebuilt_message, message)
139

    
140

    
141
class TestQmp(testutils.GanetiTestCase):
142
  def testQmp(self):
143
    requests = [
144
      {"execute": "query-kvm", "arguments": []},
145
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
146
      {"execute": "query-status", "arguments": []},
147
      {"execute": "query-name", "arguments": []},
148
      ]
149

    
150
    server_responses = [
151
      # One message, one send()
152
      '{"return": {"enabled": true, "present": true}}\r\n',
153

    
154
      # Message sent using multiple send()
155
      ['{"retur', 'n": {}}\r\n'],
156

    
157
      # Multiple messages sent using one send()
158
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
159
      '{"return": {"running": true, "singlestep": false}}\r\n',
160
      ]
161

    
162
    expected_responses = [
163
      {"return": {"enabled": True, "present": True}},
164
      {"return": {}},
165
      {"return": [{"name": "quit"}, {"name": "eject"}]},
166
      {"return": {"running": True, "singlestep": False}},
167
      ]
168

    
169
    # Set up the stub
170
    socket_file = tempfile.NamedTemporaryFile()
171
    os.remove(socket_file.name)
172
    qmp_stub = QmpStub(socket_file.name, server_responses)
173
    qmp_stub.start()
174

    
175
    # Set up the QMP connection
176
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
177
    qmp_connection.connect()
178

    
179
    # Format the script
180
    for request, expected_response in zip(requests, expected_responses):
181
      response = qmp_connection.Execute(request)
182
      msg = hv_kvm.QmpMessage(expected_response)
183
      self.assertEqual(len(str(msg).splitlines()), 1,
184
                       msg="Got multi-line message")
185
      self.assertEqual(response, msg)
186

    
187

    
188
class TestConsole(unittest.TestCase):
189
  def _Test(self, instance, hvparams):
190
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
191
    self.assertTrue(cons.Validate())
192
    return cons
193

    
194
  def testSerial(self):
195
    instance = objects.Instance(name="kvm.example.com",
196
                                primary_node="node6017")
197
    hvparams = {
198
      constants.HV_SERIAL_CONSOLE: True,
199
      constants.HV_VNC_BIND_ADDRESS: None,
200
      constants.HV_KVM_SPICE_BIND: None,
201
      }
202
    cons = self._Test(instance, hvparams)
203
    self.assertEqual(cons.kind, constants.CONS_SSH)
204
    self.assertEqual(cons.host, instance.primary_node)
205
    self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
206
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
207

    
208
  def testVnc(self):
209
    instance = objects.Instance(name="kvm.example.com",
210
                                primary_node="node7235",
211
                                network_port=constants.VNC_BASE_PORT + 10)
212
    hvparams = {
213
      constants.HV_SERIAL_CONSOLE: False,
214
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
215
      constants.HV_KVM_SPICE_BIND: None,
216
      }
217
    cons = self._Test(instance, hvparams)
218
    self.assertEqual(cons.kind, constants.CONS_VNC)
219
    self.assertEqual(cons.host, "192.0.2.1")
220
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
221
    self.assertEqual(cons.display, 10)
222

    
223
  def testSpice(self):
224
    instance = objects.Instance(name="kvm.example.com",
225
                                primary_node="node7235",
226
                                network_port=11000)
227
    hvparams = {
228
      constants.HV_SERIAL_CONSOLE: False,
229
      constants.HV_VNC_BIND_ADDRESS: None,
230
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
231
      }
232
    cons = self._Test(instance, hvparams)
233
    self.assertEqual(cons.kind, constants.CONS_SPICE)
234
    self.assertEqual(cons.host, "192.0.2.1")
235
    self.assertEqual(cons.port, 11000)
236

    
237
  def testNoConsole(self):
238
    instance = objects.Instance(name="kvm.example.com",
239
                                primary_node="node24325",
240
                                network_port=0)
241
    hvparams = {
242
      constants.HV_SERIAL_CONSOLE: False,
243
      constants.HV_VNC_BIND_ADDRESS: None,
244
      constants.HV_KVM_SPICE_BIND: None,
245
      }
246
    cons = self._Test(instance, hvparams)
247
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
248

    
249

    
250
class TestVersionChecking(testutils.GanetiTestCase):
251
  def testParseVersion(self):
252
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
253
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
254
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
255
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
256
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
257
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
258
    self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
259
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
260
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
261
    self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
262
    self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
263

    
264

    
265
class TestSpiceParameterList(unittest.TestCase):
266
  def test(self):
267
    defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
268

    
269
    params = \
270
      compat.UniqueFrozenset(getattr(constants, name)
271
                             for name in dir(constants)
272
                             if name.startswith("HV_KVM_SPICE_"))
273

    
274
    # Parameters whose default value evaluates to True and don't need to be set
275
    defaults_true = frozenset(filter(defaults.__getitem__, params))
276

    
277
    self.assertEqual(defaults_true, frozenset([
278
      constants.HV_KVM_SPICE_AUDIO_COMPR,
279
      constants.HV_KVM_SPICE_USE_VDAGENT,
280
      constants.HV_KVM_SPICE_TLS_CIPHERS,
281
      ]))
282

    
283
    # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
284
    # their default evaluates to False
285
    assert constants.HV_KVM_SPICE_BIND in params
286
    assert constants.HV_KVM_SPICE_BIND not in defaults_true
287

    
288
    # Exclude some parameters
289
    params -= defaults_true | frozenset([
290
      constants.HV_KVM_SPICE_BIND,
291
      ])
292

    
293
    self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
294

    
295

    
296
class TestHelpRegexps(testutils.GanetiTestCase):
297
  def testBootRe(self):
298
    """Check _BOOT_RE
299

300
    It has too match -drive.*boot=on|off except if there is another dash-option
301
    at the beginning of the line.
302

303
    """
304
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
305
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
306
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
307
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
308
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
309
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
310
    help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")
311

    
312
    self.assertTrue(boot_re.search(help_091))
313
    self.assertTrue(boot_re.search(help_0125))
314
    self.assertFalse(boot_re.search(help_091_fake))
315
    self.assertFalse(boot_re.search(help_112))
316
    self.assertFalse(boot_re.search(help_10))
317
    self.assertFalse(boot_re.search(help_01590))
318

    
319

    
320
class TestGetTunFeatures(unittest.TestCase):
321
  def testWrongIoctl(self):
322
    tmpfile = tempfile.NamedTemporaryFile()
323
    # A file does not have the right ioctls, so this must always fail
324
    result = hv_kvm._GetTunFeatures(tmpfile.fileno())
325
    self.assertTrue(result is None)
326

    
327
  def _FakeIoctl(self, features, fd, request, buf):
328
    self.assertEqual(request, hv_kvm.TUNGETFEATURES)
329

    
330
    (reqno, ) = struct.unpack("I", buf)
331
    self.assertEqual(reqno, 0)
332

    
333
    return struct.pack("I", features)
334

    
335
  def test(self):
336
    tmpfile = tempfile.NamedTemporaryFile()
337
    fd = tmpfile.fileno()
338

    
339
    for features in [0, hv_kvm.IFF_VNET_HDR]:
340
      fn = compat.partial(self._FakeIoctl, features)
341
      result = hv_kvm._GetTunFeatures(fd, _ioctl=fn)
342
      self.assertEqual(result, features)
343

    
344

    
345
class TestProbeTapVnetHdr(unittest.TestCase):
346
  def _FakeTunFeatures(self, expected_fd, flags, fd):
347
    self.assertEqual(fd, expected_fd)
348
    return flags
349

    
350
  def test(self):
351
    tmpfile = tempfile.NamedTemporaryFile()
352
    fd = tmpfile.fileno()
353

    
354
    for flags in [0, hv_kvm.IFF_VNET_HDR]:
355
      fn = compat.partial(self._FakeTunFeatures, fd, flags)
356

    
357
      result = hv_kvm._ProbeTapVnetHdr(fd, _features_fn=fn)
358
      if flags == 0:
359
        self.assertFalse(result)
360
      else:
361
        self.assertTrue(result)
362

    
363
  def testUnsupported(self):
364
    tmpfile = tempfile.NamedTemporaryFile()
365
    fd = tmpfile.fileno()
366

    
367
    self.assertFalse(hv_kvm._ProbeTapVnetHdr(fd, _features_fn=lambda _: None))
368

    
369

    
370
class TestGenerateDeviceKVMId(unittest.TestCase):
371
  def test(self):
372
    device = objects.NIC()
373
    target = constants.HOTPLUG_TARGET_NIC
374
    fn = hv_kvm._GenerateDeviceKVMId
375
    self.assertRaises(errors.HotplugError, fn, target, device)
376

    
377
    device.pci = 5
378
    device.uuid = "003fc157-66a8-4e6d-8b7e-ec4f69751396"
379
    self.assertTrue(re.match("hotnic-003fc157-pci-5", fn(target, device)))
380

    
381

    
382
class TestGetRuntimeInfo(unittest.TestCase):
383
  @classmethod
384
  def _GetRuntime(cls):
385
    data = testutils.ReadTestData("kvm_runtime.json")
386
    return hv_kvm._AnalyzeSerializedRuntime(data)
387

    
388
  def _fail(self, target, device, runtime):
389
    device.uuid = "aaaaaaaa-66a8-4e6d-8b7e-ec4f69751396"
390
    self.assertRaises(errors.HotplugError,
391
                      hv_kvm._GetExistingDeviceInfo,
392
                      target, device, runtime)
393

    
394
  def testNIC(self):
395
    device = objects.NIC()
396
    target = constants.HOTPLUG_TARGET_NIC
397
    runtime = self._GetRuntime()
398

    
399
    self._fail(target, device, runtime)
400

    
401
    device.uuid = "003fc157-66a8-4e6d-8b7e-ec4f69751396"
402
    devinfo = hv_kvm._GetExistingDeviceInfo(target, device, runtime)
403
    self.assertTrue(devinfo.pci==6)
404

    
405
  def testDisk(self):
406
    device = objects.Disk()
407
    target = constants.HOTPLUG_TARGET_DISK
408
    runtime = self._GetRuntime()
409

    
410
    self._fail(target, device, runtime)
411

    
412
    device.uuid = "9f5c5bd4-6f60-480b-acdc-9bb1a4b7df79"
413
    (devinfo, _) = hv_kvm._GetExistingDeviceInfo(target, device, runtime)
414
    self.assertTrue(devinfo.pci==5)
415

    
416

    
417
if __name__ == "__main__":
418
  testutils.GanetiTestProgram()