4 # Copyright (C) 2011, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.hypervisor.hv_lxc"""
24 import string # pylint: disable=W0402
29 from ganeti import constants
30 from ganeti import objects
31 from ganeti import hypervisor
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import compat
36 from ganeti.hypervisor import hv_xen
41 class TestConsole(unittest.TestCase):
43 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
44 instance = objects.Instance(name="xen.example.com",
45 primary_node="node24828")
46 cons = cls.GetInstanceConsole(instance, {}, {})
47 self.assertTrue(cons.Validate())
48 self.assertEqual(cons.kind, constants.CONS_SSH)
49 self.assertEqual(cons.host, instance.primary_node)
50 self.assertEqual(cons.command[-1], instance.name)
53 class TestCreateConfigCpus(unittest.TestCase):
55 for cpu_mask in [None, ""]:
56 self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
60 self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
64 self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
66 def testMultiple(self):
67 self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
68 ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
69 constants.CPU_PINNING_ALL_XEN))
72 class TestParseXmList(testutils.GanetiTestCase):
74 data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
77 self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
80 result = hv_xen._ParseXmList(data.splitlines(), True)
81 self.assertEqual(len(result), 1)
82 self.assertEqual(len(result[0]), 6)
85 self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
88 self.assertEqual(result[0][1], 0)
91 self.assertEqual(result[0][2], 1023)
94 self.assertEqual(result[0][3], 1)
97 self.assertEqual(result[0][4], "r-----")
100 self.assertAlmostEqual(result[0][5], 121152.6)
102 def testWrongLineFormat(self):
104 ["three fields only"],
105 ["name InvalidID 128 1 r----- 12345"],
110 hv_xen._ParseXmList(["Header would be here"] + lines, False)
111 except errors.HypervisorError, err:
112 self.assertTrue("Can't parse output of xm list" in str(err))
114 self.fail("Exception was not raised")
117 class TestGetXmList(testutils.GanetiTestCase):
119 return utils.RunResult(constants.EXIT_FAILURE, None,
120 "stdout", "stderr", None,
121 NotImplemented, NotImplemented)
123 def testTimeout(self):
124 fn = testutils.CallCounter(self._Fail)
126 hv_xen._GetXmList(fn, False, _timeout=0.1)
127 except errors.HypervisorError, err:
128 self.assertTrue("timeout exceeded" in str(err))
130 self.fail("Exception was not raised")
132 self.assertTrue(fn.Count() < 10,
133 msg="'xm list' was called too many times")
135 def _Success(self, stdout):
136 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
137 NotImplemented, NotImplemented)
139 def testSuccess(self):
140 data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
142 fn = testutils.CallCounter(compat.partial(self._Success, data))
144 result = hv_xen._GetXmList(fn, True, _timeout=0.1)
146 self.assertEqual(len(result), 4)
148 self.assertEqual(map(compat.fst, result), [
150 "server01.example.com",
151 "web3106215069.example.com",
152 "testinstance.example.com",
155 self.assertEqual(fn.Count(), 1)
158 class TestParseNodeInfo(testutils.GanetiTestCase):
160 self.assertEqual(hv_xen._ParseNodeInfo(""), {})
162 def testUnknownInput(self):
165 "something else goes",
168 self.assertEqual(hv_xen._ParseNodeInfo(data), {})
170 def testBasicInfo(self):
171 data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
172 result = hv_xen._ParseNodeInfo(data)
173 self.assertEqual(result, {
177 "hv_version": (4, 0),
179 "memory_total": 16378,
183 class TestMergeInstanceInfo(testutils.GanetiTestCase):
185 self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
187 def _FakeXmList(self, include_node):
188 self.assertTrue(include_node)
190 (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
192 ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
196 def testMissingNodeInfo(self):
197 result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
198 self.assertEqual(result, {
203 def testWithNodeInfo(self):
204 info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
205 result = hv_xen._GetNodeInfo(info, self._FakeXmList)
206 self.assertEqual(result, {
211 "hv_version": (4, 0),
215 "memory_total": 16378,
219 class TestGetConfigFileDiskData(unittest.TestCase):
220 def testLetterCount(self):
221 self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
223 def testNoDisks(self):
224 self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
226 def testManyDisks(self):
227 for offset in [0, 1, 10]:
228 disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
229 for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
232 result = hv_xen._GetConfigFileDiskData(disks, "hd")
233 self.assertEqual(result, [
234 "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
235 for idx in range(len(hv_xen._DISK_LETTERS) + offset)
239 hv_xen._GetConfigFileDiskData(disks, "hd")
240 except errors.HypervisorError, err:
241 self.assertEqual(str(err), "Too many disks")
243 self.fail("Exception was not raised")
245 def testTwoLvDisksWithMode(self):
247 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
249 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
253 result = hv_xen._GetConfigFileDiskData(disks, "hd")
254 self.assertEqual(result, [
255 "'phy:/tmp/diskFirst,hda,w'",
256 "'phy:/tmp/diskLast,hdb,r'",
259 def testFileDisks(self):
261 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
262 physical_id=[constants.FD_LOOP]),
264 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
265 physical_id=[constants.FD_BLKTAP]),
267 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
268 physical_id=[constants.FD_LOOP]),
270 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
271 physical_id=[constants.FD_BLKTAP]),
275 result = hv_xen._GetConfigFileDiskData(disks, "sd")
276 self.assertEqual(result, [
277 "'file:/tmp/diskFirst,sda,w'",
278 "'tap:aio:/tmp/diskTwo,sdb,r'",
279 "'file:/tmp/diskThree,sdc,w'",
280 "'tap:aio:/tmp/diskLast,sdd,w'",
283 def testInvalidFileDisk(self):
285 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
286 physical_id=["#unknown#"]),
290 self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
293 class TestXenHypervisorUnknownCommand(unittest.TestCase):
295 cmd = "#unknown command#"
296 self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
297 hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
298 _run_cmd_fn=NotImplemented,
300 self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
303 class TestXenHypervisorWriteConfigFile(unittest.TestCase):
305 self.tmpdir = tempfile.mkdtemp()
308 shutil.rmtree(self.tmpdir)
310 def testWriteError(self):
311 cfgdir = utils.PathJoin(self.tmpdir, "foobar")
313 hv = hv_xen.XenHypervisor(_cfgdir=cfgdir,
314 _run_cmd_fn=NotImplemented,
317 self.assertFalse(os.path.exists(cfgdir))
320 hv._WriteConfigFile("name", "data")
321 except errors.HypervisorError, err:
322 self.assertTrue(str(err).startswith("Cannot write Xen instance"))
324 self.fail("Exception was not raised")
327 if __name__ == "__main__":
328 testutils.GanetiTestProgram()