fd05933c192bee6dc4cc7a5118c64b7638a20732
[ganeti-local] / test / py / ganeti.hypervisor.hv_xen_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.hypervisor.hv_lxc"""
23
24 import string # pylint: disable=W0402
25 import unittest
26 import tempfile
27 import shutil
28
29 from ganeti import constants
30 from ganeti import objects
31 from ganeti import hypervisor
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import compat
35
36 from ganeti.hypervisor import hv_xen
37
38 import testutils
39
40
41 class TestConsole(unittest.TestCase):
42   def test(self):
43     for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
44       instance = objects.Instance(name="xen.example.com",
45                                   primary_node="node24828")
46       cons = cls.GetInstanceConsole(instance, {}, {})
47       self.assertTrue(cons.Validate())
48       self.assertEqual(cons.kind, constants.CONS_SSH)
49       self.assertEqual(cons.host, instance.primary_node)
50       self.assertEqual(cons.command[-1], instance.name)
51
52
53 class TestCreateConfigCpus(unittest.TestCase):
54   def testEmpty(self):
55     for cpu_mask in [None, ""]:
56       self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
57                        "cpus = [  ]")
58
59   def testAll(self):
60     self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
61                      None)
62
63   def testOne(self):
64     self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
65
66   def testMultiple(self):
67     self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
68                      ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
69                       constants.CPU_PINNING_ALL_XEN))
70
71
72 class TestParseXmList(testutils.GanetiTestCase):
73   def test(self):
74     data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
75
76     # Exclude node
77     self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
78
79     # Include node
80     result = hv_xen._ParseXmList(data.splitlines(), True)
81     self.assertEqual(len(result), 1)
82     self.assertEqual(len(result[0]), 6)
83
84     # Name
85     self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
86
87     # ID
88     self.assertEqual(result[0][1], 0)
89
90     # Memory
91     self.assertEqual(result[0][2], 1023)
92
93     # VCPUs
94     self.assertEqual(result[0][3], 1)
95
96     # State
97     self.assertEqual(result[0][4], "r-----")
98
99     # Time
100     self.assertAlmostEqual(result[0][5], 121152.6)
101
102   def testWrongLineFormat(self):
103     tests = [
104       ["three fields only"],
105       ["name InvalidID 128 1 r----- 12345"],
106       ]
107
108     for lines in tests:
109       try:
110         hv_xen._ParseXmList(["Header would be here"] + lines, False)
111       except errors.HypervisorError, err:
112         self.assertTrue("Can't parse output of xm list" in str(err))
113       else:
114         self.fail("Exception was not raised")
115
116
117 class TestGetXmList(testutils.GanetiTestCase):
118   def _Fail(self):
119     return utils.RunResult(constants.EXIT_FAILURE, None,
120                            "stdout", "stderr", None,
121                            NotImplemented, NotImplemented)
122
123   def testTimeout(self):
124     fn = testutils.CallCounter(self._Fail)
125     try:
126       hv_xen._GetXmList(fn, False, _timeout=0.1)
127     except errors.HypervisorError, err:
128       self.assertTrue("timeout exceeded" in str(err))
129     else:
130       self.fail("Exception was not raised")
131
132     self.assertTrue(fn.Count() < 10,
133                     msg="'xm list' was called too many times")
134
135   def _Success(self, stdout):
136     return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
137                            NotImplemented, NotImplemented)
138
139   def testSuccess(self):
140     data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
141
142     fn = testutils.CallCounter(compat.partial(self._Success, data))
143
144     result = hv_xen._GetXmList(fn, True, _timeout=0.1)
145
146     self.assertEqual(len(result), 4)
147
148     self.assertEqual(map(compat.fst, result), [
149       "Domain-0",
150       "server01.example.com",
151       "web3106215069.example.com",
152       "testinstance.example.com",
153       ])
154
155     self.assertEqual(fn.Count(), 1)
156
157
158 class TestParseNodeInfo(testutils.GanetiTestCase):
159   def testEmpty(self):
160     self.assertEqual(hv_xen._ParseNodeInfo(""), {})
161
162   def testUnknownInput(self):
163     data = "\n".join([
164       "foo bar",
165       "something else goes",
166       "here",
167       ])
168     self.assertEqual(hv_xen._ParseNodeInfo(data), {})
169
170   def testBasicInfo(self):
171     data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
172     result = hv_xen._ParseNodeInfo(data)
173     self.assertEqual(result, {
174       "cpu_nodes": 1,
175       "cpu_sockets": 2,
176       "cpu_total": 4,
177       "hv_version": (4, 0),
178       "memory_free": 8004,
179       "memory_total": 16378,
180       })
181
182
183 class TestMergeInstanceInfo(testutils.GanetiTestCase):
184   def testEmpty(self):
185     self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
186
187   def _FakeXmList(self, include_node):
188     self.assertTrue(include_node)
189     return [
190       (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
191        NotImplemented),
192       ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
193        NotImplemented),
194       ]
195
196   def testMissingNodeInfo(self):
197     result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
198     self.assertEqual(result, {
199       "memory_dom0": 4096,
200       "dom0_cpus": 7,
201       })
202
203   def testWithNodeInfo(self):
204     info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
205     result = hv_xen._GetNodeInfo(info, self._FakeXmList)
206     self.assertEqual(result, {
207       "cpu_nodes": 1,
208       "cpu_sockets": 2,
209       "cpu_total": 4,
210       "dom0_cpus": 7,
211       "hv_version": (4, 0),
212       "memory_dom0": 4096,
213       "memory_free": 8004,
214       "memory_hv": 2230,
215       "memory_total": 16378,
216       })
217
218
219 class TestGetConfigFileDiskData(unittest.TestCase):
220   def testLetterCount(self):
221     self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
222
223   def testNoDisks(self):
224     self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
225
226   def testManyDisks(self):
227     for offset in [0, 1, 10]:
228       disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
229                for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
230
231       if offset == 0:
232         result = hv_xen._GetConfigFileDiskData(disks, "hd")
233         self.assertEqual(result, [
234           "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
235           for idx in range(len(hv_xen._DISK_LETTERS) + offset)
236           ])
237       else:
238         try:
239           hv_xen._GetConfigFileDiskData(disks, "hd")
240         except errors.HypervisorError, err:
241           self.assertEqual(str(err), "Too many disks")
242         else:
243           self.fail("Exception was not raised")
244
245   def testTwoLvDisksWithMode(self):
246     disks = [
247       (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
248        "/tmp/diskFirst"),
249       (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
250        "/tmp/diskLast"),
251       ]
252
253     result = hv_xen._GetConfigFileDiskData(disks, "hd")
254     self.assertEqual(result, [
255       "'phy:/tmp/diskFirst,hda,w'",
256       "'phy:/tmp/diskLast,hdb,r'",
257       ])
258
259   def testFileDisks(self):
260     disks = [
261       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
262                     physical_id=[constants.FD_LOOP]),
263        "/tmp/diskFirst"),
264       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
265                     physical_id=[constants.FD_BLKTAP]),
266        "/tmp/diskTwo"),
267       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
268                     physical_id=[constants.FD_LOOP]),
269        "/tmp/diskThree"),
270       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
271                     physical_id=[constants.FD_BLKTAP]),
272        "/tmp/diskLast"),
273       ]
274
275     result = hv_xen._GetConfigFileDiskData(disks, "sd")
276     self.assertEqual(result, [
277       "'file:/tmp/diskFirst,sda,w'",
278       "'tap:aio:/tmp/diskTwo,sdb,r'",
279       "'file:/tmp/diskThree,sdc,w'",
280       "'tap:aio:/tmp/diskLast,sdd,w'",
281       ])
282
283   def testInvalidFileDisk(self):
284     disks = [
285       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
286                     physical_id=["#unknown#"]),
287        "/tmp/diskinvalid"),
288       ]
289
290     self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
291
292
293 class TestXenHypervisorUnknownCommand(unittest.TestCase):
294   def test(self):
295     cmd = "#unknown command#"
296     self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
297     hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
298                               _run_cmd_fn=NotImplemented,
299                               _cmd=cmd)
300     self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
301
302
303 class TestXenHypervisorWriteConfigFile(unittest.TestCase):
304   def setUp(self):
305     self.tmpdir = tempfile.mkdtemp()
306
307   def tearDown(self):
308     shutil.rmtree(self.tmpdir)
309
310   def testWriteError(self):
311     cfgdir = utils.PathJoin(self.tmpdir, "foobar")
312
313     hv = hv_xen.XenHypervisor(_cfgdir=cfgdir,
314                               _run_cmd_fn=NotImplemented,
315                               _cmd=NotImplemented)
316
317     self.assertFalse(os.path.exists(cfgdir))
318
319     try:
320       hv._WriteConfigFile("name", "data")
321     except errors.HypervisorError, err:
322       self.assertTrue(str(err).startswith("Cannot write Xen instance"))
323     else:
324       self.fail("Exception was not raised")
325
326
327 if __name__ == "__main__":
328   testutils.GanetiTestProgram()