Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.config_unittest.py @ b9eeeb02

History | View | Annotate | Download (4.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the config module"""
23

    
24

    
25
import unittest
26
import os
27
import time
28
import tempfile
29
import os.path
30
import socket
31

    
32
from ganeti import errors
33
from ganeti import constants
34
from ganeti import config
35
from ganeti import objects
36
from ganeti import utils
37

    
38

    
39
class TestConfigRunner(unittest.TestCase):
40
  """Testing case for HooksRunner"""
41
  def setUp(self):
42
    fd, self.cfg_file = tempfile.mkstemp()
43
    os.close(fd)
44

    
45
  def tearDown(self):
46
    try:
47
      os.unlink(self.cfg_file)
48
    except OSError:
49
      pass
50

    
51
  def _get_object(self):
52
    """Returns a instance of ConfigWriter"""
53
    cfg = config.ConfigWriter(cfg_file=self.cfg_file, offline=True)
54
    return cfg
55

    
56
  def _init_cluster(self, cfg):
57
    """Initializes the cfg object"""
58
    cluster_config = objects.Cluster(
59
      serial_no=1,
60
      rsahostkeypub="",
61
      highest_used_port=(constants.FIRST_DRBD_PORT - 1),
62
      mac_prefix="aa:00:00",
63
      volume_group_name="xenvg",
64
      default_bridge=constants.DEFAULT_BRIDGE,
65
      tcpudp_port_pool=set(),
66
      )
67
    ip = constants.LOCALHOST_IP_ADDRESS
68
    master_node_config = objects.Node(name=utils.HostInfo().name,
69
                                      primary_ip=ip,
70
                                      secondary_ip=ip)
71
    cfg.InitConfig(cluster_config, master_node_config)
72

    
73
  def _create_instance(self):
74
    """Create and return an instance object"""
75
    inst = objects.Instance(name="test.example.com", disks=[],
76
                            disk_template=constants.DT_DISKLESS)
77
    return inst
78

    
79
  def testEmpty(self):
80
    """Test instantiate config object"""
81
    self._get_object()
82

    
83
  def testInit(self):
84
    """Test initialize the config file"""
85
    cfg = self._get_object()
86
    self._init_cluster(cfg)
87
    self.failUnlessEqual(1, len(cfg.GetNodeList()))
88
    self.failUnlessEqual(0, len(cfg.GetInstanceList()))
89

    
90
  def testUpdateCluster(self):
91
    """Test updates on the cluster object"""
92
    cfg = self._get_object()
93
    # construct a fake cluster object
94
    fake_cl = objects.Cluster()
95
    # fail if we didn't read the config
96
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_cl)
97

    
98
    self._init_cluster(cfg)
99
    cl = cfg.GetClusterInfo()
100
    # first pass, must not fail
101
    cfg.Update(cl)
102
    # second pass, also must not fail (after the config has been written)
103
    cfg.Update(cl)
104
    # but the fake_cl update should still fail
105
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
106

    
107
  def testUpdateNode(self):
108
    """Test updates on one node object"""
109
    cfg = self._get_object()
110
    # construct a fake node
111
    fake_node = objects.Node()
112
    # fail if we didn't read the config
113
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_node)
114

    
115
    self._init_cluster(cfg)
116
    node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
117
    # first pass, must not fail
118
    cfg.Update(node)
119
    # second pass, also must not fail (after the config has been written)
120
    cfg.Update(node)
121
    # but the fake_node update should still fail
122
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
123

    
124
  def testUpdateInstance(self):
125
    """Test updates on one instance object"""
126
    cfg = self._get_object()
127
    # construct a fake instance
128
    inst = self._create_instance()
129
    fake_instance = objects.Instance()
130
    # fail if we didn't read the config
131
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_instance)
132

    
133
    self._init_cluster(cfg)
134
    cfg.AddInstance(inst)
135
    instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
136
    # first pass, must not fail
137
    cfg.Update(instance)
138
    # second pass, also must not fail (after the config has been written)
139
    cfg.Update(instance)
140
    # but the fake_instance update should still fail
141
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
142

    
143

    
144
if __name__ == '__main__':
145
  unittest.main()