Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.config_unittest.py @ 02691904

History | View | Annotate | Download (4.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the config module"""
23

    
24

    
25
import unittest
26
import os
27
import time
28
import tempfile
29
import os.path
30
import socket
31

    
32
from ganeti import bootstrap
33
from ganeti import config
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import objects
37
from ganeti import utils
38

    
39

    
40
class TestConfigRunner(unittest.TestCase):
41
  """Testing case for HooksRunner"""
42
  def setUp(self):
43
    fd, self.cfg_file = tempfile.mkstemp()
44
    os.close(fd)
45
    self._init_cluster(self.cfg_file)
46

    
47
  def tearDown(self):
48
    try:
49
      os.unlink(self.cfg_file)
50
    except OSError:
51
      pass
52

    
53
  def _get_object(self):
54
    """Returns a instance of ConfigWriter"""
55
    cfg = config.ConfigWriter(cfg_file=self.cfg_file, offline=True)
56
    return cfg
57

    
58
  def _init_cluster(self, cfg):
59
    """Initializes the cfg object"""
60
    me = utils.HostInfo()
61
    ip = constants.LOCALHOST_IP_ADDRESS
62

    
63
    cluster_config = objects.Cluster(
64
      serial_no=1,
65
      rsahostkeypub="",
66
      highest_used_port=(constants.FIRST_DRBD_PORT - 1),
67
      mac_prefix="aa:00:00",
68
      volume_group_name="xenvg",
69
      default_bridge=constants.DEFAULT_BRIDGE,
70
      tcpudp_port_pool=set(),
71
      default_hypervisor=constants.HT_FAKE,
72
      master_node=me.name,
73
      master_ip="127.0.0.1",
74
      master_netdev=constants.DEFAULT_BRIDGE,
75
      cluster_name="cluster.local",
76
      file_storage_dir="/tmp",
77
      )
78

    
79
    master_node_config = objects.Node(name=me.name,
80
                                      primary_ip=me.ip,
81
                                      secondary_ip=ip,
82
                                      serial_no=1)
83

    
84
    bootstrap.InitConfig(constants.CONFIG_VERSION,
85
                         cluster_config, master_node_config, self.cfg_file)
86

    
87
  def _create_instance(self):
88
    """Create and return an instance object"""
89
    inst = objects.Instance(name="test.example.com", disks=[],
90
                            disk_template=constants.DT_DISKLESS)
91
    return inst
92

    
93
  def testEmpty(self):
94
    """Test instantiate config object"""
95
    self._get_object()
96

    
97
  def testInit(self):
98
    """Test initialize the config file"""
99
    cfg = self._get_object()
100
    self.failUnlessEqual(1, len(cfg.GetNodeList()))
101
    self.failUnlessEqual(0, len(cfg.GetInstanceList()))
102

    
103
  def testUpdateCluster(self):
104
    """Test updates on the cluster object"""
105
    cfg = self._get_object()
106
    # construct a fake cluster object
107
    fake_cl = objects.Cluster()
108
    # fail if we didn't read the config
109
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
110

    
111
    cl = cfg.GetClusterInfo()
112
    # first pass, must not fail
113
    cfg.Update(cl)
114
    # second pass, also must not fail (after the config has been written)
115
    cfg.Update(cl)
116
    # but the fake_cl update should still fail
117
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
118

    
119
  def testUpdateNode(self):
120
    """Test updates on one node object"""
121
    cfg = self._get_object()
122
    # construct a fake node
123
    fake_node = objects.Node()
124
    # fail if we didn't read the config
125
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
126

    
127
    node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
128
    # first pass, must not fail
129
    cfg.Update(node)
130
    # second pass, also must not fail (after the config has been written)
131
    cfg.Update(node)
132
    # but the fake_node update should still fail
133
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
134

    
135
  def testUpdateInstance(self):
136
    """Test updates on one instance object"""
137
    cfg = self._get_object()
138
    # construct a fake instance
139
    inst = self._create_instance()
140
    fake_instance = objects.Instance()
141
    # fail if we didn't read the config
142
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
143

    
144
    cfg.AddInstance(inst)
145
    instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
146
    # first pass, must not fail
147
    cfg.Update(instance)
148
    # second pass, also must not fail (after the config has been written)
149
    cfg.Update(instance)
150
    # but the fake_instance update should still fail
151
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
152

    
153

    
154
if __name__ == '__main__':
155
  unittest.main()