Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.hypervisor.hv_kvm_unittest.py @ a182a3ed

History | View | Annotate | Download (7.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing the hypervisor.hv_kvm module"""
23

    
24
import threading
25
import tempfile
26
import unittest
27
import socket
28
import os
29

    
30
from ganeti import serializer
31
from ganeti import constants
32
from ganeti import compat
33
from ganeti import objects
34
from ganeti import errors
35

    
36
from ganeti.hypervisor import hv_kvm
37

    
38
import testutils
39

    
40

    
41
class QmpStub(threading.Thread):
42
  """Stub for a QMP endpoint for a KVM instance
43

44
  """
45
  _QMP_BANNER_DATA = {"QMP": {"version": {
46
                      "package": "",
47
                      "qemu": {"micro": 50, "minor": 13, "major": 0},
48
                      "capabilities": [],
49
                      }}}
50
  _EMPTY_RESPONSE = {"return": []}
51

    
52
  def __init__(self, socket_filename, server_responses):
53
    """Creates a QMP stub
54

55
    @type socket_filename: string
56
    @param socket_filename: filename of the UNIX socket that will be created
57
                            this class and used for the communication
58
    @type server_responses: list
59
    @param server_responses: list of responses that the server sends in response
60
                             to whatever it receives
61
    """
62
    threading.Thread.__init__(self)
63
    self.socket_filename = socket_filename
64
    self.script = server_responses
65

    
66
    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
67
    self.socket.bind(self.socket_filename)
68
    self.socket.listen(1)
69

    
70
  def run(self):
71
    # Hypothesis: the messages we receive contain only a complete QMP message
72
    # encoded in JSON.
73
    conn, addr = self.socket.accept()
74

    
75
    # Send the banner as the first thing
76
    conn.send(self.encode_string(self._QMP_BANNER_DATA))
77

    
78
    # Expect qmp_capabilities and return an empty response
79
    conn.recv(4096)
80
    conn.send(self.encode_string(self._EMPTY_RESPONSE))
81

    
82
    while True:
83
      # We ignore the expected message, as the purpose of this object is not
84
      # to verify the correctness of the communication but to act as a
85
      # partner for the SUT (System Under Test, that is QmpConnection)
86
      msg = conn.recv(4096)
87
      if not msg:
88
        break
89

    
90
      if not self.script:
91
        break
92
      response = self.script.pop(0)
93
      if isinstance(response, str):
94
        conn.send(response)
95
      elif isinstance(response, list):
96
        for chunk in response:
97
          conn.send(chunk)
98
      else:
99
        raise errors.ProgrammerError("Unknown response type for %s" % response)
100

    
101
    conn.close()
102

    
103
  def encode_string(self, message):
104
    return (serializer.DumpJson(message) +
105
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
106

    
107

    
108
class TestQmpMessage(testutils.GanetiTestCase):
109
  def testSerialization(self):
110
    test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
111
    message = hv_kvm.QmpMessage(test_data)
112

    
113
    for k, v in test_data.items():
114
      self.failUnless(message[k] == v)
115

    
116
    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
117
    self.failUnless(rebuilt_message == message)
118

    
119

    
120
class TestQmp(testutils.GanetiTestCase):
121
  def testQmp(self):
122
    requests = [
123
      {"execute": "query-kvm", "arguments": []},
124
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
125
      {"execute": "query-status", "arguments": []},
126
      {"execute": "query-name", "arguments": []},
127
      ]
128

    
129
    server_responses = [
130
      # One message, one send()
131
      '{"return": {"enabled": true, "present": true}}\r\n',
132

    
133
      # Message sent using multiple send()
134
      ['{"retur', 'n": {}}\r\n'],
135

    
136
      # Multiple messages sent using one send()
137
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
138
      '{"return": {"running": true, "singlestep": false}}\r\n',
139
      ]
140

    
141
    expected_responses = [
142
      {"return": {"enabled": True, "present": True}},
143
      {"return": {}},
144
      {"return": [{"name": "quit"}, {"name": "eject"}]},
145
      {"return": {"running": True, "singlestep": False}},
146
      ]
147

    
148
    # Set up the stub
149
    socket_file = tempfile.NamedTemporaryFile()
150
    os.remove(socket_file.name)
151
    qmp_stub = QmpStub(socket_file.name, server_responses)
152
    qmp_stub.start()
153

    
154
    # Set up the QMP connection
155
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
156
    qmp_connection.connect()
157

    
158
    # Format the script
159
    for request, expected_response in zip(requests, expected_responses):
160
      response = qmp_connection.Execute(request)
161
      self.failUnless(response == hv_kvm.QmpMessage(expected_response))
162

    
163

    
164
class TestConsole(unittest.TestCase):
165
  def _Test(self, instance, hvparams):
166
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
167
    self.assertTrue(cons.Validate())
168
    return cons
169

    
170
  def testSerial(self):
171
    instance = objects.Instance(name="kvm.example.com",
172
                                primary_node="node6017")
173
    hvparams = {
174
      constants.HV_SERIAL_CONSOLE: True,
175
      constants.HV_VNC_BIND_ADDRESS: None,
176
      constants.HV_KVM_SPICE_BIND: None,
177
      }
178
    cons = self._Test(instance, hvparams)
179
    self.assertEqual(cons.kind, constants.CONS_SSH)
180
    self.assertEqual(cons.host, instance.primary_node)
181
    self.assertEqual(cons.command[0], constants.KVM_CONSOLE_WRAPPER)
182
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)
183

    
184
  def testVnc(self):
185
    instance = objects.Instance(name="kvm.example.com",
186
                                primary_node="node7235",
187
                                network_port=constants.VNC_BASE_PORT + 10)
188
    hvparams = {
189
      constants.HV_SERIAL_CONSOLE: False,
190
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
191
      constants.HV_KVM_SPICE_BIND: None,
192
      }
193
    cons = self._Test(instance, hvparams)
194
    self.assertEqual(cons.kind, constants.CONS_VNC)
195
    self.assertEqual(cons.host, "192.0.2.1")
196
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
197
    self.assertEqual(cons.display, 10)
198

    
199
  def testSpice(self):
200
    instance = objects.Instance(name="kvm.example.com",
201
                                primary_node="node7235",
202
                                network_port=11000)
203
    hvparams = {
204
      constants.HV_SERIAL_CONSOLE: False,
205
      constants.HV_VNC_BIND_ADDRESS: None,
206
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
207
      }
208
    cons = self._Test(instance, hvparams)
209
    self.assertEqual(cons.kind, constants.CONS_SPICE)
210
    self.assertEqual(cons.host, "192.0.2.1")
211
    self.assertEqual(cons.port, 11000)
212

    
213
  def testNoConsole(self):
214
    instance = objects.Instance(name="kvm.example.com",
215
                                primary_node="node24325",
216
                                network_port=0)
217
    hvparams = {
218
      constants.HV_SERIAL_CONSOLE: False,
219
      constants.HV_VNC_BIND_ADDRESS: None,
220
      constants.HV_KVM_SPICE_BIND: None,
221
      }
222
    cons = self._Test(instance, hvparams)
223
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)
224

    
225

    
226
if __name__ == "__main__":
227
  testutils.GanetiTestProgram()