4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing the hypervisor.hv_kvm module"""
30 from ganeti import serializer
31 from ganeti import constants
32 from ganeti import compat
33 from ganeti import objects
34 from ganeti import errors
36 from ganeti.hypervisor import hv_kvm
41 class QmpStub(threading.Thread):
42 """Stub for a QMP endpoint for a KVM instance
45 _QMP_BANNER_DATA = {"QMP": {"version": {
47 "qemu": {"micro": 50, "minor": 13, "major": 0},
50 _EMPTY_RESPONSE = {"return": []}
52 def __init__(self, socket_filename, server_responses):
55 @type socket_filename: string
56 @param socket_filename: filename of the UNIX socket that will be created
57 this class and used for the communication
58 @type server_responses: list
59 @param server_responses: list of responses that the server sends in response
60 to whatever it receives
62 threading.Thread.__init__(self)
63 self.socket_filename = socket_filename
64 self.script = server_responses
66 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
67 self.socket.bind(self.socket_filename)
71 # Hypothesis: the messages we receive contain only a complete QMP message
73 conn, addr = self.socket.accept()
75 # Send the banner as the first thing
76 conn.send(self.encode_string(self._QMP_BANNER_DATA))
78 # Expect qmp_capabilities and return an empty response
80 conn.send(self.encode_string(self._EMPTY_RESPONSE))
83 # We ignore the expected message, as the purpose of this object is not
84 # to verify the correctness of the communication but to act as a
85 # partner for the SUT (System Under Test, that is QmpConnection)
92 response = self.script.pop(0)
93 if isinstance(response, str):
95 elif isinstance(response, list):
96 for chunk in response:
99 raise errors.ProgrammerError("Unknown response type for %s" % response)
103 def encode_string(self, message):
104 return (serializer.DumpJson(message, indent=False) +
105 hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
108 class TestQmpMessage(testutils.GanetiTestCase):
109 def testSerialization(self):
110 test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
111 message = hv_kvm.QmpMessage(test_data)
113 for k, v in test_data.items():
114 self.failUnless(message[k] == v)
116 rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
117 self.failUnless(rebuilt_message == message)
120 class TestQmp(testutils.GanetiTestCase):
123 {"execute": "query-kvm", "arguments": []},
124 {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
125 {"execute": "query-status", "arguments": []},
126 {"execute": "query-name", "arguments": []},
130 # One message, one send()
131 '{"return": {"enabled": true, "present": true}}\r\n',
133 # Message sent using multiple send()
134 ['{"retur', 'n": {}}\r\n'],
136 # Multiple messages sent using one send()
137 '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
138 '{"return": {"running": true, "singlestep": false}}\r\n',
141 expected_responses = [
142 {"return": {"enabled": True, "present": True}},
144 {"return": [{"name": "quit"}, {"name": "eject"}]},
145 {"return": {"running": True, "singlestep": False}},
149 socket_file = tempfile.NamedTemporaryFile()
150 os.remove(socket_file.name)
151 qmp_stub = QmpStub(socket_file.name, server_responses)
154 # Set up the QMP connection
155 qmp_connection = hv_kvm.QmpConnection(socket_file.name)
156 qmp_connection.connect()
159 for request, expected_response in zip(requests, expected_responses):
160 response = qmp_connection.Execute(request)
161 self.failUnless(response == hv_kvm.QmpMessage(expected_response))
164 class TestConsole(unittest.TestCase):
165 def _Test(self, instance, hvparams):
166 cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
167 self.assertTrue(cons.Validate())
170 def testSerial(self):
171 instance = objects.Instance(name="kvm.example.com",
172 primary_node="node6017")
174 constants.HV_SERIAL_CONSOLE: True,
175 constants.HV_VNC_BIND_ADDRESS: None,
176 constants.HV_KVM_SPICE_BIND: None,
178 cons = self._Test(instance, hvparams)
179 self.assertEqual(cons.kind, constants.CONS_SSH)
180 self.assertEqual(cons.host, instance.primary_node)
181 self.assertEqual(cons.command[0], constants.KVM_CONSOLE_WRAPPER)
182 self.assertEqual(cons.command[1], constants.SOCAT_PATH)
185 instance = objects.Instance(name="kvm.example.com",
186 primary_node="node7235",
187 network_port=constants.VNC_BASE_PORT + 10)
189 constants.HV_SERIAL_CONSOLE: False,
190 constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
191 constants.HV_KVM_SPICE_BIND: None,
193 cons = self._Test(instance, hvparams)
194 self.assertEqual(cons.kind, constants.CONS_VNC)
195 self.assertEqual(cons.host, "192.0.2.1")
196 self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
197 self.assertEqual(cons.display, 10)
200 instance = objects.Instance(name="kvm.example.com",
201 primary_node="node7235",
204 constants.HV_SERIAL_CONSOLE: False,
205 constants.HV_VNC_BIND_ADDRESS: None,
206 constants.HV_KVM_SPICE_BIND: "192.0.2.1",
208 cons = self._Test(instance, hvparams)
209 self.assertEqual(cons.kind, constants.CONS_SPICE)
210 self.assertEqual(cons.host, "192.0.2.1")
211 self.assertEqual(cons.port, 11000)
213 def testNoConsole(self):
214 instance = objects.Instance(name="kvm.example.com",
215 primary_node="node24325",
218 constants.HV_SERIAL_CONSOLE: False,
219 constants.HV_VNC_BIND_ADDRESS: None,
220 constants.HV_KVM_SPICE_BIND: None,
222 cons = self._Test(instance, hvparams)
223 self.assertEqual(cons.kind, constants.CONS_MESSAGE)
226 if __name__ == "__main__":
227 testutils.GanetiTestProgram()