utils.algo: Add a function to insert a list into a list
[ganeti-local] / test / ganeti.hypervisor.hv_kvm_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing the hypervisor.hv_kvm module"""
23
24 import threading
25 import tempfile
26 import unittest
27 import socket
28 import os
29
30 from ganeti import serializer
31 from ganeti import constants
32 from ganeti import compat
33 from ganeti import objects
34 from ganeti import errors
35
36 from ganeti.hypervisor import hv_kvm
37
38 import testutils
39
40
41 class QmpStub(threading.Thread):
42   """Stub for a QMP endpoint for a KVM instance
43
44   """
45   _QMP_BANNER_DATA = {"QMP": {"version": {
46                       "package": "",
47                       "qemu": {"micro": 50, "minor": 13, "major": 0},
48                       "capabilities": [],
49                       }}}
50   _EMPTY_RESPONSE = {"return": []}
51
52   def __init__(self, socket_filename, server_responses):
53     """Creates a QMP stub
54
55     @type socket_filename: string
56     @param socket_filename: filename of the UNIX socket that will be created
57                             this class and used for the communication
58     @type server_responses: list
59     @param server_responses: list of responses that the server sends in response
60                              to whatever it receives
61     """
62     threading.Thread.__init__(self)
63     self.socket_filename = socket_filename
64     self.script = server_responses
65
66     self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
67     self.socket.bind(self.socket_filename)
68     self.socket.listen(1)
69
70   def run(self):
71     # Hypothesis: the messages we receive contain only a complete QMP message
72     # encoded in JSON.
73     conn, addr = self.socket.accept()
74
75     # Send the banner as the first thing
76     conn.send(self.encode_string(self._QMP_BANNER_DATA))
77
78     # Expect qmp_capabilities and return an empty response
79     conn.recv(4096)
80     conn.send(self.encode_string(self._EMPTY_RESPONSE))
81
82     while True:
83       # We ignore the expected message, as the purpose of this object is not
84       # to verify the correctness of the communication but to act as a
85       # partner for the SUT (System Under Test, that is QmpConnection)
86       msg = conn.recv(4096)
87       if not msg:
88         break
89
90       if not self.script:
91         break
92       response = self.script.pop(0)
93       if isinstance(response, str):
94         conn.send(response)
95       elif isinstance(response, list):
96         for chunk in response:
97           conn.send(chunk)
98       else:
99         raise errors.ProgrammerError("Unknown response type for %s" % response)
100
101     conn.close()
102
103   def encode_string(self, message):
104     return (serializer.DumpJson(message, indent=False) +
105             hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
106
107
108 class TestQmpMessage(testutils.GanetiTestCase):
109   def testSerialization(self):
110     test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
111     message = hv_kvm.QmpMessage(test_data)
112
113     for k, v in test_data.items():
114       self.failUnless(message[k] == v)
115
116     rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
117     self.failUnless(rebuilt_message == message)
118
119
120 class TestQmp(testutils.GanetiTestCase):
121   def testQmp(self):
122     requests = [
123       {"execute": "query-kvm", "arguments": []},
124       {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
125       {"execute": "query-status", "arguments": []},
126       {"execute": "query-name", "arguments": []},
127       ]
128
129     server_responses = [
130       # One message, one send()
131       '{"return": {"enabled": true, "present": true}}\r\n',
132
133       # Message sent using multiple send()
134       ['{"retur', 'n": {}}\r\n'],
135
136       # Multiple messages sent using one send()
137       '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
138       '{"return": {"running": true, "singlestep": false}}\r\n',
139       ]
140
141     expected_responses = [
142       {"return": {"enabled": True, "present": True}},
143       {"return": {}},
144       {"return": [{"name": "quit"}, {"name": "eject"}]},
145       {"return": {"running": True, "singlestep": False}},
146       ]
147
148     # Set up the stub
149     socket_file = tempfile.NamedTemporaryFile()
150     os.remove(socket_file.name)
151     qmp_stub = QmpStub(socket_file.name, server_responses)
152     qmp_stub.start()
153
154     # Set up the QMP connection
155     qmp_connection = hv_kvm.QmpConnection(socket_file.name)
156     qmp_connection.connect()
157
158     # Format the script
159     for request, expected_response in zip(requests, expected_responses):
160       response = qmp_connection.Execute(request)
161       self.failUnless(response == hv_kvm.QmpMessage(expected_response))
162
163
164 class TestConsole(unittest.TestCase):
165   def _Test(self, instance, hvparams):
166     cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
167     self.assertTrue(cons.Validate())
168     return cons
169
170   def testSerial(self):
171     instance = objects.Instance(name="kvm.example.com",
172                                 primary_node="node6017")
173     hvparams = {
174       constants.HV_SERIAL_CONSOLE: True,
175       constants.HV_VNC_BIND_ADDRESS: None,
176       constants.HV_KVM_SPICE_BIND: None,
177       }
178     cons = self._Test(instance, hvparams)
179     self.assertEqual(cons.kind, constants.CONS_SSH)
180     self.assertEqual(cons.host, instance.primary_node)
181     self.assertEqual(cons.command[0], constants.KVM_CONSOLE_WRAPPER)
182     self.assertEqual(cons.command[1], constants.SOCAT_PATH)
183
184   def testVnc(self):
185     instance = objects.Instance(name="kvm.example.com",
186                                 primary_node="node7235",
187                                 network_port=constants.VNC_BASE_PORT + 10)
188     hvparams = {
189       constants.HV_SERIAL_CONSOLE: False,
190       constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
191       constants.HV_KVM_SPICE_BIND: None,
192       }
193     cons = self._Test(instance, hvparams)
194     self.assertEqual(cons.kind, constants.CONS_VNC)
195     self.assertEqual(cons.host, "192.0.2.1")
196     self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
197     self.assertEqual(cons.display, 10)
198
199   def testSpice(self):
200     instance = objects.Instance(name="kvm.example.com",
201                                 primary_node="node7235",
202                                 network_port=11000)
203     hvparams = {
204       constants.HV_SERIAL_CONSOLE: False,
205       constants.HV_VNC_BIND_ADDRESS: None,
206       constants.HV_KVM_SPICE_BIND: "192.0.2.1",
207       }
208     cons = self._Test(instance, hvparams)
209     self.assertEqual(cons.kind, constants.CONS_SPICE)
210     self.assertEqual(cons.host, "192.0.2.1")
211     self.assertEqual(cons.port, 11000)
212
213   def testNoConsole(self):
214     instance = objects.Instance(name="kvm.example.com",
215                                 primary_node="node24325",
216                                 network_port=0)
217     hvparams = {
218       constants.HV_SERIAL_CONSOLE: False,
219       constants.HV_VNC_BIND_ADDRESS: None,
220       constants.HV_KVM_SPICE_BIND: None,
221       }
222     cons = self._Test(instance, hvparams)
223     self.assertEqual(cons.kind, constants.CONS_MESSAGE)
224
225
226 if __name__ == "__main__":
227   testutils.GanetiTestProgram()