Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.ssconf_unittest.py @ 7352d33b

History | View | Annotate | Download (8.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.ssconf"""
23

    
24
import os
25
import unittest
26
import tempfile
27
import shutil
28
import errno
29

    
30
from ganeti import utils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import ssconf
34

    
35
import testutils
36

    
37

    
38
class TestReadSsconfFile(unittest.TestCase):
39
  def setUp(self):
40
    self.tmpdir = tempfile.mkdtemp()
41

    
42
  def tearDown(self):
43
    shutil.rmtree(self.tmpdir)
44

    
45
  def testReadDirectory(self):
46
    self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
47

    
48
  def testNonExistantFile(self):
49
    testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
50

    
51
    self.assertFalse(os.path.exists(testfile))
52

    
53
    try:
54
      ssconf.ReadSsconfFile(testfile)
55
    except EnvironmentError, err:
56
      self.assertEqual(err.errno, errno.ENOENT)
57
    else:
58
      self.fail("Exception was not raised")
59

    
60
  def testEmptyFile(self):
61
    testfile = utils.PathJoin(self.tmpdir, "empty")
62

    
63
    utils.WriteFile(testfile, data="")
64

    
65
    self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
66

    
67
  def testSingleLine(self):
68
    testfile = utils.PathJoin(self.tmpdir, "data")
69

    
70
    for nl in range(0, 10):
71
      utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
72

    
73
      self.assertEqual(ssconf.ReadSsconfFile(testfile),
74
                       "Hello World")
75

    
76
  def testExactlyMaxSize(self):
77
    testfile = utils.PathJoin(self.tmpdir, "data")
78

    
79
    data = "A" * ssconf._MAX_SIZE
80
    utils.WriteFile(testfile, data=data)
81

    
82
    self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
83

    
84
    self.assertEqual(ssconf.ReadSsconfFile(testfile),
85
                     data)
86

    
87
  def testLargeFile(self):
88
    testfile = utils.PathJoin(self.tmpdir, "data")
89

    
90
    for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
91
      utils.WriteFile(testfile, data="A" * size)
92
      self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
93
      self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
94

    
95

    
96
class TestSimpleStore(unittest.TestCase):
97
  def setUp(self):
98
    self._tmpdir = tempfile.mkdtemp()
99
    self.ssdir = utils.PathJoin(self._tmpdir, "files")
100
    lockfile = utils.PathJoin(self._tmpdir, "lock")
101

    
102
    os.mkdir(self.ssdir)
103

    
104
    self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
105
                                     _lockfile=lockfile)
106

    
107
  def tearDown(self):
108
    shutil.rmtree(self._tmpdir)
109

    
110
  def _ReadSsFile(self, filename):
111
    return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
112

    
113
  def testInvalidKey(self):
114
    self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
115
                      "not a valid key")
116
    self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
117
                      "not a valid key")
118

    
119
  def testKeyToFilename(self):
120
    for key in ssconf._VALID_KEYS:
121
      result = self.sstore.KeyToFilename(key)
122
      self.assertTrue(utils.IsBelowDir(self.ssdir, result))
123
      self.assertTrue(os.path.basename(result).startswith("ssconf_"))
124

    
125
  def testReadFileNonExistingFile(self):
126
    filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
127

    
128
    self.assertFalse(os.path.exists(filename))
129
    try:
130
      self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
131
    except errors.ConfigurationError, err:
132
      self.assertTrue(str(err).startswith("Can't read ssconf file"))
133
    else:
134
      self.fail("Exception was not raised")
135

    
136
    for default in ["", "Hello World", 0, 100]:
137
      self.assertFalse(os.path.exists(filename))
138
      result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
139
      self.assertEqual(result, default)
140

    
141
  def testReadFile(self):
142
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
143
                    data="cluster.example.com")
144

    
145
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
146
                     "cluster.example.com")
147

    
148
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
149
                                           default="something.example.com"),
150
                     "cluster.example.com")
151

    
152
  def testReadAllNoFiles(self):
153
    self.assertEqual(self.sstore.ReadAll(), {})
154

    
155
  def testReadAllSingleFile(self):
156
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
157
                    data="cluster.example.com")
158
    self.assertEqual(self.sstore.ReadAll(), {
159
      constants.SS_CLUSTER_NAME: "cluster.example.com",
160
      })
161

    
162
  def testWriteFiles(self):
163
    values = {
164
      constants.SS_CLUSTER_NAME: "cluster.example.com",
165
      constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
166
      constants.SS_INSTANCE_LIST: "",
167
      }
168

    
169
    self.sstore.WriteFiles(values)
170

    
171
    self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
172
      "ssconf_cluster_name",
173
      "ssconf_cluster_tags",
174
      "ssconf_instance_list",
175
      ]))
176

    
177
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
178
                     "cluster.example.com\n")
179
    self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
180
                     "value\nwith\nnewlines\n")
181
    self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
182

    
183
  def testWriteFilesUnknownKey(self):
184
    values = {
185
      "unknown key": "value",
186
      }
187

    
188
    self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
189
                      values, dry_run=True)
190

    
191
    self.assertEqual(os.listdir(self.ssdir), [])
192

    
193
  def testWriteFilesDryRun(self):
194
    values = {
195
      constants.SS_CLUSTER_NAME: "cluster.example.com",
196
      }
197

    
198
    self.sstore.WriteFiles(values, dry_run=True)
199

    
200
    self.assertEqual(os.listdir(self.ssdir), [])
201

    
202
  def testWriteFilesNoValues(self):
203
    for dry_run in [False, True]:
204
      self.sstore.WriteFiles({}, dry_run=dry_run)
205

    
206
      self.assertEqual(os.listdir(self.ssdir), [])
207

    
208
  def testWriteFilesTooLong(self):
209
    values = {
210
      constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
211
      }
212

    
213
    for dry_run in [False, True]:
214
      try:
215
        self.sstore.WriteFiles(values, dry_run=dry_run)
216
      except errors.ConfigurationError, err:
217
        self.assertTrue(str(err).startswith("Value 'instance_list' has"))
218
      else:
219
        self.fail("Exception was not raised")
220

    
221
      self.assertEqual(os.listdir(self.ssdir), [])
222

    
223

    
224
class TestVerifyClusterName(unittest.TestCase):
225
  def setUp(self):
226
    self.tmpdir = tempfile.mkdtemp()
227

    
228
  def tearDown(self):
229
    shutil.rmtree(self.tmpdir)
230

    
231
  def testMissingFile(self):
232
    tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
233
    ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
234

    
235
  def testMatchingName(self):
236
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
237

    
238
    for content in ["cluster.example.com", "cluster.example.com\n\n"]:
239
      utils.WriteFile(tmpfile, data=content)
240
      ssconf.VerifyClusterName("cluster.example.com",
241
                               _cfg_location=self.tmpdir)
242

    
243
  def testNameMismatch(self):
244
    tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
245

    
246
    for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
247
      utils.WriteFile(tmpfile, data=content)
248
      self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
249
                        "cluster.example.com", _cfg_location=self.tmpdir)
250

    
251

    
252
class TestVerifyKeys(unittest.TestCase):
253
  def testNoKeys(self):
254
    ssconf.VerifyKeys({})
255

    
256
  def testValidKeys(self):
257
    ssconf.VerifyKeys(ssconf._VALID_KEYS)
258

    
259
    for key in ssconf._VALID_KEYS:
260
      ssconf.VerifyKeys([key])
261

    
262
  def testInvalidKeys(self):
263
    for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
264
      self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
265

    
266

    
267
if __name__ == "__main__":
268
  testutils.GanetiTestProgram()