4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
37 from ganeti import constants
38 from ganeti import utils
39 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
40 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
41 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
42 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
43 SetEtcHostsEntry, RemoveEtcHostsEntry
44 from ganeti.errors import LockError, UnitParseError
46 def _ChildHandler(signal, stack):
50 class GanetiTestCase(unittest.TestCase):
51 def assertFileContent(self, file_name, content):
52 """Checks the content of a file.
55 handle = open(file_name, 'r')
57 self.assertEqual(handle.read(), content)
62 class TestIsProcessAlive(unittest.TestCase):
63 """Testing case for IsProcessAlive"""
67 # create a (most probably) non-existing process-id
68 self.pid_non_existing = os.fork()
69 if self.pid_non_existing == 0:
71 elif self.pid_non_existing > 0:
72 os.waitpid(self.pid_non_existing, 0)
74 raise SystemError("can't fork")
76 # Use _ChildHandler for SIGCHLD
77 self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
79 self.pid_zombie = os.fork()
80 if self.pid_zombie == 0:
82 elif self.pid_zombie < 0:
83 raise SystemError("can't fork")
86 signal.signal(signal.SIGCHLD, self.chldOrig)
90 self.assert_(IsProcessAlive(mypid),
91 "can't find myself running")
102 self.fail("timed out waiting for child's signal")
103 break # not executed...
105 self.assert_(not IsProcessAlive(self.pid_zombie),
106 "zombie not detected as zombie")
108 def testNotExisting(self):
109 self.assert_(not IsProcessAlive(self.pid_non_existing),
110 "noexisting process detected")
113 class TestLocking(unittest.TestCase):
114 """Testing case for the Lock/Unlock functions"""
117 lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
119 self.old_lock_dir = constants.LOCK_DIR
120 constants.LOCK_DIR = lock_dir
124 ganeti.utils.Unlock("unittest")
127 shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
128 constants.LOCK_DIR = self.old_lock_dir
130 def clean_lock(self, name):
132 ganeti.utils.Unlock("unittest")
138 self.clean_lock("unittest")
139 self.assertEqual(None, Lock("unittest"))
142 def testUnlock(self):
143 self.clean_lock("unittest")
144 ganeti.utils.Lock("unittest")
145 self.assertEqual(None, Unlock("unittest"))
147 def testDoubleLock(self):
148 self.clean_lock("unittest")
149 ganeti.utils.Lock("unittest")
150 self.assertRaises(LockError, Lock, "unittest")
153 class TestRunCmd(unittest.TestCase):
154 """Testing case for the RunCmd function"""
157 self.magic = time.ctime() + " ganeti test"
160 """Test successful exit code"""
161 result = RunCmd("/bin/sh -c 'exit 0'")
162 self.assertEqual(result.exit_code, 0)
165 """Test fail exit code"""
166 result = RunCmd("/bin/sh -c 'exit 1'")
167 self.assertEqual(result.exit_code, 1)
170 def testStdout(self):
171 """Test standard output"""
172 cmd = 'echo -n "%s"' % self.magic
173 result = RunCmd("/bin/sh -c '%s'" % cmd)
174 self.assertEqual(result.stdout, self.magic)
177 def testStderr(self):
178 """Test standard error"""
179 cmd = 'echo -n "%s"' % self.magic
180 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
181 self.assertEqual(result.stderr, self.magic)
184 def testCombined(self):
185 """Test combined output"""
186 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
187 result = RunCmd("/bin/sh -c '%s'" % cmd)
188 self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
190 def testSignal(self):
192 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
193 self.assertEqual(result.signal, 15)
195 def testListRun(self):
197 result = RunCmd(["true"])
198 self.assertEqual(result.signal, None)
199 self.assertEqual(result.exit_code, 0)
200 result = RunCmd(["/bin/sh", "-c", "exit 1"])
201 self.assertEqual(result.signal, None)
202 self.assertEqual(result.exit_code, 1)
203 result = RunCmd(["echo", "-n", self.magic])
204 self.assertEqual(result.signal, None)
205 self.assertEqual(result.exit_code, 0)
206 self.assertEqual(result.stdout, self.magic)
209 """Test locale environment"""
210 old_env = os.environ.copy()
212 os.environ["LANG"] = "en_US.UTF-8"
213 os.environ["LC_ALL"] = "en_US.UTF-8"
214 result = RunCmd(["locale"])
215 for line in result.output.splitlines():
216 key, value = line.split("=", 1)
217 # Ignore these variables, they're overridden by LC_ALL
218 if key == "LANG" or key == "LANGUAGE":
220 self.failIf(value and value != "C" and value != '"C"',
221 "Variable %s is set to the invalid value '%s'" % (key, value))
226 class TestRemoveFile(unittest.TestCase):
227 """Test case for the RemoveFile function"""
230 """Create a temp dir and file for each case"""
231 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
232 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
236 if os.path.exists(self.tmpfile):
237 os.unlink(self.tmpfile)
238 os.rmdir(self.tmpdir)
241 def testIgnoreDirs(self):
242 """Test that RemoveFile() ignores directories"""
243 self.assertEqual(None, RemoveFile(self.tmpdir))
246 def testIgnoreNotExisting(self):
247 """Test that RemoveFile() ignores non-existing files"""
248 RemoveFile(self.tmpfile)
249 RemoveFile(self.tmpfile)
252 def testRemoveFile(self):
253 """Test that RemoveFile does remove a file"""
254 RemoveFile(self.tmpfile)
255 if os.path.exists(self.tmpfile):
256 self.fail("File '%s' not removed" % self.tmpfile)
259 def testRemoveSymlink(self):
260 """Test that RemoveFile does remove symlinks"""
261 symlink = self.tmpdir + "/symlink"
262 os.symlink("no-such-file", symlink)
264 if os.path.exists(symlink):
265 self.fail("File '%s' not removed" % symlink)
266 os.symlink(self.tmpfile, symlink)
268 if os.path.exists(symlink):
269 self.fail("File '%s' not removed" % symlink)
272 class TestCheckdict(unittest.TestCase):
273 """Test case for the CheckDict function"""
276 """Test that CheckDict adds a missing key with the correct value"""
281 if 'b' not in tgt or tgt['b'] != 2:
282 self.fail("Failed to update dict")
285 def testNoUpdate(self):
286 """Test that CheckDict does not overwrite an existing key"""
287 tgt = {'a':1, 'b': 3}
290 self.failUnlessEqual(tgt['b'], 3)
293 class TestMatchNameComponent(unittest.TestCase):
294 """Test case for the MatchNameComponent function"""
296 def testEmptyList(self):
297 """Test that there is no match against an empty list"""
299 self.failUnlessEqual(MatchNameComponent("", []), None)
300 self.failUnlessEqual(MatchNameComponent("test", []), None)
302 def testSingleMatch(self):
303 """Test that a single match is performed correctly"""
304 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
305 for key in "test2", "test2.example", "test2.example.com":
306 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
308 def testMultipleMatches(self):
309 """Test that a multiple match is returned as None"""
310 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
311 for key in "test1", "test1.example":
312 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
315 class TestFormatUnit(unittest.TestCase):
316 """Test case for the FormatUnit function"""
319 self.assertEqual(FormatUnit(1), '1M')
320 self.assertEqual(FormatUnit(100), '100M')
321 self.assertEqual(FormatUnit(1023), '1023M')
324 self.assertEqual(FormatUnit(1024), '1.0G')
325 self.assertEqual(FormatUnit(1536), '1.5G')
326 self.assertEqual(FormatUnit(17133), '16.7G')
327 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
330 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
331 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
332 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
335 class TestParseUnit(unittest.TestCase):
336 """Test case for the ParseUnit function"""
339 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
340 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
341 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
343 def testRounding(self):
344 self.assertEqual(ParseUnit('0'), 0)
345 self.assertEqual(ParseUnit('1'), 4)
346 self.assertEqual(ParseUnit('2'), 4)
347 self.assertEqual(ParseUnit('3'), 4)
349 self.assertEqual(ParseUnit('124'), 124)
350 self.assertEqual(ParseUnit('125'), 128)
351 self.assertEqual(ParseUnit('126'), 128)
352 self.assertEqual(ParseUnit('127'), 128)
353 self.assertEqual(ParseUnit('128'), 128)
354 self.assertEqual(ParseUnit('129'), 132)
355 self.assertEqual(ParseUnit('130'), 132)
357 def testFloating(self):
358 self.assertEqual(ParseUnit('0'), 0)
359 self.assertEqual(ParseUnit('0.5'), 4)
360 self.assertEqual(ParseUnit('1.75'), 4)
361 self.assertEqual(ParseUnit('1.99'), 4)
362 self.assertEqual(ParseUnit('2.00'), 4)
363 self.assertEqual(ParseUnit('2.01'), 4)
364 self.assertEqual(ParseUnit('3.99'), 4)
365 self.assertEqual(ParseUnit('4.00'), 4)
366 self.assertEqual(ParseUnit('4.01'), 8)
367 self.assertEqual(ParseUnit('1.5G'), 1536)
368 self.assertEqual(ParseUnit('1.8G'), 1844)
369 self.assertEqual(ParseUnit('8.28T'), 8682212)
371 def testSuffixes(self):
372 for sep in ('', ' ', ' ', "\t", "\t "):
373 for suffix, scale in TestParseUnit.SCALES:
374 for func in (lambda x: x, str.lower, str.upper):
375 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
378 def testInvalidInput(self):
379 for sep in ('-', '_', ',', 'a'):
380 for suffix, _ in TestParseUnit.SCALES:
381 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
383 for suffix, _ in TestParseUnit.SCALES:
384 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
387 class TestSshKeys(GanetiTestCase):
388 """Test case for the AddAuthorizedKey function"""
390 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
391 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
392 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
395 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
397 handle = os.fdopen(fd, 'w')
399 handle.write("%s\n" % TestSshKeys.KEY_A)
400 handle.write("%s\n" % TestSshKeys.KEY_B)
404 utils.RemoveFile(self.tmpname)
408 utils.RemoveFile(self.tmpname)
411 def testAddingNewKey(self):
412 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
414 self.assertFileContent(self.tmpname,
415 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
416 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
417 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
418 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
420 def testAddingAlmostButNotCompletelyTheSameKey(self):
421 AddAuthorizedKey(self.tmpname,
422 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
424 self.assertFileContent(self.tmpname,
425 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
426 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
427 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
428 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
430 def testAddingExistingKeyWithSomeMoreSpaces(self):
431 AddAuthorizedKey(self.tmpname,
432 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
434 self.assertFileContent(self.tmpname,
435 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
436 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
437 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
439 def testRemovingExistingKeyWithSomeMoreSpaces(self):
440 RemoveAuthorizedKey(self.tmpname,
441 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
443 self.assertFileContent(self.tmpname,
444 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
445 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
447 def testRemovingNonExistingKey(self):
448 RemoveAuthorizedKey(self.tmpname,
449 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
451 self.assertFileContent(self.tmpname,
452 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
453 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
454 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
457 class TestEtcHosts(GanetiTestCase):
458 """Test functions modifying /etc/hosts"""
461 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
463 handle = os.fdopen(fd, 'w')
465 handle.write('# This is a test file for /etc/hosts\n')
466 handle.write('127.0.0.1\tlocalhost\n')
467 handle.write('192.168.1.1 router gw\n')
471 utils.RemoveFile(self.tmpname)
475 utils.RemoveFile(self.tmpname)
478 def testSettingNewIp(self):
479 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
481 self.assertFileContent(self.tmpname,
482 "# This is a test file for /etc/hosts\n"
483 "127.0.0.1\tlocalhost\n"
484 "192.168.1.1 router gw\n"
485 "1.2.3.4\tmyhost.domain.tld myhost\n")
487 def testSettingExistingIp(self):
488 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
491 self.assertFileContent(self.tmpname,
492 "# This is a test file for /etc/hosts\n"
493 "127.0.0.1\tlocalhost\n"
494 "192.168.1.1\tmyhost.domain.tld myhost\n")
496 def testSettingDuplicateName(self):
497 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
499 self.assertFileContent(self.tmpname,
500 "# This is a test file for /etc/hosts\n"
501 "127.0.0.1\tlocalhost\n"
502 "192.168.1.1 router gw\n"
505 def testRemovingExistingHost(self):
506 RemoveEtcHostsEntry(self.tmpname, 'router')
508 self.assertFileContent(self.tmpname,
509 "# This is a test file for /etc/hosts\n"
510 "127.0.0.1\tlocalhost\n"
513 def testRemovingSingleExistingHost(self):
514 RemoveEtcHostsEntry(self.tmpname, 'localhost')
516 self.assertFileContent(self.tmpname,
517 "# This is a test file for /etc/hosts\n"
518 "192.168.1.1 router gw\n")
520 def testRemovingNonExistingHost(self):
521 RemoveEtcHostsEntry(self.tmpname, 'myhost')
523 self.assertFileContent(self.tmpname,
524 "# This is a test file for /etc/hosts\n"
525 "127.0.0.1\tlocalhost\n"
526 "192.168.1.1 router gw\n")
528 def testRemovingAlias(self):
529 RemoveEtcHostsEntry(self.tmpname, 'gw')
531 self.assertFileContent(self.tmpname,
532 "# This is a test file for /etc/hosts\n"
533 "127.0.0.1\tlocalhost\n"
534 "192.168.1.1 router\n")
537 class TestShellQuoting(unittest.TestCase):
538 """Test case for shell quoting functions"""
540 def testShellQuote(self):
541 self.assertEqual(ShellQuote('abc'), "abc")
542 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
543 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
544 self.assertEqual(ShellQuote("a b c"), "'a b c'")
545 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
547 def testShellQuoteArgs(self):
548 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
549 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
550 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
553 class TestTcpPing(unittest.TestCase):
554 """Testcase for TCP version of ping - against listen(2)ing port"""
557 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
558 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
559 self.listenerport = self.listener.getsockname()[1]
560 self.listener.listen(1)
563 self.listener.shutdown(socket.SHUT_RDWR)
565 del self.listenerport
567 def testTcpPingToLocalHostAccept(self):
568 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
571 live_port_needed=True,
572 source=constants.LOCALHOST_IP_ADDRESS,
574 "failed to connect to test listener")
576 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
579 live_port_needed=True,
581 "failed to connect to test listener (no source)")
584 class TestTcpPingDeaf(unittest.TestCase):
585 """Testcase for TCP version of ping - against non listen(2)ing port"""
588 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
589 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
590 self.deaflistenerport = self.deaflistener.getsockname()[1]
593 del self.deaflistener
594 del self.deaflistenerport
596 def testTcpPingToLocalHostAcceptDeaf(self):
597 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
598 self.deaflistenerport,
599 timeout=constants.TCP_PING_TIMEOUT,
600 live_port_needed=True,
601 source=constants.LOCALHOST_IP_ADDRESS,
602 ), # need successful connect(2)
603 "successfully connected to deaf listener")
605 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606 self.deaflistenerport,
607 timeout=constants.TCP_PING_TIMEOUT,
608 live_port_needed=True,
609 ), # need successful connect(2)
610 "successfully connected to deaf listener (no source addr)")
612 def testTcpPingToLocalHostNoAccept(self):
613 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
614 self.deaflistenerport,
615 timeout=constants.TCP_PING_TIMEOUT,
616 live_port_needed=False,
617 source=constants.LOCALHOST_IP_ADDRESS,
618 ), # ECONNREFUSED is OK
619 "failed to ping alive host on deaf port")
621 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622 self.deaflistenerport,
623 timeout=constants.TCP_PING_TIMEOUT,
624 live_port_needed=False,
625 ), # ECONNREFUSED is OK
626 "failed to ping alive host on deaf port (no source addr)")
629 class TestListVisibleFiles(unittest.TestCase):
630 """Test case for ListVisibleFiles"""
633 self.path = tempfile.mkdtemp()
636 shutil.rmtree(self.path)
638 def _test(self, files, expected):
640 expected = expected[:]
644 f = open(os.path.join(self.path, name), 'w')
650 found = ListVisibleFiles(self.path)
653 self.assertEqual(found, expected)
655 def testAllVisible(self):
656 files = ["a", "b", "c"]
658 self._test(files, expected)
660 def testNoneVisible(self):
661 files = [".a", ".b", ".c"]
663 self._test(files, expected)
665 def testSomeVisible(self):
666 files = ["a", "b", ".c"]
667 expected = ["a", "b"]
668 self._test(files, expected)
671 class TestNewUUID(unittest.TestCase):
672 """Test case for NewUUID"""
674 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
675 '[a-f0-9]{4}-[a-f0-9]{12}$')
678 self.failUnless(self._re_uuid.match(utils.NewUUID()))
681 class TestUniqueSequence(unittest.TestCase):
682 """Test case for UniqueSequence"""
684 def _test(self, input, expected):
685 self.assertEqual(utils.UniqueSequence(input), expected)
689 self._test([1, 2, 3], [1, 2, 3])
690 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
691 self._test([1, 2, 2, 3], [1, 2, 3])
692 self._test([1, 2, 3, 3], [1, 2, 3])
695 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
696 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
699 self._test(["a", "a"], ["a"])
700 self._test(["a", "b"], ["a", "b"])
701 self._test(["a", "b", "a"], ["a", "b"])
704 if __name__ == '__main__':