4 # Copyright (C) 2006, 2007, 2008, 2010, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the objects module"""
28 from ganeti import constants
29 from ganeti import objects
30 from ganeti import errors
35 class SimpleObject(objects.ConfigObject):
36 __slots__ = ["a", "b"]
39 class TestDictState(unittest.TestCase):
40 """Simple dict tansformation tests"""
42 def testSimpleObjectToDict(self):
43 o1 = SimpleObject(a="1")
44 self.assertEquals(o1.ToDict(), {"a": "1"})
45 self.assertEquals(o1.__getstate__(), {"a": "1"})
46 self.assertEquals(o1.__getstate__(), o1.ToDict())
49 self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
50 o2 = SimpleObject.FromDict(o1.ToDict())
51 self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
54 class TestClusterObject(unittest.TestCase):
55 """Tests done on a L{objects.Cluster}"""
73 constants.HT_XEN_PVM: {
74 "root_path": "/dev/sda5",
82 constants.ND_OOB_PROGRAM: "/bin/cluster-oob",
83 constants.ND_SPINDLE_COUNT: 1,
84 constants.ND_EXCLUSIVE_STORAGE: False,
87 self.fake_cl = objects.Cluster(hvparams=hvparams, os_hvp=os_hvp,
89 self.fake_cl.UpgradeConfig()
91 def testGetHVDefaults(self):
93 self.failUnlessEqual(cl.GetHVDefaults(constants.HT_FAKE),
94 cl.hvparams[constants.HT_FAKE])
95 self.failUnlessEqual(cl.GetHVDefaults(None), {})
96 self.failUnlessEqual(cl.GetHVDefaults(constants.HT_XEN_PVM,
97 os_name="lenny-image"),
98 cl.os_hvp["lenny-image"][constants.HT_XEN_PVM])
100 def testFillHvFullMerge(self):
105 fake_dict = constants.HVC_DEFAULTS[constants.HT_FAKE].copy()
113 fake_inst = objects.Instance(name="foobar",
115 hypervisor=constants.HT_FAKE,
116 hvparams=inst_hvparams)
117 self.assertEqual(fake_dict, self.fake_cl.FillHV(fake_inst))
119 def testFillHvGlobalParams(self):
120 fake_inst = objects.Instance(name="foobar",
122 hypervisor=constants.HT_FAKE,
124 self.assertEqual(self.fake_cl.hvparams[constants.HT_FAKE],
125 self.fake_cl.FillHV(fake_inst))
127 def testFillHvInstParams(self):
131 fake_inst = objects.Instance(name="foobar",
133 hypervisor=constants.HT_XEN_PVM,
134 hvparams=inst_hvparams)
135 self.assertEqual(inst_hvparams, self.fake_cl.FillHV(fake_inst))
137 def testFillHvEmptyParams(self):
138 fake_inst = objects.Instance(name="foobar",
140 hypervisor=constants.HT_XEN_PVM,
142 self.assertEqual({}, self.fake_cl.FillHV(fake_inst))
144 def testFillHvPartialParams(self):
146 fake_inst = objects.Instance(name="foobar",
148 hypervisor=constants.HT_XEN_PVM,
150 self.assertEqual(self.fake_cl.os_hvp[os][constants.HT_XEN_PVM],
151 self.fake_cl.FillHV(fake_inst))
153 def testFillNdParamsCluster(self):
154 fake_node = objects.Node(name="test",
157 fake_group = objects.NodeGroup(name="testgroup",
159 self.assertEqual(self.fake_cl.ndparams,
160 self.fake_cl.FillND(fake_node, fake_group))
162 def testFillNdParamsNodeGroup(self):
163 fake_node = objects.Node(name="test",
167 constants.ND_OOB_PROGRAM: "/bin/group-oob",
168 constants.ND_SPINDLE_COUNT: 10,
169 constants.ND_EXCLUSIVE_STORAGE: True,
171 fake_group = objects.NodeGroup(name="testgroup",
172 ndparams=group_ndparams)
173 self.assertEqual(group_ndparams,
174 self.fake_cl.FillND(fake_node, fake_group))
176 def testFillNdParamsNode(self):
178 constants.ND_OOB_PROGRAM: "/bin/node-oob",
179 constants.ND_SPINDLE_COUNT: 2,
180 constants.ND_EXCLUSIVE_STORAGE: True,
182 fake_node = objects.Node(name="test",
183 ndparams=node_ndparams,
185 fake_group = objects.NodeGroup(name="testgroup",
187 self.assertEqual(node_ndparams,
188 self.fake_cl.FillND(fake_node, fake_group))
190 def testFillNdParamsAll(self):
192 constants.ND_OOB_PROGRAM: "/bin/node-oob",
193 constants.ND_SPINDLE_COUNT: 5,
194 constants.ND_EXCLUSIVE_STORAGE: True,
196 fake_node = objects.Node(name="test",
197 ndparams=node_ndparams,
200 constants.ND_OOB_PROGRAM: "/bin/group-oob",
201 constants.ND_SPINDLE_COUNT: 4,
203 fake_group = objects.NodeGroup(name="testgroup",
204 ndparams=group_ndparams)
205 self.assertEqual(node_ndparams,
206 self.fake_cl.FillND(fake_node, fake_group))
208 def testPrimaryHypervisor(self):
209 assert self.fake_cl.enabled_hypervisors is None
210 self.fake_cl.enabled_hypervisors = [constants.HT_XEN_HVM]
211 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_HVM)
213 self.fake_cl.enabled_hypervisors = [constants.HT_XEN_PVM, constants.HT_KVM]
214 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_PVM)
216 self.fake_cl.enabled_hypervisors = sorted(constants.HYPER_TYPES)
217 self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_CHROOT)
219 def testUpgradeConfig(self):
220 # FIXME: This test is incomplete
221 cluster = objects.Cluster()
222 cluster.UpgradeConfig()
223 cluster = objects.Cluster(ipolicy={"unknown_key": None})
224 self.assertRaises(errors.ConfigurationError, cluster.UpgradeConfig)
226 def testUpgradeEnabledDiskTemplates(self):
227 cfg = objects.ConfigData()
228 cfg.cluster = objects.Cluster()
229 cfg.cluster.volume_group_name = "myvg"
230 instance1 = objects.Instance()
231 instance1.disk_template = constants.DT_DISKLESS
232 instance2 = objects.Instance()
233 instance2.disk_template = constants.DT_RBD
234 cfg.instances = { "myinstance1": instance1, "myinstance2": instance2 }
235 nodegroup = objects.NodeGroup()
236 nodegroup.ipolicy = {}
237 nodegroup.ipolicy[constants.IPOLICY_DTS] = [instance1.disk_template, \
239 cfg.cluster.ipolicy = {}
240 cfg.cluster.ipolicy[constants.IPOLICY_DTS] = \
241 [constants.DT_EXT, constants.DT_DISKLESS]
242 cfg.nodegroups = { "mynodegroup": nodegroup }
243 cfg._UpgradeEnabledDiskTemplates()
244 expected_disk_templates = [constants.DT_DRBD8,
246 instance1.disk_template,
247 instance2.disk_template]
248 self.assertEqual(set(expected_disk_templates),
249 set(cfg.cluster.enabled_disk_templates))
250 self.assertEqual(set([instance1.disk_template]),
251 set(cfg.cluster.ipolicy[constants.IPOLICY_DTS]))
254 class TestClusterObjectTcpUdpPortPool(unittest.TestCase):
255 def testNewCluster(self):
256 self.assertTrue(objects.Cluster().tcpudp_port_pool is None)
258 def testSerializingEmpty(self):
259 self.assertEqual(objects.Cluster().ToDict(), {
260 "tcpudp_port_pool": [],
263 def testSerializing(self):
264 cluster = objects.Cluster.FromDict({})
265 self.assertEqual(cluster.tcpudp_port_pool, set())
267 cluster.tcpudp_port_pool.add(3546)
268 cluster.tcpudp_port_pool.add(62511)
270 data = cluster.ToDict()
271 self.assertEqual(data.keys(), ["tcpudp_port_pool"])
272 self.assertEqual(sorted(data["tcpudp_port_pool"]), sorted([3546, 62511]))
274 def testDeserializingEmpty(self):
275 cluster = objects.Cluster.FromDict({})
276 self.assertEqual(cluster.tcpudp_port_pool, set())
278 def testDeserialize(self):
279 cluster = objects.Cluster.FromDict({
280 "tcpudp_port_pool": [26214, 10039, 267],
282 self.assertEqual(cluster.tcpudp_port_pool, set([26214, 10039, 267]))
285 class TestOS(unittest.TestCase):
288 "debootstrap+default",
289 "debootstrap++default",
292 def testSplitNameVariant(self):
293 for name in self.ALL_DATA:
294 self.assertEqual(len(objects.OS.SplitNameVariant(name)), 2)
296 def testVariant(self):
297 self.assertEqual(objects.OS.GetVariant("debootstrap"), "")
298 self.assertEqual(objects.OS.GetVariant("debootstrap+default"), "default")
301 class TestInstance(unittest.TestCase):
302 def _GenericCheck(self, inst):
303 for i in [inst.all_nodes, inst.secondary_nodes]:
304 self.assertTrue(isinstance(inst.all_nodes, (list, tuple)),
305 msg="Data type doesn't guarantee order")
307 self.assertTrue(inst.primary_node not in inst.secondary_nodes)
308 self.assertEqual(inst.all_nodes[0], inst.primary_node,
309 msg="Primary node not first node in list")
311 def testNodesNoDisks(self):
312 inst = objects.Instance(name="fakeinst.example.com",
313 primary_node="pnode.example.com",
317 self._GenericCheck(inst)
318 self.assertEqual(len(inst.secondary_nodes), 0)
319 self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
320 self.assertEqual(inst.MapLVsByNode(), {
321 inst.primary_node: [],
324 def testNodesPlainDisks(self):
325 inst = objects.Instance(name="fakeinstplain.example.com",
326 primary_node="node3.example.com",
328 objects.Disk(dev_type=constants.DT_PLAIN, size=128,
329 logical_id=("myxenvg", "disk25494")),
330 objects.Disk(dev_type=constants.DT_PLAIN, size=512,
331 logical_id=("myxenvg", "disk29071")),
334 self._GenericCheck(inst)
335 self.assertEqual(len(inst.secondary_nodes), 0)
336 self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
337 self.assertEqual(inst.MapLVsByNode(), {
338 inst.primary_node: ["myxenvg/disk25494", "myxenvg/disk29071"],
341 def testNodesDrbdDisks(self):
342 inst = objects.Instance(name="fakeinstdrbd.example.com",
343 primary_node="node10.example.com",
345 objects.Disk(dev_type=constants.DT_DRBD8, size=786432,
346 logical_id=("node10.example.com", "node15.example.com",
347 12300, 0, 0, "secret"),
349 objects.Disk(dev_type=constants.DT_PLAIN, size=786432,
350 logical_id=("myxenvg", "disk0")),
351 objects.Disk(dev_type=constants.DT_PLAIN, size=128,
352 logical_id=("myxenvg", "meta0"))
357 self._GenericCheck(inst)
358 self.assertEqual(set(inst.secondary_nodes), set(["node15.example.com"]))
359 self.assertEqual(set(inst.all_nodes),
360 set([inst.primary_node, "node15.example.com"]))
361 self.assertEqual(inst.MapLVsByNode(), {
362 inst.primary_node: ["myxenvg/disk0", "myxenvg/meta0"],
363 "node15.example.com": ["myxenvg/disk0", "myxenvg/meta0"],
366 self.assertEqual(inst.FindDisk(0), inst.disks[0])
367 self.assertRaises(errors.OpPrereqError, inst.FindDisk, "hello")
368 self.assertRaises(errors.OpPrereqError, inst.FindDisk, 100)
369 self.assertRaises(errors.OpPrereqError, inst.FindDisk, 1)
372 class TestNode(unittest.TestCase):
374 self.assertEqual(objects.Node().ToDict(), {})
375 self.assertTrue(isinstance(objects.Node.FromDict({}), objects.Node))
377 def testHvState(self):
378 node = objects.Node(name="node18157.example.com", hv_state={
379 constants.HT_XEN_HVM: objects.NodeHvState(cpu_total=64),
380 constants.HT_KVM: objects.NodeHvState(cpu_node=1),
383 node2 = objects.Node.FromDict(node.ToDict())
385 # Make sure nothing can reference it anymore
388 self.assertEqual(node2.name, "node18157.example.com")
389 self.assertEqual(frozenset(node2.hv_state), frozenset([
390 constants.HT_XEN_HVM,
393 self.assertEqual(node2.hv_state[constants.HT_KVM].cpu_node, 1)
394 self.assertEqual(node2.hv_state[constants.HT_XEN_HVM].cpu_total, 64)
396 def testDiskState(self):
397 node = objects.Node(name="node32087.example.com", disk_state={
398 constants.DT_PLAIN: {
399 "lv32352": objects.NodeDiskState(total=128),
400 "lv2082": objects.NodeDiskState(total=512),
404 node2 = objects.Node.FromDict(node.ToDict())
406 # Make sure nothing can reference it anymore
409 self.assertEqual(node2.name, "node32087.example.com")
410 self.assertEqual(frozenset(node2.disk_state), frozenset([
413 self.assertEqual(frozenset(node2.disk_state[constants.DT_PLAIN]),
414 frozenset(["lv32352", "lv2082"]))
415 self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv2082"].total, 512)
416 self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv32352"].total, 128)
418 def testFilterEsNdp(self):
419 node1 = objects.Node(name="node11673.example.com", ndparams={
420 constants.ND_EXCLUSIVE_STORAGE: True,
422 node2 = objects.Node(name="node11674.example.com", ndparams={
423 constants.ND_SPINDLE_COUNT: 3,
424 constants.ND_EXCLUSIVE_STORAGE: False,
426 self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
427 node1.UpgradeConfig()
428 self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
429 self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
430 self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
431 node2.UpgradeConfig()
432 self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
433 self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
436 class TestInstancePolicy(unittest.TestCase):
438 # Policies are big, and we want to see the difference in case of an error
441 def _AssertIPolicyIsFull(self, policy):
442 self.assertEqual(frozenset(policy.keys()), constants.IPOLICY_ALL_KEYS)
443 self.assertTrue(len(policy[constants.ISPECS_MINMAX]) > 0)
444 for minmax in policy[constants.ISPECS_MINMAX]:
445 self.assertEqual(frozenset(minmax.keys()), constants.ISPECS_MINMAX_KEYS)
446 for key in constants.ISPECS_MINMAX_KEYS:
447 self.assertEqual(frozenset(minmax[key].keys()),
448 constants.ISPECS_PARAMETERS)
449 self.assertEqual(frozenset(policy[constants.ISPECS_STD].keys()),
450 constants.ISPECS_PARAMETERS)
452 def testDefaultIPolicy(self):
453 objects.InstancePolicy.CheckParameterSyntax(constants.IPOLICY_DEFAULTS,
455 self._AssertIPolicyIsFull(constants.IPOLICY_DEFAULTS)
457 def _AssertPolicyIsBad(self, ipolicy, do_check_std=None):
458 if do_check_std is None:
459 check_std_vals = [False, True]
461 check_std_vals = [do_check_std]
462 for check_std in check_std_vals:
463 self.assertRaises(errors.ConfigurationError,
464 objects.InstancePolicy.CheckISpecSyntax,
467 def testCheckISpecSyntax(self):
468 default_stdspec = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
469 incomplete_ipolicies = [
471 constants.ISPECS_MINMAX: [],
472 constants.ISPECS_STD: default_stdspec,
475 constants.ISPECS_MINMAX: [{}],
476 constants.ISPECS_STD: default_stdspec,
479 constants.ISPECS_MINMAX: [{
480 constants.ISPECS_MIN: NotImplemented,
482 constants.ISPECS_STD: default_stdspec,
485 constants.ISPECS_MINMAX: [{
486 constants.ISPECS_MAX: NotImplemented,
488 constants.ISPECS_STD: default_stdspec,
491 constants.ISPECS_MINMAX: [{
492 constants.ISPECS_MIN: NotImplemented,
493 constants.ISPECS_MAX: NotImplemented,
497 for ipol in incomplete_ipolicies:
498 self.assertRaises(errors.ConfigurationError,
499 objects.InstancePolicy.CheckISpecSyntax,
501 oldminmax = ipol[constants.ISPECS_MINMAX]
503 # Prepending valid specs shouldn't change the error
504 ipol[constants.ISPECS_MINMAX] = ([constants.ISPECS_MINMAX_DEFAULTS] +
506 self.assertRaises(errors.ConfigurationError,
507 objects.InstancePolicy.CheckISpecSyntax,
511 constants.ISPECS_MINMAX: [
513 constants.ISPECS_MIN: {
514 constants.ISPEC_MEM_SIZE: 64,
515 constants.ISPEC_CPU_COUNT: 1,
516 constants.ISPEC_DISK_COUNT: 2,
517 constants.ISPEC_DISK_SIZE: 64,
518 constants.ISPEC_NIC_COUNT: 1,
519 constants.ISPEC_SPINDLE_USE: 1,
521 constants.ISPECS_MAX: {
522 constants.ISPEC_MEM_SIZE: 16384,
523 constants.ISPEC_CPU_COUNT: 5,
524 constants.ISPEC_DISK_COUNT: 12,
525 constants.ISPEC_DISK_SIZE: 1024,
526 constants.ISPEC_NIC_COUNT: 9,
527 constants.ISPEC_SPINDLE_USE: 18,
531 constants.ISPECS_MIN: {
532 constants.ISPEC_MEM_SIZE: 32768,
533 constants.ISPEC_CPU_COUNT: 8,
534 constants.ISPEC_DISK_COUNT: 1,
535 constants.ISPEC_DISK_SIZE: 1024,
536 constants.ISPEC_NIC_COUNT: 1,
537 constants.ISPEC_SPINDLE_USE: 1,
539 constants.ISPECS_MAX: {
540 constants.ISPEC_MEM_SIZE: 65536,
541 constants.ISPEC_CPU_COUNT: 10,
542 constants.ISPEC_DISK_COUNT: 5,
543 constants.ISPEC_DISK_SIZE: 1024 * 1024,
544 constants.ISPEC_NIC_COUNT: 3,
545 constants.ISPEC_SPINDLE_USE: 12,
550 good_ipolicy[constants.ISPECS_STD] = copy.deepcopy(
551 good_ipolicy[constants.ISPECS_MINMAX][0][constants.ISPECS_MAX])
552 # Check that it's really good before making it bad
553 objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
555 bad_ipolicy = copy.deepcopy(good_ipolicy)
556 for minmax in bad_ipolicy[constants.ISPECS_MINMAX]:
557 for (key, spec) in minmax.items():
561 self._AssertPolicyIsBad(bad_ipolicy)
562 if key == constants.ISPECS_MIN:
563 spec[param] = minmax[constants.ISPECS_MAX][param] + 1
564 self._AssertPolicyIsBad(bad_ipolicy)
566 assert bad_ipolicy == good_ipolicy
568 stdspec = bad_ipolicy[constants.ISPECS_STD]
569 for param in stdspec:
570 oldv = stdspec[param]
572 self._AssertPolicyIsBad(bad_ipolicy, True)
573 # Note that std spec is the same as a max spec
574 stdspec[param] = oldv + 1
575 self._AssertPolicyIsBad(bad_ipolicy, True)
576 stdspec[param] = oldv
577 assert bad_ipolicy == good_ipolicy
579 for minmax in good_ipolicy[constants.ISPECS_MINMAX]:
580 for spec in minmax.values():
581 good_ipolicy[constants.ISPECS_STD] = spec
582 objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
584 def testCheckISpecParamSyntax(self):
586 for check_std in [True, False]:
588 good_values = [(11, 11), (11, 40), (0, 0)]
589 for (mn, mx) in good_values:
590 minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
591 minmax[constants.ISPECS_MIN][par] = mn
592 minmax[constants.ISPECS_MAX][par] = mx
593 objects.InstancePolicy._CheckISpecParamSyntax(minmax, {}, par,
595 minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
596 minmax[constants.ISPECS_MIN][par] = 11
597 minmax[constants.ISPECS_MAX][par] = 5
598 self.assertRaises(errors.ConfigurationError,
599 objects.InstancePolicy._CheckISpecParamSyntax,
600 minmax, {}, par, check_std)
607 for (mn, st, mx) in good_values:
609 constants.ISPECS_MIN: {par: mn},
610 constants.ISPECS_MAX: {par: mx},
613 objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, par, True)
622 for (mn, st, mx, excp) in bad_values:
624 constants.ISPECS_MIN: {par: mn},
625 constants.ISPECS_MAX: {par: mx},
629 self.assertRaises(errors.ConfigurationError,
630 objects.InstancePolicy._CheckISpecParamSyntax,
631 minmax, stdspec, par, True)
633 ret = objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec,
635 self.assertFalse(ret)
637 def testCheckDiskTemplates(self):
638 invalid = "this_is_not_a_good_template"
639 for dt in constants.DISK_TEMPLATES:
640 objects.InstancePolicy.CheckDiskTemplates([dt])
641 objects.InstancePolicy.CheckDiskTemplates(list(constants.DISK_TEMPLATES))
644 [constants.DT_DRBD8, invalid],
645 list(constants.DISK_TEMPLATES) + [invalid],
649 for dtl in bad_examples:
650 self.assertRaises(errors.ConfigurationError,
651 objects.InstancePolicy.CheckDiskTemplates,
654 def testCheckParameterSyntax(self):
655 invalid = "this_key_shouldnt_be_here"
656 for check_std in [True, False]:
657 objects.InstancePolicy.CheckParameterSyntax({}, check_std)
658 policy = {invalid: None}
659 self.assertRaises(errors.ConfigurationError,
660 objects.InstancePolicy.CheckParameterSyntax,
662 for par in constants.IPOLICY_PARAMETERS:
663 for val in ("blah", None, {}, [42]):
665 self.assertRaises(errors.ConfigurationError,
666 objects.InstancePolicy.CheckParameterSyntax,
669 def testFillIPolicyEmpty(self):
670 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, {})
671 objects.InstancePolicy.CheckParameterSyntax(policy, True)
672 self.assertEqual(policy, constants.IPOLICY_DEFAULTS)
674 def _AssertISpecsMerged(self, default_spec, diff_spec, merged_spec):
675 for (param, value) in merged_spec.items():
676 if param in diff_spec:
677 self.assertEqual(value, diff_spec[param])
679 self.assertEqual(value, default_spec[param])
681 def _AssertIPolicyMerged(self, default_pol, diff_pol, merged_pol):
682 for (key, value) in merged_pol.items():
684 if key == constants.ISPECS_STD:
685 self._AssertISpecsMerged(default_pol[key], diff_pol[key], value)
687 self.assertEqual(value, diff_pol[key])
689 self.assertEqual(value, default_pol[key])
691 def testFillIPolicy(self):
693 {constants.IPOLICY_VCPU_RATIO: 3.14},
694 {constants.IPOLICY_SPINDLE_RATIO: 2.72},
695 {constants.IPOLICY_DTS: [constants.DT_FILE]},
696 {constants.ISPECS_STD: {constants.ISPEC_DISK_COUNT: 3}},
697 {constants.ISPECS_MINMAX: [constants.ISPECS_MINMAX_DEFAULTS,
698 constants.ISPECS_MINMAX_DEFAULTS]}
700 for diff_pol in partial_policies:
701 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
702 objects.InstancePolicy.CheckParameterSyntax(policy, True)
703 self._AssertIPolicyIsFull(policy)
704 self._AssertIPolicyMerged(constants.IPOLICY_DEFAULTS, diff_pol, policy)
706 def testFillIPolicyKeepsUnknown(self):
707 INVALID_KEY = "invalid_ipolicy_key"
711 policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
712 self.assertTrue(INVALID_KEY in policy)
715 class TestDisk(unittest.TestCase):
716 def addChild(self, disk):
717 """Adds a child of the same device type as the parent."""
719 child = objects.Disk()
720 child.dev_type = disk.dev_type
721 disk.children.append(child)
723 def testUpgradeConfigDevTypeLegacy(self):
724 for old, new in [("drbd8", constants.DT_DRBD8),
725 ("lvm", constants.DT_PLAIN)]:
726 disk = objects.Disk()
730 self.assertEqual(new, disk.dev_type)
731 self.assertEqual(new, disk.children[0].dev_type)
733 def testUpgradeConfigDevTypeLegacyUnchanged(self):
734 dev_types = [constants.DT_FILE, constants.DT_SHARED_FILE,
735 constants.DT_BLOCK, constants.DT_EXT,
737 for dev_type in dev_types:
738 disk = objects.Disk()
739 disk.dev_type = dev_type
742 self.assertEqual(dev_type, disk.dev_type)
743 self.assertEqual(dev_type, disk.children[0].dev_type)
746 if __name__ == "__main__":
747 testutils.GanetiTestProgram()