Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.client.gnt_instance_unittest.py @ b21d488b

History | View | Annotate | Download (8.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.client.gnt_instance"""
23

    
24
import unittest
25

    
26
from ganeti import constants
27
from ganeti import utils
28
from ganeti import errors
29
from ganeti import objects
30
from ganeti.client import gnt_instance
31

    
32
import testutils
33

    
34

    
35
class TestConsole(unittest.TestCase):
36
  def setUp(self):
37
    self._output = []
38
    self._cmds = []
39
    self._next_cmd_exitcode = 0
40

    
41
  def _Test(self, console, show_command, cluster_name):
42
    return gnt_instance._DoConsole(console, show_command, cluster_name,
43
                                   feedback_fn=self._Feedback,
44
                                   _runcmd_fn=self._FakeRunCmd)
45

    
46
  def _Feedback(self, msg, *args):
47
    if args:
48
      msg = msg % args
49
    self._output.append(msg)
50

    
51
  def _FakeRunCmd(self, cmd, interactive=None):
52
    self.assertTrue(interactive)
53
    self.assertTrue(isinstance(cmd, list))
54
    self._cmds.append(cmd)
55
    return utils.RunResult(self._next_cmd_exitcode, None, "", "", "cmd",
56
                           utils.process._TIMEOUT_NONE, 5)
57

    
58
  def testMessage(self):
59
    cons = objects.InstanceConsole(instance="inst98.example.com",
60
                                   kind=constants.CONS_MESSAGE,
61
                                   message="Hello World")
62
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
63
                     constants.EXIT_SUCCESS)
64
    self.assertEqual(len(self._cmds), 0)
65
    self.assertEqual(self._output, ["Hello World"])
66

    
67
  def testVnc(self):
68
    cons = objects.InstanceConsole(instance="inst1.example.com",
69
                                   kind=constants.CONS_VNC,
70
                                   host="node1.example.com",
71
                                   port=5901,
72
                                   display=1)
73
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
74
                     constants.EXIT_SUCCESS)
75
    self.assertEqual(len(self._cmds), 0)
76
    self.assertEqual(len(self._output), 1)
77
    self.assertTrue(" inst1.example.com " in self._output[0])
78
    self.assertTrue(" node1.example.com:5901 " in self._output[0])
79
    self.assertTrue("vnc://node1.example.com:5901/" in self._output[0])
80

    
81
  def testSshShow(self):
82
    cons = objects.InstanceConsole(instance="inst31.example.com",
83
                                   kind=constants.CONS_SSH,
84
                                   host="node93.example.com",
85
                                   user="user_abc",
86
                                   command="xm console x.y.z")
87
    self.assertEqual(self._Test(cons, True, "cluster.example.com"),
88
                     constants.EXIT_SUCCESS)
89
    self.assertEqual(len(self._cmds), 0)
90
    self.assertEqual(len(self._output), 1)
91
    self.assertTrue(" user_abc@node93.example.com " in self._output[0])
92
    self.assertTrue("'xm console x.y.z'" in self._output[0])
93

    
94
  def testSshRun(self):
95
    cons = objects.InstanceConsole(instance="inst31.example.com",
96
                                   kind=constants.CONS_SSH,
97
                                   host="node93.example.com",
98
                                   user="user_abc",
99
                                   command=["xm", "console", "x.y.z"])
100
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
101
                     constants.EXIT_SUCCESS)
102
    self.assertEqual(len(self._cmds), 1)
103
    self.assertEqual(len(self._output), 0)
104

    
105
    # This is very important to prevent escapes from the console
106
    self.assertTrue("-oEscapeChar=none" in self._cmds[0])
107

    
108
  def testSshRunFail(self):
109
    cons = objects.InstanceConsole(instance="inst31.example.com",
110
                                   kind=constants.CONS_SSH,
111
                                   host="node93.example.com",
112
                                   user="user_abc",
113
                                   command=["xm", "console", "x.y.z"])
114

    
115
    self._next_cmd_exitcode = 100
116
    self.assertRaises(errors.OpExecError, self._Test,
117
                      cons, False, "cluster.example.com")
118
    self.assertEqual(len(self._cmds), 1)
119
    self.assertEqual(len(self._output), 0)
120

    
121

    
122
class TestConvertNicDiskModifications(unittest.TestCase):
123
  def test(self):
124
    fn = gnt_instance._ConvertNicDiskModifications
125

    
126
    self.assertEqual(fn([]), [])
127

    
128
    # Error cases
129
    self.assertRaises(errors.OpPrereqError, fn, [
130
      (constants.DDM_REMOVE, { "param": "value", }),
131
      ])
132
    self.assertRaises(errors.OpPrereqError, fn, [
133
      (0, { constants.DDM_REMOVE: True, "param": "value", }),
134
      ])
135
    self.assertRaises(errors.OpPrereqError, fn, [
136
      (0, {
137
        constants.DDM_REMOVE: True,
138
        constants.DDM_ADD: True,
139
        }),
140
      ])
141

    
142
    # Legacy calls
143
    for action in constants.DDMS_VALUES:
144
      self.assertEqual(fn([
145
        (action, {}),
146
        ]), [
147
        (action, -1, {}),
148
        ])
149
      self.assertRaises(errors.OpPrereqError, fn, [
150
        (0, {
151
          action: True,
152
          constants.DDM_MODIFY: True,
153
          }),
154
        ])
155

    
156
    self.assertEqual(fn([
157
      (constants.DDM_ADD, {
158
        constants.IDISK_SIZE: 1024,
159
        }),
160
      ]), [
161
      (constants.DDM_ADD, -1, {
162
        constants.IDISK_SIZE: 1024,
163
        }),
164
      ])
165

    
166
    # New-style calls
167
    self.assertEqual(fn([
168
      (2, {
169
        constants.IDISK_MODE: constants.DISK_RDWR,
170
        }),
171
      ]), [
172
      (constants.DDM_MODIFY, 2, {
173
        constants.IDISK_MODE: constants.DISK_RDWR,
174
        }),
175
      ])
176

    
177
    self.assertEqual(fn([
178
      (0, {
179
        constants.DDM_ADD: True,
180
        constants.IDISK_SIZE: 4096,
181
        }),
182
      ]), [
183
      (constants.DDM_ADD, 0, {
184
        constants.IDISK_SIZE: 4096,
185
        }),
186
      ])
187

    
188
    self.assertEqual(fn([
189
      (-1, {
190
        constants.DDM_REMOVE: True,
191
        }),
192
      ]), [
193
      (constants.DDM_REMOVE, -1, {}),
194
      ])
195

    
196
    self.assertEqual(fn([
197
      (-1, {
198
        constants.DDM_MODIFY: True,
199
        constants.IDISK_SIZE: 1024,
200
        }),
201
      ]), [
202
      (constants.DDM_MODIFY, -1, {
203
        constants.IDISK_SIZE: 1024,
204
        }),
205
      ])
206

    
207
    # Names and UUIDs
208
    self.assertEqual(fn([
209
      ('name', {
210
        constants.IDISK_MODE: constants.DISK_RDWR,
211
        constants.IDISK_NAME: "rename",
212
        }),
213
      ]), [
214
      (constants.DDM_MODIFY, 'name', {
215
        constants.IDISK_MODE: constants.DISK_RDWR,
216
        constants.IDISK_NAME: "rename",
217
        }),
218
      ])
219
    self.assertEqual(fn([
220
      ('024ef14d-4879-400e-8767-d61c051950bf', {
221
        constants.DDM_MODIFY: True,
222
        constants.IDISK_SIZE: 1024,
223
        constants.IDISK_NAME: "name",
224
        }),
225
      ]), [
226
      (constants.DDM_MODIFY, '024ef14d-4879-400e-8767-d61c051950bf', {
227
        constants.IDISK_SIZE: 1024,
228
        constants.IDISK_NAME: "name",
229
        }),
230
      ])
231
    self.assertEqual(fn([
232
      ('name', {
233
        constants.DDM_REMOVE: True,
234
        }),
235
      ]), [
236
      (constants.DDM_REMOVE, 'name', {}),
237
      ])
238

    
239

    
240
class TestParseDiskSizes(unittest.TestCase):
241
  def test(self):
242
    fn = gnt_instance._ParseDiskSizes
243

    
244
    self.assertEqual(fn([]), [])
245

    
246
    # Missing size parameter
247
    self.assertRaises(errors.OpPrereqError, fn, [
248
      (constants.DDM_ADD, 0, {}),
249
      ])
250

    
251
    # Converting disk size
252
    self.assertEqual(fn([
253
      (constants.DDM_ADD, 11, {
254
        constants.IDISK_SIZE: "9G",
255
        }),
256
      ]), [
257
        (constants.DDM_ADD, 11, {
258
          constants.IDISK_SIZE: 9216,
259
          }),
260
        ])
261

    
262
    # No size parameter
263
    self.assertEqual(fn([
264
      (constants.DDM_REMOVE, 11, {
265
        "other": "24M",
266
        }),
267
      ]), [
268
        (constants.DDM_REMOVE, 11, {
269
          "other": "24M",
270
          }),
271
        ])
272

    
273

    
274
if __name__ == "__main__":
275
  testutils.GanetiTestProgram()