Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.config_unittest.py @ f6bd6e98

History | View | Annotate | Download (4.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the config module"""
23

    
24

    
25
import unittest
26
import os
27
import time
28
import tempfile
29
import os.path
30
import socket
31

    
32
from ganeti import errors
33
from ganeti import constants
34
from ganeti import config
35
from ganeti import objects
36
from ganeti import utils
37

    
38

    
39
class TestConfigRunner(unittest.TestCase):
40
  """Testing case for HooksRunner"""
41
  def setUp(self):
42
    fd, self.cfg_file = tempfile.mkstemp()
43
    os.close(fd)
44

    
45
  def tearDown(self):
46
    try:
47
      os.unlink(self.cfg_file)
48
    except OSError:
49
      pass
50

    
51
  def _get_object(self):
52
    """Returns a instance of ConfigWriter"""
53
    cfg = config.ConfigWriter(cfg_file=self.cfg_file, offline=True)
54
    return cfg
55

    
56
  def _init_cluster(self, cfg):
57
    """Initializes the cfg object"""
58
    me = utils.HostInfo()
59
    ip = constants.LOCALHOST_IP_ADDRESS
60

    
61
    cluster_config = objects.Cluster(
62
      serial_no=1,
63
      rsahostkeypub="",
64
      highest_used_port=(constants.FIRST_DRBD_PORT - 1),
65
      mac_prefix="aa:00:00",
66
      volume_group_name="xenvg",
67
      default_bridge=constants.DEFAULT_BRIDGE,
68
      tcpudp_port_pool=set(),
69
      hypervisor=constants.HT_FAKE,
70
      master_node=me.name,
71
      master_ip="127.0.0.1",
72
      master_netdev=constants.DEFAULT_BRIDGE,
73
      cluster_name="cluster.local",
74
      file_storage_dir="/tmp",
75
      )
76

    
77
    master_node_config = objects.Node(name=me.name,
78
                                      primary_ip=me.ip,
79
                                      secondary_ip=ip)
80

    
81
    cfg.InitConfig(constants.CONFIG_VERSION,
82
                   cluster_config, master_node_config)
83

    
84
  def _create_instance(self):
85
    """Create and return an instance object"""
86
    inst = objects.Instance(name="test.example.com", disks=[],
87
                            disk_template=constants.DT_DISKLESS)
88
    return inst
89

    
90
  def testEmpty(self):
91
    """Test instantiate config object"""
92
    self._get_object()
93

    
94
  def testInit(self):
95
    """Test initialize the config file"""
96
    cfg = self._get_object()
97
    self._init_cluster(cfg)
98
    self.failUnlessEqual(1, len(cfg.GetNodeList()))
99
    self.failUnlessEqual(0, len(cfg.GetInstanceList()))
100

    
101
  def testUpdateCluster(self):
102
    """Test updates on the cluster object"""
103
    cfg = self._get_object()
104
    # construct a fake cluster object
105
    fake_cl = objects.Cluster()
106
    # fail if we didn't read the config
107
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_cl)
108

    
109
    self._init_cluster(cfg)
110
    cl = cfg.GetClusterInfo()
111
    # first pass, must not fail
112
    cfg.Update(cl)
113
    # second pass, also must not fail (after the config has been written)
114
    cfg.Update(cl)
115
    # but the fake_cl update should still fail
116
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
117

    
118
  def testUpdateNode(self):
119
    """Test updates on one node object"""
120
    cfg = self._get_object()
121
    # construct a fake node
122
    fake_node = objects.Node()
123
    # fail if we didn't read the config
124
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_node)
125

    
126
    self._init_cluster(cfg)
127
    node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
128
    # first pass, must not fail
129
    cfg.Update(node)
130
    # second pass, also must not fail (after the config has been written)
131
    cfg.Update(node)
132
    # but the fake_node update should still fail
133
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
134

    
135
  def testUpdateInstance(self):
136
    """Test updates on one instance object"""
137
    cfg = self._get_object()
138
    # construct a fake instance
139
    inst = self._create_instance()
140
    fake_instance = objects.Instance()
141
    # fail if we didn't read the config
142
    self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_instance)
143

    
144
    self._init_cluster(cfg)
145
    cfg.AddInstance(inst)
146
    instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
147
    # first pass, must not fail
148
    cfg.Update(instance)
149
    # second pass, also must not fail (after the config has been written)
150
    cfg.Update(instance)
151
    # but the fake_instance update should still fail
152
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
153

    
154

    
155
if __name__ == '__main__':
156
  unittest.main()