4 # Copyright (C) 2006, 2007, 2008, 2010, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the objects module"""
28 from ganeti import constants
29 from ganeti import objects
30 from ganeti import errors
35 class SimpleObject(objects.ConfigObject):
36 __slots__ = ["a", "b"]
39 class TestDictState(unittest.TestCase):
40 """Simple dict tansformation tests"""
42 def testSimpleObjectToDict(self):
43 o1 = SimpleObject(a="1")
44 self.assertEquals(o1.ToDict(), {"a": "1"})
45 self.assertEquals(o1.__getstate__(), {"a": "1"})
46 self.assertEquals(o1.__getstate__(), o1.ToDict())
49 self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
50 o2 = SimpleObject.FromDict(o1.ToDict())
51 self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
54 class TestClusterObject(unittest.TestCase):
55 """Tests done on a L{objects.Cluster}"""
73 constants.HT_XEN_PVM: {
74 "root_path": "/dev/sda5",
82 constants.ND_OOB_PROGRAM: "/bin/cluster-oob",
83 constants.ND_SPINDLE_COUNT: 1,
84 constants.ND_EXCLUSIVE_STORAGE: False,
87 self.fake_cl = objects.Cluster(hvparams=hvparams, os_hvp=os_hvp,
89 self.fake_cl.UpgradeConfig()
91 def testGetHVDefaults(self):
93 self.failUnlessEqual(cl.GetHVDefaults(constants.HT_FAKE),
94 cl.hvparams[constants.HT_FAKE])
95 self.failUnlessEqual(cl.GetHVDefaults(None), {})
96 defaults = cl.GetHVDefaults(constants.HT_XEN_PVM,
97 os_name="lenny-image")
98 for param, value in cl.os_hvp["lenny-image"][constants.HT_XEN_PVM].items():
99 self.assertEqual(value, defaults[param])
101 def testFillHvFullMerge(self):
106 fake_dict = constants.HVC_DEFAULTS[constants.HT_FAKE].copy()
114 fake_inst = objects.Instance(name="foobar",
116 hypervisor=constants.HT_FAKE,
117 hvparams=inst_hvparams)
118 self.assertEqual(fake_dict, self.fake_cl.FillHV(fake_inst))
120 def testFillHvGlobalParams(self):
121 fake_inst = objects.Instance(name="foobar",
123 hypervisor=constants.HT_FAKE,
125 self.assertEqual(self.fake_cl.hvparams[constants.HT_FAKE],
126 self.fake_cl.FillHV(fake_inst))
128 def testFillHvInstParams(self):
132 fake_inst = objects.Instance(name="foobar",
134 hypervisor=constants.HT_XEN_PVM,
135 hvparams=inst_hvparams)
136 filled_conf = self.fake_cl.FillHV(fake_inst)
137 for param, value in constants.HVC_DEFAULTS[constants.HT_XEN_PVM].items():
140 self.assertEqual(value, filled_conf[param])
142 def testFillHvDefaultParams(self):
143 fake_inst = objects.Instance(name="foobar",
145 hypervisor=constants.HT_XEN_PVM,
147 self.assertEqual(constants.HVC_DEFAULTS[constants.HT_XEN_PVM],
148 self.fake_cl.FillHV(fake_inst))
150 def testFillHvPartialParams(self):
152 fake_inst = objects.Instance(name="foobar",
154 hypervisor=constants.HT_XEN_PVM,
156 filled_conf = self.fake_cl.FillHV(fake_inst)
157 for param, value in self.fake_cl.os_hvp[os][constants.HT_XEN_PVM].items():
158 self.assertEqual(value, filled_conf[param])
160 def testFillNdParamsCluster(self):
161 fake_node = objects.Node(name="test",
164 fake_group = objects.NodeGroup(name="testgroup",
166 self.assertEqual(self.fake_cl.ndparams,
167 self.fake_cl.FillND(fake_node, fake_group))
169 def testFillNdParamsNodeGroup(self):
170 fake_node = objects.Node(name="test",
174 constants.ND_OOB_PROGRAM: "/bin/group-oob",
175 constants.ND_SPINDLE_COUNT: 10,
176 constants.ND_EXCLUSIVE_STORAGE: True,
177 constants.ND_OVS: True,
178 constants.ND_OVS_LINK: "eth2",
179 constants.ND_OVS_NAME: "openvswitch",
181 fake_group = objects.NodeGroup(name="testgroup",
182 ndparams=group_ndparams)
183 self.assertEqual(group_ndparams,
184 self.fake_cl.FillND(fake_node, fake_group))
186 def testFillNdParamsNode(self):
188 constants.ND_OOB_PROGRAM: "/bin/node-oob",
189 constants.ND_SPINDLE_COUNT: 2,
190 constants.ND_EXCLUSIVE_STORAGE: True,
191 constants.ND_OVS: True,
192 constants.ND_OVS_LINK: "eth2",
193 constants.ND_OVS_NAME: "openvswitch",
195 fake_node = objects.Node(name="test",
196 ndparams=node_ndparams,
198 fake_group = objects.NodeGroup(name="testgroup",
200 self.assertEqual(node_ndparams,
201 self.fake_cl.FillND(fake_node, fake_group))
203 def testFillNdParamsAll(self):
205 constants.ND_OOB_PROGRAM: "/bin/node-oob",
206 constants.ND_SPINDLE_COUNT: 5,
207 constants.ND_EXCLUSIVE_STORAGE: True,
208 constants.ND_OVS: True,
209 constants.ND_OVS_LINK: "eth2",
210 constants.ND_OVS_NAME: "openvswitch",
212 fake_node = objects.Node(name="test",
213 ndparams=node_ndparams,
216 constants.ND_OOB_PROGRAM: "/bin/group-oob",
217 constants.ND_SPINDLE_COUNT: 4,
219 fake_group = objects.NodeGroup(name="testgroup",
220 ndparams=group_ndparams)
221 self.assertEqual(node_ndparams,
222 self.fake_cl.FillND(fake_node, fake_group))
224 def testPrimaryHypervisor(self):
225 assert self.fake_cl.enabled_hypervisors is None
226 self.fake_cl.enabled_hypervisors = [constants.HT_XEN_HVM]
227 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_HVM)
229 self.fake_cl.enabled_hypervisors = [constants.HT_XEN_PVM, constants.HT_KVM]
230 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_PVM)
232 self.fake_cl.enabled_hypervisors = sorted(constants.HYPER_TYPES)
233 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_CHROOT)
235 def testUpgradeConfig(self):
236 # FIXME: This test is incomplete
237 cluster = objects.Cluster()
238 cluster.UpgradeConfig()
239 cluster = objects.Cluster(ipolicy={"unknown_key": None})
240 self.assertRaises(errors.ConfigurationError, cluster.UpgradeConfig)
242 def testUpgradeEnabledDiskTemplates(self):
243 cfg = objects.ConfigData()
244 cfg.cluster = objects.Cluster()
245 cfg.cluster.volume_group_name = "myvg"
246 instance1 = objects.Instance()
247 instance1.disk_template = constants.DT_DISKLESS
248 instance2 = objects.Instance()
249 instance2.disk_template = constants.DT_RBD
250 cfg.instances = { "myinstance1": instance1, "myinstance2": instance2 }
251 nodegroup = objects.NodeGroup()
252 nodegroup.ipolicy = {}
253 nodegroup.ipolicy[constants.IPOLICY_DTS] = [instance1.disk_template, \
255 cfg.cluster.ipolicy = {}
256 cfg.cluster.ipolicy[constants.IPOLICY_DTS] = \
257 [constants.DT_EXT, constants.DT_DISKLESS]
258 cfg.nodegroups = { "mynodegroup": nodegroup }
259 cfg._UpgradeEnabledDiskTemplates()
260 expected_disk_templates = [constants.DT_DRBD8,
262 instance1.disk_template,
263 instance2.disk_template]
264 self.assertEqual(set(expected_disk_templates),
265 set(cfg.cluster.enabled_disk_templates))
266 self.assertEqual(set([instance1.disk_template]),
267 set(cfg.cluster.ipolicy[constants.IPOLICY_DTS]))
270 class TestClusterObjectTcpUdpPortPool(unittest.TestCase):
271 def testNewCluster(self):
272 self.assertTrue(objects.Cluster().tcpudp_port_pool is None)
274 def testSerializingEmpty(self):
275 self.assertEqual(objects.Cluster().ToDict(), {
276 "tcpudp_port_pool": [],
279 def testSerializing(self):
280 cluster = objects.Cluster.FromDict({})
281 self.assertEqual(cluster.tcpudp_port_pool, set())
283 cluster.tcpudp_port_pool.add(3546)
284 cluster.tcpudp_port_pool.add(62511)
286 data = cluster.ToDict()
287 self.assertEqual(data.keys(), ["tcpudp_port_pool"])
288 self.assertEqual(sorted(data["tcpudp_port_pool"]), sorted([3546, 62511]))
290 def testDeserializingEmpty(self):
291 cluster = objects.Cluster.FromDict({})
292 self.assertEqual(cluster.tcpudp_port_pool, set())
294 def testDeserialize(self):
295 cluster = objects.Cluster.FromDict({
296 "tcpudp_port_pool": [26214, 10039, 267],
298 self.assertEqual(cluster.tcpudp_port_pool, set([26214, 10039, 267]))
301 class TestOS(unittest.TestCase):
304 "debootstrap+default",
305 "debootstrap++default",
308 def testSplitNameVariant(self):
309 for name in self.ALL_DATA:
310 self.assertEqual(len(objects.OS.SplitNameVariant(name)), 2)
312 def testVariant(self):
313 self.assertEqual(objects.OS.GetVariant("debootstrap"), "")
314 self.assertEqual(objects.OS.GetVariant("debootstrap+default"), "default")
317 class TestInstance(unittest.TestCase):
318 def _GenericCheck(self, inst):
319 for i in [inst.all_nodes, inst.secondary_nodes]:
320 self.assertTrue(isinstance(inst.all_nodes, (list, tuple)),
321 msg="Data type doesn't guarantee order")
323 self.assertTrue(inst.primary_node not in inst.secondary_nodes)
324 self.assertEqual(inst.all_nodes[0], inst.primary_node,
325 msg="Primary node not first node in list")
327 def testNodesNoDisks(self):
328 inst = objects.Instance(name="fakeinst.example.com",
329 primary_node="pnode.example.com",
333 self._GenericCheck(inst)
334 self.assertEqual(len(inst.secondary_nodes), 0)
335 self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
336 self.assertEqual(inst.MapLVsByNode(), {
337 inst.primary_node: [],
340 def testNodesPlainDisks(self):
341 inst = objects.Instance(name="fakeinstplain.example.com",
342 primary_node="node3.example.com",
344 objects.Disk(dev_type=constants.DT_PLAIN, size=128,
345 logical_id=("myxenvg", "disk25494")),
346 objects.Disk(dev_type=constants.DT_PLAIN, size=512,
347 logical_id=("myxenvg", "disk29071")),
350 self._GenericCheck(inst)
351 self.assertEqual(len(inst.secondary_nodes), 0)
352 self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
353 self.assertEqual(inst.MapLVsByNode(), {
354 inst.primary_node: ["myxenvg/disk25494", "myxenvg/disk29071"],
357 def testNodesDrbdDisks(self):
358 inst = objects.Instance(name="fakeinstdrbd.example.com",
359 primary_node="node10.example.com",
361 objects.Disk(dev_type=constants.DT_DRBD8, size=786432,
362 logical_id=("node10.example.com", "node15.example.com",
363 12300, 0, 0, "secret"),
365 objects.Disk(dev_type=constants.DT_PLAIN, size=786432,
366 logical_id=("myxenvg", "disk0")),
367 objects.Disk(dev_type=constants.DT_PLAIN, size=128,
368 logical_id=("myxenvg", "meta0"))
373 self._GenericCheck(inst)
374 self.assertEqual(set(inst.secondary_nodes), set(["node15.example.com"]))
375 self.assertEqual(set(inst.all_nodes),
376 set([inst.primary_node, "node15.example.com"]))
377 self.assertEqual(inst.MapLVsByNode(), {
378 inst.primary_node: ["myxenvg/disk0", "myxenvg/meta0"],
379 "node15.example.com": ["myxenvg/disk0", "myxenvg/meta0"],
382 self.assertEqual(inst.FindDisk(0), inst.disks[0])
383 self.assertRaises(errors.OpPrereqError, inst.FindDisk, "hello")
384 self.assertRaises(errors.OpPrereqError, inst.FindDisk, 100)
385 self.assertRaises(errors.OpPrereqError, inst.FindDisk, 1)
388 class TestNode(unittest.TestCase):
390 self.assertEqual(objects.Node().ToDict(), {})
391 self.assertTrue(isinstance(objects.Node.FromDict({}), objects.Node))
393 def testHvState(self):
394 node = objects.Node(name="node18157.example.com", hv_state={
395 constants.HT_XEN_HVM: objects.NodeHvState(cpu_total=64),
396 constants.HT_KVM: objects.NodeHvState(cpu_node=1),
399 node2 = objects.Node.FromDict(node.ToDict())
401 # Make sure nothing can reference it anymore
404 self.assertEqual(node2.name, "node18157.example.com")
405 self.assertEqual(frozenset(node2.hv_state), frozenset([
406 constants.HT_XEN_HVM,
409 self.assertEqual(node2.hv_state[constants.HT_KVM].cpu_node, 1)
410 self.assertEqual(node2.hv_state[constants.HT_XEN_HVM].cpu_total, 64)
412 def testDiskState(self):
413 node = objects.Node(name="node32087.example.com", disk_state={
414 constants.DT_PLAIN: {
415 "lv32352": objects.NodeDiskState(total=128),
416 "lv2082": objects.NodeDiskState(total=512),
420 node2 = objects.Node.FromDict(node.ToDict())
422 # Make sure nothing can reference it anymore
425 self.assertEqual(node2.name, "node32087.example.com")
426 self.assertEqual(frozenset(node2.disk_state), frozenset([
429 self.assertEqual(frozenset(node2.disk_state[constants.DT_PLAIN]),
430 frozenset(["lv32352", "lv2082"]))
431 self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv2082"].total, 512)
432 self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv32352"].total, 128)
434 def testFilterEsNdp(self):
435 node1 = objects.Node(name="node11673.example.com", ndparams={
436 constants.ND_EXCLUSIVE_STORAGE: True,
438 node2 = objects.Node(name="node11674.example.com", ndparams={
439 constants.ND_SPINDLE_COUNT: 3,
440 constants.ND_EXCLUSIVE_STORAGE: False,
442 self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
443 node1.UpgradeConfig()
444 self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
445 self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
446 self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
447 node2.UpgradeConfig()
448 self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
449 self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
452 class TestInstancePolicy(unittest.TestCase):
454 # Policies are big, and we want to see the difference in case of an error
457 def _AssertIPolicyIsFull(self, policy):
458 self.assertEqual(frozenset(policy.keys()), constants.IPOLICY_ALL_KEYS)
459 self.assertTrue(len(policy[constants.ISPECS_MINMAX]) > 0)
460 for minmax in policy[constants.ISPECS_MINMAX]:
461 self.assertEqual(frozenset(minmax.keys()), constants.ISPECS_MINMAX_KEYS)
462 for key in constants.ISPECS_MINMAX_KEYS:
463 self.assertEqual(frozenset(minmax[key].keys()),
464 constants.ISPECS_PARAMETERS)
465 self.assertEqual(frozenset(policy[constants.ISPECS_STD].keys()),
466 constants.ISPECS_PARAMETERS)
468 def testDefaultIPolicy(self):
469 objects.InstancePolicy.CheckParameterSyntax(constants.IPOLICY_DEFAULTS,
471 self._AssertIPolicyIsFull(constants.IPOLICY_DEFAULTS)
473 def _AssertPolicyIsBad(self, ipolicy, do_check_std=None):
474 if do_check_std is None:
475 check_std_vals = [False, True]
477 check_std_vals = [do_check_std]
478 for check_std in check_std_vals:
479 self.assertRaises(errors.ConfigurationError,
480 objects.InstancePolicy.CheckISpecSyntax,
483 def testCheckISpecSyntax(self):
484 default_stdspec = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
485 incomplete_ipolicies = [
487 constants.ISPECS_MINMAX: [],
488 constants.ISPECS_STD: default_stdspec,
491 constants.ISPECS_MINMAX: [{}],
492 constants.ISPECS_STD: default_stdspec,
495 constants.ISPECS_MINMAX: [{
496 constants.ISPECS_MIN: NotImplemented,
498 constants.ISPECS_STD: default_stdspec,
501 constants.ISPECS_MINMAX: [{
502 constants.ISPECS_MAX: NotImplemented,
504 constants.ISPECS_STD: default_stdspec,
507 constants.ISPECS_MINMAX: [{
508 constants.ISPECS_MIN: NotImplemented,
509 constants.ISPECS_MAX: NotImplemented,
513 for ipol in incomplete_ipolicies:
514 self.assertRaises(errors.ConfigurationError,
515 objects.InstancePolicy.CheckISpecSyntax,
517 oldminmax = ipol[constants.ISPECS_MINMAX]
519 # Prepending valid specs shouldn't change the error
520 ipol[constants.ISPECS_MINMAX] = ([constants.ISPECS_MINMAX_DEFAULTS] +
522 self.assertRaises(errors.ConfigurationError,
523 objects.InstancePolicy.CheckISpecSyntax,
527 constants.ISPECS_MINMAX: [
529 constants.ISPECS_MIN: {
530 constants.ISPEC_MEM_SIZE: 64,
531 constants.ISPEC_CPU_COUNT: 1,
532 constants.ISPEC_DISK_COUNT: 2,
533 constants.ISPEC_DISK_SIZE: 64,
534 constants.ISPEC_NIC_COUNT: 1,
535 constants.ISPEC_SPINDLE_USE: 1,
537 constants.ISPECS_MAX: {
538 constants.ISPEC_MEM_SIZE: 16384,
539 constants.ISPEC_CPU_COUNT: 5,
540 constants.ISPEC_DISK_COUNT: 12,
541 constants.ISPEC_DISK_SIZE: 1024,
542 constants.ISPEC_NIC_COUNT: 9,
543 constants.ISPEC_SPINDLE_USE: 18,
547 constants.ISPECS_MIN: {
548 constants.ISPEC_MEM_SIZE: 32768,
549 constants.ISPEC_CPU_COUNT: 8,
550 constants.ISPEC_DISK_COUNT: 1,
551 constants.ISPEC_DISK_SIZE: 1024,
552 constants.ISPEC_NIC_COUNT: 1,
553 constants.ISPEC_SPINDLE_USE: 1,
555 constants.ISPECS_MAX: {
556 constants.ISPEC_MEM_SIZE: 65536,
557 constants.ISPEC_CPU_COUNT: 10,
558 constants.ISPEC_DISK_COUNT: 5,
559 constants.ISPEC_DISK_SIZE: 1024 * 1024,
560 constants.ISPEC_NIC_COUNT: 3,
561 constants.ISPEC_SPINDLE_USE: 12,
566 good_ipolicy[constants.ISPECS_STD] = copy.deepcopy(
567 good_ipolicy[constants.ISPECS_MINMAX][0][constants.ISPECS_MAX])
568 # Check that it's really good before making it bad
569 objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
571 bad_ipolicy = copy.deepcopy(good_ipolicy)
572 for minmax in bad_ipolicy[constants.ISPECS_MINMAX]:
573 for (key, spec) in minmax.items():
577 self._AssertPolicyIsBad(bad_ipolicy)
578 if key == constants.ISPECS_MIN:
579 spec[param] = minmax[constants.ISPECS_MAX][param] + 1
580 self._AssertPolicyIsBad(bad_ipolicy)
582 assert bad_ipolicy == good_ipolicy
584 stdspec = bad_ipolicy[constants.ISPECS_STD]
585 for param in stdspec:
586 oldv = stdspec[param]
588 self._AssertPolicyIsBad(bad_ipolicy, True)
589 # Note that std spec is the same as a max spec
590 stdspec[param] = oldv + 1
591 self._AssertPolicyIsBad(bad_ipolicy, True)
592 stdspec[param] = oldv
593 assert bad_ipolicy == good_ipolicy
595 for minmax in good_ipolicy[constants.ISPECS_MINMAX]:
596 for spec in minmax.values():
597 good_ipolicy[constants.ISPECS_STD] = spec
598 objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
600 def testCheckISpecParamSyntax(self):
602 for check_std in [True, False]:
604 good_values = [(11, 11), (11, 40), (0, 0)]
605 for (mn, mx) in good_values:
606 minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
607 minmax[constants.ISPECS_MIN][par] = mn
608 minmax[constants.ISPECS_MAX][par] = mx
609 objects.InstancePolicy._CheckISpecParamSyntax(minmax, {}, par,
611 minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
612 minmax[constants.ISPECS_MIN][par] = 11
613 minmax[constants.ISPECS_MAX][par] = 5
614 self.assertRaises(errors.ConfigurationError,
615 objects.InstancePolicy._CheckISpecParamSyntax,
616 minmax, {}, par, check_std)
623 for (mn, st, mx) in good_values:
625 constants.ISPECS_MIN: {par: mn},
626 constants.ISPECS_MAX: {par: mx},
629 objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, par, True)
638 for (mn, st, mx, excp) in bad_values:
640 constants.ISPECS_MIN: {par: mn},
641 constants.ISPECS_MAX: {par: mx},
645 self.assertRaises(errors.ConfigurationError,
646 objects.InstancePolicy._CheckISpecParamSyntax,
647 minmax, stdspec, par, True)
649 ret = objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec,
651 self.assertFalse(ret)
653 def testCheckDiskTemplates(self):
654 invalid = "this_is_not_a_good_template"
655 for dt in constants.DISK_TEMPLATES:
656 objects.InstancePolicy.CheckDiskTemplates([dt])
657 objects.InstancePolicy.CheckDiskTemplates(list(constants.DISK_TEMPLATES))
660 [constants.DT_DRBD8, invalid],
661 list(constants.DISK_TEMPLATES) + [invalid],
665 for dtl in bad_examples:
666 self.assertRaises(errors.ConfigurationError,
667 objects.InstancePolicy.CheckDiskTemplates,
670 def testCheckParameterSyntax(self):
671 invalid = "this_key_shouldnt_be_here"
672 for check_std in [True, False]:
673 objects.InstancePolicy.CheckParameterSyntax({}, check_std)
674 policy = {invalid: None}
675 self.assertRaises(errors.ConfigurationError,
676 objects.InstancePolicy.CheckParameterSyntax,
678 for par in constants.IPOLICY_PARAMETERS:
679 for val in ("blah", None, {}, [42]):
681 self.assertRaises(errors.ConfigurationError,
682 objects.InstancePolicy.CheckParameterSyntax,
685 def testFillIPolicyEmpty(self):
686 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, {})
687 objects.InstancePolicy.CheckParameterSyntax(policy, True)
688 self.assertEqual(policy, constants.IPOLICY_DEFAULTS)
690 def _AssertISpecsMerged(self, default_spec, diff_spec, merged_spec):
691 for (param, value) in merged_spec.items():
692 if param in diff_spec:
693 self.assertEqual(value, diff_spec[param])
695 self.assertEqual(value, default_spec[param])
697 def _AssertIPolicyMerged(self, default_pol, diff_pol, merged_pol):
698 for (key, value) in merged_pol.items():
700 if key == constants.ISPECS_STD:
701 self._AssertISpecsMerged(default_pol[key], diff_pol[key], value)
703 self.assertEqual(value, diff_pol[key])
705 self.assertEqual(value, default_pol[key])
707 def testFillIPolicy(self):
709 {constants.IPOLICY_VCPU_RATIO: 3.14},
710 {constants.IPOLICY_SPINDLE_RATIO: 2.72},
711 {constants.IPOLICY_DTS: [constants.DT_FILE]},
712 {constants.ISPECS_STD: {constants.ISPEC_DISK_COUNT: 3}},
713 {constants.ISPECS_MINMAX: [constants.ISPECS_MINMAX_DEFAULTS,
714 constants.ISPECS_MINMAX_DEFAULTS]}
716 for diff_pol in partial_policies:
717 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
718 objects.InstancePolicy.CheckParameterSyntax(policy, True)
719 self._AssertIPolicyIsFull(policy)
720 self._AssertIPolicyMerged(constants.IPOLICY_DEFAULTS, diff_pol, policy)
722 def testFillIPolicyKeepsUnknown(self):
723 INVALID_KEY = "invalid_ipolicy_key"
727 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
728 self.assertTrue(INVALID_KEY in policy)
731 class TestDisk(unittest.TestCase):
732 def addChild(self, disk):
733 """Adds a child of the same device type as the parent."""
735 child = objects.Disk()
736 child.dev_type = disk.dev_type
737 disk.children.append(child)
739 def testUpgradeConfigDevTypeLegacy(self):
740 for old, new in [("drbd8", constants.DT_DRBD8),
741 ("lvm", constants.DT_PLAIN)]:
742 disk = objects.Disk()
746 self.assertEqual(new, disk.dev_type)
747 self.assertEqual(new, disk.children[0].dev_type)
749 def testUpgradeConfigDevTypeLegacyUnchanged(self):
750 dev_types = [constants.DT_FILE, constants.DT_SHARED_FILE,
751 constants.DT_BLOCK, constants.DT_EXT,
753 for dev_type in dev_types:
754 disk = objects.Disk()
755 disk.dev_type = dev_type
758 self.assertEqual(dev_type, disk.dev_type)
759 self.assertEqual(dev_type, disk.children[0].dev_type)
762 if __name__ == "__main__":
763 testutils.GanetiTestProgram()