4 # Copyright (C) 2011, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.hypervisor.hv_lxc"""
24 import string # pylint: disable=W0402
27 from ganeti import constants
28 from ganeti import objects
29 from ganeti import hypervisor
30 from ganeti import utils
31 from ganeti import errors
32 from ganeti import compat
34 from ganeti.hypervisor import hv_xen
39 class TestConsole(unittest.TestCase):
41 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
42 instance = objects.Instance(name="xen.example.com",
43 primary_node="node24828")
44 cons = cls.GetInstanceConsole(instance, {}, {})
45 self.assertTrue(cons.Validate())
46 self.assertEqual(cons.kind, constants.CONS_SSH)
47 self.assertEqual(cons.host, instance.primary_node)
48 self.assertEqual(cons.command[-1], instance.name)
51 class TestCreateConfigCpus(unittest.TestCase):
53 for cpu_mask in [None, ""]:
54 self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
58 self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
62 self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
64 def testMultiple(self):
65 self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
66 ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
67 constants.CPU_PINNING_ALL_XEN))
70 class TestParseXmList(testutils.GanetiTestCase):
72 data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
75 self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
78 result = hv_xen._ParseXmList(data.splitlines(), True)
79 self.assertEqual(len(result), 1)
80 self.assertEqual(len(result[0]), 6)
83 self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
86 self.assertEqual(result[0][1], 0)
89 self.assertEqual(result[0][2], 1023)
92 self.assertEqual(result[0][3], 1)
95 self.assertEqual(result[0][4], "r-----")
98 self.assertAlmostEqual(result[0][5], 121152.6)
100 def testWrongLineFormat(self):
102 ["three fields only"],
103 ["name InvalidID 128 1 r----- 12345"],
108 hv_xen._ParseXmList(["Header would be here"] + lines, False)
109 except errors.HypervisorError, err:
110 self.assertTrue("Can't parse output of xm list" in str(err))
112 self.fail("Exception was not raised")
115 class TestGetXmList(testutils.GanetiTestCase):
117 return utils.RunResult(constants.EXIT_FAILURE, None,
118 "stdout", "stderr", None,
119 NotImplemented, NotImplemented)
121 def testTimeout(self):
122 fn = testutils.CallCounter(self._Fail)
124 hv_xen._GetXmList(fn, False, _timeout=0.1)
125 except errors.HypervisorError, err:
126 self.assertTrue("timeout exceeded" in str(err))
128 self.fail("Exception was not raised")
130 self.assertTrue(fn.Count() < 10,
131 msg="'xm list' was called too many times")
133 def _Success(self, stdout):
134 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
135 NotImplemented, NotImplemented)
137 def testSuccess(self):
138 data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
140 fn = testutils.CallCounter(compat.partial(self._Success, data))
142 result = hv_xen._GetXmList(fn, True, _timeout=0.1)
144 self.assertEqual(len(result), 4)
146 self.assertEqual(map(compat.fst, result), [
148 "server01.example.com",
149 "web3106215069.example.com",
150 "testinstance.example.com",
153 self.assertEqual(fn.Count(), 1)
156 class TestParseNodeInfo(testutils.GanetiTestCase):
158 self.assertEqual(hv_xen._ParseNodeInfo(""), {})
160 def testUnknownInput(self):
163 "something else goes",
166 self.assertEqual(hv_xen._ParseNodeInfo(data), {})
168 def testBasicInfo(self):
169 data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
170 result = hv_xen._ParseNodeInfo(data)
171 self.assertEqual(result, {
175 "hv_version": (4, 0),
177 "memory_total": 16378,
181 class TestMergeInstanceInfo(testutils.GanetiTestCase):
183 self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
185 def _FakeXmList(self, include_node):
186 self.assertTrue(include_node)
188 (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
190 ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
194 def testMissingNodeInfo(self):
195 result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
196 self.assertEqual(result, {
201 def testWithNodeInfo(self):
202 info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
203 result = hv_xen._GetNodeInfo(info, self._FakeXmList)
204 self.assertEqual(result, {
209 "hv_version": (4, 0),
213 "memory_total": 16378,
217 class TestGetConfigFileDiskData(unittest.TestCase):
218 def testLetterCount(self):
219 self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
221 def testNoDisks(self):
222 self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
224 def testManyDisks(self):
225 for offset in [0, 1, 10]:
226 disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
227 for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
230 result = hv_xen._GetConfigFileDiskData(disks, "hd")
231 self.assertEqual(result, [
232 "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
233 for idx in range(len(hv_xen._DISK_LETTERS) + offset)
237 hv_xen._GetConfigFileDiskData(disks, "hd")
238 except errors.HypervisorError, err:
239 self.assertEqual(str(err), "Too many disks")
241 self.fail("Exception was not raised")
243 def testTwoLvDisksWithMode(self):
245 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
247 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
251 result = hv_xen._GetConfigFileDiskData(disks, "hd")
252 self.assertEqual(result, [
253 "'phy:/tmp/diskFirst,hda,w'",
254 "'phy:/tmp/diskLast,hdb,r'",
257 def testFileDisks(self):
259 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
260 physical_id=[constants.FD_LOOP]),
262 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
263 physical_id=[constants.FD_BLKTAP]),
265 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
266 physical_id=[constants.FD_LOOP]),
268 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
269 physical_id=[constants.FD_BLKTAP]),
273 result = hv_xen._GetConfigFileDiskData(disks, "sd")
274 self.assertEqual(result, [
275 "'file:/tmp/diskFirst,sda,w'",
276 "'tap:aio:/tmp/diskTwo,sdb,r'",
277 "'file:/tmp/diskThree,sdc,w'",
278 "'tap:aio:/tmp/diskLast,sdd,w'",
281 def testInvalidFileDisk(self):
283 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
284 physical_id=["#unknown#"]),
288 self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
291 if __name__ == "__main__":
292 testutils.GanetiTestProgram()