Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.hypervisor.hv_kvm_unittest.py @ 51129a7f

History | View | Annotate | Download (7.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29

    
30
from ganeti import serializer
31
from ganeti import constants
32
from ganeti import compat
33
from ganeti import objects
34
from ganeti import errors
35

    
36
from ganeti.hypervisor import hv_kvm
37

    
38
import testutils
39

    
40

    
41
class QmpStub(threading.Thread):
42
  """Stub for a QMP endpoint for a KVM instance
43

44
  """
45
  _QMP_BANNER_DATA = {
46
    "QMP": {
47
      "version": {
48
        "package": "",
49
        "qemu": {
50
          "micro": 50,
51
          "minor": 13,
52
          "major": 0,
53
          },
54
        "capabilities": [],
55
        },
56
      }
57
    }
58
  _EMPTY_RESPONSE = {
59
    "return": [],
60
    }
61

    
62
  def __init__(self, socket_filename, server_responses):
63
    """Creates a QMP stub
64

65
    @type socket_filename: string
66
    @param socket_filename: filename of the UNIX socket that will be created
67
                            this class and used for the communication
68
    @type server_responses: list
69
    @param server_responses: list of responses that the server sends in response
70
                             to whatever it receives
71
    """
72
    threading.Thread.__init__(self)
73
    self.socket_filename = socket_filename
74
    self.script = server_responses
75

    
76
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
77
    self.socket.bind(self.socket_filename)
78
    self.socket.listen(1)
79

    
80
  def run(self):
81
    # Hypothesis: the messages we receive contain only a complete QMP message
82
    # encoded in JSON.
83
    conn, addr = self.socket.accept()
84

    
85
    # Send the banner as the first thing
86
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
87

    
88
    # Expect qmp_capabilities and return an empty response
89
    conn.recv(4096)
90
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
91

    
92
    while True:
93
      # We ignore the expected message, as the purpose of this object is not
94
      # to verify the correctness of the communication but to act as a
95
      # partner for the SUT (System Under Test, that is QmpConnection)
96
      msg = conn.recv(4096)
97
      if not msg:
98
        break
99

    
100
      if not self.script:
101
        break
102
      response = self.script.pop(0)
103
      if isinstance(response, str):
104
        conn.send(response)
105
      elif isinstance(response, list):
106
        for chunk in response:
107
          conn.send(chunk)
108
      else:
109
        raise errors.ProgrammerError("Unknown response type for %s" % response)
110

    
111
    conn.close()
112

    
113
  def encode_string(self, message):
114
    return (serializer.DumpJson(message) +
115
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
116

    
117

    
118
class TestQmpMessage(testutils.GanetiTestCase):
119
  def testSerialization(self):
120
    test_data = {
121
      "execute": "command",
122
      "arguments": ["a", "b", "c"],
123
      }
124
    message = hv_kvm.QmpMessage(test_data)
125

    
126
    for k, v in test_data.items():
127
      self.assertEqual(message[k], v)
128

    
129
    serialized = str(message)
130
    self.assertEqual(len(serialized.splitlines()), 1,
131
                     msg="Got multi-line message")
132

    
133
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
134
    self.assertEqual(rebuilt_message, message)
135

    
136

    
137
class TestQmp(testutils.GanetiTestCase):
138
  def testQmp(self):
139
    requests = [
140
      {"execute": "query-kvm", "arguments": []},
141
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
142
      {"execute": "query-status", "arguments": []},
143
      {"execute": "query-name", "arguments": []},
144
      ]
145

    
146
    server_responses = [
147
      # One message, one send()
148
      '{"return": {"enabled": true, "present": true}}\r\n',
149

    
150
      # Message sent using multiple send()
151
      ['{"retur', 'n": {}}\r\n'],
152

    
153
      # Multiple messages sent using one send()
154
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
155
      '{"return": {"running": true, "singlestep": false}}\r\n',
156
      ]
157

    
158
    expected_responses = [
159
      {"return": {"enabled": True, "present": True}},
160
      {"return": {}},
161
      {"return": [{"name": "quit"}, {"name": "eject"}]},
162
      {"return": {"running": True, "singlestep": False}},
163
      ]
164

    
165
    # Set up the stub
166
    socket_file = tempfile.NamedTemporaryFile()
167
    os.remove(socket_file.name)
168
    qmp_stub = QmpStub(socket_file.name, server_responses)
169
    qmp_stub.start()
170

    
171
    # Set up the QMP connection
172
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
173
    qmp_connection.connect()
174

    
175
    # Format the script
176
    for request, expected_response in zip(requests, expected_responses):
177
      response = qmp_connection.Execute(request)
178
      msg = hv_kvm.QmpMessage(expected_response)
179
      self.assertEqual(len(str(msg).splitlines()), 1,
180
                       msg="Got multi-line message")
181
      self.assertEqual(response, msg)
182

    
183

    
184
class TestConsole(unittest.TestCase):
185
  def _Test(self, instance, hvparams):
186
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
187
    self.assertTrue(cons.Validate())
188
    return cons
189

    
190
  def testSerial(self):
191
    instance = objects.Instance(name="kvm.example.com",
192
                                primary_node="node6017")
193
    hvparams = {
194
      constants.HV_SERIAL_CONSOLE: True,
195
      constants.HV_VNC_BIND_ADDRESS: None,
196
      constants.HV_KVM_SPICE_BIND: None,
197
      }
198
    cons = self._Test(instance, hvparams)
199
    self.assertEqual(cons.kind, constants.CONS_SSH)
200
    self.assertEqual(cons.host, instance.primary_node)
201
    self.assertEqual(cons.command[0], constants.KVM_CONSOLE_WRAPPER)
202
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
203

    
204
  def testVnc(self):
205
    instance = objects.Instance(name="kvm.example.com",
206
                                primary_node="node7235",
207
                                network_port=constants.VNC_BASE_PORT + 10)
208
    hvparams = {
209
      constants.HV_SERIAL_CONSOLE: False,
210
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
211
      constants.HV_KVM_SPICE_BIND: None,
212
      }
213
    cons = self._Test(instance, hvparams)
214
    self.assertEqual(cons.kind, constants.CONS_VNC)
215
    self.assertEqual(cons.host, "192.0.2.1")
216
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
217
    self.assertEqual(cons.display, 10)
218

    
219
  def testSpice(self):
220
    instance = objects.Instance(name="kvm.example.com",
221
                                primary_node="node7235",
222
                                network_port=11000)
223
    hvparams = {
224
      constants.HV_SERIAL_CONSOLE: False,
225
      constants.HV_VNC_BIND_ADDRESS: None,
226
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
227
      }
228
    cons = self._Test(instance, hvparams)
229
    self.assertEqual(cons.kind, constants.CONS_SPICE)
230
    self.assertEqual(cons.host, "192.0.2.1")
231
    self.assertEqual(cons.port, 11000)
232

    
233
  def testNoConsole(self):
234
    instance = objects.Instance(name="kvm.example.com",
235
                                primary_node="node24325",
236
                                network_port=0)
237
    hvparams = {
238
      constants.HV_SERIAL_CONSOLE: False,
239
      constants.HV_VNC_BIND_ADDRESS: None,
240
      constants.HV_KVM_SPICE_BIND: None,
241
      }
242
    cons = self._Test(instance, hvparams)
243
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
244

    
245

    
246
if __name__ == "__main__":
247
  testutils.GanetiTestProgram()