Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.cmdlib_unittest.py @ 26d3fd2f

History | View | Annotate | Download (5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the cmdlib module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import tempfile
29
import shutil
30

    
31
from ganeti import mcpu
32
from ganeti import cmdlib
33
from ganeti import opcodes
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import luxi
37

    
38
import testutils
39
import mocks
40

    
41

    
42
class TestCertVerification(testutils.GanetiTestCase):
43
  def setUp(self):
44
    testutils.GanetiTestCase.setUp(self)
45

    
46
    self.tmpdir = tempfile.mkdtemp()
47

    
48
  def tearDown(self):
49
    shutil.rmtree(self.tmpdir)
50

    
51
  def testVerifyCertificate(self):
52
    cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
53

    
54
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
55

    
56
    (errcode, msg) = cmdlib._VerifyCertificate(nonexist_filename)
57
    self.assertEqual(errcode, cmdlib.LUVerifyCluster.ETYPE_ERROR)
58

    
59
    # Try to load non-certificate file
60
    invalid_cert = self._TestDataFilename("bdev-net.txt")
61
    (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
62
    self.assertEqual(errcode, cmdlib.LUVerifyCluster.ETYPE_ERROR)
63

    
64

    
65
class TestOpcodeParams(testutils.GanetiTestCase):
66
  def testParamsStructures(self):
67
    for op in sorted(mcpu.Processor.DISPATCH_TABLE):
68
      lu = mcpu.Processor.DISPATCH_TABLE[op]
69
      lu_name = lu.__name__
70
      self.failIf(hasattr(lu, "_OP_REQP"), "LU '%s' has old-style _OP_REQP" %
71
                  lu_name)
72
      self.failIf(hasattr(lu, "_OP_DEFS"), "LU '%s' has old-style _OP_DEFS" %
73
                  lu_name)
74
      # this needs to remain a list!
75
      defined_params = [v[0] for v in lu._OP_PARAMS]
76
      for row in lu._OP_PARAMS:
77
        # this relies on there being at least one element
78
        param_name = row[0]
79
        self.failIf(len(row) != 3, "LU '%s' parameter %s has invalid length" %
80
                    (lu_name, param_name))
81
        self.failIf(defined_params.count(param_name) > 1, "LU '%s' parameter"
82
                    " '%s' is defined multiple times" % (lu_name, param_name))
83

    
84
  def testParamsDefined(self):
85
    for op in sorted(mcpu.Processor.DISPATCH_TABLE):
86
      lu = mcpu.Processor.DISPATCH_TABLE[op]
87
      lu_name = lu.__name__
88
      # TODO: this doesn't deal with recursive slots definitions
89
      all_params = set(op.__slots__)
90
      defined_params = set(v[0] for v in lu._OP_PARAMS)
91
      missing = all_params.difference(defined_params)
92
      self.failIf(missing, "Undeclared parameter types for LU '%s': %s" %
93
                  (lu_name, utils.CommaJoin(missing)))
94
      extra = defined_params.difference(all_params)
95
      self.failIf(extra, "Extra parameter types for LU '%s': %s" %
96
                  (lu_name, utils.CommaJoin(extra)))
97

    
98

    
99
class TestIAllocatorChecks(testutils.GanetiTestCase):
100
  def testFunction(self):
101
    class TestLU(object):
102
      def __init__(self, opcode):
103
        self.cfg = mocks.FakeConfig()
104
        self.op = opcode
105

    
106
    class TestOpcode(opcodes.OpCode):
107
      OP_ID = "OP_TEST"
108
      __slots__ = ["iallocator", "node"]
109

    
110
    default_iallocator = mocks.FakeConfig().GetDefaultIAllocator()
111
    other_iallocator = default_iallocator + "_not"
112

    
113
    op = TestOpcode()
114
    lu = TestLU(op)
115

    
116
    c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
117

    
118
    # Neither node nor iallocator given
119
    op.iallocator = None
120
    op.node = None
121
    c_i()
122
    self.assertEqual(lu.op.iallocator, default_iallocator)
123
    self.assertEqual(lu.op.node, None)
124

    
125
    # Both, iallocator and node given
126
    op.iallocator = "test"
127
    op.node = "test"
128
    self.assertRaises(errors.OpPrereqError, c_i)
129

    
130
    # Only iallocator given
131
    op.iallocator = other_iallocator
132
    op.node = None
133
    c_i()
134
    self.assertEqual(lu.op.iallocator, other_iallocator)
135
    self.assertEqual(lu.op.node, None)
136

    
137
    # Only node given
138
    op.iallocator = None
139
    op.node = "node"
140
    c_i()
141
    self.assertEqual(lu.op.iallocator, None)
142
    self.assertEqual(lu.op.node, "node")
143

    
144
    # No node, iallocator or default iallocator
145
    op.iallocator = None
146
    op.node = None
147
    lu.cfg.GetDefaultIAllocator = lambda: None
148
    self.assertRaises(errors.OpPrereqError, c_i)
149

    
150

    
151
class TestLUTestJobqueue(unittest.TestCase):
152
  def test(self):
153
    self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT <
154
                 (luxi.WFJC_TIMEOUT * 0.75),
155
                 msg=("Client timeout too high, might not notice bugs"
156
                      " in WaitForJobChange"))
157

    
158

    
159
if __name__ == "__main__":
160
  testutils.GanetiTestProgram()