Revision 911dfc49

b/Makefile.am
958 958
	test/ganeti.runtime_unittest.py \
959 959
	test/ganeti.serializer_unittest.py \
960 960
	test/ganeti.server.rapi_unittest.py \
961
	test/ganeti.ssconf_unittest.py \
961 962
	test/ganeti.ssh_unittest.py \
962 963
	test/ganeti.storage_unittest.py \
963 964
	test/ganeti.tools.ensure_dirs_unittest.py \
b/lib/ssconf.py
69 69
_MAX_SIZE = 128 * 1024
70 70

  
71 71

  
72
def ReadSsconfFile(filename):
73
  """Reads an ssconf file and verifies its size.
74

  
75
  @type filename: string
76
  @param filename: Path to file
77
  @rtype: string
78
  @return: File contents without newlines at the end
79
  @raise RuntimeError: When the file size exceeds L{_MAX_SIZE}
80

  
81
  """
82
  statcb = utils.FileStatHelper()
83

  
84
  data = utils.ReadFile(filename, size=_MAX_SIZE, preread=statcb)
85

  
86
  if statcb.st.st_size > _MAX_SIZE:
87
    msg = ("File '%s' has a size of %s bytes (up to %s allowed)" %
88
           (filename, statcb.st.st_size, _MAX_SIZE))
89
    raise RuntimeError(msg)
90

  
91
  return data.rstrip("\n")
92

  
93

  
72 94
class SimpleStore(object):
73 95
  """Interface to static cluster data.
74 96

  
......
106 128
    """
107 129
    filename = self.KeyToFilename(key)
108 130
    try:
109
      data = utils.ReadFile(filename, size=_MAX_SIZE)
131
      return ReadSsconfFile(filename)
110 132
    except EnvironmentError, err:
111 133
      if err.errno == errno.ENOENT and default is not None:
112 134
        return default
113 135
      raise errors.ConfigurationError("Can't read ssconf file %s: %s" %
114 136
                                      (filename, str(err)))
115 137

  
116
    return data.rstrip("\n")
117

  
118 138
  def WriteFiles(self, values):
119 139
    """Writes ssconf files used by external scripts.
120 140

  
b/test/ganeti.ssconf_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for testing ganeti.ssconf"""
23

  
24
import os
25
import unittest
26
import tempfile
27
import shutil
28
import errno
29

  
30
from ganeti import utils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import ssconf
34

  
35
import testutils
36

  
37

  
38
class TestReadSsconfFile(unittest.TestCase):
39
  def setUp(self):
40
    self.tmpdir = tempfile.mkdtemp()
41

  
42
  def tearDown(self):
43
    shutil.rmtree(self.tmpdir)
44

  
45
  def testReadDirectory(self):
46
    self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
47

  
48
  def testNonExistantFile(self):
49
    testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
50

  
51
    self.assertFalse(os.path.exists(testfile))
52

  
53
    try:
54
      ssconf.ReadSsconfFile(testfile)
55
    except EnvironmentError, err:
56
      self.assertEqual(err.errno, errno.ENOENT)
57
    else:
58
      self.fail("Exception was not raised")
59

  
60
  def testEmptyFile(self):
61
    testfile = utils.PathJoin(self.tmpdir, "empty")
62

  
63
    utils.WriteFile(testfile, data="")
64

  
65
    self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
66

  
67
  def testSingleLine(self):
68
    testfile = utils.PathJoin(self.tmpdir, "data")
69

  
70
    for nl in range(0, 10):
71
      utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
72

  
73
      self.assertEqual(ssconf.ReadSsconfFile(testfile),
74
                       "Hello World")
75

  
76
  def testExactlyMaxSize(self):
77
    testfile = utils.PathJoin(self.tmpdir, "data")
78

  
79
    data = "A" * ssconf._MAX_SIZE
80
    utils.WriteFile(testfile, data=data)
81

  
82
    self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
83

  
84
    self.assertEqual(ssconf.ReadSsconfFile(testfile),
85
                     data)
86

  
87
  def testLargeFile(self):
88
    testfile = utils.PathJoin(self.tmpdir, "data")
89

  
90
    for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
91
      utils.WriteFile(testfile, data="A" * size)
92
      self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
93
      self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
94

  
95

  
96
class TestSimpleStore(unittest.TestCase):
97
  def setUp(self):
98
    self.tmpdir = tempfile.mkdtemp()
99
    self.sstore = ssconf.SimpleStore(cfg_location=self.tmpdir)
100

  
101
  def tearDown(self):
102
    shutil.rmtree(self.tmpdir)
103

  
104
  def testInvalidKey(self):
105
    self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
106
                      "not a valid key")
107
    self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
108
                      "not a valid key")
109

  
110
  def testKeyToFilename(self):
111
    for key in ssconf._VALID_KEYS:
112
      result = self.sstore.KeyToFilename(key)
113
      self.assertTrue(utils.IsBelowDir(self.tmpdir, result))
114
      self.assertTrue(os.path.basename(result).startswith("ssconf_"))
115

  
116
  def testReadFileNonExistingFile(self):
117
    filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
118

  
119
    self.assertFalse(os.path.exists(filename))
120
    try:
121
      self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
122
    except errors.ConfigurationError, err:
123
      self.assertTrue(str(err).startswith("Can't read ssconf file"))
124
    else:
125
      self.fail("Exception was not raised")
126

  
127
    for default in ["", "Hello World", 0, 100]:
128
      self.assertFalse(os.path.exists(filename))
129
      result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
130
      self.assertEqual(result, default)
131

  
132
  def testReadFile(self):
133
    utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
134
                    data="cluster.example.com")
135

  
136
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
137
                     "cluster.example.com")
138

  
139
    self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
140
                                           default="something.example.com"),
141
                     "cluster.example.com")
142

  
143

  
144
if __name__ == "__main__":
145
  testutils.GanetiTestProgram()

Also available in: Unified diff