Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.ssconf_unittest.py @ 560ef132

History | View | Annotate | Download (8.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.ssconf"""
23

    
24
import os
25
import unittest
26
import tempfile
27
import shutil
28
import errno
29

    
30
from ganeti import utils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import ssconf
34

    
35
import testutils
36
import mock
37

    
38

    
39
class TestReadSsconfFile(unittest.TestCase):
40
  def setUp(self):
41
    self.tmpdir = tempfile.mkdtemp()
42

    
43
  def tearDown(self):
44
    shutil.rmtree(self.tmpdir)
45

    
46
  def testReadDirectory(self):
47
    self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
48

    
49
  def testNonExistantFile(self):
50
    testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
51

    
52
    self.assertFalse(os.path.exists(testfile))
53

    
54
    try:
55
      ssconf.ReadSsconfFile(testfile)
56
    except EnvironmentError, err:
57
      self.assertEqual(err.errno, errno.ENOENT)
58
    else:
59
      self.fail("Exception was not raised")
60

    
61
  def testEmptyFile(self):
62
    testfile = utils.PathJoin(self.tmpdir, "empty")
63

    
64
    utils.WriteFile(testfile, data="")
65

    
66
    self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
67

    
68
  def testSingleLine(self):
69
    testfile = utils.PathJoin(self.tmpdir, "data")
70

    
71
    for nl in range(0, 10):
72
      utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
73

    
74
      self.assertEqual(ssconf.ReadSsconfFile(testfile),
75
                       "Hello World")
76

    
77
  def testExactlyMaxSize(self):
78
    testfile = utils.PathJoin(self.tmpdir, "data")
79

    
80
    data = "A" * ssconf._MAX_SIZE
81
    utils.WriteFile(testfile, data=data)
82

    
83
    self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
84

    
85
    self.assertEqual(ssconf.ReadSsconfFile(testfile),
86
                     data)
87

    
88
  def testLargeFile(self):
89
    testfile = utils.PathJoin(self.tmpdir, "data")
90

    
91
    for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
92
      utils.WriteFile(testfile, data="A" * size)
93
      self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
94
      self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
95

    
96

    
97
class TestSimpleStore(unittest.TestCase):
98
  def setUp(self):
99
    self._tmpdir = tempfile.mkdtemp()
100
    self.ssdir = utils.PathJoin(self._tmpdir, "files")
101
    lockfile = utils.PathJoin(self._tmpdir, "lock")
102

    
103
    os.mkdir(self.ssdir)
104

    
105
    self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
106
                                     _lockfile=lockfile)
107

    
108
  def tearDown(self):
109
    shutil.rmtree(self._tmpdir)
110

    
111
  def _ReadSsFile(self, filename):
112
    return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
113

    
114
  def testInvalidKey(self):
115
    self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
116
                      "not a valid key")
117
    self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
118
                      "not a valid key")
119

    
120
  def testKeyToFilename(self):
121
    for key in ssconf._VALID_KEYS:
122
      result = self.sstore.KeyToFilename(key)
123
      self.assertTrue(utils.IsBelowDir(self.ssdir, result))
124
      self.assertTrue(os.path.basename(result).startswith("ssconf_"))
125

    
126
  def testReadFileNonExistingFile(self):
127
    filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
128

    
129
    self.assertFalse(os.path.exists(filename))
130
    try:
131
      self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
132
    except errors.ConfigurationError, err:
133
      self.assertTrue(str(err).startswith("Can't read ssconf file"))
134
    else:
135
      self.fail("Exception was not raised")
136

    
137
    for default in ["", "Hello World", 0, 100]:
138
      self.assertFalse(os.path.exists(filename))
139
      result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
140
      self.assertEqual(result, default)
141

    
142
  def testReadFile(self):
143
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
144
                    data="cluster.example.com")
145

    
146
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
147
                     "cluster.example.com")
148

    
149
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
150
                                           default="something.example.com"),
151
                     "cluster.example.com")
152

    
153
  def testReadAllNoFiles(self):
154
    self.assertEqual(self.sstore.ReadAll(), {})
155

    
156
  def testReadAllSingleFile(self):
157
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
158
                    data="cluster.example.com")
159
    self.assertEqual(self.sstore.ReadAll(), {
160
      constants.SS_CLUSTER_NAME: "cluster.example.com",
161
      })
162

    
163
  def testWriteFiles(self):
164
    values = {
165
      constants.SS_CLUSTER_NAME: "cluster.example.com",
166
      constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
167
      constants.SS_INSTANCE_LIST: "",
168
      }
169

    
170
    self.sstore.WriteFiles(values)
171

    
172
    self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
173
      "ssconf_cluster_name",
174
      "ssconf_cluster_tags",
175
      "ssconf_instance_list",
176
      ]))
177

    
178
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
179
                     "cluster.example.com\n")
180
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
181
                     "value\nwith\nnewlines\n")
182
    self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
183

    
184
  def testWriteFilesUnknownKey(self):
185
    values = {
186
      "unknown key": "value",
187
      }
188

    
189
    self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
190
                      values, dry_run=True)
191

    
192
    self.assertEqual(os.listdir(self.ssdir), [])
193

    
194
  def testWriteFilesDryRun(self):
195
    values = {
196
      constants.SS_CLUSTER_NAME: "cluster.example.com",
197
      }
198

    
199
    self.sstore.WriteFiles(values, dry_run=True)
200

    
201
    self.assertEqual(os.listdir(self.ssdir), [])
202

    
203
  def testWriteFilesNoValues(self):
204
    for dry_run in [False, True]:
205
      self.sstore.WriteFiles({}, dry_run=dry_run)
206

    
207
      self.assertEqual(os.listdir(self.ssdir), [])
208

    
209
  def testWriteFilesTooLong(self):
210
    values = {
211
      constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
212
      }
213

    
214
    for dry_run in [False, True]:
215
      try:
216
        self.sstore.WriteFiles(values, dry_run=dry_run)
217
      except errors.ConfigurationError, err:
218
        self.assertTrue(str(err).startswith("Value 'instance_list' has"))
219
      else:
220
        self.fail("Exception was not raised")
221

    
222
      self.assertEqual(os.listdir(self.ssdir), [])
223

    
224
  def testGetHvparamsForHypervisor(self):
225
    hvparams = [("a", "A"), ("b", "B"), ("c", "C")]
226
    ssconf_file_content = '\n'.join("%s=%s" % (key, value) for (key, value)
227
                                    in hvparams)
228
    self.sstore._ReadFile = mock.Mock(return_value=ssconf_file_content)
229
    result = self.sstore.GetHvparamsForHypervisor("foo")
230
    for (key, value) in hvparams:
231
      self.assertTrue(key in result)
232
      self.assertEqual(value, result[key])
233

    
234

    
235
class TestVerifyClusterName(unittest.TestCase):
236
  def setUp(self):
237
    self.tmpdir = tempfile.mkdtemp()
238

    
239
  def tearDown(self):
240
    shutil.rmtree(self.tmpdir)
241

    
242
  def testMissingFile(self):
243
    tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
244
    ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
245

    
246
  def testMatchingName(self):
247
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
248

    
249
    for content in ["cluster.example.com", "cluster.example.com\n\n"]:
250
      utils.WriteFile(tmpfile, data=content)
251
      ssconf.VerifyClusterName("cluster.example.com",
252
                               _cfg_location=self.tmpdir)
253

    
254
  def testNameMismatch(self):
255
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
256

    
257
    for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
258
      utils.WriteFile(tmpfile, data=content)
259
      self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
260
                        "cluster.example.com", _cfg_location=self.tmpdir)
261

    
262

    
263
class TestVerifyKeys(unittest.TestCase):
264
  def testNoKeys(self):
265
    ssconf.VerifyKeys({})
266

    
267
  def testValidKeys(self):
268
    ssconf.VerifyKeys(ssconf._VALID_KEYS)
269

    
270
    for key in ssconf._VALID_KEYS:
271
      ssconf.VerifyKeys([key])
272

    
273
  def testInvalidKeys(self):
274
    for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
275
      self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
276

    
277

    
278
if __name__ == "__main__":
279
  testutils.GanetiTestProgram()