hv_xen: Factorize and test disk configuration
[ganeti-local] / test / py / ganeti.hypervisor.hv_xen_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.hypervisor.hv_lxc"""
23
24 import string # pylint: disable=W0402
25 import unittest
26
27 from ganeti import constants
28 from ganeti import objects
29 from ganeti import hypervisor
30 from ganeti import utils
31 from ganeti import errors
32 from ganeti import compat
33
34 from ganeti.hypervisor import hv_xen
35
36 import testutils
37
38
39 class TestConsole(unittest.TestCase):
40   def test(self):
41     for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
42       instance = objects.Instance(name="xen.example.com",
43                                   primary_node="node24828")
44       cons = cls.GetInstanceConsole(instance, {}, {})
45       self.assertTrue(cons.Validate())
46       self.assertEqual(cons.kind, constants.CONS_SSH)
47       self.assertEqual(cons.host, instance.primary_node)
48       self.assertEqual(cons.command[-1], instance.name)
49
50
51 class TestCreateConfigCpus(unittest.TestCase):
52   def testEmpty(self):
53     for cpu_mask in [None, ""]:
54       self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
55                        "cpus = [  ]")
56
57   def testAll(self):
58     self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
59                      None)
60
61   def testOne(self):
62     self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
63
64   def testMultiple(self):
65     self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
66                      ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
67                       constants.CPU_PINNING_ALL_XEN))
68
69
70 class TestParseXmList(testutils.GanetiTestCase):
71   def test(self):
72     data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
73
74     # Exclude node
75     self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
76
77     # Include node
78     result = hv_xen._ParseXmList(data.splitlines(), True)
79     self.assertEqual(len(result), 1)
80     self.assertEqual(len(result[0]), 6)
81
82     # Name
83     self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
84
85     # ID
86     self.assertEqual(result[0][1], 0)
87
88     # Memory
89     self.assertEqual(result[0][2], 1023)
90
91     # VCPUs
92     self.assertEqual(result[0][3], 1)
93
94     # State
95     self.assertEqual(result[0][4], "r-----")
96
97     # Time
98     self.assertAlmostEqual(result[0][5], 121152.6)
99
100   def testWrongLineFormat(self):
101     tests = [
102       ["three fields only"],
103       ["name InvalidID 128 1 r----- 12345"],
104       ]
105
106     for lines in tests:
107       try:
108         hv_xen._ParseXmList(["Header would be here"] + lines, False)
109       except errors.HypervisorError, err:
110         self.assertTrue("Can't parse output of xm list" in str(err))
111       else:
112         self.fail("Exception was not raised")
113
114
115 class TestGetXmList(testutils.GanetiTestCase):
116   def _Fail(self):
117     return utils.RunResult(constants.EXIT_FAILURE, None,
118                            "stdout", "stderr", None,
119                            NotImplemented, NotImplemented)
120
121   def testTimeout(self):
122     fn = testutils.CallCounter(self._Fail)
123     try:
124       hv_xen._GetXmList(fn, False, _timeout=0.1)
125     except errors.HypervisorError, err:
126       self.assertTrue("timeout exceeded" in str(err))
127     else:
128       self.fail("Exception was not raised")
129
130     self.assertTrue(fn.Count() < 10,
131                     msg="'xm list' was called too many times")
132
133   def _Success(self, stdout):
134     return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
135                            NotImplemented, NotImplemented)
136
137   def testSuccess(self):
138     data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
139
140     fn = testutils.CallCounter(compat.partial(self._Success, data))
141
142     result = hv_xen._GetXmList(fn, True, _timeout=0.1)
143
144     self.assertEqual(len(result), 4)
145
146     self.assertEqual(map(compat.fst, result), [
147       "Domain-0",
148       "server01.example.com",
149       "web3106215069.example.com",
150       "testinstance.example.com",
151       ])
152
153     self.assertEqual(fn.Count(), 1)
154
155
156 class TestParseNodeInfo(testutils.GanetiTestCase):
157   def testEmpty(self):
158     self.assertEqual(hv_xen._ParseNodeInfo(""), {})
159
160   def testUnknownInput(self):
161     data = "\n".join([
162       "foo bar",
163       "something else goes",
164       "here",
165       ])
166     self.assertEqual(hv_xen._ParseNodeInfo(data), {})
167
168   def testBasicInfo(self):
169     data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
170     result = hv_xen._ParseNodeInfo(data)
171     self.assertEqual(result, {
172       "cpu_nodes": 1,
173       "cpu_sockets": 2,
174       "cpu_total": 4,
175       "hv_version": (4, 0),
176       "memory_free": 8004,
177       "memory_total": 16378,
178       })
179
180
181 class TestMergeInstanceInfo(testutils.GanetiTestCase):
182   def testEmpty(self):
183     self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
184
185   def _FakeXmList(self, include_node):
186     self.assertTrue(include_node)
187     return [
188       (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
189        NotImplemented),
190       ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
191        NotImplemented),
192       ]
193
194   def testMissingNodeInfo(self):
195     result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
196     self.assertEqual(result, {
197       "memory_dom0": 4096,
198       "dom0_cpus": 7,
199       })
200
201   def testWithNodeInfo(self):
202     info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
203     result = hv_xen._GetNodeInfo(info, self._FakeXmList)
204     self.assertEqual(result, {
205       "cpu_nodes": 1,
206       "cpu_sockets": 2,
207       "cpu_total": 4,
208       "dom0_cpus": 7,
209       "hv_version": (4, 0),
210       "memory_dom0": 4096,
211       "memory_free": 8004,
212       "memory_hv": 2230,
213       "memory_total": 16378,
214       })
215
216
217 class TestGetConfigFileDiskData(unittest.TestCase):
218   def testLetterCount(self):
219     self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
220
221   def testNoDisks(self):
222     self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
223
224   def testManyDisks(self):
225     for offset in [0, 1, 10]:
226       disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
227                for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
228
229       if offset == 0:
230         result = hv_xen._GetConfigFileDiskData(disks, "hd")
231         self.assertEqual(result, [
232           "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
233           for idx in range(len(hv_xen._DISK_LETTERS) + offset)
234           ])
235       else:
236         try:
237           hv_xen._GetConfigFileDiskData(disks, "hd")
238         except errors.HypervisorError, err:
239           self.assertEqual(str(err), "Too many disks")
240         else:
241           self.fail("Exception was not raised")
242
243   def testTwoLvDisksWithMode(self):
244     disks = [
245       (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
246        "/tmp/diskFirst"),
247       (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
248        "/tmp/diskLast"),
249       ]
250
251     result = hv_xen._GetConfigFileDiskData(disks, "hd")
252     self.assertEqual(result, [
253       "'phy:/tmp/diskFirst,hda,w'",
254       "'phy:/tmp/diskLast,hdb,r'",
255       ])
256
257   def testFileDisks(self):
258     disks = [
259       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
260                     physical_id=[constants.FD_LOOP]),
261        "/tmp/diskFirst"),
262       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
263                     physical_id=[constants.FD_BLKTAP]),
264        "/tmp/diskTwo"),
265       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
266                     physical_id=[constants.FD_LOOP]),
267        "/tmp/diskThree"),
268       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
269                     physical_id=[constants.FD_BLKTAP]),
270        "/tmp/diskLast"),
271       ]
272
273     result = hv_xen._GetConfigFileDiskData(disks, "sd")
274     self.assertEqual(result, [
275       "'file:/tmp/diskFirst,sda,w'",
276       "'tap:aio:/tmp/diskTwo,sdb,r'",
277       "'file:/tmp/diskThree,sdc,w'",
278       "'tap:aio:/tmp/diskLast,sdd,w'",
279       ])
280
281   def testInvalidFileDisk(self):
282     disks = [
283       (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
284                     physical_id=["#unknown#"]),
285        "/tmp/diskinvalid"),
286       ]
287
288     self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
289
290
291 if __name__ == "__main__":
292   testutils.GanetiTestProgram()