4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
34 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
35 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
36 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
37 ShellQuote, ShellQuoteArgs, _ParseIpOutput, TcpPing
38 from ganeti.errors import LockError, UnitParseError
41 class TestIsProcessAlive(unittest.TestCase):
42 """Testing case for IsProcessAlive"""
44 # create a zombie and a (hopefully) non-existing process id
45 self.pid_zombie = os.fork()
46 if self.pid_zombie == 0:
48 elif self.pid_zombie < 0:
49 raise SystemError("can't fork")
50 self.pid_non_existing = os.fork()
51 if self.pid_non_existing == 0:
53 elif self.pid_non_existing > 0:
54 os.waitpid(self.pid_non_existing, 0)
56 raise SystemError("can't fork")
61 self.assert_(IsProcessAlive(mypid),
62 "can't find myself running")
65 self.assert_(not IsProcessAlive(self.pid_zombie),
66 "zombie not detected as zombie")
69 def testNotExisting(self):
70 self.assert_(not IsProcessAlive(self.pid_non_existing),
71 "noexisting process detected")
74 class TestLocking(unittest.TestCase):
75 """Testing case for the Lock/Unlock functions"""
76 def clean_lock(self, name):
78 ganeti.utils.Unlock("unittest")
84 self.clean_lock("unittest")
85 self.assertEqual(None, Lock("unittest"))
89 self.clean_lock("unittest")
90 ganeti.utils.Lock("unittest")
91 self.assertEqual(None, Unlock("unittest"))
94 def testDoubleLock(self):
95 self.clean_lock("unittest")
96 ganeti.utils.Lock("unittest")
97 self.assertRaises(LockError, Lock, "unittest")
100 class TestRunCmd(unittest.TestCase):
101 """Testing case for the RunCmd function"""
104 self.magic = time.ctime() + " ganeti test"
107 """Test successful exit code"""
108 result = RunCmd("/bin/sh -c 'exit 0'")
109 self.assertEqual(result.exit_code, 0)
112 """Test fail exit code"""
113 result = RunCmd("/bin/sh -c 'exit 1'")
114 self.assertEqual(result.exit_code, 1)
117 def testStdout(self):
118 """Test standard output"""
119 cmd = 'echo -n "%s"' % self.magic
120 result = RunCmd("/bin/sh -c '%s'" % cmd)
121 self.assertEqual(result.stdout, self.magic)
124 def testStderr(self):
125 """Test standard error"""
126 cmd = 'echo -n "%s"' % self.magic
127 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
128 self.assertEqual(result.stderr, self.magic)
131 def testCombined(self):
132 """Test combined output"""
133 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
134 result = RunCmd("/bin/sh -c '%s'" % cmd)
135 self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
137 def testSignal(self):
138 """Test standard error"""
139 result = RunCmd("/bin/sh -c 'kill -15 $$'")
140 self.assertEqual(result.signal, 15)
142 def testListRun(self):
144 result = RunCmd(["true"])
145 self.assertEqual(result.signal, None)
146 self.assertEqual(result.exit_code, 0)
147 result = RunCmd(["/bin/sh", "-c", "exit 1"])
148 self.assertEqual(result.signal, None)
149 self.assertEqual(result.exit_code, 1)
150 result = RunCmd(["echo", "-n", self.magic])
151 self.assertEqual(result.signal, None)
152 self.assertEqual(result.exit_code, 0)
153 self.assertEqual(result.stdout, self.magic)
156 """Test locale environment"""
157 old_env = os.environ.copy()
159 os.environ["LANG"] = "en_US.UTF-8"
160 os.environ["LC_ALL"] = "en_US.UTF-8"
161 result = RunCmd(["locale"])
162 for line in result.output.splitlines():
163 key, value = line.split("=", 1)
164 # Ignore these variables, they're overridden by LC_ALL
165 if key == "LANG" or key == "LANGUAGE":
167 self.failIf(value and value != "C" and value != '"C"',
168 "Variable %s is set to the invalid value '%s'" % (key, value))
173 class TestRemoveFile(unittest.TestCase):
174 """Test case for the RemoveFile function"""
177 """Create a temp dir and file for each case"""
178 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
179 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
183 if os.path.exists(self.tmpfile):
184 os.unlink(self.tmpfile)
185 os.rmdir(self.tmpdir)
188 def testIgnoreDirs(self):
189 """Test that RemoveFile() ignores directories"""
190 self.assertEqual(None, RemoveFile(self.tmpdir))
193 def testIgnoreNotExisting(self):
194 """Test that RemoveFile() ignores non-existing files"""
195 RemoveFile(self.tmpfile)
196 RemoveFile(self.tmpfile)
199 def testRemoveFile(self):
200 """Test that RemoveFile does remove a file"""
201 RemoveFile(self.tmpfile)
202 if os.path.exists(self.tmpfile):
203 self.fail("File '%s' not removed" % self.tmpfile)
206 def testRemoveSymlink(self):
207 """Test that RemoveFile does remove symlinks"""
208 symlink = self.tmpdir + "/symlink"
209 os.symlink("no-such-file", symlink)
211 if os.path.exists(symlink):
212 self.fail("File '%s' not removed" % symlink)
213 os.symlink(self.tmpfile, symlink)
215 if os.path.exists(symlink):
216 self.fail("File '%s' not removed" % symlink)
219 class TestCheckdict(unittest.TestCase):
220 """Test case for the CheckDict function"""
223 """Test that CheckDict adds a missing key with the correct value"""
228 if 'b' not in tgt or tgt['b'] != 2:
229 self.fail("Failed to update dict")
232 def testNoUpdate(self):
233 """Test that CheckDict does not overwrite an existing key"""
234 tgt = {'a':1, 'b': 3}
237 self.failUnlessEqual(tgt['b'], 3)
240 class TestMatchNameComponent(unittest.TestCase):
241 """Test case for the MatchNameComponent function"""
243 def testEmptyList(self):
244 """Test that there is no match against an empty list"""
246 self.failUnlessEqual(MatchNameComponent("", []), None)
247 self.failUnlessEqual(MatchNameComponent("test", []), None)
249 def testSingleMatch(self):
250 """Test that a single match is performed correctly"""
251 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
252 for key in "test2", "test2.example", "test2.example.com":
253 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
255 def testMultipleMatches(self):
256 """Test that a multiple match is returned as None"""
257 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
258 for key in "test1", "test1.example":
259 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
262 class TestFormatUnit(unittest.TestCase):
263 """Test case for the FormatUnit function"""
266 self.assertEqual(FormatUnit(1), '1M')
267 self.assertEqual(FormatUnit(100), '100M')
268 self.assertEqual(FormatUnit(1023), '1023M')
271 self.assertEqual(FormatUnit(1024), '1.0G')
272 self.assertEqual(FormatUnit(1536), '1.5G')
273 self.assertEqual(FormatUnit(17133), '16.7G')
274 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
277 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
278 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
279 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
282 class TestParseUnit(unittest.TestCase):
283 """Test case for the ParseUnit function"""
286 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
287 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
288 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
290 def testRounding(self):
291 self.assertEqual(ParseUnit('0'), 0)
292 self.assertEqual(ParseUnit('1'), 4)
293 self.assertEqual(ParseUnit('2'), 4)
294 self.assertEqual(ParseUnit('3'), 4)
296 self.assertEqual(ParseUnit('124'), 124)
297 self.assertEqual(ParseUnit('125'), 128)
298 self.assertEqual(ParseUnit('126'), 128)
299 self.assertEqual(ParseUnit('127'), 128)
300 self.assertEqual(ParseUnit('128'), 128)
301 self.assertEqual(ParseUnit('129'), 132)
302 self.assertEqual(ParseUnit('130'), 132)
304 def testFloating(self):
305 self.assertEqual(ParseUnit('0'), 0)
306 self.assertEqual(ParseUnit('0.5'), 4)
307 self.assertEqual(ParseUnit('1.75'), 4)
308 self.assertEqual(ParseUnit('1.99'), 4)
309 self.assertEqual(ParseUnit('2.00'), 4)
310 self.assertEqual(ParseUnit('2.01'), 4)
311 self.assertEqual(ParseUnit('3.99'), 4)
312 self.assertEqual(ParseUnit('4.00'), 4)
313 self.assertEqual(ParseUnit('4.01'), 8)
314 self.assertEqual(ParseUnit('1.5G'), 1536)
315 self.assertEqual(ParseUnit('1.8G'), 1844)
316 self.assertEqual(ParseUnit('8.28T'), 8682212)
318 def testSuffixes(self):
319 for sep in ('', ' ', ' ', "\t", "\t "):
320 for suffix, scale in TestParseUnit.SCALES:
321 for func in (lambda x: x, str.lower, str.upper):
322 self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
324 def testInvalidInput(self):
325 for sep in ('-', '_', ',', 'a'):
326 for suffix, _ in TestParseUnit.SCALES:
327 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
329 for suffix, _ in TestParseUnit.SCALES:
330 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
333 class TestSshKeys(unittest.TestCase):
334 """Test case for the AddAuthorizedKey function"""
336 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
337 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
338 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
340 # NOTE: The MD5 sums below were calculated after manually
341 # checking the output files.
343 def writeTestFile(self):
344 (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
345 f = os.fdopen(fd, 'w')
347 f.write(TestSshKeys.KEY_A)
349 f.write(TestSshKeys.KEY_B)
356 def testAddingNewKey(self):
357 tmpname = self.writeTestFile()
359 AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
361 f = open(tmpname, 'r')
363 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
364 'ccc71523108ca6e9d0343797dc3e9f16')
370 def testAddingAlmostButNotCompletlyTheSameKey(self):
371 tmpname = self.writeTestFile()
373 AddAuthorizedKey(tmpname,
374 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
376 f = open(tmpname, 'r')
378 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
379 'f2c939d57addb5b3a6846884be896b46')
385 def testAddingExistingKeyWithSomeMoreSpaces(self):
386 tmpname = self.writeTestFile()
388 AddAuthorizedKey(tmpname,
389 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
391 f = open(tmpname, 'r')
393 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
394 '4e612764808bd46337eb0f575415fc30')
400 def testRemovingExistingKeyWithSomeMoreSpaces(self):
401 tmpname = self.writeTestFile()
403 RemoveAuthorizedKey(tmpname,
404 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
406 f = open(tmpname, 'r')
408 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
409 '77516d987fca07f70e30b830b3e4f2ed')
415 def testRemovingNonExistingKey(self):
416 tmpname = self.writeTestFile()
418 RemoveAuthorizedKey(tmpname,
419 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
421 f = open(tmpname, 'r')
423 self.assertEqual(md5.new(f.read(8192)).hexdigest(),
424 '4e612764808bd46337eb0f575415fc30')
431 class TestShellQuoting(unittest.TestCase):
432 """Test case for shell quoting functions"""
434 def testShellQuote(self):
435 self.assertEqual(ShellQuote('abc'), "abc")
436 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
437 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
438 self.assertEqual(ShellQuote("a b c"), "'a b c'")
439 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
441 def testShellQuoteArgs(self):
442 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
443 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
444 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
447 class TestIpAdressList(unittest.TestCase):
448 """Test case for local IP addresses"""
450 def _test(self, output, required):
451 ips = _ParseIpOutput(output)
453 # Sort the output, so our check below works in all cases
457 self.assertEqual(required, ips)
459 def testSingleIpAddress(self):
461 ("3: lo inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
462 "5: eth0 inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
463 self._test(output, ['127.0.0.1', '10.0.0.1'])
465 def testMultipleIpAddresses(self):
467 ("3: lo inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
468 "5: eth0 inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
469 "5: eth0 inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
470 self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
473 class TestTcpPing(unittest.TestCase):
474 """Testcase for TCP version of ping - against listen(2)ing port"""
477 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478 self.listener.bind(("127.0.0.1", 0))
479 self.listenerport = self.listener.getsockname()[1]
480 self.listener.listen(1)
483 self.listener.shutdown(socket.SHUT_RDWR)
485 del self.listenerport
487 def testTcpPingToLocalHostAccept(self):
488 self.assert_(TcpPing("127.0.0.1",
492 live_port_needed=True),
493 "failed to connect to test listener")
496 class TestTcpPingDeaf(unittest.TestCase):
497 """Testcase for TCP version of ping - against non listen(2)ing port"""
500 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501 self.deaflistener.bind(("127.0.0.1", 0))
502 self.deaflistenerport = self.deaflistener.getsockname()[1]
505 del self.deaflistener
506 del self.deaflistenerport
508 def testTcpPingToLocalHostAcceptDeaf(self):
509 self.failIf(TcpPing("127.0.0.1",
511 self.deaflistenerport,
512 timeout=10, # timeout for blocking operations
513 live_port_needed=True), # need successful connect(2)
514 "successfully connected to deaf listener")
516 def testTcpPingToLocalHostNoAccept(self):
517 self.assert_(TcpPing("127.0.0.1",
519 self.deaflistenerport,
520 timeout=10, # timeout for blocking operations
521 live_port_needed=False), # ECONNREFUSED is OK
522 "failed to ping alive host on deaf port")
525 if __name__ == '__main__':