Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.cmdlib_unittest.py @ 65e183af

History | View | Annotate | Download (5.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the cmdlib module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import tempfile
29
import shutil
30

    
31
from ganeti import constants
32
from ganeti import mcpu
33
from ganeti import cmdlib
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import utils
37
from ganeti import luxi
38
from ganeti import ht
39

    
40
import testutils
41
import mocks
42

    
43

    
44
class TestCertVerification(testutils.GanetiTestCase):
45
  def setUp(self):
46
    testutils.GanetiTestCase.setUp(self)
47

    
48
    self.tmpdir = tempfile.mkdtemp()
49

    
50
  def tearDown(self):
51
    shutil.rmtree(self.tmpdir)
52

    
53
  def testVerifyCertificate(self):
54
    cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
55

    
56
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
57

    
58
    (errcode, msg) = cmdlib._VerifyCertificate(nonexist_filename)
59
    self.assertEqual(errcode, cmdlib.LUVerifyCluster.ETYPE_ERROR)
60

    
61
    # Try to load non-certificate file
62
    invalid_cert = self._TestDataFilename("bdev-net.txt")
63
    (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
64
    self.assertEqual(errcode, cmdlib.LUVerifyCluster.ETYPE_ERROR)
65

    
66

    
67
class TestOpcodeParams(testutils.GanetiTestCase):
68
  def testParamsStructures(self):
69
    for op in sorted(mcpu.Processor.DISPATCH_TABLE):
70
      lu = mcpu.Processor.DISPATCH_TABLE[op]
71
      lu_name = lu.__name__
72
      self.failIf(hasattr(lu, "_OP_REQP"), "LU '%s' has old-style _OP_REQP" %
73
                  lu_name)
74
      self.failIf(hasattr(lu, "_OP_DEFS"), "LU '%s' has old-style _OP_DEFS" %
75
                  lu_name)
76
      # this needs to remain a list!
77
      defined_params = [v[0] for v in lu._OP_PARAMS]
78
      for row in lu._OP_PARAMS:
79
        # this relies on there being at least one element
80
        param_name = row[0]
81
        self.failIf(len(row) != 3, "LU '%s' parameter %s has invalid length" %
82
                    (lu_name, param_name))
83
        self.failIf(defined_params.count(param_name) > 1, "LU '%s' parameter"
84
                    " '%s' is defined multiple times" % (lu_name, param_name))
85

    
86
  def testParamsDefined(self):
87
    for op in sorted(mcpu.Processor.DISPATCH_TABLE):
88
      lu = mcpu.Processor.DISPATCH_TABLE[op]
89
      lu_name = lu.__name__
90
      # TODO: this doesn't deal with recursive slots definitions
91
      all_params = set(op.__slots__)
92
      defined_params = set(v[0] for v in lu._OP_PARAMS)
93
      missing = all_params.difference(defined_params)
94
      self.failIf(missing, "Undeclared parameter types for LU '%s': %s" %
95
                  (lu_name, utils.CommaJoin(missing)))
96
      extra = defined_params.difference(all_params)
97
      self.failIf(extra, "Extra parameter types for LU '%s': %s" %
98
                  (lu_name, utils.CommaJoin(extra)))
99

    
100

    
101
class TestIAllocatorChecks(testutils.GanetiTestCase):
102
  def testFunction(self):
103
    class TestLU(object):
104
      def __init__(self, opcode):
105
        self.cfg = mocks.FakeConfig()
106
        self.op = opcode
107

    
108
    class TestOpcode(opcodes.OpCode):
109
      OP_ID = "OP_TEST"
110
      OP_PARAMS = [
111
        ("iallocator", None, ht.NoType),
112
        ("node", None, ht.NoType),
113
        ]
114

    
115
    default_iallocator = mocks.FakeConfig().GetDefaultIAllocator()
116
    other_iallocator = default_iallocator + "_not"
117

    
118
    op = TestOpcode()
119
    lu = TestLU(op)
120

    
121
    c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
122

    
123
    # Neither node nor iallocator given
124
    op.iallocator = None
125
    op.node = None
126
    c_i()
127
    self.assertEqual(lu.op.iallocator, default_iallocator)
128
    self.assertEqual(lu.op.node, None)
129

    
130
    # Both, iallocator and node given
131
    op.iallocator = "test"
132
    op.node = "test"
133
    self.assertRaises(errors.OpPrereqError, c_i)
134

    
135
    # Only iallocator given
136
    op.iallocator = other_iallocator
137
    op.node = None
138
    c_i()
139
    self.assertEqual(lu.op.iallocator, other_iallocator)
140
    self.assertEqual(lu.op.node, None)
141

    
142
    # Only node given
143
    op.iallocator = None
144
    op.node = "node"
145
    c_i()
146
    self.assertEqual(lu.op.iallocator, None)
147
    self.assertEqual(lu.op.node, "node")
148

    
149
    # No node, iallocator or default iallocator
150
    op.iallocator = None
151
    op.node = None
152
    lu.cfg.GetDefaultIAllocator = lambda: None
153
    self.assertRaises(errors.OpPrereqError, c_i)
154

    
155

    
156
class TestLUTestJobqueue(unittest.TestCase):
157
  def test(self):
158
    self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT <
159
                 (luxi.WFJC_TIMEOUT * 0.75),
160
                 msg=("Client timeout too high, might not notice bugs"
161
                      " in WaitForJobChange"))
162

    
163

    
164
class TestLUQuery(unittest.TestCase):
165
  def test(self):
166
    self.assertEqual(sorted(cmdlib._QUERY_IMPL.keys()),
167
                     sorted(constants.QR_OP_QUERY))
168

    
169
    assert constants.QR_NODE in constants.QR_OP_QUERY
170
    assert constants.QR_INSTANCE in constants.QR_OP_QUERY
171

    
172
    for i in constants.QR_OP_QUERY:
173
      self.assert_(cmdlib._GetQueryImplementation(i))
174

    
175
    self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation, "")
176
    self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation,
177
                      "xyz")
178

    
179

    
180
if __name__ == "__main__":
181
  testutils.GanetiTestProgram()