Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_xen_unittest.py @ 1dee2041

History | View | Annotate | Download (9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2011, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.hypervisor.hv_lxc"""
23

    
24
import string # pylint: disable=W0402
25
import unittest
26

    
27
from ganeti import constants
28
from ganeti import objects
29
from ganeti import hypervisor
30
from ganeti import utils
31
from ganeti import errors
32
from ganeti import compat
33

    
34
from ganeti.hypervisor import hv_xen
35

    
36
import testutils
37

    
38

    
39
class TestConsole(unittest.TestCase):
40
  def test(self):
41
    for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
42
      instance = objects.Instance(name="xen.example.com",
43
                                  primary_node="node24828")
44
      cons = cls.GetInstanceConsole(instance, {}, {})
45
      self.assertTrue(cons.Validate())
46
      self.assertEqual(cons.kind, constants.CONS_SSH)
47
      self.assertEqual(cons.host, instance.primary_node)
48
      self.assertEqual(cons.command[-1], instance.name)
49

    
50

    
51
class TestCreateConfigCpus(unittest.TestCase):
52
  def testEmpty(self):
53
    for cpu_mask in [None, ""]:
54
      self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
55
                       "cpus = [  ]")
56

    
57
  def testAll(self):
58
    self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
59
                     None)
60

    
61
  def testOne(self):
62
    self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
63

    
64
  def testMultiple(self):
65
    self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
66
                     ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
67
                      constants.CPU_PINNING_ALL_XEN))
68

    
69

    
70
class TestParseXmList(testutils.GanetiTestCase):
71
  def test(self):
72
    data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
73

    
74
    # Exclude node
75
    self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
76

    
77
    # Include node
78
    result = hv_xen._ParseXmList(data.splitlines(), True)
79
    self.assertEqual(len(result), 1)
80
    self.assertEqual(len(result[0]), 6)
81

    
82
    # Name
83
    self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
84

    
85
    # ID
86
    self.assertEqual(result[0][1], 0)
87

    
88
    # Memory
89
    self.assertEqual(result[0][2], 1023)
90

    
91
    # VCPUs
92
    self.assertEqual(result[0][3], 1)
93

    
94
    # State
95
    self.assertEqual(result[0][4], "r-----")
96

    
97
    # Time
98
    self.assertAlmostEqual(result[0][5], 121152.6)
99

    
100
  def testWrongLineFormat(self):
101
    tests = [
102
      ["three fields only"],
103
      ["name InvalidID 128 1 r----- 12345"],
104
      ]
105

    
106
    for lines in tests:
107
      try:
108
        hv_xen._ParseXmList(["Header would be here"] + lines, False)
109
      except errors.HypervisorError, err:
110
        self.assertTrue("Can't parse output of xm list" in str(err))
111
      else:
112
        self.fail("Exception was not raised")
113

    
114

    
115
class TestGetXmList(testutils.GanetiTestCase):
116
  def _Fail(self):
117
    return utils.RunResult(constants.EXIT_FAILURE, None,
118
                           "stdout", "stderr", None,
119
                           NotImplemented, NotImplemented)
120

    
121
  def testTimeout(self):
122
    fn = testutils.CallCounter(self._Fail)
123
    try:
124
      hv_xen._GetXmList(fn, False, _timeout=0.1)
125
    except errors.HypervisorError, err:
126
      self.assertTrue("timeout exceeded" in str(err))
127
    else:
128
      self.fail("Exception was not raised")
129

    
130
    self.assertTrue(fn.Count() < 10,
131
                    msg="'xm list' was called too many times")
132

    
133
  def _Success(self, stdout):
134
    return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
135
                           NotImplemented, NotImplemented)
136

    
137
  def testSuccess(self):
138
    data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
139

    
140
    fn = testutils.CallCounter(compat.partial(self._Success, data))
141

    
142
    result = hv_xen._GetXmList(fn, True, _timeout=0.1)
143

    
144
    self.assertEqual(len(result), 4)
145

    
146
    self.assertEqual(map(compat.fst, result), [
147
      "Domain-0",
148
      "server01.example.com",
149
      "web3106215069.example.com",
150
      "testinstance.example.com",
151
      ])
152

    
153
    self.assertEqual(fn.Count(), 1)
154

    
155

    
156
class TestParseNodeInfo(testutils.GanetiTestCase):
157
  def testEmpty(self):
158
    self.assertEqual(hv_xen._ParseNodeInfo(""), {})
159

    
160
  def testUnknownInput(self):
161
    data = "\n".join([
162
      "foo bar",
163
      "something else goes",
164
      "here",
165
      ])
166
    self.assertEqual(hv_xen._ParseNodeInfo(data), {})
167

    
168
  def testBasicInfo(self):
169
    data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
170
    result = hv_xen._ParseNodeInfo(data)
171
    self.assertEqual(result, {
172
      "cpu_nodes": 1,
173
      "cpu_sockets": 2,
174
      "cpu_total": 4,
175
      "hv_version": (4, 0),
176
      "memory_free": 8004,
177
      "memory_total": 16378,
178
      })
179

    
180

    
181
class TestMergeInstanceInfo(testutils.GanetiTestCase):
182
  def testEmpty(self):
183
    self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
184

    
185
  def _FakeXmList(self, include_node):
186
    self.assertTrue(include_node)
187
    return [
188
      (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
189
       NotImplemented),
190
      ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
191
       NotImplemented),
192
      ]
193

    
194
  def testMissingNodeInfo(self):
195
    result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
196
    self.assertEqual(result, {
197
      "memory_dom0": 4096,
198
      "dom0_cpus": 7,
199
      })
200

    
201
  def testWithNodeInfo(self):
202
    info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
203
    result = hv_xen._GetNodeInfo(info, self._FakeXmList)
204
    self.assertEqual(result, {
205
      "cpu_nodes": 1,
206
      "cpu_sockets": 2,
207
      "cpu_total": 4,
208
      "dom0_cpus": 7,
209
      "hv_version": (4, 0),
210
      "memory_dom0": 4096,
211
      "memory_free": 8004,
212
      "memory_hv": 2230,
213
      "memory_total": 16378,
214
      })
215

    
216

    
217
class TestGetConfigFileDiskData(unittest.TestCase):
218
  def testLetterCount(self):
219
    self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
220

    
221
  def testNoDisks(self):
222
    self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
223

    
224
  def testManyDisks(self):
225
    for offset in [0, 1, 10]:
226
      disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
227
               for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
228

    
229
      if offset == 0:
230
        result = hv_xen._GetConfigFileDiskData(disks, "hd")
231
        self.assertEqual(result, [
232
          "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
233
          for idx in range(len(hv_xen._DISK_LETTERS) + offset)
234
          ])
235
      else:
236
        try:
237
          hv_xen._GetConfigFileDiskData(disks, "hd")
238
        except errors.HypervisorError, err:
239
          self.assertEqual(str(err), "Too many disks")
240
        else:
241
          self.fail("Exception was not raised")
242

    
243
  def testTwoLvDisksWithMode(self):
244
    disks = [
245
      (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
246
       "/tmp/diskFirst"),
247
      (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
248
       "/tmp/diskLast"),
249
      ]
250

    
251
    result = hv_xen._GetConfigFileDiskData(disks, "hd")
252
    self.assertEqual(result, [
253
      "'phy:/tmp/diskFirst,hda,w'",
254
      "'phy:/tmp/diskLast,hdb,r'",
255
      ])
256

    
257
  def testFileDisks(self):
258
    disks = [
259
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
260
                    physical_id=[constants.FD_LOOP]),
261
       "/tmp/diskFirst"),
262
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
263
                    physical_id=[constants.FD_BLKTAP]),
264
       "/tmp/diskTwo"),
265
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
266
                    physical_id=[constants.FD_LOOP]),
267
       "/tmp/diskThree"),
268
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
269
                    physical_id=[constants.FD_BLKTAP]),
270
       "/tmp/diskLast"),
271
      ]
272

    
273
    result = hv_xen._GetConfigFileDiskData(disks, "sd")
274
    self.assertEqual(result, [
275
      "'file:/tmp/diskFirst,sda,w'",
276
      "'tap:aio:/tmp/diskTwo,sdb,r'",
277
      "'file:/tmp/diskThree,sdc,w'",
278
      "'tap:aio:/tmp/diskLast,sdd,w'",
279
      ])
280

    
281
  def testInvalidFileDisk(self):
282
    disks = [
283
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
284
                    physical_id=["#unknown#"]),
285
       "/tmp/diskinvalid"),
286
      ]
287

    
288
    self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
289

    
290

    
291
class TestXenHypervisorUnknownCommand(unittest.TestCase):
292
  def test(self):
293
    cmd = "#unknown command#"
294
    self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
295
    hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
296
                              _run_cmd_fn=NotImplemented,
297
                              _cmd=cmd)
298
    self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
299

    
300

    
301
if __name__ == "__main__":
302
  testutils.GanetiTestProgram()