Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_kvm_unittest.py @ 5e34123e

History | View | Annotate | Download (12.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29
import struct
30

    
31
from ganeti import serializer
32
from ganeti import constants
33
from ganeti import compat
34
from ganeti import objects
35
from ganeti import errors
36
from ganeti import utils
37
from ganeti import pathutils
38

    
39
from ganeti.hypervisor import hv_kvm
40

    
41
import testutils
42

    
43

    
44
class QmpStub(threading.Thread):
45
  """Stub for a QMP endpoint for a KVM instance
46

47
  """
48
  _QMP_BANNER_DATA = {
49
    "QMP": {
50
      "version": {
51
        "package": "",
52
        "qemu": {
53
          "micro": 50,
54
          "minor": 13,
55
          "major": 0,
56
          },
57
        "capabilities": [],
58
        },
59
      }
60
    }
61
  _EMPTY_RESPONSE = {
62
    "return": [],
63
    }
64

    
65
  def __init__(self, socket_filename, server_responses):
66
    """Creates a QMP stub
67

68
    @type socket_filename: string
69
    @param socket_filename: filename of the UNIX socket that will be created
70
                            this class and used for the communication
71
    @type server_responses: list
72
    @param server_responses: list of responses that the server sends in response
73
                             to whatever it receives
74
    """
75
    threading.Thread.__init__(self)
76
    self.socket_filename = socket_filename
77
    self.script = server_responses
78

    
79
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
80
    self.socket.bind(self.socket_filename)
81
    self.socket.listen(1)
82

    
83
  def run(self):
84
    # Hypothesis: the messages we receive contain only a complete QMP message
85
    # encoded in JSON.
86
    conn, addr = self.socket.accept()
87

    
88
    # Send the banner as the first thing
89
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
90

    
91
    # Expect qmp_capabilities and return an empty response
92
    conn.recv(4096)
93
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
94

    
95
    while True:
96
      # We ignore the expected message, as the purpose of this object is not
97
      # to verify the correctness of the communication but to act as a
98
      # partner for the SUT (System Under Test, that is QmpConnection)
99
      msg = conn.recv(4096)
100
      if not msg:
101
        break
102

    
103
      if not self.script:
104
        break
105
      response = self.script.pop(0)
106
      if isinstance(response, str):
107
        conn.send(response)
108
      elif isinstance(response, list):
109
        for chunk in response:
110
          conn.send(chunk)
111
      else:
112
        raise errors.ProgrammerError("Unknown response type for %s" % response)
113

    
114
    conn.close()
115

    
116
  def encode_string(self, message):
117
    return (serializer.DumpJson(message) +
118
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
119

    
120

    
121
class TestQmpMessage(testutils.GanetiTestCase):
122
  def testSerialization(self):
123
    test_data = {
124
      "execute": "command",
125
      "arguments": ["a", "b", "c"],
126
      }
127
    message = hv_kvm.QmpMessage(test_data)
128

    
129
    for k, v in test_data.items():
130
      self.assertEqual(message[k], v)
131

    
132
    serialized = str(message)
133
    self.assertEqual(len(serialized.splitlines()), 1,
134
                     msg="Got multi-line message")
135

    
136
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
137
    self.assertEqual(rebuilt_message, message)
138
    self.assertEqual(len(rebuilt_message), len(test_data))
139

    
140
  def testDelete(self):
141
    toDelete = "execute"
142
    test_data = {
143
      toDelete: "command",
144
      "arguments": ["a", "b", "c"],
145
      }
146
    message = hv_kvm.QmpMessage(test_data)
147

    
148
    oldLen = len(message)
149
    del(message[toDelete])
150
    newLen = len(message)
151
    self.assertEqual(oldLen - 1, newLen)
152

    
153

    
154
class TestQmp(testutils.GanetiTestCase):
155
  def testQmp(self):
156
    requests = [
157
      {"execute": "query-kvm", "arguments": []},
158
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
159
      {"execute": "query-status", "arguments": []},
160
      {"execute": "query-name", "arguments": []},
161
      ]
162

    
163
    server_responses = [
164
      # One message, one send()
165
      '{"return": {"enabled": true, "present": true}}\r\n',
166

    
167
      # Message sent using multiple send()
168
      ['{"retur', 'n": {}}\r\n'],
169

    
170
      # Multiple messages sent using one send()
171
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
172
      '{"return": {"running": true, "singlestep": false}}\r\n',
173
      ]
174

    
175
    expected_responses = [
176
      {"return": {"enabled": True, "present": True}},
177
      {"return": {}},
178
      {"return": [{"name": "quit"}, {"name": "eject"}]},
179
      {"return": {"running": True, "singlestep": False}},
180
      ]
181

    
182
    # Set up the stub
183
    socket_file = tempfile.NamedTemporaryFile()
184
    os.remove(socket_file.name)
185
    qmp_stub = QmpStub(socket_file.name, server_responses)
186
    qmp_stub.start()
187

    
188
    # Set up the QMP connection
189
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
190
    qmp_connection.connect()
191

    
192
    # Format the script
193
    for request, expected_response in zip(requests, expected_responses):
194
      response = qmp_connection.Execute(request)
195
      msg = hv_kvm.QmpMessage(expected_response)
196
      self.assertEqual(len(str(msg).splitlines()), 1,
197
                       msg="Got multi-line message")
198
      self.assertEqual(response, msg)
199

    
200

    
201
class TestConsole(unittest.TestCase):
202
  def _Test(self, instance, node, hvparams):
203
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, node, hvparams, {})
204
    self.assertTrue(cons.Validate())
205
    return cons
206

    
207
  def testSerial(self):
208
    instance = objects.Instance(name="kvm.example.com",
209
                                primary_node="node6017-uuid")
210
    node = objects.Node(name="node6017", uuid="node6017-uuid")
211
    hvparams = {
212
      constants.HV_SERIAL_CONSOLE: True,
213
      constants.HV_VNC_BIND_ADDRESS: None,
214
      constants.HV_KVM_SPICE_BIND: None,
215
      }
216
    cons = self._Test(instance, node, hvparams)
217
    self.assertEqual(cons.kind, constants.CONS_SSH)
218
    self.assertEqual(cons.host, node.name)
219
    self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
220
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
221

    
222
  def testVnc(self):
223
    instance = objects.Instance(name="kvm.example.com",
224
                                primary_node="node7235-uuid",
225
                                network_port=constants.VNC_BASE_PORT + 10)
226
    node = objects.Node(name="node7235", uuid="node7235-uuid")
227
    hvparams = {
228
      constants.HV_SERIAL_CONSOLE: False,
229
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
230
      constants.HV_KVM_SPICE_BIND: None,
231
      }
232
    cons = self._Test(instance, node, hvparams)
233
    self.assertEqual(cons.kind, constants.CONS_VNC)
234
    self.assertEqual(cons.host, "192.0.2.1")
235
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
236
    self.assertEqual(cons.display, 10)
237

    
238
  def testSpice(self):
239
    instance = objects.Instance(name="kvm.example.com",
240
                                primary_node="node7235",
241
                                network_port=11000)
242
    node = objects.Node(name="node7235", uuid="node7235-uuid")
243
    hvparams = {
244
      constants.HV_SERIAL_CONSOLE: False,
245
      constants.HV_VNC_BIND_ADDRESS: None,
246
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
247
      }
248
    cons = self._Test(instance, node, hvparams)
249
    self.assertEqual(cons.kind, constants.CONS_SPICE)
250
    self.assertEqual(cons.host, "192.0.2.1")
251
    self.assertEqual(cons.port, 11000)
252

    
253
  def testNoConsole(self):
254
    instance = objects.Instance(name="kvm.example.com",
255
                                primary_node="node24325",
256
                                network_port=0)
257
    node = objects.Node(name="node24325", uuid="node24325-uuid")
258
    hvparams = {
259
      constants.HV_SERIAL_CONSOLE: False,
260
      constants.HV_VNC_BIND_ADDRESS: None,
261
      constants.HV_KVM_SPICE_BIND: None,
262
      }
263
    cons = self._Test(instance, node, hvparams)
264
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
265

    
266

    
267
class TestVersionChecking(testutils.GanetiTestCase):
268
  def testParseVersion(self):
269
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
270
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
271
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
272
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
273
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
274
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
275
    self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
276
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
277
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
278
    self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
279
    self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
280

    
281

    
282
class TestSpiceParameterList(unittest.TestCase):
283
  def test(self):
284
    defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
285

    
286
    params = \
287
      compat.UniqueFrozenset(getattr(constants, name)
288
                             for name in dir(constants)
289
                             if name.startswith("HV_KVM_SPICE_"))
290

    
291
    # Parameters whose default value evaluates to True and don't need to be set
292
    defaults_true = frozenset(filter(defaults.__getitem__, params))
293

    
294
    self.assertEqual(defaults_true, frozenset([
295
      constants.HV_KVM_SPICE_AUDIO_COMPR,
296
      constants.HV_KVM_SPICE_USE_VDAGENT,
297
      constants.HV_KVM_SPICE_TLS_CIPHERS,
298
      ]))
299

    
300
    # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
301
    # their default evaluates to False
302
    assert constants.HV_KVM_SPICE_BIND in params
303
    assert constants.HV_KVM_SPICE_BIND not in defaults_true
304

    
305
    # Exclude some parameters
306
    params -= defaults_true | frozenset([
307
      constants.HV_KVM_SPICE_BIND,
308
      ])
309

    
310
    self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
311

    
312

    
313
class TestHelpRegexps(testutils.GanetiTestCase):
314
  def testBootRe(self):
315
    """Check _BOOT_RE
316

317
    It has too match -drive.*boot=on|off except if there is another dash-option
318
    at the beginning of the line.
319

320
    """
321
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
322
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
323
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
324
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
325
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
326
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
327
    help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")
328

    
329
    self.assertTrue(boot_re.search(help_091))
330
    self.assertTrue(boot_re.search(help_0125))
331
    self.assertFalse(boot_re.search(help_091_fake))
332
    self.assertFalse(boot_re.search(help_112))
333
    self.assertFalse(boot_re.search(help_10))
334
    self.assertFalse(boot_re.search(help_01590))
335

    
336

    
337
class TestGetTunFeatures(unittest.TestCase):
338
  def testWrongIoctl(self):
339
    tmpfile = tempfile.NamedTemporaryFile()
340
    # A file does not have the right ioctls, so this must always fail
341
    result = hv_kvm._GetTunFeatures(tmpfile.fileno())
342
    self.assertTrue(result is None)
343

    
344
  def _FakeIoctl(self, features, fd, request, buf):
345
    self.assertEqual(request, hv_kvm.TUNGETFEATURES)
346

    
347
    (reqno, ) = struct.unpack("I", buf)
348
    self.assertEqual(reqno, 0)
349

    
350
    return struct.pack("I", features)
351

    
352
  def test(self):
353
    tmpfile = tempfile.NamedTemporaryFile()
354
    fd = tmpfile.fileno()
355

    
356
    for features in [0, hv_kvm.IFF_VNET_HDR]:
357
      fn = compat.partial(self._FakeIoctl, features)
358
      result = hv_kvm._GetTunFeatures(fd, _ioctl=fn)
359
      self.assertEqual(result, features)
360

    
361

    
362
class TestProbeTapVnetHdr(unittest.TestCase):
363
  def _FakeTunFeatures(self, expected_fd, flags, fd):
364
    self.assertEqual(fd, expected_fd)
365
    return flags
366

    
367
  def test(self):
368
    tmpfile = tempfile.NamedTemporaryFile()
369
    fd = tmpfile.fileno()
370

    
371
    for flags in [0, hv_kvm.IFF_VNET_HDR]:
372
      fn = compat.partial(self._FakeTunFeatures, fd, flags)
373

    
374
      result = hv_kvm._ProbeTapVnetHdr(fd, _features_fn=fn)
375
      if flags == 0:
376
        self.assertFalse(result)
377
      else:
378
        self.assertTrue(result)
379

    
380
  def testUnsupported(self):
381
    tmpfile = tempfile.NamedTemporaryFile()
382
    fd = tmpfile.fileno()
383

    
384
    self.assertFalse(hv_kvm._ProbeTapVnetHdr(fd, _features_fn=lambda _: None))
385

    
386

    
387
if __name__ == "__main__":
388
  testutils.GanetiTestProgram()