Merge branch 'stable-2.8' into master
[ganeti-local] / test / py / ganeti.client.gnt_instance_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.client.gnt_instance"""
23
24 import unittest
25
26 from ganeti import constants
27 from ganeti import utils
28 from ganeti import errors
29 from ganeti import objects
30 from ganeti.client import gnt_instance
31
32 import testutils
33
34
35 class TestConsole(unittest.TestCase):
36   def setUp(self):
37     self._output = []
38     self._cmds = []
39     self._next_cmd_exitcode = 0
40
41   def _Test(self, console, show_command, cluster_name):
42     return gnt_instance._DoConsole(console, show_command, cluster_name,
43                                    feedback_fn=self._Feedback,
44                                    _runcmd_fn=self._FakeRunCmd)
45
46   def _Feedback(self, msg, *args):
47     if args:
48       msg = msg % args
49     self._output.append(msg)
50
51   def _FakeRunCmd(self, cmd, interactive=None):
52     self.assertTrue(interactive)
53     self.assertTrue(isinstance(cmd, list))
54     self._cmds.append(cmd)
55     return utils.RunResult(self._next_cmd_exitcode, None, "", "", "cmd",
56                            utils.process._TIMEOUT_NONE, 5)
57
58   def testMessage(self):
59     cons = objects.InstanceConsole(instance="inst98.example.com",
60                                    kind=constants.CONS_MESSAGE,
61                                    message="Hello World")
62     self.assertEqual(self._Test(cons, False, "cluster.example.com"),
63                      constants.EXIT_SUCCESS)
64     self.assertEqual(len(self._cmds), 0)
65     self.assertEqual(self._output, ["Hello World"])
66
67   def testVnc(self):
68     cons = objects.InstanceConsole(instance="inst1.example.com",
69                                    kind=constants.CONS_VNC,
70                                    host="node1.example.com",
71                                    port=5901,
72                                    display=1)
73     self.assertEqual(self._Test(cons, False, "cluster.example.com"),
74                      constants.EXIT_SUCCESS)
75     self.assertEqual(len(self._cmds), 0)
76     self.assertEqual(len(self._output), 1)
77     self.assertTrue(" inst1.example.com " in self._output[0])
78     self.assertTrue(" node1.example.com:5901 " in self._output[0])
79     self.assertTrue("vnc://node1.example.com:5901/" in self._output[0])
80
81   def testSshShow(self):
82     cons = objects.InstanceConsole(instance="inst31.example.com",
83                                    kind=constants.CONS_SSH,
84                                    host="node93.example.com",
85                                    user="user_abc",
86                                    command="xm console x.y.z")
87     self.assertEqual(self._Test(cons, True, "cluster.example.com"),
88                      constants.EXIT_SUCCESS)
89     self.assertEqual(len(self._cmds), 0)
90     self.assertEqual(len(self._output), 1)
91     self.assertTrue(" user_abc@node93.example.com " in self._output[0])
92     self.assertTrue("'xm console x.y.z'" in self._output[0])
93
94   def testSshRun(self):
95     cons = objects.InstanceConsole(instance="inst31.example.com",
96                                    kind=constants.CONS_SSH,
97                                    host="node93.example.com",
98                                    user="user_abc",
99                                    command=["xm", "console", "x.y.z"])
100     self.assertEqual(self._Test(cons, False, "cluster.example.com"),
101                      constants.EXIT_SUCCESS)
102     self.assertEqual(len(self._cmds), 1)
103     self.assertEqual(len(self._output), 0)
104
105     # This is very important to prevent escapes from the console
106     self.assertTrue("-oEscapeChar=none" in self._cmds[0])
107
108   def testSshRunFail(self):
109     cons = objects.InstanceConsole(instance="inst31.example.com",
110                                    kind=constants.CONS_SSH,
111                                    host="node93.example.com",
112                                    user="user_abc",
113                                    command=["xm", "console", "x.y.z"])
114
115     self._next_cmd_exitcode = 100
116     self.assertRaises(errors.OpExecError, self._Test,
117                       cons, False, "cluster.example.com")
118     self.assertEqual(len(self._cmds), 1)
119     self.assertEqual(len(self._output), 0)
120
121
122 class TestConvertNicDiskModifications(unittest.TestCase):
123   def test(self):
124     fn = gnt_instance._ConvertNicDiskModifications
125
126     self.assertEqual(fn([]), [])
127
128     # Error cases
129     self.assertRaises(errors.OpPrereqError, fn, [
130       (constants.DDM_REMOVE, { "param": "value", }),
131       ])
132     self.assertRaises(errors.OpPrereqError, fn, [
133       (0, { constants.DDM_REMOVE: True, "param": "value", }),
134       ])
135     self.assertRaises(errors.OpPrereqError, fn, [
136       (0, {
137         constants.DDM_REMOVE: True,
138         constants.DDM_ADD: True,
139         }),
140       ])
141
142     # Legacy calls
143     for action in constants.DDMS_VALUES:
144       self.assertEqual(fn([
145         (action, {}),
146         ]), [
147         (action, -1, {}),
148         ])
149       self.assertRaises(errors.OpPrereqError, fn, [
150         (0, {
151           action: True,
152           constants.DDM_MODIFY: True,
153           }),
154         ])
155
156     self.assertEqual(fn([
157       (constants.DDM_ADD, {
158         constants.IDISK_SIZE: 1024,
159         }),
160       ]), [
161       (constants.DDM_ADD, -1, {
162         constants.IDISK_SIZE: 1024,
163         }),
164       ])
165
166     # New-style calls
167     self.assertEqual(fn([
168       (2, {
169         constants.IDISK_MODE: constants.DISK_RDWR,
170         }),
171       ]), [
172       (constants.DDM_MODIFY, 2, {
173         constants.IDISK_MODE: constants.DISK_RDWR,
174         }),
175       ])
176
177     self.assertEqual(fn([
178       (0, {
179         constants.DDM_ADD: True,
180         constants.IDISK_SIZE: 4096,
181         }),
182       ]), [
183       (constants.DDM_ADD, 0, {
184         constants.IDISK_SIZE: 4096,
185         }),
186       ])
187
188     self.assertEqual(fn([
189       (-1, {
190         constants.DDM_REMOVE: True,
191         }),
192       ]), [
193       (constants.DDM_REMOVE, -1, {}),
194       ])
195
196     self.assertEqual(fn([
197       (-1, {
198         constants.DDM_MODIFY: True,
199         constants.IDISK_SIZE: 1024,
200         }),
201       ]), [
202       (constants.DDM_MODIFY, -1, {
203         constants.IDISK_SIZE: 1024,
204         }),
205       ])
206
207     # Names and UUIDs
208     self.assertEqual(fn([
209       ('name', {
210         constants.IDISK_MODE: constants.DISK_RDWR,
211         constants.IDISK_NAME: "rename",
212         }),
213       ]), [
214       (constants.DDM_MODIFY, 'name', {
215         constants.IDISK_MODE: constants.DISK_RDWR,
216         constants.IDISK_NAME: "rename",
217         }),
218       ])
219     self.assertEqual(fn([
220       ('024ef14d-4879-400e-8767-d61c051950bf', {
221         constants.DDM_MODIFY: True,
222         constants.IDISK_SIZE: 1024,
223         constants.IDISK_NAME: "name",
224         }),
225       ]), [
226       (constants.DDM_MODIFY, '024ef14d-4879-400e-8767-d61c051950bf', {
227         constants.IDISK_SIZE: 1024,
228         constants.IDISK_NAME: "name",
229         }),
230       ])
231     self.assertEqual(fn([
232       ('name', {
233         constants.DDM_REMOVE: True,
234         }),
235       ]), [
236       (constants.DDM_REMOVE, 'name', {}),
237       ])
238
239
240 class TestParseDiskSizes(unittest.TestCase):
241   def test(self):
242     fn = gnt_instance._ParseDiskSizes
243
244     self.assertEqual(fn([]), [])
245
246     # Missing size parameter
247     self.assertRaises(errors.OpPrereqError, fn, [
248       (constants.DDM_ADD, 0, {}),
249       ])
250
251     # Converting disk size
252     self.assertEqual(fn([
253       (constants.DDM_ADD, 11, {
254         constants.IDISK_SIZE: "9G",
255         }),
256       ]), [
257         (constants.DDM_ADD, 11, {
258           constants.IDISK_SIZE: 9216,
259           }),
260         ])
261
262     # No size parameter
263     self.assertEqual(fn([
264       (constants.DDM_REMOVE, 11, {
265         "other": "24M",
266         }),
267       ]), [
268         (constants.DDM_REMOVE, 11, {
269           "other": "24M",
270           }),
271         ])
272
273
274 if __name__ == "__main__":
275   testutils.GanetiTestProgram()