bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.client.gnt_cluster_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.client.gnt_cluster"""
23
24 import unittest
25
26 from ganeti.client import gnt_cluster
27 from ganeti import utils
28 from ganeti import compat
29
30 import testutils
31
32
33 class TestEpo(unittest.TestCase):
34   def setUp(self):
35     self.nodes2ip = dict(("node%s" % i, "192.0.2.%s" % i) for i in range(1, 10))
36     self.nodes = set(self.nodes2ip.keys())
37     self.ips2node = dict((v, k) for (k, v) in self.nodes2ip.items())
38
39   def _FakeAction(*args):
40     return True
41
42   def _FakePing(ip, port, live_port_needed=False):
43     self.assert_(live_port_needed)
44     self.assertEqual(port, 0)
45     return True
46
47   def _FakeSleep(secs):
48     self.assert_(secs >= 0 and secs <= 5)
49     return
50
51   def _NoopFeedback(self, text):
52     return
53
54   def testPingFnRemoveHostsUp(self):
55     seen = set()
56     def _FakeSeenPing(ip, *args, **kwargs):
57       node = self.ips2node[ip]
58       self.assertFalse(node in seen)
59       seen.add(node)
60       return True
61
62     helper = gnt_cluster._RunWhenNodesReachableHelper(self.nodes,
63                                                       self._FakeAction,
64                                                       self.nodes2ip, 0,
65                                                       self._NoopFeedback,
66                                                       _ping_fn=_FakeSeenPing,
67                                                       _sleep_fn=self._FakeSleep)
68
69     nodes_len = len(self.nodes)
70     for (num, _) in enumerate(self.nodes):
71       helper.Wait(5)
72       if num < nodes_len - 1:
73         self.assertRaises(utils.RetryAgain, helper)
74       else:
75         helper()
76
77     self.assertEqual(seen, self.nodes)
78     self.assertFalse(helper.down)
79     self.assertEqual(helper.up, self.nodes)
80
81   def testActionReturnFalseSetsHelperFalse(self):
82     called = False
83     def _FalseAction(*args):
84       return called
85
86     helper = gnt_cluster._RunWhenNodesReachableHelper(self.nodes, _FalseAction,
87                                                       self.nodes2ip, 0,
88                                                       self._NoopFeedback,
89                                                       _ping_fn=self._FakePing,
90                                                       _sleep_fn=self._FakeSleep)
91     for _ in self.nodes:
92       try:
93         helper()
94       except utils.RetryAgain:
95         called = True
96
97     self.assertFalse(helper.success)
98
99   def testMaybeInstanceStartup(self):
100     instances_arg = []
101     def _FakeInstanceStart(opts, instances, start):
102       instances_arg.append(set(instances))
103       return None
104
105     inst_map = {
106       "inst1": set(["node1", "node2"]),
107       "inst2": set(["node1", "node3"]),
108       "inst3": set(["node2", "node1"]),
109       "inst4": set(["node2", "node1", "node3"]),
110       "inst5": set(["node4"]),
111       }
112
113     fn = _FakeInstanceStart
114     self.assert_(gnt_cluster._MaybeInstanceStartup(None, inst_map, set(),
115                                                    _instance_start_fn=fn))
116     self.assertFalse(instances_arg)
117     result = gnt_cluster._MaybeInstanceStartup(None, inst_map, set(["node1"]),
118                                                _instance_start_fn=fn)
119     self.assert_(result)
120     self.assertFalse(instances_arg)
121     result = gnt_cluster._MaybeInstanceStartup(None, inst_map,
122                                                set(["node1", "node3"]),
123                                                _instance_start_fn=fn)
124     self.assert_(result is None)
125     self.assertEqual(instances_arg.pop(0), set(["inst2"]))
126     self.assertFalse("inst2" in inst_map)
127     result = gnt_cluster._MaybeInstanceStartup(None, inst_map,
128                                                set(["node1", "node3"]),
129                                                _instance_start_fn=fn)
130     self.assert_(result)
131     self.assertFalse(instances_arg)
132     result = gnt_cluster._MaybeInstanceStartup(None, inst_map,
133                                                set(["node1", "node3", "node2"]),
134                                                _instance_start_fn=fn)
135     self.assertEqual(instances_arg.pop(0), set(["inst1", "inst3", "inst4"]))
136     self.assert_(result is None)
137     result = gnt_cluster._MaybeInstanceStartup(None, inst_map,
138                                                set(["node1", "node3", "node2",
139                                                     "node4"]),
140                                                _instance_start_fn=fn)
141     self.assert_(result is None)
142     self.assertEqual(instances_arg.pop(0), set(["inst5"]))
143     self.assertFalse(inst_map)
144
145
146 if __name__ == "__main__":
147   testutils.GanetiTestProgram()