Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_kvm_unittest.py @ cd3b4ff4

History | View | Annotate | Download (12.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29
import struct
30

    
31
from ganeti import serializer
32
from ganeti import constants
33
from ganeti import compat
34
from ganeti import objects
35
from ganeti import errors
36
from ganeti import utils
37
from ganeti import pathutils
38

    
39
from ganeti.hypervisor import hv_kvm
40

    
41
import testutils
42

    
43

    
44
class QmpStub(threading.Thread):
45
  """Stub for a QMP endpoint for a KVM instance
46

47
  """
48
  _QMP_BANNER_DATA = {
49
    "QMP": {
50
      "version": {
51
        "package": "",
52
        "qemu": {
53
          "micro": 50,
54
          "minor": 13,
55
          "major": 0,
56
          },
57
        "capabilities": [],
58
        },
59
      }
60
    }
61
  _EMPTY_RESPONSE = {
62
    "return": [],
63
    }
64

    
65
  def __init__(self, socket_filename, server_responses):
66
    """Creates a QMP stub
67

68
    @type socket_filename: string
69
    @param socket_filename: filename of the UNIX socket that will be created
70
                            this class and used for the communication
71
    @type server_responses: list
72
    @param server_responses: list of responses that the server sends in response
73
                             to whatever it receives
74
    """
75
    threading.Thread.__init__(self)
76
    self.socket_filename = socket_filename
77
    self.script = server_responses
78

    
79
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
80
    self.socket.bind(self.socket_filename)
81
    self.socket.listen(1)
82

    
83
  def run(self):
84
    # Hypothesis: the messages we receive contain only a complete QMP message
85
    # encoded in JSON.
86
    conn, addr = self.socket.accept()
87

    
88
    # Send the banner as the first thing
89
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
90

    
91
    # Expect qmp_capabilities and return an empty response
92
    conn.recv(4096)
93
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
94

    
95
    while True:
96
      # We ignore the expected message, as the purpose of this object is not
97
      # to verify the correctness of the communication but to act as a
98
      # partner for the SUT (System Under Test, that is QmpConnection)
99
      msg = conn.recv(4096)
100
      if not msg:
101
        break
102

    
103
      if not self.script:
104
        break
105
      response = self.script.pop(0)
106
      if isinstance(response, str):
107
        conn.send(response)
108
      elif isinstance(response, list):
109
        for chunk in response:
110
          conn.send(chunk)
111
      else:
112
        raise errors.ProgrammerError("Unknown response type for %s" % response)
113

    
114
    conn.close()
115

    
116
  def encode_string(self, message):
117
    return (serializer.DumpJson(message) +
118
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
119

    
120

    
121
class TestQmpMessage(testutils.GanetiTestCase):
122
  def testSerialization(self):
123
    test_data = {
124
      "execute": "command",
125
      "arguments": ["a", "b", "c"],
126
      }
127
    message = hv_kvm.QmpMessage(test_data)
128

    
129
    for k, v in test_data.items():
130
      self.assertEqual(message[k], v)
131

    
132
    serialized = str(message)
133
    self.assertEqual(len(serialized.splitlines()), 1,
134
                     msg="Got multi-line message")
135

    
136
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
137
    self.assertEqual(rebuilt_message, message)
138

    
139

    
140
class TestQmp(testutils.GanetiTestCase):
141
  def testQmp(self):
142
    requests = [
143
      {"execute": "query-kvm", "arguments": []},
144
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
145
      {"execute": "query-status", "arguments": []},
146
      {"execute": "query-name", "arguments": []},
147
      ]
148

    
149
    server_responses = [
150
      # One message, one send()
151
      '{"return": {"enabled": true, "present": true}}\r\n',
152

    
153
      # Message sent using multiple send()
154
      ['{"retur', 'n": {}}\r\n'],
155

    
156
      # Multiple messages sent using one send()
157
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
158
      '{"return": {"running": true, "singlestep": false}}\r\n',
159
      ]
160

    
161
    expected_responses = [
162
      {"return": {"enabled": True, "present": True}},
163
      {"return": {}},
164
      {"return": [{"name": "quit"}, {"name": "eject"}]},
165
      {"return": {"running": True, "singlestep": False}},
166
      ]
167

    
168
    # Set up the stub
169
    socket_file = tempfile.NamedTemporaryFile()
170
    os.remove(socket_file.name)
171
    qmp_stub = QmpStub(socket_file.name, server_responses)
172
    qmp_stub.start()
173

    
174
    # Set up the QMP connection
175
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
176
    qmp_connection.connect()
177

    
178
    # Format the script
179
    for request, expected_response in zip(requests, expected_responses):
180
      response = qmp_connection.Execute(request)
181
      msg = hv_kvm.QmpMessage(expected_response)
182
      self.assertEqual(len(str(msg).splitlines()), 1,
183
                       msg="Got multi-line message")
184
      self.assertEqual(response, msg)
185

    
186

    
187
class TestConsole(unittest.TestCase):
188
  def _Test(self, instance, node, hvparams):
189
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, node, hvparams, {})
190
    self.assertTrue(cons.Validate())
191
    return cons
192

    
193
  def testSerial(self):
194
    instance = objects.Instance(name="kvm.example.com",
195
                                primary_node="node6017-uuid")
196
    node = objects.Node(name="node6017", uuid="node6017-uuid")
197
    hvparams = {
198
      constants.HV_SERIAL_CONSOLE: True,
199
      constants.HV_VNC_BIND_ADDRESS: None,
200
      constants.HV_KVM_SPICE_BIND: None,
201
      }
202
    cons = self._Test(instance, node, hvparams)
203
    self.assertEqual(cons.kind, constants.CONS_SSH)
204
    self.assertEqual(cons.host, node.name)
205
    self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
206
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
207

    
208
  def testVnc(self):
209
    instance = objects.Instance(name="kvm.example.com",
210
                                primary_node="node7235-uuid",
211
                                network_port=constants.VNC_BASE_PORT + 10)
212
    node = objects.Node(name="node7235", uuid="node7235-uuid")
213
    hvparams = {
214
      constants.HV_SERIAL_CONSOLE: False,
215
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
216
      constants.HV_KVM_SPICE_BIND: None,
217
      }
218
    cons = self._Test(instance, node, hvparams)
219
    self.assertEqual(cons.kind, constants.CONS_VNC)
220
    self.assertEqual(cons.host, "192.0.2.1")
221
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
222
    self.assertEqual(cons.display, 10)
223

    
224
  def testSpice(self):
225
    instance = objects.Instance(name="kvm.example.com",
226
                                primary_node="node7235",
227
                                network_port=11000)
228
    node = objects.Node(name="node7235", uuid="node7235-uuid")
229
    hvparams = {
230
      constants.HV_SERIAL_CONSOLE: False,
231
      constants.HV_VNC_BIND_ADDRESS: None,
232
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
233
      }
234
    cons = self._Test(instance, node, hvparams)
235
    self.assertEqual(cons.kind, constants.CONS_SPICE)
236
    self.assertEqual(cons.host, "192.0.2.1")
237
    self.assertEqual(cons.port, 11000)
238

    
239
  def testNoConsole(self):
240
    instance = objects.Instance(name="kvm.example.com",
241
                                primary_node="node24325",
242
                                network_port=0)
243
    node = objects.Node(name="node24325", uuid="node24325-uuid")
244
    hvparams = {
245
      constants.HV_SERIAL_CONSOLE: False,
246
      constants.HV_VNC_BIND_ADDRESS: None,
247
      constants.HV_KVM_SPICE_BIND: None,
248
      }
249
    cons = self._Test(instance, node, hvparams)
250
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
251

    
252

    
253
class TestVersionChecking(testutils.GanetiTestCase):
254
  def testParseVersion(self):
255
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
256
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
257
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
258
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
259
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
260
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
261
    self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
262
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
263
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
264
    self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
265
    self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
266

    
267

    
268
class TestSpiceParameterList(unittest.TestCase):
269
  def test(self):
270
    defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
271

    
272
    params = \
273
      compat.UniqueFrozenset(getattr(constants, name)
274
                             for name in dir(constants)
275
                             if name.startswith("HV_KVM_SPICE_"))
276

    
277
    # Parameters whose default value evaluates to True and don't need to be set
278
    defaults_true = frozenset(filter(defaults.__getitem__, params))
279

    
280
    self.assertEqual(defaults_true, frozenset([
281
      constants.HV_KVM_SPICE_AUDIO_COMPR,
282
      constants.HV_KVM_SPICE_USE_VDAGENT,
283
      constants.HV_KVM_SPICE_TLS_CIPHERS,
284
      ]))
285

    
286
    # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
287
    # their default evaluates to False
288
    assert constants.HV_KVM_SPICE_BIND in params
289
    assert constants.HV_KVM_SPICE_BIND not in defaults_true
290

    
291
    # Exclude some parameters
292
    params -= defaults_true | frozenset([
293
      constants.HV_KVM_SPICE_BIND,
294
      ])
295

    
296
    self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
297

    
298

    
299
class TestHelpRegexps(testutils.GanetiTestCase):
300
  def testBootRe(self):
301
    """Check _BOOT_RE
302

303
    It has too match -drive.*boot=on|off except if there is another dash-option
304
    at the beginning of the line.
305

306
    """
307
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
308
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
309
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
310
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
311
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
312
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
313
    help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")
314

    
315
    self.assertTrue(boot_re.search(help_091))
316
    self.assertTrue(boot_re.search(help_0125))
317
    self.assertFalse(boot_re.search(help_091_fake))
318
    self.assertFalse(boot_re.search(help_112))
319
    self.assertFalse(boot_re.search(help_10))
320
    self.assertFalse(boot_re.search(help_01590))
321

    
322

    
323
class TestGetTunFeatures(unittest.TestCase):
324
  def testWrongIoctl(self):
325
    tmpfile = tempfile.NamedTemporaryFile()
326
    # A file does not have the right ioctls, so this must always fail
327
    result = hv_kvm._GetTunFeatures(tmpfile.fileno())
328
    self.assertTrue(result is None)
329

    
330
  def _FakeIoctl(self, features, fd, request, buf):
331
    self.assertEqual(request, hv_kvm.TUNGETFEATURES)
332

    
333
    (reqno, ) = struct.unpack("I", buf)
334
    self.assertEqual(reqno, 0)
335

    
336
    return struct.pack("I", features)
337

    
338
  def test(self):
339
    tmpfile = tempfile.NamedTemporaryFile()
340
    fd = tmpfile.fileno()
341

    
342
    for features in [0, hv_kvm.IFF_VNET_HDR]:
343
      fn = compat.partial(self._FakeIoctl, features)
344
      result = hv_kvm._GetTunFeatures(fd, _ioctl=fn)
345
      self.assertEqual(result, features)
346

    
347

    
348
class TestProbeTapVnetHdr(unittest.TestCase):
349
  def _FakeTunFeatures(self, expected_fd, flags, fd):
350
    self.assertEqual(fd, expected_fd)
351
    return flags
352

    
353
  def test(self):
354
    tmpfile = tempfile.NamedTemporaryFile()
355
    fd = tmpfile.fileno()
356

    
357
    for flags in [0, hv_kvm.IFF_VNET_HDR]:
358
      fn = compat.partial(self._FakeTunFeatures, fd, flags)
359

    
360
      result = hv_kvm._ProbeTapVnetHdr(fd, _features_fn=fn)
361
      if flags == 0:
362
        self.assertFalse(result)
363
      else:
364
        self.assertTrue(result)
365

    
366
  def testUnsupported(self):
367
    tmpfile = tempfile.NamedTemporaryFile()
368
    fd = tmpfile.fileno()
369

    
370
    self.assertFalse(hv_kvm._ProbeTapVnetHdr(fd, _features_fn=lambda _: None))
371

    
372

    
373
if __name__ == "__main__":
374
  testutils.GanetiTestProgram()