Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_kvm_unittest.py @ 0ad7f5d8

History | View | Annotate | Download (10.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29

    
30
from ganeti import serializer
31
from ganeti import constants
32
from ganeti import compat
33
from ganeti import objects
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import pathutils
37

    
38
from ganeti.hypervisor import hv_kvm
39

    
40
import testutils
41

    
42

    
43
class QmpStub(threading.Thread):
44
  """Stub for a QMP endpoint for a KVM instance
45

46
  """
47
  _QMP_BANNER_DATA = {
48
    "QMP": {
49
      "version": {
50
        "package": "",
51
        "qemu": {
52
          "micro": 50,
53
          "minor": 13,
54
          "major": 0,
55
          },
56
        "capabilities": [],
57
        },
58
      }
59
    }
60
  _EMPTY_RESPONSE = {
61
    "return": [],
62
    }
63

    
64
  def __init__(self, socket_filename, server_responses):
65
    """Creates a QMP stub
66

67
    @type socket_filename: string
68
    @param socket_filename: filename of the UNIX socket that will be created
69
                            this class and used for the communication
70
    @type server_responses: list
71
    @param server_responses: list of responses that the server sends in response
72
                             to whatever it receives
73
    """
74
    threading.Thread.__init__(self)
75
    self.socket_filename = socket_filename
76
    self.script = server_responses
77

    
78
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
79
    self.socket.bind(self.socket_filename)
80
    self.socket.listen(1)
81

    
82
  def run(self):
83
    # Hypothesis: the messages we receive contain only a complete QMP message
84
    # encoded in JSON.
85
    conn, addr = self.socket.accept()
86

    
87
    # Send the banner as the first thing
88
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
89

    
90
    # Expect qmp_capabilities and return an empty response
91
    conn.recv(4096)
92
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
93

    
94
    while True:
95
      # We ignore the expected message, as the purpose of this object is not
96
      # to verify the correctness of the communication but to act as a
97
      # partner for the SUT (System Under Test, that is QmpConnection)
98
      msg = conn.recv(4096)
99
      if not msg:
100
        break
101

    
102
      if not self.script:
103
        break
104
      response = self.script.pop(0)
105
      if isinstance(response, str):
106
        conn.send(response)
107
      elif isinstance(response, list):
108
        for chunk in response:
109
          conn.send(chunk)
110
      else:
111
        raise errors.ProgrammerError("Unknown response type for %s" % response)
112

    
113
    conn.close()
114

    
115
  def encode_string(self, message):
116
    return (serializer.DumpJson(message) +
117
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
118

    
119

    
120
class TestQmpMessage(testutils.GanetiTestCase):
121
  def testSerialization(self):
122
    test_data = {
123
      "execute": "command",
124
      "arguments": ["a", "b", "c"],
125
      }
126
    message = hv_kvm.QmpMessage(test_data)
127

    
128
    for k, v in test_data.items():
129
      self.assertEqual(message[k], v)
130

    
131
    serialized = str(message)
132
    self.assertEqual(len(serialized.splitlines()), 1,
133
                     msg="Got multi-line message")
134

    
135
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
136
    self.assertEqual(rebuilt_message, message)
137

    
138

    
139
class TestQmp(testutils.GanetiTestCase):
140
  def testQmp(self):
141
    requests = [
142
      {"execute": "query-kvm", "arguments": []},
143
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
144
      {"execute": "query-status", "arguments": []},
145
      {"execute": "query-name", "arguments": []},
146
      ]
147

    
148
    server_responses = [
149
      # One message, one send()
150
      '{"return": {"enabled": true, "present": true}}\r\n',
151

    
152
      # Message sent using multiple send()
153
      ['{"retur', 'n": {}}\r\n'],
154

    
155
      # Multiple messages sent using one send()
156
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
157
      '{"return": {"running": true, "singlestep": false}}\r\n',
158
      ]
159

    
160
    expected_responses = [
161
      {"return": {"enabled": True, "present": True}},
162
      {"return": {}},
163
      {"return": [{"name": "quit"}, {"name": "eject"}]},
164
      {"return": {"running": True, "singlestep": False}},
165
      ]
166

    
167
    # Set up the stub
168
    socket_file = tempfile.NamedTemporaryFile()
169
    os.remove(socket_file.name)
170
    qmp_stub = QmpStub(socket_file.name, server_responses)
171
    qmp_stub.start()
172

    
173
    # Set up the QMP connection
174
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
175
    qmp_connection.connect()
176

    
177
    # Format the script
178
    for request, expected_response in zip(requests, expected_responses):
179
      response = qmp_connection.Execute(request)
180
      msg = hv_kvm.QmpMessage(expected_response)
181
      self.assertEqual(len(str(msg).splitlines()), 1,
182
                       msg="Got multi-line message")
183
      self.assertEqual(response, msg)
184

    
185

    
186
class TestConsole(unittest.TestCase):
187
  def _Test(self, instance, hvparams):
188
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
189
    self.assertTrue(cons.Validate())
190
    return cons
191

    
192
  def testSerial(self):
193
    instance = objects.Instance(name="kvm.example.com",
194
                                primary_node="node6017")
195
    hvparams = {
196
      constants.HV_SERIAL_CONSOLE: True,
197
      constants.HV_VNC_BIND_ADDRESS: None,
198
      constants.HV_KVM_SPICE_BIND: None,
199
      }
200
    cons = self._Test(instance, hvparams)
201
    self.assertEqual(cons.kind, constants.CONS_SSH)
202
    self.assertEqual(cons.host, instance.primary_node)
203
    self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
204
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
205

    
206
  def testVnc(self):
207
    instance = objects.Instance(name="kvm.example.com",
208
                                primary_node="node7235",
209
                                network_port=constants.VNC_BASE_PORT + 10)
210
    hvparams = {
211
      constants.HV_SERIAL_CONSOLE: False,
212
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
213
      constants.HV_KVM_SPICE_BIND: None,
214
      }
215
    cons = self._Test(instance, hvparams)
216
    self.assertEqual(cons.kind, constants.CONS_VNC)
217
    self.assertEqual(cons.host, "192.0.2.1")
218
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
219
    self.assertEqual(cons.display, 10)
220

    
221
  def testSpice(self):
222
    instance = objects.Instance(name="kvm.example.com",
223
                                primary_node="node7235",
224
                                network_port=11000)
225
    hvparams = {
226
      constants.HV_SERIAL_CONSOLE: False,
227
      constants.HV_VNC_BIND_ADDRESS: None,
228
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
229
      }
230
    cons = self._Test(instance, hvparams)
231
    self.assertEqual(cons.kind, constants.CONS_SPICE)
232
    self.assertEqual(cons.host, "192.0.2.1")
233
    self.assertEqual(cons.port, 11000)
234

    
235
  def testNoConsole(self):
236
    instance = objects.Instance(name="kvm.example.com",
237
                                primary_node="node24325",
238
                                network_port=0)
239
    hvparams = {
240
      constants.HV_SERIAL_CONSOLE: False,
241
      constants.HV_VNC_BIND_ADDRESS: None,
242
      constants.HV_KVM_SPICE_BIND: None,
243
      }
244
    cons = self._Test(instance, hvparams)
245
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
246

    
247

    
248
class TestVersionChecking(testutils.GanetiTestCase):
249
  def testParseVersion(self):
250
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
251
    help_10 = utils.ReadFile(self._TestDataFilename("kvm_1.0_help.txt"))
252
    help_01590 = utils.ReadFile(self._TestDataFilename("kvm_0.15.90_help.txt"))
253
    help_0125 = utils.ReadFile(self._TestDataFilename("kvm_0.12.5_help.txt"))
254
    help_091 = utils.ReadFile(self._TestDataFilename("kvm_0.9.1_help.txt"))
255
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
256
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
257
    self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
258
    self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
259

    
260

    
261
class TestSpiceParameterList(unittest.TestCase):
262
  def test(self):
263
    defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
264

    
265
    params = \
266
      compat.UniqueFrozenset(getattr(constants, name)
267
                             for name in dir(constants)
268
                             if name.startswith("HV_KVM_SPICE_"))
269

    
270
    # Parameters whose default value evaluates to True and don't need to be set
271
    defaults_true = frozenset(filter(defaults.__getitem__, params))
272

    
273
    self.assertEqual(defaults_true, frozenset([
274
      constants.HV_KVM_SPICE_AUDIO_COMPR,
275
      constants.HV_KVM_SPICE_USE_VDAGENT,
276
      constants.HV_KVM_SPICE_TLS_CIPHERS,
277
      ]))
278

    
279
    # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
280
    # their default evaluates to False
281
    assert constants.HV_KVM_SPICE_BIND in params
282
    assert constants.HV_KVM_SPICE_BIND not in defaults_true
283

    
284
    # Exclude some parameters
285
    params -= defaults_true | frozenset([
286
      constants.HV_KVM_SPICE_BIND,
287
      ])
288

    
289
    self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
290

    
291

    
292
class TestHelpRegexps(testutils.GanetiTestCase):
293
  def testBootRe(self):
294
    """Check _BOOT_RE
295

296
    It has too match -drive.*boot=on|off except if there is another dash-option
297
    at the beginning of the line.
298

299
    """
300
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
301
    help_10 = utils.ReadFile(self._TestDataFilename("kvm_1.0_help.txt"))
302
    help_01590 = utils.ReadFile(self._TestDataFilename("kvm_0.15.90_help.txt"))
303
    help_0125 = utils.ReadFile(self._TestDataFilename("kvm_0.12.5_help.txt"))
304
    help_091 = utils.ReadFile(self._TestDataFilename("kvm_0.9.1_help.txt"))
305

    
306
    self.assertTrue(boot_re.search(help_091))
307
    self.assertTrue(boot_re.search(help_0125))
308
    self.assertFalse(boot_re.search(help_10))
309
    self.assertFalse(boot_re.search(help_01590))
310

    
311

    
312
if __name__ == "__main__":
313
  testutils.GanetiTestProgram()