4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
24 import distutils.version
43 from cStringIO import StringIO
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
51 ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
52 TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
53 ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
56 class TestIsProcessAlive(unittest.TestCase):
57 """Testing case for IsProcessAlive"""
61 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
63 def testNotExisting(self):
64 pid_non_existing = os.fork()
65 if pid_non_existing == 0:
67 elif pid_non_existing < 0:
68 raise SystemError("can't fork")
69 os.waitpid(pid_non_existing, 0)
70 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
71 "nonexisting process detected")
74 class TestGetProcStatusPath(unittest.TestCase):
76 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
77 self.assertNotEqual(utils._GetProcStatusPath(1),
78 utils._GetProcStatusPath(2))
81 class TestIsProcessHandlingSignal(unittest.TestCase):
83 self.tmpdir = tempfile.mkdtemp()
86 shutil.rmtree(self.tmpdir)
88 def testParseSigsetT(self):
89 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
90 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
91 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
92 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
93 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
95 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
97 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
99 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
100 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
101 24, 25, 26, 28, 31]))
102 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
104 def testGetProcStatusField(self):
105 for field in ["SigCgt", "Name", "FDSize"]:
106 for value in ["", "0", "cat", " 1234 KB"]:
107 pstatus = "\n".join([
109 "%s: %s" % (field, value),
112 result = utils._GetProcStatusField(pstatus, field)
113 self.assertEqual(result, value.strip())
116 sp = PathJoin(self.tmpdir, "status")
118 utils.WriteFile(sp, data="\n".join([
120 "State: S (sleeping)",
125 "SigBlk: 0000000000010000",
126 "SigIgn: 0000000000384004",
127 "SigCgt: 000000004b813efb",
128 "CapEff: 0000000000000000",
131 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
133 def testNoSigCgt(self):
134 sp = PathJoin(self.tmpdir, "status")
136 utils.WriteFile(sp, data="\n".join([
140 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
141 1234, 10, status_path=sp)
143 def testNoSuchFile(self):
144 sp = PathJoin(self.tmpdir, "notexist")
146 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
149 def _TestRealProcess():
150 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
151 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
152 raise Exception("SIGUSR1 is handled when it should not be")
154 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
155 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
156 raise Exception("SIGUSR1 is not handled when it should be")
158 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is not handled when it should be")
162 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
163 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is handled when it should not be")
168 def testRealProcess(self):
169 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
172 class TestPidFileFunctions(unittest.TestCase):
173 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
176 self.dir = tempfile.mkdtemp()
177 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
178 utils.DaemonPidFileName = self.f_dpn
180 def testPidFileFunctions(self):
181 pid_file = self.f_dpn('test')
182 fd = utils.WritePidFile(self.f_dpn('test'))
183 self.failUnless(os.path.exists(pid_file),
184 "PID file should have been created")
185 read_pid = utils.ReadPidFile(pid_file)
186 self.failUnlessEqual(read_pid, os.getpid())
187 self.failUnless(utils.IsProcessAlive(read_pid))
188 self.failUnlessRaises(errors.LockError, utils.WritePidFile,
191 utils.RemovePidFile('test')
192 self.failIf(os.path.exists(pid_file),
193 "PID file should not exist anymore")
194 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195 "ReadPidFile should return 0 for missing pid file")
196 fh = open(pid_file, "w")
199 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
200 "ReadPidFile should return 0 for invalid pid file")
201 # but now, even with the file existing, we should be able to lock it
202 fd = utils.WritePidFile(self.f_dpn('test'))
204 utils.RemovePidFile('test')
205 self.failIf(os.path.exists(pid_file),
206 "PID file should not exist anymore")
209 pid_file = self.f_dpn('child')
210 r_fd, w_fd = os.pipe()
212 if new_pid == 0: #child
213 utils.WritePidFile(self.f_dpn('child'))
218 # else we are in the parent
219 # wait until the child has written the pid file
221 read_pid = utils.ReadPidFile(pid_file)
222 self.failUnlessEqual(read_pid, new_pid)
223 self.failUnless(utils.IsProcessAlive(new_pid))
224 utils.KillProcess(new_pid, waitpid=True)
225 self.failIf(utils.IsProcessAlive(new_pid))
226 utils.RemovePidFile('child')
227 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
230 for name in os.listdir(self.dir):
231 os.unlink(os.path.join(self.dir, name))
235 class TestRunCmd(testutils.GanetiTestCase):
236 """Testing case for the RunCmd function"""
239 testutils.GanetiTestCase.setUp(self)
240 self.magic = time.ctime() + " ganeti test"
241 self.fname = self._CreateTempFile()
242 self.fifo_tmpdir = tempfile.mkdtemp()
243 self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
244 os.mkfifo(self.fifo_file)
247 shutil.rmtree(self.fifo_tmpdir)
248 testutils.GanetiTestCase.tearDown(self)
251 """Test successful exit code"""
252 result = RunCmd("/bin/sh -c 'exit 0'")
253 self.assertEqual(result.exit_code, 0)
254 self.assertEqual(result.output, "")
257 """Test fail exit code"""
258 result = RunCmd("/bin/sh -c 'exit 1'")
259 self.assertEqual(result.exit_code, 1)
260 self.assertEqual(result.output, "")
262 def testStdout(self):
263 """Test standard output"""
264 cmd = 'echo -n "%s"' % self.magic
265 result = RunCmd("/bin/sh -c '%s'" % cmd)
266 self.assertEqual(result.stdout, self.magic)
267 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
268 self.assertEqual(result.output, "")
269 self.assertFileContent(self.fname, self.magic)
271 def testStderr(self):
272 """Test standard error"""
273 cmd = 'echo -n "%s"' % self.magic
274 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
275 self.assertEqual(result.stderr, self.magic)
276 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
277 self.assertEqual(result.output, "")
278 self.assertFileContent(self.fname, self.magic)
280 def testCombined(self):
281 """Test combined output"""
282 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
283 expected = "A" + self.magic + "B" + self.magic
284 result = RunCmd("/bin/sh -c '%s'" % cmd)
285 self.assertEqual(result.output, expected)
286 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
287 self.assertEqual(result.output, "")
288 self.assertFileContent(self.fname, expected)
290 def testSignal(self):
292 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
293 self.assertEqual(result.signal, 15)
294 self.assertEqual(result.output, "")
296 def testTimeoutClean(self):
297 cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
298 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
299 self.assertEqual(result.exit_code, 0)
301 def testTimeoutKill(self):
302 cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
304 out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
305 timeout, _linger_timeout=0.2)
306 self.assert_(status < 0)
307 self.assertEqual(-status, signal.SIGKILL)
309 def testTimeoutOutputAfterTerm(self):
310 cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
311 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
312 self.assert_(result.failed)
313 self.assertEqual(result.stdout, "sigtermed\n")
315 def testListRun(self):
317 result = RunCmd(["true"])
318 self.assertEqual(result.signal, None)
319 self.assertEqual(result.exit_code, 0)
320 result = RunCmd(["/bin/sh", "-c", "exit 1"])
321 self.assertEqual(result.signal, None)
322 self.assertEqual(result.exit_code, 1)
323 result = RunCmd(["echo", "-n", self.magic])
324 self.assertEqual(result.signal, None)
325 self.assertEqual(result.exit_code, 0)
326 self.assertEqual(result.stdout, self.magic)
328 def testFileEmptyOutput(self):
329 """Test file output"""
330 result = RunCmd(["true"], output=self.fname)
331 self.assertEqual(result.signal, None)
332 self.assertEqual(result.exit_code, 0)
333 self.assertFileContent(self.fname, "")
336 """Test locale environment"""
337 old_env = os.environ.copy()
339 os.environ["LANG"] = "en_US.UTF-8"
340 os.environ["LC_ALL"] = "en_US.UTF-8"
341 result = RunCmd(["locale"])
342 for line in result.output.splitlines():
343 key, value = line.split("=", 1)
344 # Ignore these variables, they're overridden by LC_ALL
345 if key == "LANG" or key == "LANGUAGE":
347 self.failIf(value and value != "C" and value != '"C"',
348 "Variable %s is set to the invalid value '%s'" % (key, value))
352 def testDefaultCwd(self):
353 """Test default working directory"""
354 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
357 """Test default working directory"""
358 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
359 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
361 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
363 def testResetEnv(self):
364 """Test environment reset functionality"""
365 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
366 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
367 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
369 def testNoFork(self):
370 """Test that nofork raise an error"""
371 assert not utils.no_fork
374 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
376 utils.no_fork = False
378 def testWrongParams(self):
379 """Test wrong parameters"""
380 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
381 output="/dev/null", interactive=True)
384 class TestRunParts(testutils.GanetiTestCase):
385 """Testing case for the RunParts function"""
388 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
391 shutil.rmtree(self.rundir)
394 """Test on an empty dir"""
395 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
397 def testSkipWrongName(self):
398 """Test that wrong files are skipped"""
399 fname = os.path.join(self.rundir, "00test.dot")
400 utils.WriteFile(fname, data="")
401 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
402 relname = os.path.basename(fname)
403 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
404 [(relname, constants.RUNPARTS_SKIP, None)])
406 def testSkipNonExec(self):
407 """Test that non executable files are skipped"""
408 fname = os.path.join(self.rundir, "00test")
409 utils.WriteFile(fname, data="")
410 relname = os.path.basename(fname)
411 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
412 [(relname, constants.RUNPARTS_SKIP, None)])
415 """Test error on a broken executable"""
416 fname = os.path.join(self.rundir, "00test")
417 utils.WriteFile(fname, data="")
418 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
419 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
420 self.failUnlessEqual(relname, os.path.basename(fname))
421 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
422 self.failUnless(error)
424 def testSorted(self):
425 """Test executions are sorted"""
427 files.append(os.path.join(self.rundir, "64test"))
428 files.append(os.path.join(self.rundir, "00test"))
429 files.append(os.path.join(self.rundir, "42test"))
432 utils.WriteFile(fname, data="")
434 results = RunParts(self.rundir, reset_env=True)
436 for fname in sorted(files):
437 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
440 """Test correct execution"""
441 fname = os.path.join(self.rundir, "00test")
442 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
443 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
444 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
445 self.failUnlessEqual(relname, os.path.basename(fname))
446 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
447 self.failUnlessEqual(runresult.stdout, "ciao")
449 def testRunFail(self):
450 """Test correct execution, with run failure"""
451 fname = os.path.join(self.rundir, "00test")
452 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
453 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
454 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
455 self.failUnlessEqual(relname, os.path.basename(fname))
456 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457 self.failUnlessEqual(runresult.exit_code, 1)
458 self.failUnless(runresult.failed)
460 def testRunMix(self):
462 files.append(os.path.join(self.rundir, "00test"))
463 files.append(os.path.join(self.rundir, "42test"))
464 files.append(os.path.join(self.rundir, "64test"))
465 files.append(os.path.join(self.rundir, "99test"))
469 # 1st has errors in execution
470 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
471 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
474 utils.WriteFile(files[1], data="")
476 # 3rd cannot execute properly
477 utils.WriteFile(files[2], data="")
478 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
481 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
482 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
484 results = RunParts(self.rundir, reset_env=True)
486 (relname, status, runresult) = results[0]
487 self.failUnlessEqual(relname, os.path.basename(files[0]))
488 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
489 self.failUnlessEqual(runresult.exit_code, 1)
490 self.failUnless(runresult.failed)
492 (relname, status, runresult) = results[1]
493 self.failUnlessEqual(relname, os.path.basename(files[1]))
494 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
495 self.failUnlessEqual(runresult, None)
497 (relname, status, runresult) = results[2]
498 self.failUnlessEqual(relname, os.path.basename(files[2]))
499 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
500 self.failUnless(runresult)
502 (relname, status, runresult) = results[3]
503 self.failUnlessEqual(relname, os.path.basename(files[3]))
504 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
505 self.failUnlessEqual(runresult.output, "ciao")
506 self.failUnlessEqual(runresult.exit_code, 0)
507 self.failUnless(not runresult.failed)
509 def testMissingDirectory(self):
510 nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
511 self.assertEqual(RunParts(nosuchdir), [])
514 class TestStartDaemon(testutils.GanetiTestCase):
516 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
517 self.tmpfile = os.path.join(self.tmpdir, "test")
520 shutil.rmtree(self.tmpdir)
523 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
524 self._wait(self.tmpfile, 60.0, "Hello World")
526 def testShellOutput(self):
527 utils.StartDaemon("echo Hello World", output=self.tmpfile)
528 self._wait(self.tmpfile, 60.0, "Hello World")
530 def testNoShellNoOutput(self):
531 utils.StartDaemon(["pwd"])
533 def testNoShellNoOutputTouch(self):
534 testfile = os.path.join(self.tmpdir, "check")
535 self.failIf(os.path.exists(testfile))
536 utils.StartDaemon(["touch", testfile])
537 self._wait(testfile, 60.0, "")
539 def testNoShellOutput(self):
540 utils.StartDaemon(["pwd"], output=self.tmpfile)
541 self._wait(self.tmpfile, 60.0, "/")
543 def testNoShellOutputCwd(self):
544 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
545 self._wait(self.tmpfile, 60.0, os.getcwd())
547 def testShellEnv(self):
548 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
549 env={ "GNT_TEST_VAR": "Hello World", })
550 self._wait(self.tmpfile, 60.0, "Hello World")
552 def testNoShellEnv(self):
553 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
554 env={ "GNT_TEST_VAR": "Hello World", })
555 self._wait(self.tmpfile, 60.0, "Hello World")
557 def testOutputFd(self):
558 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
560 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
563 self._wait(self.tmpfile, 60.0, os.getcwd())
566 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
567 self._wait(self.tmpfile, 60.0, str(pid))
569 def testPidFile(self):
570 pidfile = os.path.join(self.tmpdir, "pid")
571 checkfile = os.path.join(self.tmpdir, "abort")
573 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
576 fd = os.open(pidfile, os.O_RDONLY)
578 # Check file is locked
579 self.assertRaises(errors.LockError, utils.LockFile, fd)
581 pidtext = os.read(fd, 100)
585 self.assertEqual(int(pidtext.strip()), pid)
587 self.assert_(utils.IsProcessAlive(pid))
589 # No matter what happens, kill daemon
590 utils.KillProcess(pid, timeout=5.0, waitpid=False)
591 self.failIf(utils.IsProcessAlive(pid))
593 self.assertEqual(utils.ReadFile(self.tmpfile), "")
595 def _wait(self, path, timeout, expected):
596 # Due to the asynchronous nature of daemon processes, polling is necessary.
597 # A timeout makes sure the test doesn't hang forever.
599 if not (os.path.isfile(path) and
600 utils.ReadFile(path).strip() == expected):
601 raise utils.RetryAgain()
604 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
605 except utils.RetryTimeout:
606 self.fail("Apparently the daemon didn't run in %s seconds and/or"
607 " didn't write the correct output" % timeout)
610 self.assertRaises(errors.OpExecError, utils.StartDaemon,
611 ["./does-NOT-EXIST/here/0123456789"])
612 self.assertRaises(errors.OpExecError, utils.StartDaemon,
613 ["./does-NOT-EXIST/here/0123456789"],
614 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
615 self.assertRaises(errors.OpExecError, utils.StartDaemon,
616 ["./does-NOT-EXIST/here/0123456789"],
617 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
618 self.assertRaises(errors.OpExecError, utils.StartDaemon,
619 ["./does-NOT-EXIST/here/0123456789"],
620 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
622 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
624 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
625 ["./does-NOT-EXIST/here/0123456789"],
626 output=self.tmpfile, output_fd=fd)
631 class TestSetCloseOnExecFlag(unittest.TestCase):
632 """Tests for SetCloseOnExecFlag"""
635 self.tmpfile = tempfile.TemporaryFile()
637 def testEnable(self):
638 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
639 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
642 def testDisable(self):
643 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
644 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
648 class TestSetNonblockFlag(unittest.TestCase):
650 self.tmpfile = tempfile.TemporaryFile()
652 def testEnable(self):
653 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
654 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
657 def testDisable(self):
658 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
659 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
663 class TestRemoveFile(unittest.TestCase):
664 """Test case for the RemoveFile function"""
667 """Create a temp dir and file for each case"""
668 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
669 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
673 if os.path.exists(self.tmpfile):
674 os.unlink(self.tmpfile)
675 os.rmdir(self.tmpdir)
677 def testIgnoreDirs(self):
678 """Test that RemoveFile() ignores directories"""
679 self.assertEqual(None, RemoveFile(self.tmpdir))
681 def testIgnoreNotExisting(self):
682 """Test that RemoveFile() ignores non-existing files"""
683 RemoveFile(self.tmpfile)
684 RemoveFile(self.tmpfile)
686 def testRemoveFile(self):
687 """Test that RemoveFile does remove a file"""
688 RemoveFile(self.tmpfile)
689 if os.path.exists(self.tmpfile):
690 self.fail("File '%s' not removed" % self.tmpfile)
692 def testRemoveSymlink(self):
693 """Test that RemoveFile does remove symlinks"""
694 symlink = self.tmpdir + "/symlink"
695 os.symlink("no-such-file", symlink)
697 if os.path.exists(symlink):
698 self.fail("File '%s' not removed" % symlink)
699 os.symlink(self.tmpfile, symlink)
701 if os.path.exists(symlink):
702 self.fail("File '%s' not removed" % symlink)
705 class TestRemoveDir(unittest.TestCase):
707 self.tmpdir = tempfile.mkdtemp()
711 shutil.rmtree(self.tmpdir)
712 except EnvironmentError:
715 def testEmptyDir(self):
716 utils.RemoveDir(self.tmpdir)
717 self.assertFalse(os.path.isdir(self.tmpdir))
719 def testNonEmptyDir(self):
720 self.tmpfile = os.path.join(self.tmpdir, "test1")
721 open(self.tmpfile, "w").close()
722 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
725 class TestRename(unittest.TestCase):
726 """Test case for RenameFile"""
729 """Create a temporary directory"""
730 self.tmpdir = tempfile.mkdtemp()
731 self.tmpfile = os.path.join(self.tmpdir, "test1")
734 open(self.tmpfile, "w").close()
737 """Remove temporary directory"""
738 shutil.rmtree(self.tmpdir)
740 def testSimpleRename1(self):
741 """Simple rename 1"""
742 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
743 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
745 def testSimpleRename2(self):
746 """Simple rename 2"""
747 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
749 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
751 def testRenameMkdir(self):
752 """Rename with mkdir"""
753 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
755 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
756 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
758 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
759 os.path.join(self.tmpdir, "test/foo/bar/baz"),
761 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
762 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
763 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
766 class TestMatchNameComponent(unittest.TestCase):
767 """Test case for the MatchNameComponent function"""
769 def testEmptyList(self):
770 """Test that there is no match against an empty list"""
772 self.failUnlessEqual(MatchNameComponent("", []), None)
773 self.failUnlessEqual(MatchNameComponent("test", []), None)
775 def testSingleMatch(self):
776 """Test that a single match is performed correctly"""
777 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
778 for key in "test2", "test2.example", "test2.example.com":
779 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
781 def testMultipleMatches(self):
782 """Test that a multiple match is returned as None"""
783 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
784 for key in "test1", "test1.example":
785 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
787 def testFullMatch(self):
788 """Test that a full match is returned correctly"""
790 key2 = "test1.example"
791 mlist = [key2, key2 + ".com"]
792 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
793 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
795 def testCaseInsensitivePartialMatch(self):
796 """Test for the case_insensitive keyword"""
797 mlist = ["test1.example.com", "test2.example.net"]
798 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
800 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
802 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
804 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
808 def testCaseInsensitiveFullMatch(self):
809 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
810 # Between the two ts1 a full string match non-case insensitive should work
811 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
813 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
815 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
817 # Between the two ts2 only case differs, so only case-match works
818 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
820 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
822 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
826 class TestReadFile(testutils.GanetiTestCase):
828 def testReadAll(self):
829 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
830 self.assertEqual(len(data), 814)
832 h = compat.md5_hash()
834 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
836 def testReadSize(self):
837 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
839 self.assertEqual(len(data), 100)
841 h = compat.md5_hash()
843 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
846 self.assertRaises(EnvironmentError, utils.ReadFile,
847 "/dev/null/does-not-exist")
850 class TestReadOneLineFile(testutils.GanetiTestCase):
853 testutils.GanetiTestCase.setUp(self)
855 def testDefault(self):
856 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
857 self.assertEqual(len(data), 27)
858 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
860 def testNotStrict(self):
861 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
862 self.assertEqual(len(data), 27)
863 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
865 def testStrictFailure(self):
866 self.assertRaises(errors.GenericError, ReadOneLineFile,
867 self._TestDataFilename("cert1.pem"), strict=True)
869 def testLongLine(self):
870 dummydata = (1024 * "Hello World! ")
871 myfile = self._CreateTempFile()
872 utils.WriteFile(myfile, data=dummydata)
873 datastrict = ReadOneLineFile(myfile, strict=True)
874 datalax = ReadOneLineFile(myfile, strict=False)
875 self.assertEqual(dummydata, datastrict)
876 self.assertEqual(dummydata, datalax)
878 def testNewline(self):
879 myfile = self._CreateTempFile()
881 for nl in ["", "\n", "\r\n"]:
882 dummydata = "%s%s" % (myline, nl)
883 utils.WriteFile(myfile, data=dummydata)
884 datalax = ReadOneLineFile(myfile, strict=False)
885 self.assertEqual(myline, datalax)
886 datastrict = ReadOneLineFile(myfile, strict=True)
887 self.assertEqual(myline, datastrict)
889 def testWhitespaceAndMultipleLines(self):
890 myfile = self._CreateTempFile()
891 for nl in ["", "\n", "\r\n"]:
892 for ws in [" ", "\t", "\t\t \t", "\t "]:
893 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
894 utils.WriteFile(myfile, data=dummydata)
895 datalax = ReadOneLineFile(myfile, strict=False)
897 self.assert_(set("\r\n") & set(dummydata))
898 self.assertRaises(errors.GenericError, ReadOneLineFile,
900 explen = len("Foo bar baz ") + len(ws)
901 self.assertEqual(len(datalax), explen)
902 self.assertEqual(datalax, dummydata[:explen])
903 self.assertFalse(set("\r\n") & set(datalax))
905 datastrict = ReadOneLineFile(myfile, strict=True)
906 self.assertEqual(dummydata, datastrict)
907 self.assertEqual(dummydata, datalax)
909 def testEmptylines(self):
910 myfile = self._CreateTempFile()
912 for nl in ["\n", "\r\n"]:
913 for ol in ["", "otherline"]:
914 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
915 utils.WriteFile(myfile, data=dummydata)
916 self.assert_(set("\r\n") & set(dummydata))
917 datalax = ReadOneLineFile(myfile, strict=False)
918 self.assertEqual(myline, datalax)
920 self.assertRaises(errors.GenericError, ReadOneLineFile,
923 datastrict = ReadOneLineFile(myfile, strict=True)
924 self.assertEqual(myline, datastrict)
926 def testEmptyfile(self):
927 myfile = self._CreateTempFile()
928 self.assertRaises(errors.GenericError, ReadOneLineFile, myfile)
931 class TestTimestampForFilename(unittest.TestCase):
933 self.assert_("." not in utils.TimestampForFilename())
934 self.assert_(":" not in utils.TimestampForFilename())
937 class TestCreateBackup(testutils.GanetiTestCase):
939 testutils.GanetiTestCase.setUp(self)
941 self.tmpdir = tempfile.mkdtemp()
944 testutils.GanetiTestCase.tearDown(self)
946 shutil.rmtree(self.tmpdir)
949 filename = PathJoin(self.tmpdir, "config.data")
950 utils.WriteFile(filename, data="")
951 bname = utils.CreateBackup(filename)
952 self.assertFileContent(bname, "")
953 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
954 utils.CreateBackup(filename)
955 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
956 utils.CreateBackup(filename)
957 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
959 fifoname = PathJoin(self.tmpdir, "fifo")
961 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
963 def testContent(self):
965 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
966 for rep in [1, 2, 10, 127]:
967 testdata = data * rep
969 filename = PathJoin(self.tmpdir, "test.data_")
970 utils.WriteFile(filename, data=testdata)
971 self.assertFileContent(filename, testdata)
974 bname = utils.CreateBackup(filename)
976 self.assertFileContent(bname, testdata)
977 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
980 class TestFormatUnit(unittest.TestCase):
981 """Test case for the FormatUnit function"""
984 self.assertEqual(FormatUnit(1, 'h'), '1M')
985 self.assertEqual(FormatUnit(100, 'h'), '100M')
986 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
988 self.assertEqual(FormatUnit(1, 'm'), '1')
989 self.assertEqual(FormatUnit(100, 'm'), '100')
990 self.assertEqual(FormatUnit(1023, 'm'), '1023')
992 self.assertEqual(FormatUnit(1024, 'm'), '1024')
993 self.assertEqual(FormatUnit(1536, 'm'), '1536')
994 self.assertEqual(FormatUnit(17133, 'm'), '17133')
995 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
998 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
999 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
1000 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
1001 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
1003 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
1004 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
1005 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
1006 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
1008 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
1009 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
1010 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
1013 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
1014 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
1015 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
1017 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
1018 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
1019 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
1021 def testErrors(self):
1022 self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a")
1025 class TestParseUnit(unittest.TestCase):
1026 """Test case for the ParseUnit function"""
1029 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
1030 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
1031 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
1033 def testRounding(self):
1034 self.assertEqual(ParseUnit('0'), 0)
1035 self.assertEqual(ParseUnit('1'), 4)
1036 self.assertEqual(ParseUnit('2'), 4)
1037 self.assertEqual(ParseUnit('3'), 4)
1039 self.assertEqual(ParseUnit('124'), 124)
1040 self.assertEqual(ParseUnit('125'), 128)
1041 self.assertEqual(ParseUnit('126'), 128)
1042 self.assertEqual(ParseUnit('127'), 128)
1043 self.assertEqual(ParseUnit('128'), 128)
1044 self.assertEqual(ParseUnit('129'), 132)
1045 self.assertEqual(ParseUnit('130'), 132)
1047 def testFloating(self):
1048 self.assertEqual(ParseUnit('0'), 0)
1049 self.assertEqual(ParseUnit('0.5'), 4)
1050 self.assertEqual(ParseUnit('1.75'), 4)
1051 self.assertEqual(ParseUnit('1.99'), 4)
1052 self.assertEqual(ParseUnit('2.00'), 4)
1053 self.assertEqual(ParseUnit('2.01'), 4)
1054 self.assertEqual(ParseUnit('3.99'), 4)
1055 self.assertEqual(ParseUnit('4.00'), 4)
1056 self.assertEqual(ParseUnit('4.01'), 8)
1057 self.assertEqual(ParseUnit('1.5G'), 1536)
1058 self.assertEqual(ParseUnit('1.8G'), 1844)
1059 self.assertEqual(ParseUnit('8.28T'), 8682212)
1061 def testSuffixes(self):
1062 for sep in ('', ' ', ' ', "\t", "\t "):
1063 for suffix, scale in TestParseUnit.SCALES:
1064 for func in (lambda x: x, str.lower, str.upper):
1065 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
1068 def testInvalidInput(self):
1069 for sep in ('-', '_', ',', 'a'):
1070 for suffix, _ in TestParseUnit.SCALES:
1071 self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
1073 for suffix, _ in TestParseUnit.SCALES:
1074 self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
1077 class TestParseCpuMask(unittest.TestCase):
1078 """Test case for the ParseCpuMask function."""
1080 def testWellFormed(self):
1081 self.assertEqual(utils.ParseCpuMask(""), [])
1082 self.assertEqual(utils.ParseCpuMask("1"), [1])
1083 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
1085 def testInvalidInput(self):
1086 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
1087 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
1090 class TestSshKeys(testutils.GanetiTestCase):
1091 """Test case for the AddAuthorizedKey function"""
1093 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1094 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1095 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1098 testutils.GanetiTestCase.setUp(self)
1099 self.tmpname = self._CreateTempFile()
1100 handle = open(self.tmpname, 'w')
1102 handle.write("%s\n" % TestSshKeys.KEY_A)
1103 handle.write("%s\n" % TestSshKeys.KEY_B)
1107 def testAddingNewKey(self):
1108 utils.AddAuthorizedKey(self.tmpname,
1109 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1111 self.assertFileContent(self.tmpname,
1112 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1113 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1114 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1115 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1117 def testAddingAlmostButNotCompletelyTheSameKey(self):
1118 utils.AddAuthorizedKey(self.tmpname,
1119 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1121 self.assertFileContent(self.tmpname,
1122 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1123 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1124 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1125 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1127 def testAddingExistingKeyWithSomeMoreSpaces(self):
1128 utils.AddAuthorizedKey(self.tmpname,
1129 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1131 self.assertFileContent(self.tmpname,
1132 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1133 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1134 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1136 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1137 utils.RemoveAuthorizedKey(self.tmpname,
1138 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1140 self.assertFileContent(self.tmpname,
1141 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1142 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1144 def testRemovingNonExistingKey(self):
1145 utils.RemoveAuthorizedKey(self.tmpname,
1146 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1148 self.assertFileContent(self.tmpname,
1149 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1150 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1151 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1154 class TestEtcHosts(testutils.GanetiTestCase):
1155 """Test functions modifying /etc/hosts"""
1158 testutils.GanetiTestCase.setUp(self)
1159 self.tmpname = self._CreateTempFile()
1160 handle = open(self.tmpname, 'w')
1162 handle.write('# This is a test file for /etc/hosts\n')
1163 handle.write('127.0.0.1\tlocalhost\n')
1164 handle.write('192.0.2.1 router gw\n')
1168 def testSettingNewIp(self):
1169 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1172 self.assertFileContent(self.tmpname,
1173 "# This is a test file for /etc/hosts\n"
1174 "127.0.0.1\tlocalhost\n"
1175 "192.0.2.1 router gw\n"
1176 "198.51.100.4\tmyhost.example.com myhost\n")
1177 self.assertFileMode(self.tmpname, 0644)
1179 def testSettingExistingIp(self):
1180 SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1183 self.assertFileContent(self.tmpname,
1184 "# This is a test file for /etc/hosts\n"
1185 "127.0.0.1\tlocalhost\n"
1186 "192.0.2.1\tmyhost.example.com myhost\n")
1187 self.assertFileMode(self.tmpname, 0644)
1189 def testSettingDuplicateName(self):
1190 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1192 self.assertFileContent(self.tmpname,
1193 "# This is a test file for /etc/hosts\n"
1194 "127.0.0.1\tlocalhost\n"
1195 "192.0.2.1 router gw\n"
1196 "198.51.100.4\tmyhost\n")
1197 self.assertFileMode(self.tmpname, 0644)
1199 def testRemovingExistingHost(self):
1200 RemoveEtcHostsEntry(self.tmpname, 'router')
1202 self.assertFileContent(self.tmpname,
1203 "# This is a test file for /etc/hosts\n"
1204 "127.0.0.1\tlocalhost\n"
1206 self.assertFileMode(self.tmpname, 0644)
1208 def testRemovingSingleExistingHost(self):
1209 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1211 self.assertFileContent(self.tmpname,
1212 "# This is a test file for /etc/hosts\n"
1213 "192.0.2.1 router gw\n")
1214 self.assertFileMode(self.tmpname, 0644)
1216 def testRemovingNonExistingHost(self):
1217 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1219 self.assertFileContent(self.tmpname,
1220 "# This is a test file for /etc/hosts\n"
1221 "127.0.0.1\tlocalhost\n"
1222 "192.0.2.1 router gw\n")
1223 self.assertFileMode(self.tmpname, 0644)
1225 def testRemovingAlias(self):
1226 RemoveEtcHostsEntry(self.tmpname, 'gw')
1228 self.assertFileContent(self.tmpname,
1229 "# This is a test file for /etc/hosts\n"
1230 "127.0.0.1\tlocalhost\n"
1231 "192.0.2.1 router\n")
1232 self.assertFileMode(self.tmpname, 0644)
1235 class TestGetMounts(unittest.TestCase):
1236 """Test case for GetMounts()."""
1239 "rootfs / rootfs rw 0 0\n"
1240 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1241 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1244 self.tmpfile = tempfile.NamedTemporaryFile()
1245 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1247 def testGetMounts(self):
1248 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1250 ("rootfs", "/", "rootfs", "rw"),
1251 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1252 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1256 class TestShellQuoting(unittest.TestCase):
1257 """Test case for shell quoting functions"""
1259 def testShellQuote(self):
1260 self.assertEqual(ShellQuote('abc'), "abc")
1261 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1262 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1263 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1264 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1266 def testShellQuoteArgs(self):
1267 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1268 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1269 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1272 class TestListVisibleFiles(unittest.TestCase):
1273 """Test case for ListVisibleFiles"""
1276 self.path = tempfile.mkdtemp()
1279 shutil.rmtree(self.path)
1281 def _CreateFiles(self, files):
1283 utils.WriteFile(os.path.join(self.path, name), data="test")
1285 def _test(self, files, expected):
1286 self._CreateFiles(files)
1287 found = ListVisibleFiles(self.path)
1288 self.assertEqual(set(found), set(expected))
1290 def testAllVisible(self):
1291 files = ["a", "b", "c"]
1293 self._test(files, expected)
1295 def testNoneVisible(self):
1296 files = [".a", ".b", ".c"]
1298 self._test(files, expected)
1300 def testSomeVisible(self):
1301 files = ["a", "b", ".c"]
1302 expected = ["a", "b"]
1303 self._test(files, expected)
1305 def testNonAbsolutePath(self):
1306 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1308 def testNonNormalizedPath(self):
1309 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1313 class TestNewUUID(unittest.TestCase):
1314 """Test case for NewUUID"""
1317 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
1320 class TestUniqueSequence(unittest.TestCase):
1321 """Test case for UniqueSequence"""
1323 def _test(self, input, expected):
1324 self.assertEqual(utils.UniqueSequence(input), expected)
1328 self._test([1, 2, 3], [1, 2, 3])
1329 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1330 self._test([1, 2, 2, 3], [1, 2, 3])
1331 self._test([1, 2, 3, 3], [1, 2, 3])
1334 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1335 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1338 self._test(["a", "a"], ["a"])
1339 self._test(["a", "b"], ["a", "b"])
1340 self._test(["a", "b", "a"], ["a", "b"])
1343 class TestFindDuplicates(unittest.TestCase):
1344 """Test case for FindDuplicates"""
1346 def _Test(self, seq, expected):
1347 result = utils.FindDuplicates(seq)
1348 self.assertEqual(result, utils.UniqueSequence(result))
1349 self.assertEqual(set(result), set(expected))
1353 self._Test([1, 2, 3], [])
1354 self._Test([9, 8, 8, 0, 5, 1, 7, 0, 6, 7], [8, 0, 7])
1355 for exp in [[1, 2, 3], [3, 2, 1]]:
1356 self._Test([1, 1, 2, 2, 3, 3], exp)
1358 self._Test(["A", "a", "B"], [])
1359 self._Test(["a", "A", "a", "B"], ["a"])
1360 self._Test("Hello World out there!", ["e", " ", "o", "r", "t", "l"])
1362 self._Test(self._Gen(False), [])
1363 self._Test(self._Gen(True), range(1, 10))
1374 class TestFirstFree(unittest.TestCase):
1375 """Test case for the FirstFree function"""
1378 """Test FirstFree"""
1379 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1380 self.failUnlessEqual(FirstFree([]), None)
1381 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1382 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1383 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1386 class TestTailFile(testutils.GanetiTestCase):
1387 """Test case for the TailFile function"""
1389 def testEmpty(self):
1390 fname = self._CreateTempFile()
1391 self.failUnlessEqual(TailFile(fname), [])
1392 self.failUnlessEqual(TailFile(fname, lines=25), [])
1394 def testAllLines(self):
1395 data = ["test %d" % i for i in range(30)]
1397 fname = self._CreateTempFile()
1398 fd = open(fname, "w")
1399 fd.write("\n".join(data[:i]))
1403 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1405 def testPartialLines(self):
1406 data = ["test %d" % i for i in range(30)]
1407 fname = self._CreateTempFile()
1408 fd = open(fname, "w")
1409 fd.write("\n".join(data))
1412 for i in range(1, 30):
1413 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1415 def testBigFile(self):
1416 data = ["test %d" % i for i in range(30)]
1417 fname = self._CreateTempFile()
1418 fd = open(fname, "w")
1419 fd.write("X" * 1048576)
1421 fd.write("\n".join(data))
1424 for i in range(1, 30):
1425 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1428 class _BaseFileLockTest:
1429 """Test case for the FileLock class"""
1431 def testSharedNonblocking(self):
1432 self.lock.Shared(blocking=False)
1435 def testExclusiveNonblocking(self):
1436 self.lock.Exclusive(blocking=False)
1439 def testUnlockNonblocking(self):
1440 self.lock.Unlock(blocking=False)
1443 def testSharedBlocking(self):
1444 self.lock.Shared(blocking=True)
1447 def testExclusiveBlocking(self):
1448 self.lock.Exclusive(blocking=True)
1451 def testUnlockBlocking(self):
1452 self.lock.Unlock(blocking=True)
1455 def testSharedExclusiveUnlock(self):
1456 self.lock.Shared(blocking=False)
1457 self.lock.Exclusive(blocking=False)
1458 self.lock.Unlock(blocking=False)
1461 def testExclusiveSharedUnlock(self):
1462 self.lock.Exclusive(blocking=False)
1463 self.lock.Shared(blocking=False)
1464 self.lock.Unlock(blocking=False)
1467 def testSimpleTimeout(self):
1468 # These will succeed on the first attempt, hence a short timeout
1469 self.lock.Shared(blocking=True, timeout=10.0)
1470 self.lock.Exclusive(blocking=False, timeout=10.0)
1471 self.lock.Unlock(blocking=True, timeout=10.0)
1475 def _TryLockInner(filename, shared, blocking):
1476 lock = utils.FileLock.Open(filename)
1484 # The timeout doesn't really matter as the parent process waits for us to
1486 fn(blocking=blocking, timeout=0.01)
1487 except errors.LockError, err:
1492 def _TryLock(self, *args):
1493 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1496 def testTimeout(self):
1497 for blocking in [True, False]:
1498 self.lock.Exclusive(blocking=True)
1499 self.failIf(self._TryLock(False, blocking))
1500 self.failIf(self._TryLock(True, blocking))
1502 self.lock.Shared(blocking=True)
1503 self.assert_(self._TryLock(True, blocking))
1504 self.failIf(self._TryLock(False, blocking))
1506 def testCloseShared(self):
1508 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1510 def testCloseExclusive(self):
1512 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1514 def testCloseUnlock(self):
1516 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1519 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1520 TESTDATA = "Hello World\n" * 10
1523 testutils.GanetiTestCase.setUp(self)
1525 self.tmpfile = tempfile.NamedTemporaryFile()
1526 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1527 self.lock = utils.FileLock.Open(self.tmpfile.name)
1529 # Ensure "Open" didn't truncate file
1530 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1533 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1535 testutils.GanetiTestCase.tearDown(self)
1538 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1540 self.tmpfile = tempfile.NamedTemporaryFile()
1541 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1544 class TestTimeFunctions(unittest.TestCase):
1545 """Test case for time functions"""
1548 self.assertEqual(utils.SplitTime(1), (1, 0))
1549 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1550 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1551 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1552 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1553 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1554 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1555 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1557 self.assertRaises(AssertionError, utils.SplitTime, -1)
1559 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1560 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1561 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1563 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1565 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1567 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1568 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1569 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1570 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1571 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1574 class FieldSetTestCase(unittest.TestCase):
1575 """Test case for FieldSets"""
1577 def testSimpleMatch(self):
1578 f = utils.FieldSet("a", "b", "c", "def")
1579 self.failUnless(f.Matches("a"))
1580 self.failIf(f.Matches("d"), "Substring matched")
1581 self.failIf(f.Matches("defghi"), "Prefix string matched")
1582 self.failIf(f.NonMatching(["b", "c"]))
1583 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1584 self.failUnless(f.NonMatching(["a", "d"]))
1586 def testRegexMatch(self):
1587 f = utils.FieldSet("a", "b([0-9]+)", "c")
1588 self.failUnless(f.Matches("b1"))
1589 self.failUnless(f.Matches("b99"))
1590 self.failIf(f.Matches("b/1"))
1591 self.failIf(f.NonMatching(["b12", "c"]))
1592 self.failUnless(f.NonMatching(["a", "1"]))
1594 class TestForceDictType(unittest.TestCase):
1595 """Test case for ForceDictType"""
1597 "a": constants.VTYPE_INT,
1598 "b": constants.VTYPE_BOOL,
1599 "c": constants.VTYPE_STRING,
1600 "d": constants.VTYPE_SIZE,
1601 "e": constants.VTYPE_MAYBE_STRING,
1604 def _fdt(self, dict, allowed_values=None):
1605 if allowed_values is None:
1606 utils.ForceDictType(dict, self.KEY_TYPES)
1608 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
1612 def testSimpleDict(self):
1613 self.assertEqual(self._fdt({}), {})
1614 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1615 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1616 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1617 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1618 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1619 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1620 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1621 self.assertEqual(self._fdt({'b': False}), {'b': False})
1622 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1623 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1624 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1625 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1626 self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1627 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1628 self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1629 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
1631 def testErrors(self):
1632 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1633 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
1634 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1635 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1636 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1637 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1638 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1639 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
1640 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
1641 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
1642 {"b": "hello"}, {"b": "no-such-type"})
1645 class TestIsNormAbsPath(unittest.TestCase):
1646 """Testing case for IsNormAbsPath"""
1648 def _pathTestHelper(self, path, result):
1650 self.assert_(utils.IsNormAbsPath(path),
1651 "Path %s should result absolute and normalized" % path)
1653 self.assertFalse(utils.IsNormAbsPath(path),
1654 "Path %s should not result absolute and normalized" % path)
1657 self._pathTestHelper('/etc', True)
1658 self._pathTestHelper('/srv', True)
1659 self._pathTestHelper('etc', False)
1660 self._pathTestHelper('/etc/../root', False)
1661 self._pathTestHelper('/etc/', False)
1664 class TestSafeEncode(unittest.TestCase):
1665 """Test case for SafeEncode"""
1667 def testAscii(self):
1668 for txt in [string.digits, string.letters, string.punctuation]:
1669 self.failUnlessEqual(txt, SafeEncode(txt))
1671 def testDoubleEncode(self):
1672 for i in range(255):
1673 txt = SafeEncode(chr(i))
1674 self.failUnlessEqual(txt, SafeEncode(txt))
1676 def testUnicode(self):
1677 # 1024 is high enough to catch non-direct ASCII mappings
1678 for i in range(1024):
1679 txt = SafeEncode(unichr(i))
1680 self.failUnlessEqual(txt, SafeEncode(txt))
1683 class TestFormatTime(unittest.TestCase):
1684 """Testing case for FormatTime"""
1687 def _TestInProcess(tz, timestamp, expected):
1688 os.environ["TZ"] = tz
1690 return utils.FormatTime(timestamp) == expected
1692 def _Test(self, *args):
1693 # Need to use separate process as we want to change TZ
1694 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
1697 self._Test("UTC", 0, "1970-01-01 00:00:00")
1698 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
1699 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
1700 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
1701 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
1704 self.failUnlessEqual(FormatTime(None), "N/A")
1706 def testInvalid(self):
1707 self.failUnlessEqual(FormatTime(()), "N/A")
1710 # tests that we accept time.time input
1711 FormatTime(time.time())
1712 # tests that we accept int input
1713 FormatTime(int(time.time()))
1716 class RunInSeparateProcess(unittest.TestCase):
1718 for exp in [True, False]:
1722 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1725 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1726 def _child(carg1, carg2):
1727 return carg1 == "Foo" and carg2 == arg
1729 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1732 parent_pid = os.getpid()
1735 return os.getpid() == parent_pid
1737 self.failIf(utils.RunInSeparateProcess(_check))
1739 def testSignal(self):
1741 os.kill(os.getpid(), signal.SIGTERM)
1743 self.assertRaises(errors.GenericError,
1744 utils.RunInSeparateProcess, _kill)
1746 def testException(self):
1748 raise errors.GenericError("This is a test")
1750 self.assertRaises(errors.GenericError,
1751 utils.RunInSeparateProcess, _exc)
1754 class TestFingerprintFiles(unittest.TestCase):
1756 self.tmpfile = tempfile.NamedTemporaryFile()
1757 self.tmpfile2 = tempfile.NamedTemporaryFile()
1758 utils.WriteFile(self.tmpfile2.name, data="Hello World\n")
1760 self.tmpfile.name: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
1761 self.tmpfile2.name: "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a",
1764 def testSingleFile(self):
1765 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1766 self.results[self.tmpfile.name])
1768 self.assertEqual(utils._FingerprintFile("/no/such/file"), None)
1770 def testBigFile(self):
1771 self.tmpfile.write("A" * 8192)
1772 self.tmpfile.flush()
1773 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1774 "35b6795ca20d6dc0aff8c7c110c96cd1070b8c38")
1776 def testMultiple(self):
1777 all_files = self.results.keys()
1778 all_files.append("/no/such/file")
1779 self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results)
1782 class TestUnescapeAndSplit(unittest.TestCase):
1783 """Testing case for UnescapeAndSplit"""
1786 # testing more that one separator for regexp safety
1787 self._seps = [",", "+", "."]
1789 def testSimple(self):
1790 a = ["a", "b", "c", "d"]
1791 for sep in self._seps:
1792 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1794 def testEscape(self):
1795 for sep in self._seps:
1796 a = ["a", "b\\" + sep + "c", "d"]
1797 b = ["a", "b" + sep + "c", "d"]
1798 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1800 def testDoubleEscape(self):
1801 for sep in self._seps:
1802 a = ["a", "b\\\\", "c", "d"]
1803 b = ["a", "b\\", "c", "d"]
1804 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1806 def testThreeEscape(self):
1807 for sep in self._seps:
1808 a = ["a", "b\\\\\\" + sep + "c", "d"]
1809 b = ["a", "b\\" + sep + "c", "d"]
1810 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1813 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1815 self.tmpdir = tempfile.mkdtemp()
1818 shutil.rmtree(self.tmpdir)
1820 def _checkRsaPrivateKey(self, key):
1821 lines = key.splitlines()
1822 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1823 "-----END RSA PRIVATE KEY-----" in lines)
1825 def _checkCertificate(self, cert):
1826 lines = cert.splitlines()
1827 return ("-----BEGIN CERTIFICATE-----" in lines and
1828 "-----END CERTIFICATE-----" in lines)
1831 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1832 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1833 self._checkRsaPrivateKey(key_pem)
1834 self._checkCertificate(cert_pem)
1836 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1838 self.assert_(key.bits() >= 1024)
1839 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1840 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1842 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1844 self.failIf(x509.has_expired())
1845 self.assertEqual(x509.get_issuer().CN, common_name)
1846 self.assertEqual(x509.get_subject().CN, common_name)
1847 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1849 def testLegacy(self):
1850 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1852 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1854 cert1 = utils.ReadFile(cert1_filename)
1856 self.assert_(self._checkRsaPrivateKey(cert1))
1857 self.assert_(self._checkCertificate(cert1))
1860 class TestPathJoin(unittest.TestCase):
1861 """Testing case for PathJoin"""
1863 def testBasicItems(self):
1864 mlist = ["/a", "b", "c"]
1865 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1867 def testNonAbsPrefix(self):
1868 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1870 def testBackTrack(self):
1871 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1873 def testMultiAbs(self):
1874 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1877 class TestValidateServiceName(unittest.TestCase):
1878 def testValid(self):
1880 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1885 "0", "80", "1111", "65535",
1888 for name in testnames:
1889 self.assertEqual(utils.ValidateServiceName(name), name)
1891 def testInvalid(self):
1893 -15756, -1, 65536, 133428083,
1894 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1895 "-8546", "-1", "65536",
1899 for name in testnames:
1900 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1903 class TestParseAsn1Generalizedtime(unittest.TestCase):
1906 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1907 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1909 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1913 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1915 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1917 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1919 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1921 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1924 # Leap seconds are not supported by datetime.datetime
1925 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1926 "19841231235960+0000")
1927 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1928 "19920630235960+0000")
1931 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1932 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1933 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1935 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1936 "Mon Feb 22 17:47:02 UTC 2010")
1937 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1938 "2010-02-22 17:42:02")
1941 class TestGetX509CertValidity(testutils.GanetiTestCase):
1943 testutils.GanetiTestCase.setUp(self)
1945 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1947 # Test whether we have pyOpenSSL 0.7 or above
1948 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1950 if not self.pyopenssl0_7:
1951 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1952 " function correctly")
1954 def _LoadCert(self, name):
1955 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1956 self._ReadTestData(name))
1959 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1960 if self.pyopenssl0_7:
1961 self.assertEqual(validity, (1266919967, 1267524767))
1963 self.assertEqual(validity, (None, None))
1966 class TestSignX509Certificate(unittest.TestCase):
1967 KEY = "My private key!"
1968 KEY_OTHER = "Another key"
1971 # Generate certificate valid for 5 minutes
1972 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1974 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1977 # No signature at all
1978 self.assertRaises(errors.GenericError,
1979 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1982 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1984 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1985 "X-Ganeti-Signature: \n", self.KEY)
1986 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1987 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1988 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1989 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1990 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1991 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1994 for salt in list("-_@$,:;/\\ \t\n"):
1995 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1996 cert_pem, self.KEY, "foo%sbar" % salt)
1998 for salt in ["HelloWorld", "salt", string.letters, string.digits,
1999 utils.GenerateSecret(numbytes=4),
2000 utils.GenerateSecret(numbytes=16),
2001 "{123:456}".encode("hex")]:
2002 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2004 self._Check(cert, salt, signed_pem)
2006 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2007 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2008 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2009 "lines----\n------ at\nthe end!"))
2011 def _Check(self, cert, salt, pem):
2012 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2013 self.assertEqual(salt, salt2)
2014 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2017 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2018 pem, self.KEY_OTHER)
2021 class TestMakedirs(unittest.TestCase):
2023 self.tmpdir = tempfile.mkdtemp()
2026 shutil.rmtree(self.tmpdir)
2028 def testNonExisting(self):
2029 path = PathJoin(self.tmpdir, "foo")
2030 utils.Makedirs(path)
2031 self.assert_(os.path.isdir(path))
2033 def testExisting(self):
2034 path = PathJoin(self.tmpdir, "foo")
2036 utils.Makedirs(path)
2037 self.assert_(os.path.isdir(path))
2039 def testRecursiveNonExisting(self):
2040 path = PathJoin(self.tmpdir, "foo/bar/baz")
2041 utils.Makedirs(path)
2042 self.assert_(os.path.isdir(path))
2044 def testRecursiveExisting(self):
2045 path = PathJoin(self.tmpdir, "B/moo/xyz")
2046 self.assertFalse(os.path.exists(path))
2047 os.mkdir(PathJoin(self.tmpdir, "B"))
2048 utils.Makedirs(path)
2049 self.assert_(os.path.isdir(path))
2052 class TestRetry(testutils.GanetiTestCase):
2054 testutils.GanetiTestCase.setUp(self)
2058 def _RaiseRetryAgain():
2059 raise utils.RetryAgain()
2062 def _RaiseRetryAgainWithArg(args):
2063 raise utils.RetryAgain(*args)
2065 def _WrongNestedLoop(self):
2066 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2068 def _RetryAndSucceed(self, retries):
2069 if self.retries < retries:
2071 raise utils.RetryAgain()
2075 def testRaiseTimeout(self):
2076 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2077 self._RaiseRetryAgain, 0.01, 0.02)
2078 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2079 self._RetryAndSucceed, 0.01, 0, args=[1])
2080 self.failUnlessEqual(self.retries, 1)
2082 def testComplete(self):
2083 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2084 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2086 self.failUnlessEqual(self.retries, 2)
2088 def testNestedLoop(self):
2090 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2091 self._WrongNestedLoop, 0, 1)
2092 except utils.RetryTimeout:
2093 self.fail("Didn't detect inner loop's exception")
2095 def testTimeoutArgument(self):
2096 retry_arg="my_important_debugging_message"
2098 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2099 except utils.RetryTimeout, err:
2100 self.failUnlessEqual(err.args, (retry_arg, ))
2102 self.fail("Expected timeout didn't happen")
2104 def testRaiseInnerWithExc(self):
2105 retry_arg="my_important_debugging_message"
2108 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2109 args=[[errors.GenericError(retry_arg, retry_arg)]])
2110 except utils.RetryTimeout, err:
2113 self.fail("Expected timeout didn't happen")
2114 except errors.GenericError, err:
2115 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2117 self.fail("Expected GenericError didn't happen")
2119 def testRaiseInnerWithMsg(self):
2120 retry_arg="my_important_debugging_message"
2123 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2124 args=[[retry_arg, retry_arg]])
2125 except utils.RetryTimeout, err:
2128 self.fail("Expected timeout didn't happen")
2129 except utils.RetryTimeout, err:
2130 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2132 self.fail("Expected RetryTimeout didn't happen")
2135 class TestLineSplitter(unittest.TestCase):
2138 ls = utils.LineSplitter(lines.append)
2139 ls.write("Hello World\n")
2140 self.assertEqual(lines, [])
2141 ls.write("Foo\n Bar\r\n ")
2144 self.assertEqual(lines, [])
2146 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2148 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2150 def _testExtra(self, line, all_lines, p1, p2):
2151 self.assertEqual(p1, 999)
2152 self.assertEqual(p2, "extra")
2153 all_lines.append(line)
2155 def testExtraArgsNoFlush(self):
2157 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2158 ls.write("\n\nHello World\n")
2159 ls.write("Foo\n Bar\r\n ")
2162 ls.write("Moo\n\nx\n")
2163 self.assertEqual(lines, [])
2165 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2169 class TestReadLockedPidFile(unittest.TestCase):
2171 self.tmpdir = tempfile.mkdtemp()
2174 shutil.rmtree(self.tmpdir)
2176 def testNonExistent(self):
2177 path = PathJoin(self.tmpdir, "nonexist")
2178 self.assert_(utils.ReadLockedPidFile(path) is None)
2180 def testUnlocked(self):
2181 path = PathJoin(self.tmpdir, "pid")
2182 utils.WriteFile(path, data="123")
2183 self.assert_(utils.ReadLockedPidFile(path) is None)
2185 def testLocked(self):
2186 path = PathJoin(self.tmpdir, "pid")
2187 utils.WriteFile(path, data="123")
2189 fl = utils.FileLock.Open(path)
2191 fl.Exclusive(blocking=True)
2193 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2197 self.assert_(utils.ReadLockedPidFile(path) is None)
2199 def testError(self):
2200 path = PathJoin(self.tmpdir, "foobar", "pid")
2201 utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2202 # open(2) should return ENOTDIR
2203 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2206 class TestCertVerification(testutils.GanetiTestCase):
2208 testutils.GanetiTestCase.setUp(self)
2210 self.tmpdir = tempfile.mkdtemp()
2213 shutil.rmtree(self.tmpdir)
2215 def testVerifyCertificate(self):
2216 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2217 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2220 # Not checking return value as this certificate is expired
2221 utils.VerifyX509Certificate(cert, 30, 7)
2224 class TestVerifyCertificateInner(unittest.TestCase):
2226 vci = utils._VerifyCertificateInner
2229 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2233 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2234 self.assertEqual(errcode, utils.CERT_WARNING)
2237 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2238 self.assertEqual(errcode, utils.CERT_ERROR)
2240 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2241 self.assertEqual(errcode, utils.CERT_WARNING)
2243 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2244 self.assertEqual(errcode, None)
2247 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2248 self.assertEqual(errcode, utils.CERT_ERROR)
2250 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2251 self.assertEqual(errcode, utils.CERT_ERROR)
2253 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2254 self.assertEqual(errcode, utils.CERT_ERROR)
2256 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2257 self.assertEqual(errcode, utils.CERT_ERROR)
2260 class TestHmacFunctions(unittest.TestCase):
2261 # Digests can be checked with "openssl sha1 -hmac $key"
2262 def testSha1Hmac(self):
2263 self.assertEqual(utils.Sha1Hmac("", ""),
2264 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2265 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2266 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2267 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2268 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2270 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2271 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2272 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2274 def testSha1HmacSalt(self):
2275 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2276 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2277 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2278 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2279 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2280 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2282 def testVerifySha1Hmac(self):
2283 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2284 "7d64b71fb76370690e1d")))
2285 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2286 ("f904c2476527c6d3e660"
2287 "9ab683c66fa0652cb1dc")))
2289 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2290 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2291 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2293 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2295 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2298 def testVerifySha1HmacSalt(self):
2299 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2300 ("17a4adc34d69c0d367d4"
2301 "ffbef96fd41d4df7a6e8"),
2303 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2304 ("7f264f8114c9066afc9b"
2305 "b7636e1786d996d3cc0d"),
2309 class TestIgnoreSignals(unittest.TestCase):
2310 """Test the IgnoreSignals decorator"""
2313 def _Raise(exception):
2320 def testIgnoreSignals(self):
2321 sock_err_intr = socket.error(errno.EINTR, "Message")
2322 sock_err_inval = socket.error(errno.EINVAL, "Message")
2324 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2325 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2327 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2328 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2329 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2330 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2332 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2333 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2334 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2336 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2339 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2340 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2343 class TestEnsureDirs(unittest.TestCase):
2344 """Tests for EnsureDirs"""
2347 self.dir = tempfile.mkdtemp()
2348 self.old_umask = os.umask(0777)
2350 def testEnsureDirs(self):
2352 (PathJoin(self.dir, "foo"), 0777),
2353 (PathJoin(self.dir, "bar"), 0000),
2355 self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2356 self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2359 os.rmdir(PathJoin(self.dir, "foo"))
2360 os.rmdir(PathJoin(self.dir, "bar"))
2362 os.umask(self.old_umask)
2365 class TestFormatSeconds(unittest.TestCase):
2367 self.assertEqual(utils.FormatSeconds(1), "1s")
2368 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2369 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2370 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2371 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2372 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2373 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2374 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2375 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2376 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2377 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2379 def testFloat(self):
2380 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2381 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2382 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2383 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2386 class TestIgnoreProcessNotFound(unittest.TestCase):
2389 os.write(fd, str(os.getpid()))
2394 (pid_read_fd, pid_write_fd) = os.pipe()
2396 # Start short-lived process which writes its PID to pipe
2397 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2398 os.close(pid_write_fd)
2400 # Read PID from pipe
2401 pid = int(os.read(pid_read_fd, 1024))
2402 os.close(pid_read_fd)
2404 # Try to send signal to process which exited recently
2405 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2408 class TestShellWriter(unittest.TestCase):
2411 sw = utils.ShellWriter(buf)
2412 sw.Write("#!/bin/bash")
2413 sw.Write("if true; then")
2416 sw.Write("echo true")
2418 sw.Write("for i in 1 2 3")
2422 self.assertEqual(sw._indent, 2)
2429 sw.Write("echo %s", utils.ShellQuote("Hello World"))
2432 self.assertEqual(sw._indent, 0)
2434 output = buf.getvalue()
2436 self.assert_(output.endswith("\n"))
2438 lines = output.splitlines()
2439 self.assertEqual(len(lines), 9)
2440 self.assertEqual(lines[0], "#!/bin/bash")
2441 self.assert_(re.match(r"^\s+date$", lines[5]))
2442 self.assertEqual(lines[7], "echo 'Hello World'")
2444 def testEmpty(self):
2446 sw = utils.ShellWriter(buf)
2448 self.assertEqual(buf.getvalue(), "")
2451 class TestCommaJoin(unittest.TestCase):
2453 self.assertEqual(utils.CommaJoin([]), "")
2454 self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
2455 self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
2456 self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
2457 self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
2461 class TestFindMatch(unittest.TestCase):
2465 "bb": {"Two B": True},
2466 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
2469 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
2470 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
2472 for i in ["foo", "bar", "bazX"]:
2473 for j in range(1, 100, 7):
2474 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
2475 ((1, 2, 3), [i, str(j)]))
2477 def testNoMatch(self):
2478 self.assert_(utils.FindMatch({}, "") is None)
2479 self.assert_(utils.FindMatch({}, "foo") is None)
2480 self.assert_(utils.FindMatch({}, 1234) is None)
2484 re.compile("^(something)$"): "Hello World",
2487 self.assert_(utils.FindMatch(data, "") is None)
2488 self.assert_(utils.FindMatch(data, "Hello World") is None)
2491 class TestFileID(testutils.GanetiTestCase):
2492 def testEquality(self):
2493 name = self._CreateTempFile()
2494 oldi = utils.GetFileID(path=name)
2495 self.failUnless(utils.VerifyFileID(oldi, oldi))
2497 def testUpdate(self):
2498 name = self._CreateTempFile()
2499 oldi = utils.GetFileID(path=name)
2500 os.utime(name, None)
2501 fd = os.open(name, os.O_RDWR)
2503 newi = utils.GetFileID(fd=fd)
2504 self.failUnless(utils.VerifyFileID(oldi, newi))
2505 self.failUnless(utils.VerifyFileID(newi, oldi))
2509 def testWriteFile(self):
2510 name = self._CreateTempFile()
2511 oldi = utils.GetFileID(path=name)
2513 os.utime(name, (mtime + 10, mtime + 10))
2514 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
2516 os.utime(name, (mtime - 10, mtime - 10))
2517 utils.SafeWriteFile(name, oldi, data="")
2518 oldi = utils.GetFileID(path=name)
2520 os.utime(name, (mtime + 10, mtime + 10))
2521 # this doesn't raise, since we passed None
2522 utils.SafeWriteFile(name, None, data="")
2524 def testError(self):
2525 t = tempfile.NamedTemporaryFile()
2526 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
2527 path=t.name, fd=t.fileno())
2531 def __init__(self, values):
2532 self.values = values
2535 return self.values.pop(0)
2538 class TestRunningTimeout(unittest.TestCase):
2540 self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
2542 def testRemainingFloat(self):
2543 timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
2544 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2545 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2546 self.assertAlmostEqual(timeout.Remaining(), -1.5)
2548 def testRemaining(self):
2549 self.time_fn = TimeMock([0, 2, 4, 5, 6])
2550 timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
2551 self.assertEqual(timeout.Remaining(), 3)
2552 self.assertEqual(timeout.Remaining(), 1)
2553 self.assertEqual(timeout.Remaining(), 0)
2554 self.assertEqual(timeout.Remaining(), -1)
2556 def testRemainingNonNegative(self):
2557 timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
2558 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2559 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2560 self.assertEqual(timeout.Remaining(), 0.0)
2562 def testNegativeTimeout(self):
2563 self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
2566 class TestTryConvert(unittest.TestCase):
2568 for src, fn, result in [
2574 self.assertEqual(utils.TryConvert(fn, src), result)
2577 class TestIsValidShellParam(unittest.TestCase):
2579 for val, result in [
2583 self.assertEqual(utils.IsValidShellParam(val), result)
2586 class TestBuildShellCmd(unittest.TestCase):
2588 self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
2590 self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
2593 class TestWriteFile(unittest.TestCase):
2595 self.tfile = tempfile.NamedTemporaryFile()
2596 self.did_pre = False
2597 self.did_post = False
2598 self.did_write = False
2600 def markPre(self, fd):
2603 def markPost(self, fd):
2604 self.did_post = True
2606 def markWrite(self, fd):
2607 self.did_write = True
2609 def testWrite(self):
2611 utils.WriteFile(self.tfile.name, data=data)
2612 self.assertEqual(utils.ReadFile(self.tfile.name), data)
2614 def testErrors(self):
2615 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2616 self.tfile.name, data="test", fn=lambda fd: None)
2617 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
2618 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2619 self.tfile.name, data="test", atime=0)
2621 def testCalls(self):
2622 utils.WriteFile(self.tfile.name, fn=self.markWrite,
2623 prewrite=self.markPre, postwrite=self.markPost)
2624 self.assertTrue(self.did_pre)
2625 self.assertTrue(self.did_post)
2626 self.assertTrue(self.did_write)
2628 def testDryRun(self):
2630 self.tfile.write(orig)
2632 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
2633 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
2635 def testTimes(self):
2637 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
2638 (int(time.time()), 5000)]:
2639 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
2641 self.assertEqual(st.st_atime, at)
2642 self.assertEqual(st.st_mtime, mt)
2645 def testNoClose(self):
2647 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
2648 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
2651 self.assertEqual(os.read(fd, 4096), data)
2656 class TestNormalizeAndValidateMac(unittest.TestCase):
2657 def testInvalid(self):
2658 self.assertRaises(errors.OpPrereqError,
2659 utils.NormalizeAndValidateMac, "xxx")
2661 def testNormalization(self):
2662 for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
2663 self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
2666 class TestNiceSort(unittest.TestCase):
2668 self.assertEqual(utils.NiceSort([]), [])
2669 self.assertEqual(utils.NiceSort(["foo"]), ["foo"])
2670 self.assertEqual(utils.NiceSort(["bar", ""]), ["", "bar"])
2671 self.assertEqual(utils.NiceSort([",", "."]), [",", "."])
2672 self.assertEqual(utils.NiceSort(["0.1", "0.2"]), ["0.1", "0.2"])
2673 self.assertEqual(utils.NiceSort(["0;099", "0,099", "0.1", "0.2"]),
2674 ["0,099", "0.1", "0.2", "0;099"])
2676 data = ["a0", "a1", "a99", "a20", "a2", "b10", "b70", "b00", "0000"]
2677 self.assertEqual(utils.NiceSort(data),
2678 ["0000", "a0", "a1", "a2", "a20", "a99",
2679 "b00", "b10", "b70"])
2681 data = ["a0-0", "a1-0", "a99-10", "a20-3", "a0-4", "a99-3", "a09-2",
2682 "Z", "a9-1", "A", "b"]
2683 self.assertEqual(utils.NiceSort(data),
2684 ["A", "Z", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2685 "a20-3", "a99-3", "a99-10", "b"])
2686 self.assertEqual(utils.NiceSort(data, key=str.lower),
2687 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2688 "a20-3", "a99-3", "a99-10", "b", "Z"])
2689 self.assertEqual(utils.NiceSort(data, key=str.upper),
2690 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2691 "a20-3", "a99-3", "a99-10", "b", "Z"])
2693 def testLargeA(self):
2695 "Eegah9ei", "xij88brTulHYAv8IEOyU", "3jTwJPtrXOY22bwL2YoW",
2696 "Z8Ljf1Pf5eBfNg171wJR", "WvNJd91OoXvLzdEiEXa6", "uHXAyYYftCSG1o7qcCqe",
2697 "xpIUJeVT1Rp", "KOt7vn1dWXi", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2698 "cPRi0lM7HLnSuWA2G9", "KVQqLPDjcPjf8T3oyzjcOsfkb",
2699 "guKJkXnkULealVC8CyF1xefym", "pqF8dkU5B1cMnyZuREaSOADYx",
2701 self.assertEqual(utils.NiceSort(data), [
2702 "3jTwJPtrXOY22bwL2YoW", "Eegah9ei", "KOt7vn1dWXi",
2703 "KVQqLPDjcPjf8T3oyzjcOsfkb", "WvNJd91OoXvLzdEiEXa6",
2704 "Z8Ljf1Pf5eBfNg171wJR", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2705 "cPRi0lM7HLnSuWA2G9", "guKJkXnkULealVC8CyF1xefym",
2706 "pqF8dkU5B1cMnyZuREaSOADYx", "uHXAyYYftCSG1o7qcCqe",
2707 "xij88brTulHYAv8IEOyU", "xpIUJeVT1Rp"
2710 def testLargeB(self):
2712 "inst-0.0.0.0-0.0.0.0",
2713 "inst-0.1.0.0-0.0.0.0",
2714 "inst-0.2.0.0-0.0.0.0",
2715 "inst-0.2.1.0-0.0.0.0",
2716 "inst-0.2.2.0-0.0.0.0",
2717 "inst-0.2.2.0-0.0.0.9",
2718 "inst-0.2.2.0-0.0.3.9",
2719 "inst-0.2.2.0-0.2.0.9",
2720 "inst-0.2.2.0-0.9.0.9",
2721 "inst-0.20.2.0-0.0.0.0",
2722 "inst-0.20.2.0-0.9.0.9",
2723 "inst-10.020.2.0-0.9.0.10",
2724 "inst-15.020.2.0-0.9.1.00",
2725 "inst-100.020.2.0-0.9.0.9",
2727 # Only the last group, not converted to a number anymore, differs
2728 "inst-100.020.2.0a999",
2729 "inst-100.020.2.0b000",
2730 "inst-100.020.2.0c10",
2731 "inst-100.020.2.0c101",
2732 "inst-100.020.2.0c2",
2733 "inst-100.020.2.0c20",
2734 "inst-100.020.2.0c3",
2735 "inst-100.020.2.0c39123",
2738 rnd = random.Random(16205)
2741 rnd.shuffle(testdata)
2742 assert testdata != data
2743 self.assertEqual(utils.NiceSort(testdata), data)
2746 def __init__(self, fn):
2750 def __call__(self, *args):
2752 return self.fn(*args)
2754 def testKeyfuncA(self):
2755 # Generate some random numbers
2756 rnd = random.Random(21131)
2757 numbers = [rnd.randint(0, 10000) for _ in range(999)]
2758 assert numbers != sorted(numbers)
2761 data = [hex(i) for i in numbers]
2764 keyfn = self._CallCount(lambda value: str(int(value, 16)))
2766 # Sort with key function converting hex to decimal
2767 result = utils.NiceSort(data, key=keyfn)
2769 self.assertEqual([hex(i) for i in sorted(numbers)], result)
2770 self.assertEqual(data, datacopy, msg="Input data was modified in NiceSort")
2771 self.assertEqual(keyfn.count, len(numbers),
2772 msg="Key function was not called once per value")
2775 def __init__(self, name, value):
2779 def testKeyfuncB(self):
2780 rnd = random.Random(27396)
2782 for i in range(123):
2783 v1 = rnd.randint(0, 5)
2784 v2 = rnd.randint(0, 5)
2785 data.append(self._TestData("inst-%s-%s-%s" % (v1, v2, i),
2788 assert data != sorted(data, key=operator.attrgetter("name"))
2790 keyfn = self._CallCount(operator.attrgetter("name"))
2793 result = utils.NiceSort(data, key=keyfn)
2795 self.assertEqual(result, sorted(data, key=operator.attrgetter("value")))
2796 self.assertEqual(keyfn.count, len(data),
2797 msg="Key function was not called once per value")
2800 if __name__ == '__main__':
2801 testutils.GanetiTestProgram()