4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
32 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35 ShellQuote, ShellQuoteArgs
36 from ganeti.errors import LockError, UnitParseError
38 class TestIsProcessAlive(unittest.TestCase):
39 """Testing case for IsProcessAlive"""
41 # create a zombie and a (hopefully) non-existing process id
42 self.pid_zombie = os.fork()
43 if self.pid_zombie == 0:
45 elif self.pid_zombie < 0:
46 raise SystemError("can't fork")
47 self.pid_non_existing = os.fork()
48 if self.pid_non_existing == 0:
50 elif self.pid_non_existing > 0:
51 os.waitpid(self.pid_non_existing, 0)
53 raise SystemError("can't fork")
58 self.assert_(IsProcessAlive(mypid),
59 "can't find myself running")
62 self.assert_(not IsProcessAlive(self.pid_zombie),
63 "zombie not detected as zombie")
66 def testNotExisting(self):
67 self.assert_(not IsProcessAlive(self.pid_non_existing),
68 "noexisting process detected")
71 class TestLocking(unittest.TestCase):
72 """Testing case for the Lock/Unlock functions"""
73 def clean_lock(self, name):
75 ganeti.utils.Unlock("unittest")
81 self.clean_lock("unittest")
82 self.assertEqual(None, Lock("unittest"))
86 self.clean_lock("unittest")
87 ganeti.utils.Lock("unittest")
88 self.assertEqual(None, Unlock("unittest"))
91 def testDoubleLock(self):
92 self.clean_lock("unittest")
93 ganeti.utils.Lock("unittest")
94 self.assertRaises(LockError, Lock, "unittest")
97 class TestRunCmd(unittest.TestCase):
98 """Testing case for the RunCmd function"""
101 self.magic = time.ctime() + " ganeti test"
104 """Test successfull exit code"""
105 result = RunCmd("/bin/sh -c 'exit 0'")
106 self.assertEqual(result.exit_code, 0)
109 """Test fail exit code"""
110 result = RunCmd("/bin/sh -c 'exit 1'")
111 self.assertEqual(result.exit_code, 1)
114 def testStdout(self):
115 """Test standard output"""
116 cmd = 'echo -n "%s"' % self.magic
117 result = RunCmd("/bin/sh -c '%s'" % cmd)
118 self.assertEqual(result.stdout, self.magic)
121 def testStderr(self):
122 """Test standard error"""
123 cmd = 'echo -n "%s"' % self.magic
124 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
125 self.assertEqual(result.stderr, self.magic)
128 def testCombined(self):
129 """Test combined output"""
130 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
131 result = RunCmd("/bin/sh -c '%s'" % cmd)
132 self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
134 def testSignal(self):
135 """Test standard error"""
136 result = RunCmd("/bin/sh -c 'kill -15 $$'")
137 self.assertEqual(result.signal, 15)
140 class TestRemoveFile(unittest.TestCase):
141 """Test case for the RemoveFile function"""
144 """Create a temp dir and file for each case"""
145 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
146 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
150 if os.path.exists(self.tmpfile):
151 os.unlink(self.tmpfile)
152 os.rmdir(self.tmpdir)
155 def testIgnoreDirs(self):
156 """Test that RemoveFile() ignores directories"""
157 self.assertEqual(None, RemoveFile(self.tmpdir))
160 def testIgnoreNotExisting(self):
161 """Test that RemoveFile() ignores non-existing files"""
162 RemoveFile(self.tmpfile)
163 RemoveFile(self.tmpfile)
166 def testRemoveFile(self):
167 """Test that RemoveFile does remove a file"""
168 RemoveFile(self.tmpfile)
169 if os.path.exists(self.tmpfile):
170 self.fail("File '%s' not removed" % self.tmpfile)
173 def testRemoveSymlink(self):
174 """Test that RemoveFile does remove symlinks"""
175 symlink = self.tmpdir + "/symlink"
176 os.symlink("no-such-file", symlink)
178 if os.path.exists(symlink):
179 self.fail("File '%s' not removed" % symlink)
180 os.symlink(self.tmpfile, symlink)
182 if os.path.exists(symlink):
183 self.fail("File '%s' not removed" % symlink)
186 class TestCheckdict(unittest.TestCase):
187 """Test case for the CheckDict function"""
190 """Test that CheckDict adds a missing key with the correct value"""
195 if 'b' not in tgt or tgt['b'] != 2:
196 self.fail("Failed to update dict")
199 def testNoUpdate(self):
200 """Test that CheckDict does not overwrite an existing key"""
201 tgt = {'a':1, 'b': 3}
204 self.failUnlessEqual(tgt['b'], 3)
207 class TestMatchNameComponent(unittest.TestCase):
208 """Test case for the MatchNameComponent function"""
210 def testEmptyList(self):
211 """Test that there is no match against an empty list"""
213 self.failUnlessEqual(MatchNameComponent("", []), None)
214 self.failUnlessEqual(MatchNameComponent("test", []), None)
216 def testSingleMatch(self):
217 """Test that a single match is performed correctly"""
218 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
219 for key in "test2", "test2.example", "test2.example.com":
220 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
222 def testMultipleMatches(self):
223 """Test that a multiple match is returned as None"""
224 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
225 for key in "test1", "test1.example":
226 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
229 class TestFormatUnit(unittest.TestCase):
230 """Test case for the FormatUnit function"""
233 self.assertEqual(FormatUnit(1), '1M')
234 self.assertEqual(FormatUnit(100), '100M')
235 self.assertEqual(FormatUnit(1023), '1023M')
238 self.assertEqual(FormatUnit(1024), '1.0G')
239 self.assertEqual(FormatUnit(1536), '1.5G')
240 self.assertEqual(FormatUnit(17133), '16.7G')
241 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
244 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
245 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
246 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
249 class TestParseUnit(unittest.TestCase):
250 """Test case for the ParseUnit function"""
253 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
254 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
255 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
257 def testRounding(self):
258 self.assertEqual(ParseUnit('0'), 0)
259 self.assertEqual(ParseUnit('1'), 4)
260 self.assertEqual(ParseUnit('2'), 4)
261 self.assertEqual(ParseUnit('3'), 4)
263 self.assertEqual(ParseUnit('124'), 124)
264 self.assertEqual(ParseUnit('125'), 128)
265 self.assertEqual(ParseUnit('126'), 128)
266 self.assertEqual(ParseUnit('127'), 128)
267 self.assertEqual(ParseUnit('128'), 128)
268 self.assertEqual(ParseUnit('129'), 132)
269 self.assertEqual(ParseUnit('130'), 132)
271 def testFloating(self):
272 self.assertEqual(ParseUnit('0'), 0)
273 self.assertEqual(ParseUnit('0.5'), 4)
274 self.assertEqual(ParseUnit('1.75'), 4)
275 self.assertEqual(ParseUnit('1.99'), 4)
276 self.assertEqual(ParseUnit('2.00'), 4)
277 self.assertEqual(ParseUnit('2.01'), 4)
278 self.assertEqual(ParseUnit('3.99'), 4)
279 self.assertEqual(ParseUnit('4.00'), 4)
280 self.assertEqual(ParseUnit('4.01'), 8)
281 self.assertEqual(ParseUnit('1.5G'), 1536)
282 self.assertEqual(ParseUnit('1.8G'), 1844)
283 self.assertEqual(ParseUnit('8.28T'), 8682212)
285 def testSuffixes(self):
286 for sep in ('', ' ', ' ', "\t", "\t "):
287 for suffix, scale in TestParseUnit.SCALES:
288 for func in (lambda x: x, str.lower, str.upper):
289 self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
291 def testInvalidInput(self):
292 for sep in ('-', '_', ',', 'a'):
293 for suffix, _ in TestParseUnit.SCALES:
294 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
296 for suffix, _ in TestParseUnit.SCALES:
297 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
300 class TestSshKeys(unittest.TestCase):
301 """Test case for the AddAuthorizedKey function"""
303 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
304 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
305 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
307 # NOTE: The MD5 sums below were calculated after manually
308 # checking the output files.
310 def writeTestFile(self):
311 (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
312 f = os.fdopen(fd, 'w')
314 f.write(TestSshKeys.KEY_A)
316 f.write(TestSshKeys.KEY_B)
323 def testAddingNewKey(self):
324 tmpname = self.writeTestFile()
326 AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
328 f = open(tmpname, 'r')
330 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
331 'ccc71523108ca6e9d0343797dc3e9f16')
337 def testAddingAlmostButNotCompletlyTheSameKey(self):
338 tmpname = self.writeTestFile()
340 AddAuthorizedKey(tmpname,
341 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
343 f = open(tmpname, 'r')
345 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
346 'f2c939d57addb5b3a6846884be896b46')
352 def testAddingExistingKeyWithSomeMoreSpaces(self):
353 tmpname = self.writeTestFile()
355 AddAuthorizedKey(tmpname,
356 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
358 f = open(tmpname, 'r')
360 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
361 '4e612764808bd46337eb0f575415fc30')
367 def testRemovingExistingKeyWithSomeMoreSpaces(self):
368 tmpname = self.writeTestFile()
370 RemoveAuthorizedKey(tmpname,
371 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
373 f = open(tmpname, 'r')
375 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
376 '77516d987fca07f70e30b830b3e4f2ed')
382 def testRemovingNonExistingKey(self):
383 tmpname = self.writeTestFile()
385 RemoveAuthorizedKey(tmpname,
386 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
388 f = open(tmpname, 'r')
390 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
391 '4e612764808bd46337eb0f575415fc30')
398 class TestShellQuoting(unittest.TestCase):
399 """Test case for shell quoting functions"""
401 def testShellQuote(self):
402 self.assertEqual(ShellQuote('abc'), "abc")
403 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
404 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
405 self.assertEqual(ShellQuote("a b c"), "'a b c'")
406 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
408 def testShellQuoteArgs(self):
409 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
410 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
411 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
414 if __name__ == '__main__':