Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.hypervisor.hv_xen_unittest.py @ 64a66bd2

History | View | Annotate | Download (9.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2011, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.hypervisor.hv_lxc"""
23

    
24
import string # pylint: disable=W0402
25
import unittest
26
import tempfile
27
import shutil
28

    
29
from ganeti import constants
30
from ganeti import objects
31
from ganeti import hypervisor
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import compat
35

    
36
from ganeti.hypervisor import hv_xen
37

    
38
import testutils
39

    
40

    
41
class TestConsole(unittest.TestCase):
42
  def test(self):
43
    for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
44
      instance = objects.Instance(name="xen.example.com",
45
                                  primary_node="node24828")
46
      cons = cls.GetInstanceConsole(instance, {}, {})
47
      self.assertTrue(cons.Validate())
48
      self.assertEqual(cons.kind, constants.CONS_SSH)
49
      self.assertEqual(cons.host, instance.primary_node)
50
      self.assertEqual(cons.command[-1], instance.name)
51

    
52

    
53
class TestCreateConfigCpus(unittest.TestCase):
54
  def testEmpty(self):
55
    for cpu_mask in [None, ""]:
56
      self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
57
                       "cpus = [  ]")
58

    
59
  def testAll(self):
60
    self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
61
                     None)
62

    
63
  def testOne(self):
64
    self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
65

    
66
  def testMultiple(self):
67
    self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
68
                     ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
69
                      constants.CPU_PINNING_ALL_XEN))
70

    
71

    
72
class TestParseXmList(testutils.GanetiTestCase):
73
  def test(self):
74
    data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
75

    
76
    # Exclude node
77
    self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
78

    
79
    # Include node
80
    result = hv_xen._ParseXmList(data.splitlines(), True)
81
    self.assertEqual(len(result), 1)
82
    self.assertEqual(len(result[0]), 6)
83

    
84
    # Name
85
    self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
86

    
87
    # ID
88
    self.assertEqual(result[0][1], 0)
89

    
90
    # Memory
91
    self.assertEqual(result[0][2], 1023)
92

    
93
    # VCPUs
94
    self.assertEqual(result[0][3], 1)
95

    
96
    # State
97
    self.assertEqual(result[0][4], "r-----")
98

    
99
    # Time
100
    self.assertAlmostEqual(result[0][5], 121152.6)
101

    
102
  def testWrongLineFormat(self):
103
    tests = [
104
      ["three fields only"],
105
      ["name InvalidID 128 1 r----- 12345"],
106
      ]
107

    
108
    for lines in tests:
109
      try:
110
        hv_xen._ParseXmList(["Header would be here"] + lines, False)
111
      except errors.HypervisorError, err:
112
        self.assertTrue("Can't parse output of xm list" in str(err))
113
      else:
114
        self.fail("Exception was not raised")
115

    
116

    
117
class TestGetXmList(testutils.GanetiTestCase):
118
  def _Fail(self):
119
    return utils.RunResult(constants.EXIT_FAILURE, None,
120
                           "stdout", "stderr", None,
121
                           NotImplemented, NotImplemented)
122

    
123
  def testTimeout(self):
124
    fn = testutils.CallCounter(self._Fail)
125
    try:
126
      hv_xen._GetXmList(fn, False, _timeout=0.1)
127
    except errors.HypervisorError, err:
128
      self.assertTrue("timeout exceeded" in str(err))
129
    else:
130
      self.fail("Exception was not raised")
131

    
132
    self.assertTrue(fn.Count() < 10,
133
                    msg="'xm list' was called too many times")
134

    
135
  def _Success(self, stdout):
136
    return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
137
                           NotImplemented, NotImplemented)
138

    
139
  def testSuccess(self):
140
    data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
141

    
142
    fn = testutils.CallCounter(compat.partial(self._Success, data))
143

    
144
    result = hv_xen._GetXmList(fn, True, _timeout=0.1)
145

    
146
    self.assertEqual(len(result), 4)
147

    
148
    self.assertEqual(map(compat.fst, result), [
149
      "Domain-0",
150
      "server01.example.com",
151
      "web3106215069.example.com",
152
      "testinstance.example.com",
153
      ])
154

    
155
    self.assertEqual(fn.Count(), 1)
156

    
157

    
158
class TestParseNodeInfo(testutils.GanetiTestCase):
159
  def testEmpty(self):
160
    self.assertEqual(hv_xen._ParseNodeInfo(""), {})
161

    
162
  def testUnknownInput(self):
163
    data = "\n".join([
164
      "foo bar",
165
      "something else goes",
166
      "here",
167
      ])
168
    self.assertEqual(hv_xen._ParseNodeInfo(data), {})
169

    
170
  def testBasicInfo(self):
171
    data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
172
    result = hv_xen._ParseNodeInfo(data)
173
    self.assertEqual(result, {
174
      "cpu_nodes": 1,
175
      "cpu_sockets": 2,
176
      "cpu_total": 4,
177
      "hv_version": (4, 0),
178
      "memory_free": 8004,
179
      "memory_total": 16378,
180
      })
181

    
182

    
183
class TestMergeInstanceInfo(testutils.GanetiTestCase):
184
  def testEmpty(self):
185
    self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
186

    
187
  def _FakeXmList(self, include_node):
188
    self.assertTrue(include_node)
189
    return [
190
      (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
191
       NotImplemented),
192
      ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
193
       NotImplemented),
194
      ]
195

    
196
  def testMissingNodeInfo(self):
197
    result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
198
    self.assertEqual(result, {
199
      "memory_dom0": 4096,
200
      "dom0_cpus": 7,
201
      })
202

    
203
  def testWithNodeInfo(self):
204
    info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
205
    result = hv_xen._GetNodeInfo(info, self._FakeXmList)
206
    self.assertEqual(result, {
207
      "cpu_nodes": 1,
208
      "cpu_sockets": 2,
209
      "cpu_total": 4,
210
      "dom0_cpus": 7,
211
      "hv_version": (4, 0),
212
      "memory_dom0": 4096,
213
      "memory_free": 8004,
214
      "memory_hv": 2230,
215
      "memory_total": 16378,
216
      })
217

    
218

    
219
class TestGetConfigFileDiskData(unittest.TestCase):
220
  def testLetterCount(self):
221
    self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
222

    
223
  def testNoDisks(self):
224
    self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
225

    
226
  def testManyDisks(self):
227
    for offset in [0, 1, 10]:
228
      disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
229
               for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
230

    
231
      if offset == 0:
232
        result = hv_xen._GetConfigFileDiskData(disks, "hd")
233
        self.assertEqual(result, [
234
          "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
235
          for idx in range(len(hv_xen._DISK_LETTERS) + offset)
236
          ])
237
      else:
238
        try:
239
          hv_xen._GetConfigFileDiskData(disks, "hd")
240
        except errors.HypervisorError, err:
241
          self.assertEqual(str(err), "Too many disks")
242
        else:
243
          self.fail("Exception was not raised")
244

    
245
  def testTwoLvDisksWithMode(self):
246
    disks = [
247
      (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
248
       "/tmp/diskFirst"),
249
      (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
250
       "/tmp/diskLast"),
251
      ]
252

    
253
    result = hv_xen._GetConfigFileDiskData(disks, "hd")
254
    self.assertEqual(result, [
255
      "'phy:/tmp/diskFirst,hda,w'",
256
      "'phy:/tmp/diskLast,hdb,r'",
257
      ])
258

    
259
  def testFileDisks(self):
260
    disks = [
261
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
262
                    physical_id=[constants.FD_LOOP]),
263
       "/tmp/diskFirst"),
264
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
265
                    physical_id=[constants.FD_BLKTAP]),
266
       "/tmp/diskTwo"),
267
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
268
                    physical_id=[constants.FD_LOOP]),
269
       "/tmp/diskThree"),
270
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
271
                    physical_id=[constants.FD_BLKTAP]),
272
       "/tmp/diskLast"),
273
      ]
274

    
275
    result = hv_xen._GetConfigFileDiskData(disks, "sd")
276
    self.assertEqual(result, [
277
      "'file:/tmp/diskFirst,sda,w'",
278
      "'tap:aio:/tmp/diskTwo,sdb,r'",
279
      "'file:/tmp/diskThree,sdc,w'",
280
      "'tap:aio:/tmp/diskLast,sdd,w'",
281
      ])
282

    
283
  def testInvalidFileDisk(self):
284
    disks = [
285
      (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
286
                    physical_id=["#unknown#"]),
287
       "/tmp/diskinvalid"),
288
      ]
289

    
290
    self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
291

    
292

    
293
class TestXenHypervisorUnknownCommand(unittest.TestCase):
294
  def test(self):
295
    cmd = "#unknown command#"
296
    self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
297
    hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
298
                              _run_cmd_fn=NotImplemented,
299
                              _cmd=cmd)
300
    self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
301

    
302

    
303
class TestXenHypervisorWriteConfigFile(unittest.TestCase):
304
  def setUp(self):
305
    self.tmpdir = tempfile.mkdtemp()
306

    
307
  def tearDown(self):
308
    shutil.rmtree(self.tmpdir)
309

    
310
  def testWriteError(self):
311
    cfgdir = utils.PathJoin(self.tmpdir, "foobar")
312

    
313
    hv = hv_xen.XenHypervisor(_cfgdir=cfgdir,
314
                              _run_cmd_fn=NotImplemented,
315
                              _cmd=NotImplemented)
316

    
317
    self.assertFalse(os.path.exists(cfgdir))
318

    
319
    try:
320
      hv._WriteConfigFile("name", "data")
321
    except errors.HypervisorError, err:
322
      self.assertTrue(str(err).startswith("Cannot write Xen instance"))
323
    else:
324
      self.fail("Exception was not raised")
325

    
326

    
327
if __name__ == "__main__":
328
  testutils.GanetiTestProgram()