Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.client.gnt_instance_unittest.py @ 7a70541e

History | View | Annotate | Download (7.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.client.gnt_instance"""
23

    
24
import unittest
25

    
26
from ganeti import constants
27
from ganeti import utils
28
from ganeti import errors
29
from ganeti import objects
30
from ganeti.client import gnt_instance
31

    
32
import testutils
33

    
34

    
35
class TestConsole(unittest.TestCase):
36
  def setUp(self):
37
    self._output = []
38
    self._cmds = []
39
    self._next_cmd_exitcode = 0
40

    
41
  def _Test(self, console, show_command, cluster_name):
42
    return gnt_instance._DoConsole(console, show_command, cluster_name,
43
                                   feedback_fn=self._Feedback,
44
                                   _runcmd_fn=self._FakeRunCmd)
45

    
46
  def _Feedback(self, msg, *args):
47
    if args:
48
      msg = msg % args
49
    self._output.append(msg)
50

    
51
  def _FakeRunCmd(self, cmd, interactive=None):
52
    self.assertTrue(interactive)
53
    self.assertTrue(isinstance(cmd, list))
54
    self._cmds.append(cmd)
55
    return utils.RunResult(self._next_cmd_exitcode, None, "", "", "cmd",
56
                           utils.process._TIMEOUT_NONE, 5)
57

    
58
  def testMessage(self):
59
    cons = objects.InstanceConsole(instance="inst98.example.com",
60
                                   kind=constants.CONS_MESSAGE,
61
                                   message="Hello World")
62
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
63
                     constants.EXIT_SUCCESS)
64
    self.assertEqual(len(self._cmds), 0)
65
    self.assertEqual(self._output, ["Hello World"])
66

    
67
  def testVnc(self):
68
    cons = objects.InstanceConsole(instance="inst1.example.com",
69
                                   kind=constants.CONS_VNC,
70
                                   host="node1.example.com",
71
                                   port=5901,
72
                                   display=1)
73
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
74
                     constants.EXIT_SUCCESS)
75
    self.assertEqual(len(self._cmds), 0)
76
    self.assertEqual(len(self._output), 1)
77
    self.assertTrue(" inst1.example.com " in self._output[0])
78
    self.assertTrue(" node1.example.com:5901 " in self._output[0])
79
    self.assertTrue("vnc://node1.example.com:5901/" in self._output[0])
80

    
81
  def testSshShow(self):
82
    cons = objects.InstanceConsole(instance="inst31.example.com",
83
                                   kind=constants.CONS_SSH,
84
                                   host="node93.example.com",
85
                                   user="user_abc",
86
                                   command="xm console x.y.z")
87
    self.assertEqual(self._Test(cons, True, "cluster.example.com"),
88
                     constants.EXIT_SUCCESS)
89
    self.assertEqual(len(self._cmds), 0)
90
    self.assertEqual(len(self._output), 1)
91
    self.assertTrue(" user_abc@node93.example.com " in self._output[0])
92
    self.assertTrue("'xm console x.y.z'" in self._output[0])
93

    
94
  def testSshRun(self):
95
    cons = objects.InstanceConsole(instance="inst31.example.com",
96
                                   kind=constants.CONS_SSH,
97
                                   host="node93.example.com",
98
                                   user="user_abc",
99
                                   command=["xm", "console", "x.y.z"])
100
    self.assertEqual(self._Test(cons, False, "cluster.example.com"),
101
                     constants.EXIT_SUCCESS)
102
    self.assertEqual(len(self._cmds), 1)
103
    self.assertEqual(len(self._output), 0)
104

    
105
    # This is very important to prevent escapes from the console
106
    self.assertTrue("-oEscapeChar=none" in self._cmds[0])
107

    
108
  def testSshRunFail(self):
109
    cons = objects.InstanceConsole(instance="inst31.example.com",
110
                                   kind=constants.CONS_SSH,
111
                                   host="node93.example.com",
112
                                   user="user_abc",
113
                                   command=["xm", "console", "x.y.z"])
114

    
115
    self._next_cmd_exitcode = 100
116
    self.assertRaises(errors.OpExecError, self._Test,
117
                      cons, False, "cluster.example.com")
118
    self.assertEqual(len(self._cmds), 1)
119
    self.assertEqual(len(self._output), 0)
120

    
121

    
122
class TestConvertNicDiskModifications(unittest.TestCase):
123
  def test(self):
124
    fn = gnt_instance._ConvertNicDiskModifications
125

    
126
    self.assertEqual(fn([]), [])
127

    
128
    # Error cases
129
    self.assertRaises(errors.OpPrereqError, fn, [
130
      (constants.DDM_REMOVE, { "param": "value", }),
131
      ])
132
    self.assertRaises(errors.OpPrereqError, fn, [
133
      (0, { constants.DDM_REMOVE: True, "param": "value", }),
134
      ])
135
    self.assertRaises(errors.OpPrereqError, fn, [
136
      ("Hello", {}),
137
      ])
138
    self.assertRaises(errors.OpPrereqError, fn, [
139
      (0, {
140
        constants.DDM_REMOVE: True,
141
        constants.DDM_ADD: True,
142
        }),
143
      ])
144

    
145
    # Legacy calls
146
    for action in constants.DDMS_VALUES:
147
      self.assertEqual(fn([
148
        (action, {}),
149
        ]), [
150
        (action, -1, {}),
151
        ])
152
      self.assertRaises(errors.OpPrereqError, fn, [
153
        (0, {
154
          action: True,
155
          constants.DDM_MODIFY: True,
156
          }),
157
        ])
158

    
159
    self.assertEqual(fn([
160
      (constants.DDM_ADD, {
161
        constants.IDISK_SIZE: 1024,
162
        }),
163
      ]), [
164
      (constants.DDM_ADD, -1, {
165
        constants.IDISK_SIZE: 1024,
166
        }),
167
      ])
168

    
169
    # New-style calls
170
    self.assertEqual(fn([
171
      (2, {
172
        constants.IDISK_MODE: constants.DISK_RDWR,
173
        }),
174
      ]), [
175
      (constants.DDM_MODIFY, 2, {
176
        constants.IDISK_MODE: constants.DISK_RDWR,
177
        }),
178
      ])
179

    
180
    self.assertEqual(fn([
181
      (0, {
182
        constants.DDM_ADD: True,
183
        constants.IDISK_SIZE: 4096,
184
        }),
185
      ]), [
186
      (constants.DDM_ADD, 0, {
187
        constants.IDISK_SIZE: 4096,
188
        }),
189
      ])
190

    
191
    self.assertEqual(fn([
192
      (-1, {
193
        constants.DDM_REMOVE: True,
194
        }),
195
      ]), [
196
      (constants.DDM_REMOVE, -1, {}),
197
      ])
198

    
199
    self.assertEqual(fn([
200
      (-1, {
201
        constants.DDM_MODIFY: True,
202
        constants.IDISK_SIZE: 1024,
203
        }),
204
      ]), [
205
      (constants.DDM_MODIFY, -1, {
206
        constants.IDISK_SIZE: 1024,
207
        }),
208
      ])
209

    
210

    
211
class TestParseDiskSizes(unittest.TestCase):
212
  def test(self):
213
    fn = gnt_instance._ParseDiskSizes
214

    
215
    self.assertEqual(fn([]), [])
216

    
217
    # Missing size parameter
218
    self.assertRaises(errors.OpPrereqError, fn, [
219
      (constants.DDM_ADD, 0, {}),
220
      ])
221

    
222
    # Converting disk size
223
    self.assertEqual(fn([
224
      (constants.DDM_ADD, 11, {
225
        constants.IDISK_SIZE: "9G",
226
        }),
227
      ]), [
228
        (constants.DDM_ADD, 11, {
229
          constants.IDISK_SIZE: 9216,
230
          }),
231
        ])
232

    
233
    # No size parameter
234
    self.assertEqual(fn([
235
      (constants.DDM_REMOVE, 11, {
236
        "other": "24M",
237
        }),
238
      ]), [
239
        (constants.DDM_REMOVE, 11, {
240
          "other": "24M",
241
          }),
242
        ])
243

    
244

    
245
if __name__ == "__main__":
246
  testutils.GanetiTestProgram()