4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
39 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
40 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
41 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
42 SetEtcHostsEntry, RemoveEtcHostsEntry
43 from ganeti.errors import LockError, UnitParseError
46 class GanetiTestCase(unittest.TestCase):
47 def assertFileContent(self, file_name, content):
48 """Checks the content of a file.
51 handle = open(file_name, 'r')
53 self.assertEqual(handle.read(), content)
58 class TestIsProcessAlive(unittest.TestCase):
59 """Testing case for IsProcessAlive"""
61 # create a zombie and a (hopefully) non-existing process id
62 self.pid_zombie = os.fork()
63 if self.pid_zombie == 0:
65 elif self.pid_zombie < 0:
66 raise SystemError("can't fork")
67 self.pid_non_existing = os.fork()
68 if self.pid_non_existing == 0:
70 elif self.pid_non_existing > 0:
71 os.waitpid(self.pid_non_existing, 0)
73 raise SystemError("can't fork")
78 self.assert_(IsProcessAlive(mypid),
79 "can't find myself running")
82 self.assert_(not IsProcessAlive(self.pid_zombie),
83 "zombie not detected as zombie")
86 def testNotExisting(self):
87 self.assert_(not IsProcessAlive(self.pid_non_existing),
88 "noexisting process detected")
91 class TestLocking(unittest.TestCase):
92 """Testing case for the Lock/Unlock functions"""
95 lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
97 self.old_lock_dir = constants.LOCK_DIR
98 constants.LOCK_DIR = lock_dir
102 ganeti.utils.Unlock("unittest")
105 shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
106 constants.LOCK_DIR = self.old_lock_dir
108 def clean_lock(self, name):
110 ganeti.utils.Unlock("unittest")
116 self.clean_lock("unittest")
117 self.assertEqual(None, Lock("unittest"))
120 def testUnlock(self):
121 self.clean_lock("unittest")
122 ganeti.utils.Lock("unittest")
123 self.assertEqual(None, Unlock("unittest"))
125 def testDoubleLock(self):
126 self.clean_lock("unittest")
127 ganeti.utils.Lock("unittest")
128 self.assertRaises(LockError, Lock, "unittest")
131 class TestRunCmd(unittest.TestCase):
132 """Testing case for the RunCmd function"""
135 self.magic = time.ctime() + " ganeti test"
138 """Test successful exit code"""
139 result = RunCmd("/bin/sh -c 'exit 0'")
140 self.assertEqual(result.exit_code, 0)
143 """Test fail exit code"""
144 result = RunCmd("/bin/sh -c 'exit 1'")
145 self.assertEqual(result.exit_code, 1)
148 def testStdout(self):
149 """Test standard output"""
150 cmd = 'echo -n "%s"' % self.magic
151 result = RunCmd("/bin/sh -c '%s'" % cmd)
152 self.assertEqual(result.stdout, self.magic)
155 def testStderr(self):
156 """Test standard error"""
157 cmd = 'echo -n "%s"' % self.magic
158 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
159 self.assertEqual(result.stderr, self.magic)
162 def testCombined(self):
163 """Test combined output"""
164 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
165 result = RunCmd("/bin/sh -c '%s'" % cmd)
166 self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
168 def testSignal(self):
169 """Test standard error"""
170 result = RunCmd("/bin/sh -c 'kill -15 $$'")
171 self.assertEqual(result.signal, 15)
173 def testListRun(self):
175 result = RunCmd(["true"])
176 self.assertEqual(result.signal, None)
177 self.assertEqual(result.exit_code, 0)
178 result = RunCmd(["/bin/sh", "-c", "exit 1"])
179 self.assertEqual(result.signal, None)
180 self.assertEqual(result.exit_code, 1)
181 result = RunCmd(["echo", "-n", self.magic])
182 self.assertEqual(result.signal, None)
183 self.assertEqual(result.exit_code, 0)
184 self.assertEqual(result.stdout, self.magic)
187 """Test locale environment"""
188 old_env = os.environ.copy()
190 os.environ["LANG"] = "en_US.UTF-8"
191 os.environ["LC_ALL"] = "en_US.UTF-8"
192 result = RunCmd(["locale"])
193 for line in result.output.splitlines():
194 key, value = line.split("=", 1)
195 # Ignore these variables, they're overridden by LC_ALL
196 if key == "LANG" or key == "LANGUAGE":
198 self.failIf(value and value != "C" and value != '"C"',
199 "Variable %s is set to the invalid value '%s'" % (key, value))
204 class TestRemoveFile(unittest.TestCase):
205 """Test case for the RemoveFile function"""
208 """Create a temp dir and file for each case"""
209 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
210 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
214 if os.path.exists(self.tmpfile):
215 os.unlink(self.tmpfile)
216 os.rmdir(self.tmpdir)
219 def testIgnoreDirs(self):
220 """Test that RemoveFile() ignores directories"""
221 self.assertEqual(None, RemoveFile(self.tmpdir))
224 def testIgnoreNotExisting(self):
225 """Test that RemoveFile() ignores non-existing files"""
226 RemoveFile(self.tmpfile)
227 RemoveFile(self.tmpfile)
230 def testRemoveFile(self):
231 """Test that RemoveFile does remove a file"""
232 RemoveFile(self.tmpfile)
233 if os.path.exists(self.tmpfile):
234 self.fail("File '%s' not removed" % self.tmpfile)
237 def testRemoveSymlink(self):
238 """Test that RemoveFile does remove symlinks"""
239 symlink = self.tmpdir + "/symlink"
240 os.symlink("no-such-file", symlink)
242 if os.path.exists(symlink):
243 self.fail("File '%s' not removed" % symlink)
244 os.symlink(self.tmpfile, symlink)
246 if os.path.exists(symlink):
247 self.fail("File '%s' not removed" % symlink)
250 class TestCheckdict(unittest.TestCase):
251 """Test case for the CheckDict function"""
254 """Test that CheckDict adds a missing key with the correct value"""
259 if 'b' not in tgt or tgt['b'] != 2:
260 self.fail("Failed to update dict")
263 def testNoUpdate(self):
264 """Test that CheckDict does not overwrite an existing key"""
265 tgt = {'a':1, 'b': 3}
268 self.failUnlessEqual(tgt['b'], 3)
271 class TestMatchNameComponent(unittest.TestCase):
272 """Test case for the MatchNameComponent function"""
274 def testEmptyList(self):
275 """Test that there is no match against an empty list"""
277 self.failUnlessEqual(MatchNameComponent("", []), None)
278 self.failUnlessEqual(MatchNameComponent("test", []), None)
280 def testSingleMatch(self):
281 """Test that a single match is performed correctly"""
282 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
283 for key in "test2", "test2.example", "test2.example.com":
284 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
286 def testMultipleMatches(self):
287 """Test that a multiple match is returned as None"""
288 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
289 for key in "test1", "test1.example":
290 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
293 class TestFormatUnit(unittest.TestCase):
294 """Test case for the FormatUnit function"""
297 self.assertEqual(FormatUnit(1), '1M')
298 self.assertEqual(FormatUnit(100), '100M')
299 self.assertEqual(FormatUnit(1023), '1023M')
302 self.assertEqual(FormatUnit(1024), '1.0G')
303 self.assertEqual(FormatUnit(1536), '1.5G')
304 self.assertEqual(FormatUnit(17133), '16.7G')
305 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
308 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
309 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
310 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
313 class TestParseUnit(unittest.TestCase):
314 """Test case for the ParseUnit function"""
317 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
318 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
319 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
321 def testRounding(self):
322 self.assertEqual(ParseUnit('0'), 0)
323 self.assertEqual(ParseUnit('1'), 4)
324 self.assertEqual(ParseUnit('2'), 4)
325 self.assertEqual(ParseUnit('3'), 4)
327 self.assertEqual(ParseUnit('124'), 124)
328 self.assertEqual(ParseUnit('125'), 128)
329 self.assertEqual(ParseUnit('126'), 128)
330 self.assertEqual(ParseUnit('127'), 128)
331 self.assertEqual(ParseUnit('128'), 128)
332 self.assertEqual(ParseUnit('129'), 132)
333 self.assertEqual(ParseUnit('130'), 132)
335 def testFloating(self):
336 self.assertEqual(ParseUnit('0'), 0)
337 self.assertEqual(ParseUnit('0.5'), 4)
338 self.assertEqual(ParseUnit('1.75'), 4)
339 self.assertEqual(ParseUnit('1.99'), 4)
340 self.assertEqual(ParseUnit('2.00'), 4)
341 self.assertEqual(ParseUnit('2.01'), 4)
342 self.assertEqual(ParseUnit('3.99'), 4)
343 self.assertEqual(ParseUnit('4.00'), 4)
344 self.assertEqual(ParseUnit('4.01'), 8)
345 self.assertEqual(ParseUnit('1.5G'), 1536)
346 self.assertEqual(ParseUnit('1.8G'), 1844)
347 self.assertEqual(ParseUnit('8.28T'), 8682212)
349 def testSuffixes(self):
350 for sep in ('', ' ', ' ', "\t", "\t "):
351 for suffix, scale in TestParseUnit.SCALES:
352 for func in (lambda x: x, str.lower, str.upper):
353 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
356 def testInvalidInput(self):
357 for sep in ('-', '_', ',', 'a'):
358 for suffix, _ in TestParseUnit.SCALES:
359 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
361 for suffix, _ in TestParseUnit.SCALES:
362 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
365 class TestSshKeys(GanetiTestCase):
366 """Test case for the AddAuthorizedKey function"""
368 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
369 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
370 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
373 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
375 handle = os.fdopen(fd, 'w')
377 handle.write("%s\n" % TestSshKeys.KEY_A)
378 handle.write("%s\n" % TestSshKeys.KEY_B)
382 utils.RemoveFile(self.tmpname)
386 utils.RemoveFile(self.tmpname)
389 def testAddingNewKey(self):
390 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
392 self.assertFileContent(self.tmpname,
393 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
398 def testAddingAlmostButNotCompletelyTheSameKey(self):
399 AddAuthorizedKey(self.tmpname,
400 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
402 self.assertFileContent(self.tmpname,
403 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
406 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
408 def testAddingExistingKeyWithSomeMoreSpaces(self):
409 AddAuthorizedKey(self.tmpname,
410 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
412 self.assertFileContent(self.tmpname,
413 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
414 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
415 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
417 def testRemovingExistingKeyWithSomeMoreSpaces(self):
418 RemoveAuthorizedKey(self.tmpname,
419 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
421 self.assertFileContent(self.tmpname,
422 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
423 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
425 def testRemovingNonExistingKey(self):
426 RemoveAuthorizedKey(self.tmpname,
427 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
429 self.assertFileContent(self.tmpname,
430 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
431 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
432 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
435 class TestEtcHosts(GanetiTestCase):
436 """Test functions modifying /etc/hosts"""
439 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
441 handle = os.fdopen(fd, 'w')
443 handle.write('# This is a test file for /etc/hosts\n')
444 handle.write('127.0.0.1\tlocalhost\n')
445 handle.write('192.168.1.1 router gw\n')
449 utils.RemoveFile(self.tmpname)
453 utils.RemoveFile(self.tmpname)
456 def testSettingNewIp(self):
457 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
459 self.assertFileContent(self.tmpname,
460 "# This is a test file for /etc/hosts\n"
461 "127.0.0.1\tlocalhost\n"
462 "192.168.1.1 router gw\n"
463 "1.2.3.4\tmyhost.domain.tld myhost\n")
465 def testSettingExistingIp(self):
466 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
469 self.assertFileContent(self.tmpname,
470 "# This is a test file for /etc/hosts\n"
471 "127.0.0.1\tlocalhost\n"
472 "192.168.1.1\tmyhost.domain.tld myhost\n")
474 def testSettingDuplicateName(self):
475 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
477 self.assertFileContent(self.tmpname,
478 "# This is a test file for /etc/hosts\n"
479 "127.0.0.1\tlocalhost\n"
480 "192.168.1.1 router gw\n"
483 def testRemovingExistingHost(self):
484 RemoveEtcHostsEntry(self.tmpname, 'router')
486 self.assertFileContent(self.tmpname,
487 "# This is a test file for /etc/hosts\n"
488 "127.0.0.1\tlocalhost\n"
491 def testRemovingSingleExistingHost(self):
492 RemoveEtcHostsEntry(self.tmpname, 'localhost')
494 self.assertFileContent(self.tmpname,
495 "# This is a test file for /etc/hosts\n"
496 "192.168.1.1 router gw\n")
498 def testRemovingNonExistingHost(self):
499 RemoveEtcHostsEntry(self.tmpname, 'myhost')
501 self.assertFileContent(self.tmpname,
502 "# This is a test file for /etc/hosts\n"
503 "127.0.0.1\tlocalhost\n"
504 "192.168.1.1 router gw\n")
506 def testRemovingAlias(self):
507 RemoveEtcHostsEntry(self.tmpname, 'gw')
509 self.assertFileContent(self.tmpname,
510 "# This is a test file for /etc/hosts\n"
511 "127.0.0.1\tlocalhost\n"
512 "192.168.1.1 router\n")
515 class TestShellQuoting(unittest.TestCase):
516 """Test case for shell quoting functions"""
518 def testShellQuote(self):
519 self.assertEqual(ShellQuote('abc'), "abc")
520 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
521 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
522 self.assertEqual(ShellQuote("a b c"), "'a b c'")
523 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
525 def testShellQuoteArgs(self):
526 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
527 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
528 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
531 class TestTcpPing(unittest.TestCase):
532 """Testcase for TCP version of ping - against listen(2)ing port"""
535 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
536 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
537 self.listenerport = self.listener.getsockname()[1]
538 self.listener.listen(1)
541 self.listener.shutdown(socket.SHUT_RDWR)
543 del self.listenerport
545 def testTcpPingToLocalHostAccept(self):
546 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
547 constants.LOCALHOST_IP_ADDRESS,
550 live_port_needed=True),
551 "failed to connect to test listener")
554 class TestTcpPingDeaf(unittest.TestCase):
555 """Testcase for TCP version of ping - against non listen(2)ing port"""
558 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
560 self.deaflistenerport = self.deaflistener.getsockname()[1]
563 del self.deaflistener
564 del self.deaflistenerport
566 def testTcpPingToLocalHostAcceptDeaf(self):
567 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
568 constants.LOCALHOST_IP_ADDRESS,
569 self.deaflistenerport,
570 timeout=constants.TCP_PING_TIMEOUT,
571 live_port_needed=True), # need successful connect(2)
572 "successfully connected to deaf listener")
574 def testTcpPingToLocalHostNoAccept(self):
575 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
576 constants.LOCALHOST_IP_ADDRESS,
577 self.deaflistenerport,
578 timeout=constants.TCP_PING_TIMEOUT,
579 live_port_needed=False), # ECONNREFUSED is OK
580 "failed to ping alive host on deaf port")
583 class TestListVisibleFiles(unittest.TestCase):
584 """Test case for ListVisibleFiles"""
587 self.path = tempfile.mkdtemp()
590 shutil.rmtree(self.path)
592 def _test(self, files, expected):
594 expected = expected[:]
598 f = open(os.path.join(self.path, name), 'w')
604 found = ListVisibleFiles(self.path)
607 self.assertEqual(found, expected)
609 def testAllVisible(self):
610 files = ["a", "b", "c"]
612 self._test(files, expected)
614 def testNoneVisible(self):
615 files = [".a", ".b", ".c"]
617 self._test(files, expected)
619 def testSomeVisible(self):
620 files = ["a", "b", ".c"]
621 expected = ["a", "b"]
622 self._test(files, expected)
625 class TestNewUUID(unittest.TestCase):
626 """Test case for NewUUID"""
628 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
629 '[a-f0-9]{4}-[a-f0-9]{12}$')
632 self.failUnless(self._re_uuid.match(utils.NewUUID()))
635 class TestUniqueSequence(unittest.TestCase):
636 """Test case for UniqueSequence"""
638 def _test(self, input, expected):
639 self.assertEqual(utils.UniqueSequence(input), expected)
643 self._test([1, 2, 3], [1, 2, 3])
644 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
645 self._test([1, 2, 2, 3], [1, 2, 3])
646 self._test([1, 2, 3, 3], [1, 2, 3])
649 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
650 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
653 self._test(["a", "a"], ["a"])
654 self._test(["a", "b"], ["a", "b"])
655 self._test(["a", "b", "a"], ["a", "b"])
658 if __name__ == '__main__':