Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.ssconf_unittest.py @ cc7f5bfc

History | View | Annotate | Download (7.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.ssconf"""
23

    
24
import os
25
import unittest
26
import tempfile
27
import shutil
28
import errno
29

    
30
from ganeti import utils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import ssconf
34

    
35
import testutils
36

    
37

    
38
class TestReadSsconfFile(unittest.TestCase):
39
  def setUp(self):
40
    self.tmpdir = tempfile.mkdtemp()
41

    
42
  def tearDown(self):
43
    shutil.rmtree(self.tmpdir)
44

    
45
  def testReadDirectory(self):
46
    self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
47

    
48
  def testNonExistantFile(self):
49
    testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
50

    
51
    self.assertFalse(os.path.exists(testfile))
52

    
53
    try:
54
      ssconf.ReadSsconfFile(testfile)
55
    except EnvironmentError, err:
56
      self.assertEqual(err.errno, errno.ENOENT)
57
    else:
58
      self.fail("Exception was not raised")
59

    
60
  def testEmptyFile(self):
61
    testfile = utils.PathJoin(self.tmpdir, "empty")
62

    
63
    utils.WriteFile(testfile, data="")
64

    
65
    self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
66

    
67
  def testSingleLine(self):
68
    testfile = utils.PathJoin(self.tmpdir, "data")
69

    
70
    for nl in range(0, 10):
71
      utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
72

    
73
      self.assertEqual(ssconf.ReadSsconfFile(testfile),
74
                       "Hello World")
75

    
76
  def testExactlyMaxSize(self):
77
    testfile = utils.PathJoin(self.tmpdir, "data")
78

    
79
    data = "A" * ssconf._MAX_SIZE
80
    utils.WriteFile(testfile, data=data)
81

    
82
    self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
83

    
84
    self.assertEqual(ssconf.ReadSsconfFile(testfile),
85
                     data)
86

    
87
  def testLargeFile(self):
88
    testfile = utils.PathJoin(self.tmpdir, "data")
89

    
90
    for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
91
      utils.WriteFile(testfile, data="A" * size)
92
      self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
93
      self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
94

    
95

    
96
class TestSimpleStore(unittest.TestCase):
97
  def setUp(self):
98
    self._tmpdir = tempfile.mkdtemp()
99
    self.ssdir = utils.PathJoin(self._tmpdir, "files")
100
    lockfile = utils.PathJoin(self._tmpdir, "lock")
101

    
102
    os.mkdir(self.ssdir)
103

    
104
    self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
105
                                     _lockfile=lockfile)
106

    
107
  def tearDown(self):
108
    shutil.rmtree(self._tmpdir)
109

    
110
  def _ReadSsFile(self, filename):
111
    return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
112

    
113
  def testInvalidKey(self):
114
    self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
115
                      "not a valid key")
116
    self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
117
                      "not a valid key")
118

    
119
  def testKeyToFilename(self):
120
    for key in ssconf._VALID_KEYS:
121
      result = self.sstore.KeyToFilename(key)
122
      self.assertTrue(utils.IsBelowDir(self.ssdir, result))
123
      self.assertTrue(os.path.basename(result).startswith("ssconf_"))
124

    
125
  def testReadFileNonExistingFile(self):
126
    filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
127

    
128
    self.assertFalse(os.path.exists(filename))
129
    try:
130
      self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
131
    except errors.ConfigurationError, err:
132
      self.assertTrue(str(err).startswith("Can't read ssconf file"))
133
    else:
134
      self.fail("Exception was not raised")
135

    
136
    for default in ["", "Hello World", 0, 100]:
137
      self.assertFalse(os.path.exists(filename))
138
      result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
139
      self.assertEqual(result, default)
140

    
141
  def testReadFile(self):
142
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
143
                    data="cluster.example.com")
144

    
145
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
146
                     "cluster.example.com")
147

    
148
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
149
                                           default="something.example.com"),
150
                     "cluster.example.com")
151

    
152
  def testWriteFiles(self):
153
    values = {
154
      constants.SS_CLUSTER_NAME: "cluster.example.com",
155
      constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
156
      constants.SS_INSTANCE_LIST: "",
157
      }
158

    
159
    self.sstore.WriteFiles(values)
160

    
161
    self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
162
      "ssconf_cluster_name",
163
      "ssconf_cluster_tags",
164
      "ssconf_instance_list",
165
      ]))
166

    
167
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
168
                     "cluster.example.com\n")
169
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
170
                     "value\nwith\nnewlines\n")
171
    self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
172

    
173
  def testWriteFilesUnknownKey(self):
174
    values = {
175
      "unknown key": "value",
176
      }
177

    
178
    self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
179
                      values, dry_run=True)
180

    
181
    self.assertEqual(os.listdir(self.ssdir), [])
182

    
183
  def testWriteFilesDryRun(self):
184
    values = {
185
      constants.SS_CLUSTER_NAME: "cluster.example.com",
186
      }
187

    
188
    self.sstore.WriteFiles(values, dry_run=True)
189

    
190
    self.assertEqual(os.listdir(self.ssdir), [])
191

    
192
  def testWriteFilesNoValues(self):
193
    for dry_run in [False, True]:
194
      self.sstore.WriteFiles({}, dry_run=dry_run)
195

    
196
      self.assertEqual(os.listdir(self.ssdir), [])
197

    
198
  def testWriteFilesTooLong(self):
199
    values = {
200
      constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
201
      }
202

    
203
    for dry_run in [False, True]:
204
      try:
205
        self.sstore.WriteFiles(values, dry_run=dry_run)
206
      except errors.ConfigurationError, err:
207
        self.assertTrue(str(err).startswith("Value 'instance_list' has"))
208
      else:
209
        self.fail("Exception was not raised")
210

    
211
      self.assertEqual(os.listdir(self.ssdir), [])
212

    
213

    
214
class TestVerifyClusterName(unittest.TestCase):
215
  def setUp(self):
216
    self.tmpdir = tempfile.mkdtemp()
217

    
218
  def tearDown(self):
219
    shutil.rmtree(self.tmpdir)
220

    
221
  def testMissingFile(self):
222
    tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
223
    ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
224

    
225
  def testMatchingName(self):
226
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
227

    
228
    for content in ["cluster.example.com", "cluster.example.com\n\n"]:
229
      utils.WriteFile(tmpfile, data=content)
230
      ssconf.VerifyClusterName("cluster.example.com",
231
                               _cfg_location=self.tmpdir)
232

    
233
  def testNameMismatch(self):
234
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
235

    
236
    for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
237
      utils.WriteFile(tmpfile, data=content)
238
      self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
239
                        "cluster.example.com", _cfg_location=self.tmpdir)
240

    
241

    
242
class TestVerifyKeys(unittest.TestCase):
243
  def testNoKeys(self):
244
    ssconf.VerifyKeys({})
245

    
246
  def testValidKeys(self):
247
    ssconf.VerifyKeys(ssconf._VALID_KEYS)
248

    
249
    for key in ssconf._VALID_KEYS:
250
      ssconf.VerifyKeys([key])
251

    
252
  def testInvalidKeys(self):
253
    for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
254
      self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
255

    
256

    
257
if __name__ == "__main__":
258
  testutils.GanetiTestProgram()