4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing the hypervisor.hv_kvm module"""
30 from ganeti import serializer
31 from ganeti import constants
32 from ganeti import compat
33 from ganeti import objects
34 from ganeti import errors
35 from ganeti import utils
36 from ganeti import pathutils
38 from ganeti.hypervisor import hv_kvm
43 class QmpStub(threading.Thread):
44 """Stub for a QMP endpoint for a KVM instance
64 def __init__(self, socket_filename, server_responses):
67 @type socket_filename: string
68 @param socket_filename: filename of the UNIX socket that will be created
69 this class and used for the communication
70 @type server_responses: list
71 @param server_responses: list of responses that the server sends in response
72 to whatever it receives
74 threading.Thread.__init__(self)
75 self.socket_filename = socket_filename
76 self.script = server_responses
78 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
79 self.socket.bind(self.socket_filename)
83 # Hypothesis: the messages we receive contain only a complete QMP message
85 conn, addr = self.socket.accept()
87 # Send the banner as the first thing
88 conn.send(self.encode_string(self._QMP_BANNER_DATA))
90 # Expect qmp_capabilities and return an empty response
92 conn.send(self.encode_string(self._EMPTY_RESPONSE))
95 # We ignore the expected message, as the purpose of this object is not
96 # to verify the correctness of the communication but to act as a
97 # partner for the SUT (System Under Test, that is QmpConnection)
104 response = self.script.pop(0)
105 if isinstance(response, str):
107 elif isinstance(response, list):
108 for chunk in response:
111 raise errors.ProgrammerError("Unknown response type for %s" % response)
115 def encode_string(self, message):
116 return (serializer.DumpJson(message) +
117 hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
120 class TestQmpMessage(testutils.GanetiTestCase):
121 def testSerialization(self):
123 "execute": "command",
124 "arguments": ["a", "b", "c"],
126 message = hv_kvm.QmpMessage(test_data)
128 for k, v in test_data.items():
129 self.assertEqual(message[k], v)
131 serialized = str(message)
132 self.assertEqual(len(serialized.splitlines()), 1,
133 msg="Got multi-line message")
135 rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
136 self.assertEqual(rebuilt_message, message)
139 class TestQmp(testutils.GanetiTestCase):
142 {"execute": "query-kvm", "arguments": []},
143 {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
144 {"execute": "query-status", "arguments": []},
145 {"execute": "query-name", "arguments": []},
149 # One message, one send()
150 '{"return": {"enabled": true, "present": true}}\r\n',
152 # Message sent using multiple send()
153 ['{"retur', 'n": {}}\r\n'],
155 # Multiple messages sent using one send()
156 '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
157 '{"return": {"running": true, "singlestep": false}}\r\n',
160 expected_responses = [
161 {"return": {"enabled": True, "present": True}},
163 {"return": [{"name": "quit"}, {"name": "eject"}]},
164 {"return": {"running": True, "singlestep": False}},
168 socket_file = tempfile.NamedTemporaryFile()
169 os.remove(socket_file.name)
170 qmp_stub = QmpStub(socket_file.name, server_responses)
173 # Set up the QMP connection
174 qmp_connection = hv_kvm.QmpConnection(socket_file.name)
175 qmp_connection.connect()
178 for request, expected_response in zip(requests, expected_responses):
179 response = qmp_connection.Execute(request)
180 msg = hv_kvm.QmpMessage(expected_response)
181 self.assertEqual(len(str(msg).splitlines()), 1,
182 msg="Got multi-line message")
183 self.assertEqual(response, msg)
186 class TestConsole(unittest.TestCase):
187 def _Test(self, instance, hvparams):
188 cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
189 self.assertTrue(cons.Validate())
192 def testSerial(self):
193 instance = objects.Instance(name="kvm.example.com",
194 primary_node="node6017")
196 constants.HV_SERIAL_CONSOLE: True,
197 constants.HV_VNC_BIND_ADDRESS: None,
198 constants.HV_KVM_SPICE_BIND: None,
200 cons = self._Test(instance, hvparams)
201 self.assertEqual(cons.kind, constants.CONS_SSH)
202 self.assertEqual(cons.host, instance.primary_node)
203 self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
204 self.assertEqual(cons.command[1], constants.SOCAT_PATH)
207 instance = objects.Instance(name="kvm.example.com",
208 primary_node="node7235",
209 network_port=constants.VNC_BASE_PORT + 10)
211 constants.HV_SERIAL_CONSOLE: False,
212 constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
213 constants.HV_KVM_SPICE_BIND: None,
215 cons = self._Test(instance, hvparams)
216 self.assertEqual(cons.kind, constants.CONS_VNC)
217 self.assertEqual(cons.host, "192.0.2.1")
218 self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
219 self.assertEqual(cons.display, 10)
222 instance = objects.Instance(name="kvm.example.com",
223 primary_node="node7235",
226 constants.HV_SERIAL_CONSOLE: False,
227 constants.HV_VNC_BIND_ADDRESS: None,
228 constants.HV_KVM_SPICE_BIND: "192.0.2.1",
230 cons = self._Test(instance, hvparams)
231 self.assertEqual(cons.kind, constants.CONS_SPICE)
232 self.assertEqual(cons.host, "192.0.2.1")
233 self.assertEqual(cons.port, 11000)
235 def testNoConsole(self):
236 instance = objects.Instance(name="kvm.example.com",
237 primary_node="node24325",
240 constants.HV_SERIAL_CONSOLE: False,
241 constants.HV_VNC_BIND_ADDRESS: None,
242 constants.HV_KVM_SPICE_BIND: None,
244 cons = self._Test(instance, hvparams)
245 self.assertEqual(cons.kind, constants.CONS_MESSAGE)
248 class TestVersionChecking(testutils.GanetiTestCase):
249 def testParseVersion(self):
250 parse = hv_kvm.KVMHypervisor._ParseKVMVersion
251 help_10 = utils.ReadFile(self._TestDataFilename("kvm_1.0_help.txt"))
252 help_01590 = utils.ReadFile(self._TestDataFilename("kvm_0.15.90_help.txt"))
253 help_0125 = utils.ReadFile(self._TestDataFilename("kvm_0.12.5_help.txt"))
254 help_091 = utils.ReadFile(self._TestDataFilename("kvm_0.9.1_help.txt"))
255 self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
256 self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
257 self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
258 self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
261 class TestSpiceParameterList(unittest.TestCase):
263 defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
266 compat.UniqueFrozenset(getattr(constants, name)
267 for name in dir(constants)
268 if name.startswith("HV_KVM_SPICE_"))
270 # Parameters whose default value evaluates to True and don't need to be set
271 defaults_true = frozenset(filter(defaults.__getitem__, params))
273 self.assertEqual(defaults_true, frozenset([
274 constants.HV_KVM_SPICE_AUDIO_COMPR,
275 constants.HV_KVM_SPICE_USE_VDAGENT,
276 constants.HV_KVM_SPICE_TLS_CIPHERS,
279 # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
280 # their default evaluates to False
281 assert constants.HV_KVM_SPICE_BIND in params
282 assert constants.HV_KVM_SPICE_BIND not in defaults_true
284 # Exclude some parameters
285 params -= defaults_true | frozenset([
286 constants.HV_KVM_SPICE_BIND,
289 self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
292 if __name__ == "__main__":
293 testutils.GanetiTestProgram()