Unit test for FillIPolicy keeping unknown keys
[ganeti-local] / test / py / ganeti.objects_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the objects module"""
23
24
25 import copy
26 import unittest
27
28 from ganeti import constants
29 from ganeti import objects
30 from ganeti import errors
31
32 import testutils
33
34
35 class SimpleObject(objects.ConfigObject):
36   __slots__ = ["a", "b"]
37
38
39 class TestDictState(unittest.TestCase):
40   """Simple dict tansformation tests"""
41
42   def testSimpleObjectToDict(self):
43     o1 = SimpleObject(a="1")
44     self.assertEquals(o1.ToDict(), {"a": "1"})
45     self.assertEquals(o1.__getstate__(), {"a": "1"})
46     self.assertEquals(o1.__getstate__(), o1.ToDict())
47     o1.a = 2
48     o1.b = 5
49     self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
50     o2 = SimpleObject.FromDict(o1.ToDict())
51     self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
52
53
54 class TestClusterObject(unittest.TestCase):
55   """Tests done on a L{objects.Cluster}"""
56
57   def setUp(self):
58     hvparams = {
59       constants.HT_FAKE: {
60         "foo": "bar",
61         "bar": "foo",
62         "foobar": "barfoo",
63         },
64       }
65     os_hvp = {
66       "lenny-image": {
67         constants.HT_FAKE: {
68           "foo": "baz",
69           "foobar": "foobar",
70           "blah": "blibb",
71           "blubb": "blah",
72           },
73         constants.HT_XEN_PVM: {
74           "root_path": "/dev/sda5",
75           "foo": "foobar",
76           },
77         },
78       "ubuntu-hardy": {
79         },
80       }
81     ndparams = {
82         constants.ND_OOB_PROGRAM: "/bin/cluster-oob",
83         constants.ND_SPINDLE_COUNT: 1,
84         constants.ND_EXCLUSIVE_STORAGE: False,
85         }
86
87     self.fake_cl = objects.Cluster(hvparams=hvparams, os_hvp=os_hvp,
88                                    ndparams=ndparams)
89     self.fake_cl.UpgradeConfig()
90
91   def testGetHVDefaults(self):
92     cl = self.fake_cl
93     self.failUnlessEqual(cl.GetHVDefaults(constants.HT_FAKE),
94                          cl.hvparams[constants.HT_FAKE])
95     self.failUnlessEqual(cl.GetHVDefaults(None), {})
96     self.failUnlessEqual(cl.GetHVDefaults(constants.HT_XEN_PVM,
97                                           os_name="lenny-image"),
98                          cl.os_hvp["lenny-image"][constants.HT_XEN_PVM])
99
100
101   def testFillHvFullMerge(self):
102     inst_hvparams = {
103       "blah": "blubb",
104       }
105
106     fake_dict = {
107       "foo": "baz",
108       "bar": "foo",
109       "foobar": "foobar",
110       "blah": "blubb",
111       "blubb": "blah",
112       }
113     fake_inst = objects.Instance(name="foobar",
114                                  os="lenny-image",
115                                  hypervisor=constants.HT_FAKE,
116                                  hvparams=inst_hvparams)
117     self.assertEqual(fake_dict, self.fake_cl.FillHV(fake_inst))
118
119   def testFillHvGlobalParams(self):
120     fake_inst = objects.Instance(name="foobar",
121                                  os="ubuntu-hardy",
122                                  hypervisor=constants.HT_FAKE,
123                                  hvparams={})
124     self.assertEqual(self.fake_cl.hvparams[constants.HT_FAKE],
125                      self.fake_cl.FillHV(fake_inst))
126
127   def testFillHvInstParams(self):
128     inst_hvparams = {
129       "blah": "blubb",
130       }
131     fake_inst = objects.Instance(name="foobar",
132                                  os="ubuntu-hardy",
133                                  hypervisor=constants.HT_XEN_PVM,
134                                  hvparams=inst_hvparams)
135     self.assertEqual(inst_hvparams, self.fake_cl.FillHV(fake_inst))
136
137   def testFillHvEmptyParams(self):
138     fake_inst = objects.Instance(name="foobar",
139                                  os="ubuntu-hardy",
140                                  hypervisor=constants.HT_XEN_PVM,
141                                  hvparams={})
142     self.assertEqual({}, self.fake_cl.FillHV(fake_inst))
143
144   def testFillHvPartialParams(self):
145     os = "lenny-image"
146     fake_inst = objects.Instance(name="foobar",
147                                  os=os,
148                                  hypervisor=constants.HT_XEN_PVM,
149                                  hvparams={})
150     self.assertEqual(self.fake_cl.os_hvp[os][constants.HT_XEN_PVM],
151                      self.fake_cl.FillHV(fake_inst))
152
153   def testFillNdParamsCluster(self):
154     fake_node = objects.Node(name="test",
155                              ndparams={},
156                              group="testgroup")
157     fake_group = objects.NodeGroup(name="testgroup",
158                                    ndparams={})
159     self.assertEqual(self.fake_cl.ndparams,
160                      self.fake_cl.FillND(fake_node, fake_group))
161
162   def testFillNdParamsNodeGroup(self):
163     fake_node = objects.Node(name="test",
164                              ndparams={},
165                              group="testgroup")
166     group_ndparams = {
167         constants.ND_OOB_PROGRAM: "/bin/group-oob",
168         constants.ND_SPINDLE_COUNT: 10,
169         constants.ND_EXCLUSIVE_STORAGE: True,
170         }
171     fake_group = objects.NodeGroup(name="testgroup",
172                                    ndparams=group_ndparams)
173     self.assertEqual(group_ndparams,
174                      self.fake_cl.FillND(fake_node, fake_group))
175
176   def testFillNdParamsNode(self):
177     node_ndparams = {
178         constants.ND_OOB_PROGRAM: "/bin/node-oob",
179         constants.ND_SPINDLE_COUNT: 2,
180         constants.ND_EXCLUSIVE_STORAGE: True,
181         }
182     fake_node = objects.Node(name="test",
183                              ndparams=node_ndparams,
184                              group="testgroup")
185     fake_group = objects.NodeGroup(name="testgroup",
186                                    ndparams={})
187     self.assertEqual(node_ndparams,
188                      self.fake_cl.FillND(fake_node, fake_group))
189
190   def testFillNdParamsAll(self):
191     node_ndparams = {
192         constants.ND_OOB_PROGRAM: "/bin/node-oob",
193         constants.ND_SPINDLE_COUNT: 5,
194         constants.ND_EXCLUSIVE_STORAGE: True,
195         }
196     fake_node = objects.Node(name="test",
197                              ndparams=node_ndparams,
198                              group="testgroup")
199     group_ndparams = {
200         constants.ND_OOB_PROGRAM: "/bin/group-oob",
201         constants.ND_SPINDLE_COUNT: 4,
202         }
203     fake_group = objects.NodeGroup(name="testgroup",
204                                    ndparams=group_ndparams)
205     self.assertEqual(node_ndparams,
206                      self.fake_cl.FillND(fake_node, fake_group))
207
208   def testPrimaryHypervisor(self):
209     assert self.fake_cl.enabled_hypervisors is None
210     self.fake_cl.enabled_hypervisors = [constants.HT_XEN_HVM]
211     self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_HVM)
212
213     self.fake_cl.enabled_hypervisors = [constants.HT_XEN_PVM, constants.HT_KVM]
214     self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_PVM)
215
216     self.fake_cl.enabled_hypervisors = sorted(constants.HYPER_TYPES)
217     self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_CHROOT)
218
219   def testUpgradeConfig(self):
220     # FIXME: This test is incomplete
221     cluster = objects.Cluster()
222     cluster.UpgradeConfig()
223     cluster = objects.Cluster(ipolicy={"unknown_key": None})
224     self.assertRaises(errors.ConfigurationError, cluster.UpgradeConfig)
225
226
227 class TestClusterObjectTcpUdpPortPool(unittest.TestCase):
228   def testNewCluster(self):
229     self.assertTrue(objects.Cluster().tcpudp_port_pool is None)
230
231   def testSerializingEmpty(self):
232     self.assertEqual(objects.Cluster().ToDict(), {
233       "tcpudp_port_pool": [],
234       })
235
236   def testSerializing(self):
237     cluster = objects.Cluster.FromDict({})
238     self.assertEqual(cluster.tcpudp_port_pool, set())
239
240     cluster.tcpudp_port_pool.add(3546)
241     cluster.tcpudp_port_pool.add(62511)
242
243     data = cluster.ToDict()
244     self.assertEqual(data.keys(), ["tcpudp_port_pool"])
245     self.assertEqual(sorted(data["tcpudp_port_pool"]), sorted([3546, 62511]))
246
247   def testDeserializingEmpty(self):
248     cluster = objects.Cluster.FromDict({})
249     self.assertEqual(cluster.tcpudp_port_pool, set())
250
251   def testDeserialize(self):
252     cluster = objects.Cluster.FromDict({
253       "tcpudp_port_pool": [26214, 10039, 267],
254       })
255     self.assertEqual(cluster.tcpudp_port_pool, set([26214, 10039, 267]))
256
257
258 class TestOS(unittest.TestCase):
259   ALL_DATA = [
260     "debootstrap",
261     "debootstrap+default",
262     "debootstrap++default",
263     ]
264
265   def testSplitNameVariant(self):
266     for name in self.ALL_DATA:
267       self.assertEqual(len(objects.OS.SplitNameVariant(name)), 2)
268
269   def testVariant(self):
270     self.assertEqual(objects.OS.GetVariant("debootstrap"), "")
271     self.assertEqual(objects.OS.GetVariant("debootstrap+default"), "default")
272
273
274 class TestInstance(unittest.TestCase):
275   def _GenericCheck(self, inst):
276     for i in [inst.all_nodes, inst.secondary_nodes]:
277       self.assertTrue(isinstance(inst.all_nodes, (list, tuple)),
278                       msg="Data type doesn't guarantee order")
279
280     self.assertTrue(inst.primary_node not in inst.secondary_nodes)
281     self.assertEqual(inst.all_nodes[0], inst.primary_node,
282                      msg="Primary node not first node in list")
283
284   def testNodesNoDisks(self):
285     inst = objects.Instance(name="fakeinst.example.com",
286       primary_node="pnode.example.com",
287       disks=[
288         ])
289
290     self._GenericCheck(inst)
291     self.assertEqual(len(inst.secondary_nodes), 0)
292     self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
293     self.assertEqual(inst.MapLVsByNode(), {
294       inst.primary_node: [],
295       })
296
297   def testNodesPlainDisks(self):
298     inst = objects.Instance(name="fakeinstplain.example.com",
299       primary_node="node3.example.com",
300       disks=[
301         objects.Disk(dev_type=constants.LD_LV, size=128,
302                      logical_id=("myxenvg", "disk25494")),
303         objects.Disk(dev_type=constants.LD_LV, size=512,
304                      logical_id=("myxenvg", "disk29071")),
305         ])
306
307     self._GenericCheck(inst)
308     self.assertEqual(len(inst.secondary_nodes), 0)
309     self.assertEqual(set(inst.all_nodes), set([inst.primary_node]))
310     self.assertEqual(inst.MapLVsByNode(), {
311       inst.primary_node: ["myxenvg/disk25494", "myxenvg/disk29071"],
312       })
313
314   def testNodesDrbdDisks(self):
315     inst = objects.Instance(name="fakeinstdrbd.example.com",
316       primary_node="node10.example.com",
317       disks=[
318         objects.Disk(dev_type=constants.LD_DRBD8, size=786432,
319           logical_id=("node10.example.com", "node15.example.com",
320                       12300, 0, 0, "secret"),
321           children=[
322             objects.Disk(dev_type=constants.LD_LV, size=786432,
323                          logical_id=("myxenvg", "disk0")),
324             objects.Disk(dev_type=constants.LD_LV, size=128,
325                          logical_id=("myxenvg", "meta0"))
326           ],
327           iv_name="disk/0")
328         ])
329
330     self._GenericCheck(inst)
331     self.assertEqual(set(inst.secondary_nodes), set(["node15.example.com"]))
332     self.assertEqual(set(inst.all_nodes),
333                      set([inst.primary_node, "node15.example.com"]))
334     self.assertEqual(inst.MapLVsByNode(), {
335       inst.primary_node: ["myxenvg/disk0", "myxenvg/meta0"],
336       "node15.example.com": ["myxenvg/disk0", "myxenvg/meta0"],
337       })
338
339     self.assertEqual(inst.FindDisk(0), inst.disks[0])
340     self.assertRaises(errors.OpPrereqError, inst.FindDisk, "hello")
341     self.assertRaises(errors.OpPrereqError, inst.FindDisk, 100)
342     self.assertRaises(errors.OpPrereqError, inst.FindDisk, 1)
343
344
345 class TestNode(unittest.TestCase):
346   def testEmpty(self):
347     self.assertEqual(objects.Node().ToDict(), {})
348     self.assertTrue(isinstance(objects.Node.FromDict({}), objects.Node))
349
350   def testHvState(self):
351     node = objects.Node(name="node18157.example.com", hv_state={
352       constants.HT_XEN_HVM: objects.NodeHvState(cpu_total=64),
353       constants.HT_KVM: objects.NodeHvState(cpu_node=1),
354       })
355
356     node2 = objects.Node.FromDict(node.ToDict())
357
358     # Make sure nothing can reference it anymore
359     del node
360
361     self.assertEqual(node2.name, "node18157.example.com")
362     self.assertEqual(frozenset(node2.hv_state), frozenset([
363       constants.HT_XEN_HVM,
364       constants.HT_KVM,
365       ]))
366     self.assertEqual(node2.hv_state[constants.HT_KVM].cpu_node, 1)
367     self.assertEqual(node2.hv_state[constants.HT_XEN_HVM].cpu_total, 64)
368
369   def testDiskState(self):
370     node = objects.Node(name="node32087.example.com", disk_state={
371       constants.LD_LV: {
372         "lv32352": objects.NodeDiskState(total=128),
373         "lv2082": objects.NodeDiskState(total=512),
374         },
375       })
376
377     node2 = objects.Node.FromDict(node.ToDict())
378
379     # Make sure nothing can reference it anymore
380     del node
381
382     self.assertEqual(node2.name, "node32087.example.com")
383     self.assertEqual(frozenset(node2.disk_state), frozenset([
384       constants.LD_LV,
385       ]))
386     self.assertEqual(frozenset(node2.disk_state[constants.LD_LV]), frozenset([
387       "lv32352",
388       "lv2082",
389       ]))
390     self.assertEqual(node2.disk_state[constants.LD_LV]["lv2082"].total, 512)
391     self.assertEqual(node2.disk_state[constants.LD_LV]["lv32352"].total, 128)
392
393   def testFilterEsNdp(self):
394     node1 = objects.Node(name="node11673.example.com", ndparams={
395       constants.ND_EXCLUSIVE_STORAGE: True,
396       })
397     node2 = objects.Node(name="node11674.example.com", ndparams={
398       constants.ND_SPINDLE_COUNT: 3,
399       constants.ND_EXCLUSIVE_STORAGE: False,
400       })
401     self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
402     node1.UpgradeConfig()
403     self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
404     self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
405     self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
406     node2.UpgradeConfig()
407     self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
408     self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
409
410
411 class TestInstancePolicy(unittest.TestCase):
412   def setUp(self):
413     # Policies are big, and we want to see the difference in case of an error
414     self.maxDiff = None
415
416   def _AssertIPolicyIsFull(self, policy):
417     self.assertEqual(frozenset(policy.keys()), constants.IPOLICY_ALL_KEYS)
418     self.assertTrue(len(policy[constants.ISPECS_MINMAX]) > 0)
419     for minmax in policy[constants.ISPECS_MINMAX]:
420       self.assertEqual(frozenset(minmax.keys()), constants.ISPECS_MINMAX_KEYS)
421       for key in constants.ISPECS_MINMAX_KEYS:
422         self.assertEqual(frozenset(minmax[key].keys()),
423                          constants.ISPECS_PARAMETERS)
424     self.assertEqual(frozenset(policy[constants.ISPECS_STD].keys()),
425                      constants.ISPECS_PARAMETERS)
426
427   def testDefaultIPolicy(self):
428     objects.InstancePolicy.CheckParameterSyntax(constants.IPOLICY_DEFAULTS,
429                                                 True)
430     self._AssertIPolicyIsFull(constants.IPOLICY_DEFAULTS)
431
432   def _AssertPolicyIsBad(self, ipolicy, do_check_std=None):
433     if do_check_std is None:
434       check_std_vals = [False, True]
435     else:
436       check_std_vals = [do_check_std]
437     for check_std in check_std_vals:
438       self.assertRaises(errors.ConfigurationError,
439                         objects.InstancePolicy.CheckISpecSyntax,
440                         ipolicy, check_std)
441
442   def testCheckISpecSyntax(self):
443     default_stdspec = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
444     incomplete_ipolicies = [
445       {
446          constants.ISPECS_MINMAX: [],
447          constants.ISPECS_STD: default_stdspec,
448          },
449       {
450          constants.ISPECS_MINMAX: [{}],
451          constants.ISPECS_STD: default_stdspec,
452          },
453       {
454         constants.ISPECS_MINMAX: [{
455           constants.ISPECS_MIN: NotImplemented,
456           }],
457         constants.ISPECS_STD: default_stdspec,
458         },
459       {
460         constants.ISPECS_MINMAX: [{
461           constants.ISPECS_MAX: NotImplemented,
462           }],
463         constants.ISPECS_STD: default_stdspec,
464         },
465       {
466         constants.ISPECS_MINMAX: [{
467           constants.ISPECS_MIN: NotImplemented,
468           constants.ISPECS_MAX: NotImplemented,
469           }],
470         },
471       ]
472     for ipol in incomplete_ipolicies:
473       self.assertRaises(errors.ConfigurationError,
474                         objects.InstancePolicy.CheckISpecSyntax,
475                         ipol, True)
476       oldminmax = ipol[constants.ISPECS_MINMAX]
477       if oldminmax:
478         # Prepending valid specs shouldn't change the error
479         ipol[constants.ISPECS_MINMAX] = ([constants.ISPECS_MINMAX_DEFAULTS] +
480                                          oldminmax)
481         self.assertRaises(errors.ConfigurationError,
482                           objects.InstancePolicy.CheckISpecSyntax,
483                           ipol, True)
484
485     good_ipolicy = {
486       constants.ISPECS_MINMAX: [
487         {
488           constants.ISPECS_MIN: {
489             constants.ISPEC_MEM_SIZE: 64,
490             constants.ISPEC_CPU_COUNT: 1,
491             constants.ISPEC_DISK_COUNT: 2,
492             constants.ISPEC_DISK_SIZE: 64,
493             constants.ISPEC_NIC_COUNT: 1,
494             constants.ISPEC_SPINDLE_USE: 1,
495             },
496           constants.ISPECS_MAX: {
497             constants.ISPEC_MEM_SIZE: 16384,
498             constants.ISPEC_CPU_COUNT: 5,
499             constants.ISPEC_DISK_COUNT: 12,
500             constants.ISPEC_DISK_SIZE: 1024,
501             constants.ISPEC_NIC_COUNT: 9,
502             constants.ISPEC_SPINDLE_USE: 18,
503             },
504           },
505         {
506           constants.ISPECS_MIN: {
507             constants.ISPEC_MEM_SIZE: 32768,
508             constants.ISPEC_CPU_COUNT: 8,
509             constants.ISPEC_DISK_COUNT: 1,
510             constants.ISPEC_DISK_SIZE: 1024,
511             constants.ISPEC_NIC_COUNT: 1,
512             constants.ISPEC_SPINDLE_USE: 1,
513             },
514           constants.ISPECS_MAX: {
515             constants.ISPEC_MEM_SIZE: 65536,
516             constants.ISPEC_CPU_COUNT: 10,
517             constants.ISPEC_DISK_COUNT: 5,
518             constants.ISPEC_DISK_SIZE: 1024 * 1024,
519             constants.ISPEC_NIC_COUNT: 3,
520             constants.ISPEC_SPINDLE_USE: 12,
521             },
522           },
523         ],
524       }
525     good_ipolicy[constants.ISPECS_STD] = copy.deepcopy(
526       good_ipolicy[constants.ISPECS_MINMAX][0][constants.ISPECS_MAX])
527     # Check that it's really good before making it bad
528     objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
529
530     bad_ipolicy = copy.deepcopy(good_ipolicy)
531     for minmax in bad_ipolicy[constants.ISPECS_MINMAX]:
532       for (key, spec) in minmax.items():
533         for param in spec:
534           oldv = spec[param]
535           del spec[param]
536           self._AssertPolicyIsBad(bad_ipolicy)
537           if key == constants.ISPECS_MIN:
538             spec[param] = minmax[constants.ISPECS_MAX][param] + 1
539           self._AssertPolicyIsBad(bad_ipolicy)
540           spec[param] = oldv
541     assert bad_ipolicy == good_ipolicy
542
543     stdspec = bad_ipolicy[constants.ISPECS_STD]
544     for param in stdspec:
545       oldv = stdspec[param]
546       del stdspec[param]
547       self._AssertPolicyIsBad(bad_ipolicy, True)
548       # Note that std spec is the same as a max spec
549       stdspec[param] = oldv + 1
550       self._AssertPolicyIsBad(bad_ipolicy, True)
551       stdspec[param] = oldv
552     assert bad_ipolicy == good_ipolicy
553
554     for minmax in good_ipolicy[constants.ISPECS_MINMAX]:
555       for spec in minmax.values():
556         good_ipolicy[constants.ISPECS_STD] = spec
557         objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
558
559   def testCheckISpecParamSyntax(self):
560     par = "my_parameter"
561     for check_std in [True, False]:
562       # Min and max only
563       good_values = [(11, 11), (11, 40), (0, 0)]
564       for (mn, mx) in good_values:
565         minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
566         minmax[constants.ISPECS_MIN][par] = mn
567         minmax[constants.ISPECS_MAX][par] = mx
568         objects.InstancePolicy._CheckISpecParamSyntax(minmax, {}, par,
569                                                      check_std)
570       minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
571       minmax[constants.ISPECS_MIN][par] = 11
572       minmax[constants.ISPECS_MAX][par] = 5
573       self.assertRaises(errors.ConfigurationError,
574                         objects.InstancePolicy._CheckISpecParamSyntax,
575                         minmax, {}, par, check_std)
576     # Min, std, max
577     good_values = [
578       (11, 11, 11),
579       (11, 11, 40),
580       (11, 40, 40),
581       ]
582     for (mn, st, mx) in good_values:
583       minmax = {
584         constants.ISPECS_MIN: {par: mn},
585         constants.ISPECS_MAX: {par: mx},
586         }
587       stdspec = {par: st}
588       objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, par, True)
589     bad_values = [
590       (11, 11,  5, True),
591       (40, 11, 11, True),
592       (11, 80, 40, False),
593       (11,  5, 40, False,),
594       (11,  5,  5, True),
595       (40, 40, 11, True),
596       ]
597     for (mn, st, mx, excp) in bad_values:
598       minmax = {
599         constants.ISPECS_MIN: {par: mn},
600         constants.ISPECS_MAX: {par: mx},
601         }
602       stdspec = {par: st}
603       if excp:
604         self.assertRaises(errors.ConfigurationError,
605                           objects.InstancePolicy._CheckISpecParamSyntax,
606                           minmax, stdspec, par, True)
607       else:
608         ret = objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec,
609                                                             par, True)
610         self.assertFalse(ret)
611
612   def testCheckDiskTemplates(self):
613     invalid = "this_is_not_a_good_template"
614     for dt in constants.DISK_TEMPLATES:
615       objects.InstancePolicy.CheckDiskTemplates([dt])
616     objects.InstancePolicy.CheckDiskTemplates(list(constants.DISK_TEMPLATES))
617     bad_examples = [
618       [invalid],
619       [constants.DT_DRBD8, invalid],
620       list(constants.DISK_TEMPLATES) + [invalid],
621       [],
622       None,
623       ]
624     for dtl in bad_examples:
625       self.assertRaises(errors.ConfigurationError,
626                         objects.InstancePolicy.CheckDiskTemplates,
627                         dtl)
628
629   def testCheckParameterSyntax(self):
630     invalid = "this_key_shouldnt_be_here"
631     for check_std in [True, False]:
632       objects.InstancePolicy.CheckParameterSyntax({}, check_std)
633       policy = {invalid: None}
634       self.assertRaises(errors.ConfigurationError,
635                         objects.InstancePolicy.CheckParameterSyntax,
636                         policy, check_std)
637       for par in constants.IPOLICY_PARAMETERS:
638         for val in ("blah", None, {}, [42]):
639           policy = {par: val}
640           self.assertRaises(errors.ConfigurationError,
641                             objects.InstancePolicy.CheckParameterSyntax,
642                             policy, check_std)
643
644   def testFillIPolicyEmpty(self):
645     policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, {})
646     objects.InstancePolicy.CheckParameterSyntax(policy, True)
647     self.assertEqual(policy, constants.IPOLICY_DEFAULTS)
648
649   def _AssertISpecsMerged(self, default_spec, diff_spec, merged_spec):
650     for (param, value) in merged_spec.items():
651       if param in diff_spec:
652         self.assertEqual(value, diff_spec[param])
653       else:
654         self.assertEqual(value, default_spec[param])
655
656   def _AssertIPolicyMerged(self, default_pol, diff_pol, merged_pol):
657     for (key, value) in merged_pol.items():
658       if key in diff_pol:
659         if key == constants.ISPECS_STD:
660           self._AssertISpecsMerged(default_pol[key], diff_pol[key], value)
661         else:
662           self.assertEqual(value, diff_pol[key])
663       else:
664         self.assertEqual(value, default_pol[key])
665
666   def testFillIPolicy(self):
667     partial_policies = [
668       {constants.IPOLICY_VCPU_RATIO: 3.14},
669       {constants.IPOLICY_SPINDLE_RATIO: 2.72},
670       {constants.IPOLICY_DTS: [constants.DT_FILE]},
671       {constants.ISPECS_STD: {constants.ISPEC_DISK_COUNT: 3}},
672       {constants.ISPECS_MINMAX: [constants.ISPECS_MINMAX_DEFAULTS,
673                                  constants.ISPECS_MINMAX_DEFAULTS]}
674       ]
675     for diff_pol in partial_policies:
676       policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
677       objects.InstancePolicy.CheckParameterSyntax(policy, True)
678       self._AssertIPolicyIsFull(policy)
679       self._AssertIPolicyMerged(constants.IPOLICY_DEFAULTS, diff_pol, policy)
680
681   def testFillIPolicyKeepsUnknown(self):
682     INVALID_KEY = "invalid_ipolicy_key"
683     diff_pol = {
684       INVALID_KEY: None,
685       }
686     policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
687     self.assertTrue(INVALID_KEY in policy)
688
689
690 if __name__ == "__main__":
691   testutils.GanetiTestProgram()