4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
24 import distutils.version
43 from cStringIO import StringIO
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
51 ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
52 TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
53 ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
56 class TestIsProcessAlive(unittest.TestCase):
57 """Testing case for IsProcessAlive"""
61 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
63 def testNotExisting(self):
64 pid_non_existing = os.fork()
65 if pid_non_existing == 0:
67 elif pid_non_existing < 0:
68 raise SystemError("can't fork")
69 os.waitpid(pid_non_existing, 0)
70 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
71 "nonexisting process detected")
74 class TestGetProcStatusPath(unittest.TestCase):
76 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
77 self.assertNotEqual(utils._GetProcStatusPath(1),
78 utils._GetProcStatusPath(2))
81 class TestIsProcessHandlingSignal(unittest.TestCase):
83 self.tmpdir = tempfile.mkdtemp()
86 shutil.rmtree(self.tmpdir)
88 def testParseSigsetT(self):
89 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
90 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
91 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
92 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
93 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
95 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
97 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
99 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
100 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
101 24, 25, 26, 28, 31]))
102 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
104 def testGetProcStatusField(self):
105 for field in ["SigCgt", "Name", "FDSize"]:
106 for value in ["", "0", "cat", " 1234 KB"]:
107 pstatus = "\n".join([
109 "%s: %s" % (field, value),
112 result = utils._GetProcStatusField(pstatus, field)
113 self.assertEqual(result, value.strip())
116 sp = PathJoin(self.tmpdir, "status")
118 utils.WriteFile(sp, data="\n".join([
120 "State: S (sleeping)",
125 "SigBlk: 0000000000010000",
126 "SigIgn: 0000000000384004",
127 "SigCgt: 000000004b813efb",
128 "CapEff: 0000000000000000",
131 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
133 def testNoSigCgt(self):
134 sp = PathJoin(self.tmpdir, "status")
136 utils.WriteFile(sp, data="\n".join([
140 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
141 1234, 10, status_path=sp)
143 def testNoSuchFile(self):
144 sp = PathJoin(self.tmpdir, "notexist")
146 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
149 def _TestRealProcess():
150 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
151 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
152 raise Exception("SIGUSR1 is handled when it should not be")
154 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
155 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
156 raise Exception("SIGUSR1 is not handled when it should be")
158 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is not handled when it should be")
162 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
163 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is handled when it should not be")
168 def testRealProcess(self):
169 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
172 class TestPidFileFunctions(unittest.TestCase):
173 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
176 self.dir = tempfile.mkdtemp()
177 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
178 utils.DaemonPidFileName = self.f_dpn
180 def testPidFileFunctions(self):
181 pid_file = self.f_dpn('test')
182 fd = utils.WritePidFile(self.f_dpn('test'))
183 self.failUnless(os.path.exists(pid_file),
184 "PID file should have been created")
185 read_pid = utils.ReadPidFile(pid_file)
186 self.failUnlessEqual(read_pid, os.getpid())
187 self.failUnless(utils.IsProcessAlive(read_pid))
188 self.failUnlessRaises(errors.LockError, utils.WritePidFile,
191 utils.RemovePidFile('test')
192 self.failIf(os.path.exists(pid_file),
193 "PID file should not exist anymore")
194 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195 "ReadPidFile should return 0 for missing pid file")
196 fh = open(pid_file, "w")
199 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
200 "ReadPidFile should return 0 for invalid pid file")
201 # but now, even with the file existing, we should be able to lock it
202 fd = utils.WritePidFile(self.f_dpn('test'))
204 utils.RemovePidFile('test')
205 self.failIf(os.path.exists(pid_file),
206 "PID file should not exist anymore")
209 pid_file = self.f_dpn('child')
210 r_fd, w_fd = os.pipe()
212 if new_pid == 0: #child
213 utils.WritePidFile(self.f_dpn('child'))
218 # else we are in the parent
219 # wait until the child has written the pid file
221 read_pid = utils.ReadPidFile(pid_file)
222 self.failUnlessEqual(read_pid, new_pid)
223 self.failUnless(utils.IsProcessAlive(new_pid))
224 utils.KillProcess(new_pid, waitpid=True)
225 self.failIf(utils.IsProcessAlive(new_pid))
226 utils.RemovePidFile('child')
227 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
230 for name in os.listdir(self.dir):
231 os.unlink(os.path.join(self.dir, name))
235 class TestRunCmd(testutils.GanetiTestCase):
236 """Testing case for the RunCmd function"""
239 testutils.GanetiTestCase.setUp(self)
240 self.magic = time.ctime() + " ganeti test"
241 self.fname = self._CreateTempFile()
242 self.fifo_tmpdir = tempfile.mkdtemp()
243 self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
244 os.mkfifo(self.fifo_file)
247 shutil.rmtree(self.fifo_tmpdir)
250 """Test successful exit code"""
251 result = RunCmd("/bin/sh -c 'exit 0'")
252 self.assertEqual(result.exit_code, 0)
253 self.assertEqual(result.output, "")
256 """Test fail exit code"""
257 result = RunCmd("/bin/sh -c 'exit 1'")
258 self.assertEqual(result.exit_code, 1)
259 self.assertEqual(result.output, "")
261 def testStdout(self):
262 """Test standard output"""
263 cmd = 'echo -n "%s"' % self.magic
264 result = RunCmd("/bin/sh -c '%s'" % cmd)
265 self.assertEqual(result.stdout, self.magic)
266 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
267 self.assertEqual(result.output, "")
268 self.assertFileContent(self.fname, self.magic)
270 def testStderr(self):
271 """Test standard error"""
272 cmd = 'echo -n "%s"' % self.magic
273 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
274 self.assertEqual(result.stderr, self.magic)
275 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
276 self.assertEqual(result.output, "")
277 self.assertFileContent(self.fname, self.magic)
279 def testCombined(self):
280 """Test combined output"""
281 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
282 expected = "A" + self.magic + "B" + self.magic
283 result = RunCmd("/bin/sh -c '%s'" % cmd)
284 self.assertEqual(result.output, expected)
285 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
286 self.assertEqual(result.output, "")
287 self.assertFileContent(self.fname, expected)
289 def testSignal(self):
291 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
292 self.assertEqual(result.signal, 15)
293 self.assertEqual(result.output, "")
295 def testTimeoutClean(self):
296 cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
297 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
298 self.assertEqual(result.exit_code, 0)
300 def testTimeoutKill(self):
301 cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
303 out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
304 timeout, _linger_timeout=0.2)
305 self.assert_(status < 0)
306 self.assertEqual(-status, signal.SIGKILL)
308 def testTimeoutOutputAfterTerm(self):
309 cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
310 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
311 self.assert_(result.failed)
312 self.assertEqual(result.stdout, "sigtermed\n")
314 def testListRun(self):
316 result = RunCmd(["true"])
317 self.assertEqual(result.signal, None)
318 self.assertEqual(result.exit_code, 0)
319 result = RunCmd(["/bin/sh", "-c", "exit 1"])
320 self.assertEqual(result.signal, None)
321 self.assertEqual(result.exit_code, 1)
322 result = RunCmd(["echo", "-n", self.magic])
323 self.assertEqual(result.signal, None)
324 self.assertEqual(result.exit_code, 0)
325 self.assertEqual(result.stdout, self.magic)
327 def testFileEmptyOutput(self):
328 """Test file output"""
329 result = RunCmd(["true"], output=self.fname)
330 self.assertEqual(result.signal, None)
331 self.assertEqual(result.exit_code, 0)
332 self.assertFileContent(self.fname, "")
335 """Test locale environment"""
336 old_env = os.environ.copy()
338 os.environ["LANG"] = "en_US.UTF-8"
339 os.environ["LC_ALL"] = "en_US.UTF-8"
340 result = RunCmd(["locale"])
341 for line in result.output.splitlines():
342 key, value = line.split("=", 1)
343 # Ignore these variables, they're overridden by LC_ALL
344 if key == "LANG" or key == "LANGUAGE":
346 self.failIf(value and value != "C" and value != '"C"',
347 "Variable %s is set to the invalid value '%s'" % (key, value))
351 def testDefaultCwd(self):
352 """Test default working directory"""
353 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
356 """Test default working directory"""
357 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
358 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
360 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
362 def testResetEnv(self):
363 """Test environment reset functionality"""
364 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
365 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
366 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
368 def testNoFork(self):
369 """Test that nofork raise an error"""
370 assert not utils.no_fork
373 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
375 utils.no_fork = False
377 def testWrongParams(self):
378 """Test wrong parameters"""
379 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
380 output="/dev/null", interactive=True)
383 class TestRunParts(testutils.GanetiTestCase):
384 """Testing case for the RunParts function"""
387 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
390 shutil.rmtree(self.rundir)
393 """Test on an empty dir"""
394 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
396 def testSkipWrongName(self):
397 """Test that wrong files are skipped"""
398 fname = os.path.join(self.rundir, "00test.dot")
399 utils.WriteFile(fname, data="")
400 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
401 relname = os.path.basename(fname)
402 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
403 [(relname, constants.RUNPARTS_SKIP, None)])
405 def testSkipNonExec(self):
406 """Test that non executable files are skipped"""
407 fname = os.path.join(self.rundir, "00test")
408 utils.WriteFile(fname, data="")
409 relname = os.path.basename(fname)
410 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
411 [(relname, constants.RUNPARTS_SKIP, None)])
414 """Test error on a broken executable"""
415 fname = os.path.join(self.rundir, "00test")
416 utils.WriteFile(fname, data="")
417 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
418 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
419 self.failUnlessEqual(relname, os.path.basename(fname))
420 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
421 self.failUnless(error)
423 def testSorted(self):
424 """Test executions are sorted"""
426 files.append(os.path.join(self.rundir, "64test"))
427 files.append(os.path.join(self.rundir, "00test"))
428 files.append(os.path.join(self.rundir, "42test"))
431 utils.WriteFile(fname, data="")
433 results = RunParts(self.rundir, reset_env=True)
435 for fname in sorted(files):
436 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
439 """Test correct execution"""
440 fname = os.path.join(self.rundir, "00test")
441 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
442 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
443 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
444 self.failUnlessEqual(relname, os.path.basename(fname))
445 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
446 self.failUnlessEqual(runresult.stdout, "ciao")
448 def testRunFail(self):
449 """Test correct execution, with run failure"""
450 fname = os.path.join(self.rundir, "00test")
451 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
452 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
453 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
454 self.failUnlessEqual(relname, os.path.basename(fname))
455 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
456 self.failUnlessEqual(runresult.exit_code, 1)
457 self.failUnless(runresult.failed)
459 def testRunMix(self):
461 files.append(os.path.join(self.rundir, "00test"))
462 files.append(os.path.join(self.rundir, "42test"))
463 files.append(os.path.join(self.rundir, "64test"))
464 files.append(os.path.join(self.rundir, "99test"))
468 # 1st has errors in execution
469 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
470 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
473 utils.WriteFile(files[1], data="")
475 # 3rd cannot execute properly
476 utils.WriteFile(files[2], data="")
477 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
480 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
481 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
483 results = RunParts(self.rundir, reset_env=True)
485 (relname, status, runresult) = results[0]
486 self.failUnlessEqual(relname, os.path.basename(files[0]))
487 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
488 self.failUnlessEqual(runresult.exit_code, 1)
489 self.failUnless(runresult.failed)
491 (relname, status, runresult) = results[1]
492 self.failUnlessEqual(relname, os.path.basename(files[1]))
493 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
494 self.failUnlessEqual(runresult, None)
496 (relname, status, runresult) = results[2]
497 self.failUnlessEqual(relname, os.path.basename(files[2]))
498 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
499 self.failUnless(runresult)
501 (relname, status, runresult) = results[3]
502 self.failUnlessEqual(relname, os.path.basename(files[3]))
503 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
504 self.failUnlessEqual(runresult.output, "ciao")
505 self.failUnlessEqual(runresult.exit_code, 0)
506 self.failUnless(not runresult.failed)
508 def testMissingDirectory(self):
509 nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
510 self.assertEqual(RunParts(nosuchdir), [])
513 class TestStartDaemon(testutils.GanetiTestCase):
515 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
516 self.tmpfile = os.path.join(self.tmpdir, "test")
519 shutil.rmtree(self.tmpdir)
522 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
523 self._wait(self.tmpfile, 60.0, "Hello World")
525 def testShellOutput(self):
526 utils.StartDaemon("echo Hello World", output=self.tmpfile)
527 self._wait(self.tmpfile, 60.0, "Hello World")
529 def testNoShellNoOutput(self):
530 utils.StartDaemon(["pwd"])
532 def testNoShellNoOutputTouch(self):
533 testfile = os.path.join(self.tmpdir, "check")
534 self.failIf(os.path.exists(testfile))
535 utils.StartDaemon(["touch", testfile])
536 self._wait(testfile, 60.0, "")
538 def testNoShellOutput(self):
539 utils.StartDaemon(["pwd"], output=self.tmpfile)
540 self._wait(self.tmpfile, 60.0, "/")
542 def testNoShellOutputCwd(self):
543 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
544 self._wait(self.tmpfile, 60.0, os.getcwd())
546 def testShellEnv(self):
547 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
548 env={ "GNT_TEST_VAR": "Hello World", })
549 self._wait(self.tmpfile, 60.0, "Hello World")
551 def testNoShellEnv(self):
552 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
553 env={ "GNT_TEST_VAR": "Hello World", })
554 self._wait(self.tmpfile, 60.0, "Hello World")
556 def testOutputFd(self):
557 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
559 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
562 self._wait(self.tmpfile, 60.0, os.getcwd())
565 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
566 self._wait(self.tmpfile, 60.0, str(pid))
568 def testPidFile(self):
569 pidfile = os.path.join(self.tmpdir, "pid")
570 checkfile = os.path.join(self.tmpdir, "abort")
572 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
575 fd = os.open(pidfile, os.O_RDONLY)
577 # Check file is locked
578 self.assertRaises(errors.LockError, utils.LockFile, fd)
580 pidtext = os.read(fd, 100)
584 self.assertEqual(int(pidtext.strip()), pid)
586 self.assert_(utils.IsProcessAlive(pid))
588 # No matter what happens, kill daemon
589 utils.KillProcess(pid, timeout=5.0, waitpid=False)
590 self.failIf(utils.IsProcessAlive(pid))
592 self.assertEqual(utils.ReadFile(self.tmpfile), "")
594 def _wait(self, path, timeout, expected):
595 # Due to the asynchronous nature of daemon processes, polling is necessary.
596 # A timeout makes sure the test doesn't hang forever.
598 if not (os.path.isfile(path) and
599 utils.ReadFile(path).strip() == expected):
600 raise utils.RetryAgain()
603 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
604 except utils.RetryTimeout:
605 self.fail("Apparently the daemon didn't run in %s seconds and/or"
606 " didn't write the correct output" % timeout)
609 self.assertRaises(errors.OpExecError, utils.StartDaemon,
610 ["./does-NOT-EXIST/here/0123456789"])
611 self.assertRaises(errors.OpExecError, utils.StartDaemon,
612 ["./does-NOT-EXIST/here/0123456789"],
613 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
614 self.assertRaises(errors.OpExecError, utils.StartDaemon,
615 ["./does-NOT-EXIST/here/0123456789"],
616 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
617 self.assertRaises(errors.OpExecError, utils.StartDaemon,
618 ["./does-NOT-EXIST/here/0123456789"],
619 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
621 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
623 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
624 ["./does-NOT-EXIST/here/0123456789"],
625 output=self.tmpfile, output_fd=fd)
630 class TestSetCloseOnExecFlag(unittest.TestCase):
631 """Tests for SetCloseOnExecFlag"""
634 self.tmpfile = tempfile.TemporaryFile()
636 def testEnable(self):
637 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
638 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
641 def testDisable(self):
642 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
643 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
647 class TestSetNonblockFlag(unittest.TestCase):
649 self.tmpfile = tempfile.TemporaryFile()
651 def testEnable(self):
652 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
653 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
656 def testDisable(self):
657 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
658 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
662 class TestRemoveFile(unittest.TestCase):
663 """Test case for the RemoveFile function"""
666 """Create a temp dir and file for each case"""
667 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
668 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
672 if os.path.exists(self.tmpfile):
673 os.unlink(self.tmpfile)
674 os.rmdir(self.tmpdir)
676 def testIgnoreDirs(self):
677 """Test that RemoveFile() ignores directories"""
678 self.assertEqual(None, RemoveFile(self.tmpdir))
680 def testIgnoreNotExisting(self):
681 """Test that RemoveFile() ignores non-existing files"""
682 RemoveFile(self.tmpfile)
683 RemoveFile(self.tmpfile)
685 def testRemoveFile(self):
686 """Test that RemoveFile does remove a file"""
687 RemoveFile(self.tmpfile)
688 if os.path.exists(self.tmpfile):
689 self.fail("File '%s' not removed" % self.tmpfile)
691 def testRemoveSymlink(self):
692 """Test that RemoveFile does remove symlinks"""
693 symlink = self.tmpdir + "/symlink"
694 os.symlink("no-such-file", symlink)
696 if os.path.exists(symlink):
697 self.fail("File '%s' not removed" % symlink)
698 os.symlink(self.tmpfile, symlink)
700 if os.path.exists(symlink):
701 self.fail("File '%s' not removed" % symlink)
704 class TestRemoveDir(unittest.TestCase):
706 self.tmpdir = tempfile.mkdtemp()
710 shutil.rmtree(self.tmpdir)
711 except EnvironmentError:
714 def testEmptyDir(self):
715 utils.RemoveDir(self.tmpdir)
716 self.assertFalse(os.path.isdir(self.tmpdir))
718 def testNonEmptyDir(self):
719 self.tmpfile = os.path.join(self.tmpdir, "test1")
720 open(self.tmpfile, "w").close()
721 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
724 class TestRename(unittest.TestCase):
725 """Test case for RenameFile"""
728 """Create a temporary directory"""
729 self.tmpdir = tempfile.mkdtemp()
730 self.tmpfile = os.path.join(self.tmpdir, "test1")
733 open(self.tmpfile, "w").close()
736 """Remove temporary directory"""
737 shutil.rmtree(self.tmpdir)
739 def testSimpleRename1(self):
740 """Simple rename 1"""
741 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
742 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
744 def testSimpleRename2(self):
745 """Simple rename 2"""
746 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
748 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
750 def testRenameMkdir(self):
751 """Rename with mkdir"""
752 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
754 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
755 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
757 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
758 os.path.join(self.tmpdir, "test/foo/bar/baz"),
760 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
761 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
762 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
765 class TestMatchNameComponent(unittest.TestCase):
766 """Test case for the MatchNameComponent function"""
768 def testEmptyList(self):
769 """Test that there is no match against an empty list"""
771 self.failUnlessEqual(MatchNameComponent("", []), None)
772 self.failUnlessEqual(MatchNameComponent("test", []), None)
774 def testSingleMatch(self):
775 """Test that a single match is performed correctly"""
776 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
777 for key in "test2", "test2.example", "test2.example.com":
778 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
780 def testMultipleMatches(self):
781 """Test that a multiple match is returned as None"""
782 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
783 for key in "test1", "test1.example":
784 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
786 def testFullMatch(self):
787 """Test that a full match is returned correctly"""
789 key2 = "test1.example"
790 mlist = [key2, key2 + ".com"]
791 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
792 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
794 def testCaseInsensitivePartialMatch(self):
795 """Test for the case_insensitive keyword"""
796 mlist = ["test1.example.com", "test2.example.net"]
797 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
799 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
801 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
803 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
807 def testCaseInsensitiveFullMatch(self):
808 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
809 # Between the two ts1 a full string match non-case insensitive should work
810 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
812 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
814 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
816 # Between the two ts2 only case differs, so only case-match works
817 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
819 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
821 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
825 class TestReadFile(testutils.GanetiTestCase):
827 def testReadAll(self):
828 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
829 self.assertEqual(len(data), 814)
831 h = compat.md5_hash()
833 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
835 def testReadSize(self):
836 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
838 self.assertEqual(len(data), 100)
840 h = compat.md5_hash()
842 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
845 self.assertRaises(EnvironmentError, utils.ReadFile,
846 "/dev/null/does-not-exist")
849 class TestReadOneLineFile(testutils.GanetiTestCase):
852 testutils.GanetiTestCase.setUp(self)
854 def testDefault(self):
855 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
856 self.assertEqual(len(data), 27)
857 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
859 def testNotStrict(self):
860 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
861 self.assertEqual(len(data), 27)
862 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
864 def testStrictFailure(self):
865 self.assertRaises(errors.GenericError, ReadOneLineFile,
866 self._TestDataFilename("cert1.pem"), strict=True)
868 def testLongLine(self):
869 dummydata = (1024 * "Hello World! ")
870 myfile = self._CreateTempFile()
871 utils.WriteFile(myfile, data=dummydata)
872 datastrict = ReadOneLineFile(myfile, strict=True)
873 datalax = ReadOneLineFile(myfile, strict=False)
874 self.assertEqual(dummydata, datastrict)
875 self.assertEqual(dummydata, datalax)
877 def testNewline(self):
878 myfile = self._CreateTempFile()
880 for nl in ["", "\n", "\r\n"]:
881 dummydata = "%s%s" % (myline, nl)
882 utils.WriteFile(myfile, data=dummydata)
883 datalax = ReadOneLineFile(myfile, strict=False)
884 self.assertEqual(myline, datalax)
885 datastrict = ReadOneLineFile(myfile, strict=True)
886 self.assertEqual(myline, datastrict)
888 def testWhitespaceAndMultipleLines(self):
889 myfile = self._CreateTempFile()
890 for nl in ["", "\n", "\r\n"]:
891 for ws in [" ", "\t", "\t\t \t", "\t "]:
892 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
893 utils.WriteFile(myfile, data=dummydata)
894 datalax = ReadOneLineFile(myfile, strict=False)
896 self.assert_(set("\r\n") & set(dummydata))
897 self.assertRaises(errors.GenericError, ReadOneLineFile,
899 explen = len("Foo bar baz ") + len(ws)
900 self.assertEqual(len(datalax), explen)
901 self.assertEqual(datalax, dummydata[:explen])
902 self.assertFalse(set("\r\n") & set(datalax))
904 datastrict = ReadOneLineFile(myfile, strict=True)
905 self.assertEqual(dummydata, datastrict)
906 self.assertEqual(dummydata, datalax)
908 def testEmptylines(self):
909 myfile = self._CreateTempFile()
911 for nl in ["\n", "\r\n"]:
912 for ol in ["", "otherline"]:
913 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
914 utils.WriteFile(myfile, data=dummydata)
915 self.assert_(set("\r\n") & set(dummydata))
916 datalax = ReadOneLineFile(myfile, strict=False)
917 self.assertEqual(myline, datalax)
919 self.assertRaises(errors.GenericError, ReadOneLineFile,
922 datastrict = ReadOneLineFile(myfile, strict=True)
923 self.assertEqual(myline, datastrict)
925 def testEmptyfile(self):
926 myfile = self._CreateTempFile()
927 self.assertRaises(errors.GenericError, ReadOneLineFile, myfile)
930 class TestTimestampForFilename(unittest.TestCase):
932 self.assert_("." not in utils.TimestampForFilename())
933 self.assert_(":" not in utils.TimestampForFilename())
936 class TestCreateBackup(testutils.GanetiTestCase):
938 testutils.GanetiTestCase.setUp(self)
940 self.tmpdir = tempfile.mkdtemp()
943 testutils.GanetiTestCase.tearDown(self)
945 shutil.rmtree(self.tmpdir)
948 filename = PathJoin(self.tmpdir, "config.data")
949 utils.WriteFile(filename, data="")
950 bname = utils.CreateBackup(filename)
951 self.assertFileContent(bname, "")
952 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
953 utils.CreateBackup(filename)
954 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
955 utils.CreateBackup(filename)
956 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
958 fifoname = PathJoin(self.tmpdir, "fifo")
960 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
962 def testContent(self):
964 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
965 for rep in [1, 2, 10, 127]:
966 testdata = data * rep
968 filename = PathJoin(self.tmpdir, "test.data_")
969 utils.WriteFile(filename, data=testdata)
970 self.assertFileContent(filename, testdata)
973 bname = utils.CreateBackup(filename)
975 self.assertFileContent(bname, testdata)
976 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
979 class TestFormatUnit(unittest.TestCase):
980 """Test case for the FormatUnit function"""
983 self.assertEqual(FormatUnit(1, 'h'), '1M')
984 self.assertEqual(FormatUnit(100, 'h'), '100M')
985 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
987 self.assertEqual(FormatUnit(1, 'm'), '1')
988 self.assertEqual(FormatUnit(100, 'm'), '100')
989 self.assertEqual(FormatUnit(1023, 'm'), '1023')
991 self.assertEqual(FormatUnit(1024, 'm'), '1024')
992 self.assertEqual(FormatUnit(1536, 'm'), '1536')
993 self.assertEqual(FormatUnit(17133, 'm'), '17133')
994 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
997 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
998 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
999 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
1000 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
1002 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
1003 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
1004 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
1005 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
1007 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
1008 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
1009 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
1012 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
1013 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
1014 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
1016 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
1017 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
1018 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
1020 def testErrors(self):
1021 self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a")
1024 class TestParseUnit(unittest.TestCase):
1025 """Test case for the ParseUnit function"""
1028 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
1029 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
1030 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
1032 def testRounding(self):
1033 self.assertEqual(ParseUnit('0'), 0)
1034 self.assertEqual(ParseUnit('1'), 4)
1035 self.assertEqual(ParseUnit('2'), 4)
1036 self.assertEqual(ParseUnit('3'), 4)
1038 self.assertEqual(ParseUnit('124'), 124)
1039 self.assertEqual(ParseUnit('125'), 128)
1040 self.assertEqual(ParseUnit('126'), 128)
1041 self.assertEqual(ParseUnit('127'), 128)
1042 self.assertEqual(ParseUnit('128'), 128)
1043 self.assertEqual(ParseUnit('129'), 132)
1044 self.assertEqual(ParseUnit('130'), 132)
1046 def testFloating(self):
1047 self.assertEqual(ParseUnit('0'), 0)
1048 self.assertEqual(ParseUnit('0.5'), 4)
1049 self.assertEqual(ParseUnit('1.75'), 4)
1050 self.assertEqual(ParseUnit('1.99'), 4)
1051 self.assertEqual(ParseUnit('2.00'), 4)
1052 self.assertEqual(ParseUnit('2.01'), 4)
1053 self.assertEqual(ParseUnit('3.99'), 4)
1054 self.assertEqual(ParseUnit('4.00'), 4)
1055 self.assertEqual(ParseUnit('4.01'), 8)
1056 self.assertEqual(ParseUnit('1.5G'), 1536)
1057 self.assertEqual(ParseUnit('1.8G'), 1844)
1058 self.assertEqual(ParseUnit('8.28T'), 8682212)
1060 def testSuffixes(self):
1061 for sep in ('', ' ', ' ', "\t", "\t "):
1062 for suffix, scale in TestParseUnit.SCALES:
1063 for func in (lambda x: x, str.lower, str.upper):
1064 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
1067 def testInvalidInput(self):
1068 for sep in ('-', '_', ',', 'a'):
1069 for suffix, _ in TestParseUnit.SCALES:
1070 self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
1072 for suffix, _ in TestParseUnit.SCALES:
1073 self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
1076 class TestParseCpuMask(unittest.TestCase):
1077 """Test case for the ParseCpuMask function."""
1079 def testWellFormed(self):
1080 self.assertEqual(utils.ParseCpuMask(""), [])
1081 self.assertEqual(utils.ParseCpuMask("1"), [1])
1082 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
1084 def testInvalidInput(self):
1085 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
1086 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
1089 class TestSshKeys(testutils.GanetiTestCase):
1090 """Test case for the AddAuthorizedKey function"""
1092 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1093 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1094 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1097 testutils.GanetiTestCase.setUp(self)
1098 self.tmpname = self._CreateTempFile()
1099 handle = open(self.tmpname, 'w')
1101 handle.write("%s\n" % TestSshKeys.KEY_A)
1102 handle.write("%s\n" % TestSshKeys.KEY_B)
1106 def testAddingNewKey(self):
1107 utils.AddAuthorizedKey(self.tmpname,
1108 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1110 self.assertFileContent(self.tmpname,
1111 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1112 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1113 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1114 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1116 def testAddingAlmostButNotCompletelyTheSameKey(self):
1117 utils.AddAuthorizedKey(self.tmpname,
1118 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1120 self.assertFileContent(self.tmpname,
1121 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1122 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1123 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1124 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1126 def testAddingExistingKeyWithSomeMoreSpaces(self):
1127 utils.AddAuthorizedKey(self.tmpname,
1128 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1130 self.assertFileContent(self.tmpname,
1131 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1132 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1133 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1135 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1136 utils.RemoveAuthorizedKey(self.tmpname,
1137 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1139 self.assertFileContent(self.tmpname,
1140 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1141 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1143 def testRemovingNonExistingKey(self):
1144 utils.RemoveAuthorizedKey(self.tmpname,
1145 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1147 self.assertFileContent(self.tmpname,
1148 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1149 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1150 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1153 class TestEtcHosts(testutils.GanetiTestCase):
1154 """Test functions modifying /etc/hosts"""
1157 testutils.GanetiTestCase.setUp(self)
1158 self.tmpname = self._CreateTempFile()
1159 handle = open(self.tmpname, 'w')
1161 handle.write('# This is a test file for /etc/hosts\n')
1162 handle.write('127.0.0.1\tlocalhost\n')
1163 handle.write('192.0.2.1 router gw\n')
1167 def testSettingNewIp(self):
1168 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1171 self.assertFileContent(self.tmpname,
1172 "# This is a test file for /etc/hosts\n"
1173 "127.0.0.1\tlocalhost\n"
1174 "192.0.2.1 router gw\n"
1175 "198.51.100.4\tmyhost.example.com myhost\n")
1176 self.assertFileMode(self.tmpname, 0644)
1178 def testSettingExistingIp(self):
1179 SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1182 self.assertFileContent(self.tmpname,
1183 "# This is a test file for /etc/hosts\n"
1184 "127.0.0.1\tlocalhost\n"
1185 "192.0.2.1\tmyhost.example.com myhost\n")
1186 self.assertFileMode(self.tmpname, 0644)
1188 def testSettingDuplicateName(self):
1189 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1191 self.assertFileContent(self.tmpname,
1192 "# This is a test file for /etc/hosts\n"
1193 "127.0.0.1\tlocalhost\n"
1194 "192.0.2.1 router gw\n"
1195 "198.51.100.4\tmyhost\n")
1196 self.assertFileMode(self.tmpname, 0644)
1198 def testRemovingExistingHost(self):
1199 RemoveEtcHostsEntry(self.tmpname, 'router')
1201 self.assertFileContent(self.tmpname,
1202 "# This is a test file for /etc/hosts\n"
1203 "127.0.0.1\tlocalhost\n"
1205 self.assertFileMode(self.tmpname, 0644)
1207 def testRemovingSingleExistingHost(self):
1208 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1210 self.assertFileContent(self.tmpname,
1211 "# This is a test file for /etc/hosts\n"
1212 "192.0.2.1 router gw\n")
1213 self.assertFileMode(self.tmpname, 0644)
1215 def testRemovingNonExistingHost(self):
1216 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1218 self.assertFileContent(self.tmpname,
1219 "# This is a test file for /etc/hosts\n"
1220 "127.0.0.1\tlocalhost\n"
1221 "192.0.2.1 router gw\n")
1222 self.assertFileMode(self.tmpname, 0644)
1224 def testRemovingAlias(self):
1225 RemoveEtcHostsEntry(self.tmpname, 'gw')
1227 self.assertFileContent(self.tmpname,
1228 "# This is a test file for /etc/hosts\n"
1229 "127.0.0.1\tlocalhost\n"
1230 "192.0.2.1 router\n")
1231 self.assertFileMode(self.tmpname, 0644)
1234 class TestGetMounts(unittest.TestCase):
1235 """Test case for GetMounts()."""
1238 "rootfs / rootfs rw 0 0\n"
1239 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1240 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1243 self.tmpfile = tempfile.NamedTemporaryFile()
1244 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1246 def testGetMounts(self):
1247 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1249 ("rootfs", "/", "rootfs", "rw"),
1250 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1251 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1255 class TestShellQuoting(unittest.TestCase):
1256 """Test case for shell quoting functions"""
1258 def testShellQuote(self):
1259 self.assertEqual(ShellQuote('abc'), "abc")
1260 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1261 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1262 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1263 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1265 def testShellQuoteArgs(self):
1266 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1267 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1268 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1271 class TestListVisibleFiles(unittest.TestCase):
1272 """Test case for ListVisibleFiles"""
1275 self.path = tempfile.mkdtemp()
1278 shutil.rmtree(self.path)
1280 def _CreateFiles(self, files):
1282 utils.WriteFile(os.path.join(self.path, name), data="test")
1284 def _test(self, files, expected):
1285 self._CreateFiles(files)
1286 found = ListVisibleFiles(self.path)
1287 self.assertEqual(set(found), set(expected))
1289 def testAllVisible(self):
1290 files = ["a", "b", "c"]
1292 self._test(files, expected)
1294 def testNoneVisible(self):
1295 files = [".a", ".b", ".c"]
1297 self._test(files, expected)
1299 def testSomeVisible(self):
1300 files = ["a", "b", ".c"]
1301 expected = ["a", "b"]
1302 self._test(files, expected)
1304 def testNonAbsolutePath(self):
1305 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1307 def testNonNormalizedPath(self):
1308 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1312 class TestNewUUID(unittest.TestCase):
1313 """Test case for NewUUID"""
1316 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
1319 class TestUniqueSequence(unittest.TestCase):
1320 """Test case for UniqueSequence"""
1322 def _test(self, input, expected):
1323 self.assertEqual(utils.UniqueSequence(input), expected)
1327 self._test([1, 2, 3], [1, 2, 3])
1328 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1329 self._test([1, 2, 2, 3], [1, 2, 3])
1330 self._test([1, 2, 3, 3], [1, 2, 3])
1333 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1334 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1337 self._test(["a", "a"], ["a"])
1338 self._test(["a", "b"], ["a", "b"])
1339 self._test(["a", "b", "a"], ["a", "b"])
1342 class TestFindDuplicates(unittest.TestCase):
1343 """Test case for FindDuplicates"""
1345 def _Test(self, seq, expected):
1346 result = utils.FindDuplicates(seq)
1347 self.assertEqual(result, utils.UniqueSequence(result))
1348 self.assertEqual(set(result), set(expected))
1352 self._Test([1, 2, 3], [])
1353 self._Test([9, 8, 8, 0, 5, 1, 7, 0, 6, 7], [8, 0, 7])
1354 for exp in [[1, 2, 3], [3, 2, 1]]:
1355 self._Test([1, 1, 2, 2, 3, 3], exp)
1357 self._Test(["A", "a", "B"], [])
1358 self._Test(["a", "A", "a", "B"], ["a"])
1359 self._Test("Hello World out there!", ["e", " ", "o", "r", "t", "l"])
1361 self._Test(self._Gen(False), [])
1362 self._Test(self._Gen(True), range(1, 10))
1373 class TestFirstFree(unittest.TestCase):
1374 """Test case for the FirstFree function"""
1377 """Test FirstFree"""
1378 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1379 self.failUnlessEqual(FirstFree([]), None)
1380 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1381 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1382 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1385 class TestTailFile(testutils.GanetiTestCase):
1386 """Test case for the TailFile function"""
1388 def testEmpty(self):
1389 fname = self._CreateTempFile()
1390 self.failUnlessEqual(TailFile(fname), [])
1391 self.failUnlessEqual(TailFile(fname, lines=25), [])
1393 def testAllLines(self):
1394 data = ["test %d" % i for i in range(30)]
1396 fname = self._CreateTempFile()
1397 fd = open(fname, "w")
1398 fd.write("\n".join(data[:i]))
1402 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1404 def testPartialLines(self):
1405 data = ["test %d" % i for i in range(30)]
1406 fname = self._CreateTempFile()
1407 fd = open(fname, "w")
1408 fd.write("\n".join(data))
1411 for i in range(1, 30):
1412 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1414 def testBigFile(self):
1415 data = ["test %d" % i for i in range(30)]
1416 fname = self._CreateTempFile()
1417 fd = open(fname, "w")
1418 fd.write("X" * 1048576)
1420 fd.write("\n".join(data))
1423 for i in range(1, 30):
1424 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1427 class _BaseFileLockTest:
1428 """Test case for the FileLock class"""
1430 def testSharedNonblocking(self):
1431 self.lock.Shared(blocking=False)
1434 def testExclusiveNonblocking(self):
1435 self.lock.Exclusive(blocking=False)
1438 def testUnlockNonblocking(self):
1439 self.lock.Unlock(blocking=False)
1442 def testSharedBlocking(self):
1443 self.lock.Shared(blocking=True)
1446 def testExclusiveBlocking(self):
1447 self.lock.Exclusive(blocking=True)
1450 def testUnlockBlocking(self):
1451 self.lock.Unlock(blocking=True)
1454 def testSharedExclusiveUnlock(self):
1455 self.lock.Shared(blocking=False)
1456 self.lock.Exclusive(blocking=False)
1457 self.lock.Unlock(blocking=False)
1460 def testExclusiveSharedUnlock(self):
1461 self.lock.Exclusive(blocking=False)
1462 self.lock.Shared(blocking=False)
1463 self.lock.Unlock(blocking=False)
1466 def testSimpleTimeout(self):
1467 # These will succeed on the first attempt, hence a short timeout
1468 self.lock.Shared(blocking=True, timeout=10.0)
1469 self.lock.Exclusive(blocking=False, timeout=10.0)
1470 self.lock.Unlock(blocking=True, timeout=10.0)
1474 def _TryLockInner(filename, shared, blocking):
1475 lock = utils.FileLock.Open(filename)
1483 # The timeout doesn't really matter as the parent process waits for us to
1485 fn(blocking=blocking, timeout=0.01)
1486 except errors.LockError, err:
1491 def _TryLock(self, *args):
1492 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1495 def testTimeout(self):
1496 for blocking in [True, False]:
1497 self.lock.Exclusive(blocking=True)
1498 self.failIf(self._TryLock(False, blocking))
1499 self.failIf(self._TryLock(True, blocking))
1501 self.lock.Shared(blocking=True)
1502 self.assert_(self._TryLock(True, blocking))
1503 self.failIf(self._TryLock(False, blocking))
1505 def testCloseShared(self):
1507 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1509 def testCloseExclusive(self):
1511 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1513 def testCloseUnlock(self):
1515 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1518 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1519 TESTDATA = "Hello World\n" * 10
1522 testutils.GanetiTestCase.setUp(self)
1524 self.tmpfile = tempfile.NamedTemporaryFile()
1525 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1526 self.lock = utils.FileLock.Open(self.tmpfile.name)
1528 # Ensure "Open" didn't truncate file
1529 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1532 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1534 testutils.GanetiTestCase.tearDown(self)
1537 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1539 self.tmpfile = tempfile.NamedTemporaryFile()
1540 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1543 class TestTimeFunctions(unittest.TestCase):
1544 """Test case for time functions"""
1547 self.assertEqual(utils.SplitTime(1), (1, 0))
1548 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1549 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1550 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1551 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1552 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1553 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1554 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1556 self.assertRaises(AssertionError, utils.SplitTime, -1)
1558 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1559 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1560 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1562 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1564 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1566 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1567 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1568 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1569 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1570 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1573 class FieldSetTestCase(unittest.TestCase):
1574 """Test case for FieldSets"""
1576 def testSimpleMatch(self):
1577 f = utils.FieldSet("a", "b", "c", "def")
1578 self.failUnless(f.Matches("a"))
1579 self.failIf(f.Matches("d"), "Substring matched")
1580 self.failIf(f.Matches("defghi"), "Prefix string matched")
1581 self.failIf(f.NonMatching(["b", "c"]))
1582 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1583 self.failUnless(f.NonMatching(["a", "d"]))
1585 def testRegexMatch(self):
1586 f = utils.FieldSet("a", "b([0-9]+)", "c")
1587 self.failUnless(f.Matches("b1"))
1588 self.failUnless(f.Matches("b99"))
1589 self.failIf(f.Matches("b/1"))
1590 self.failIf(f.NonMatching(["b12", "c"]))
1591 self.failUnless(f.NonMatching(["a", "1"]))
1593 class TestForceDictType(unittest.TestCase):
1594 """Test case for ForceDictType"""
1596 "a": constants.VTYPE_INT,
1597 "b": constants.VTYPE_BOOL,
1598 "c": constants.VTYPE_STRING,
1599 "d": constants.VTYPE_SIZE,
1600 "e": constants.VTYPE_MAYBE_STRING,
1603 def _fdt(self, dict, allowed_values=None):
1604 if allowed_values is None:
1605 utils.ForceDictType(dict, self.KEY_TYPES)
1607 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
1611 def testSimpleDict(self):
1612 self.assertEqual(self._fdt({}), {})
1613 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1614 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1615 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1616 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1617 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1618 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1619 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1620 self.assertEqual(self._fdt({'b': False}), {'b': False})
1621 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1622 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1623 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1624 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1625 self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1626 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1627 self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1628 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
1630 def testErrors(self):
1631 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1632 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
1633 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1634 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1635 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1636 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1637 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1638 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
1639 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
1640 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
1641 {"b": "hello"}, {"b": "no-such-type"})
1644 class TestIsNormAbsPath(unittest.TestCase):
1645 """Testing case for IsNormAbsPath"""
1647 def _pathTestHelper(self, path, result):
1649 self.assert_(utils.IsNormAbsPath(path),
1650 "Path %s should result absolute and normalized" % path)
1652 self.assertFalse(utils.IsNormAbsPath(path),
1653 "Path %s should not result absolute and normalized" % path)
1656 self._pathTestHelper('/etc', True)
1657 self._pathTestHelper('/srv', True)
1658 self._pathTestHelper('etc', False)
1659 self._pathTestHelper('/etc/../root', False)
1660 self._pathTestHelper('/etc/', False)
1663 class TestSafeEncode(unittest.TestCase):
1664 """Test case for SafeEncode"""
1666 def testAscii(self):
1667 for txt in [string.digits, string.letters, string.punctuation]:
1668 self.failUnlessEqual(txt, SafeEncode(txt))
1670 def testDoubleEncode(self):
1671 for i in range(255):
1672 txt = SafeEncode(chr(i))
1673 self.failUnlessEqual(txt, SafeEncode(txt))
1675 def testUnicode(self):
1676 # 1024 is high enough to catch non-direct ASCII mappings
1677 for i in range(1024):
1678 txt = SafeEncode(unichr(i))
1679 self.failUnlessEqual(txt, SafeEncode(txt))
1682 class TestFormatTime(unittest.TestCase):
1683 """Testing case for FormatTime"""
1686 def _TestInProcess(tz, timestamp, expected):
1687 os.environ["TZ"] = tz
1689 return utils.FormatTime(timestamp) == expected
1691 def _Test(self, *args):
1692 # Need to use separate process as we want to change TZ
1693 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
1696 self._Test("UTC", 0, "1970-01-01 00:00:00")
1697 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
1698 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
1699 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
1700 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
1703 self.failUnlessEqual(FormatTime(None), "N/A")
1705 def testInvalid(self):
1706 self.failUnlessEqual(FormatTime(()), "N/A")
1709 # tests that we accept time.time input
1710 FormatTime(time.time())
1711 # tests that we accept int input
1712 FormatTime(int(time.time()))
1715 class TestFormatTimestampWithTZ(unittest.TestCase):
1717 def _TestInProcess(tz, timestamp, expected):
1718 os.environ["TZ"] = tz
1720 return utils.FormatTimestampWithTZ(timestamp) == expected
1722 def _Test(self, *args):
1723 # Need to use separate process as we want to change TZ
1724 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
1727 self._Test("UTC", 0, "1970-01-01 00:00:00 UTC")
1728 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46 BRST")
1729 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46 GMT")
1730 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46 CET")
1731 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46 EST")
1734 class RunInSeparateProcess(unittest.TestCase):
1736 for exp in [True, False]:
1740 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1743 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1744 def _child(carg1, carg2):
1745 return carg1 == "Foo" and carg2 == arg
1747 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1750 parent_pid = os.getpid()
1753 return os.getpid() == parent_pid
1755 self.failIf(utils.RunInSeparateProcess(_check))
1757 def testSignal(self):
1759 os.kill(os.getpid(), signal.SIGTERM)
1761 self.assertRaises(errors.GenericError,
1762 utils.RunInSeparateProcess, _kill)
1764 def testException(self):
1766 raise errors.GenericError("This is a test")
1768 self.assertRaises(errors.GenericError,
1769 utils.RunInSeparateProcess, _exc)
1772 class TestFingerprintFiles(unittest.TestCase):
1774 self.tmpfile = tempfile.NamedTemporaryFile()
1775 self.tmpfile2 = tempfile.NamedTemporaryFile()
1776 utils.WriteFile(self.tmpfile2.name, data="Hello World\n")
1778 self.tmpfile.name: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
1779 self.tmpfile2.name: "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a",
1782 def testSingleFile(self):
1783 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1784 self.results[self.tmpfile.name])
1786 self.assertEqual(utils._FingerprintFile("/no/such/file"), None)
1788 def testBigFile(self):
1789 self.tmpfile.write("A" * 8192)
1790 self.tmpfile.flush()
1791 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1792 "35b6795ca20d6dc0aff8c7c110c96cd1070b8c38")
1794 def testMultiple(self):
1795 all_files = self.results.keys()
1796 all_files.append("/no/such/file")
1797 self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results)
1800 class TestUnescapeAndSplit(unittest.TestCase):
1801 """Testing case for UnescapeAndSplit"""
1804 # testing more that one separator for regexp safety
1805 self._seps = [",", "+", "."]
1807 def testSimple(self):
1808 a = ["a", "b", "c", "d"]
1809 for sep in self._seps:
1810 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1812 def testEscape(self):
1813 for sep in self._seps:
1814 a = ["a", "b\\" + sep + "c", "d"]
1815 b = ["a", "b" + sep + "c", "d"]
1816 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1818 def testDoubleEscape(self):
1819 for sep in self._seps:
1820 a = ["a", "b\\\\", "c", "d"]
1821 b = ["a", "b\\", "c", "d"]
1822 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1824 def testThreeEscape(self):
1825 for sep in self._seps:
1826 a = ["a", "b\\\\\\" + sep + "c", "d"]
1827 b = ["a", "b\\" + sep + "c", "d"]
1828 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1831 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1833 self.tmpdir = tempfile.mkdtemp()
1836 shutil.rmtree(self.tmpdir)
1838 def _checkRsaPrivateKey(self, key):
1839 lines = key.splitlines()
1840 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1841 "-----END RSA PRIVATE KEY-----" in lines)
1843 def _checkCertificate(self, cert):
1844 lines = cert.splitlines()
1845 return ("-----BEGIN CERTIFICATE-----" in lines and
1846 "-----END CERTIFICATE-----" in lines)
1849 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1850 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1851 self._checkRsaPrivateKey(key_pem)
1852 self._checkCertificate(cert_pem)
1854 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1856 self.assert_(key.bits() >= 1024)
1857 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1858 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1860 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1862 self.failIf(x509.has_expired())
1863 self.assertEqual(x509.get_issuer().CN, common_name)
1864 self.assertEqual(x509.get_subject().CN, common_name)
1865 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1867 def testLegacy(self):
1868 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1870 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1872 cert1 = utils.ReadFile(cert1_filename)
1874 self.assert_(self._checkRsaPrivateKey(cert1))
1875 self.assert_(self._checkCertificate(cert1))
1878 class TestPathJoin(unittest.TestCase):
1879 """Testing case for PathJoin"""
1881 def testBasicItems(self):
1882 mlist = ["/a", "b", "c"]
1883 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1885 def testNonAbsPrefix(self):
1886 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1888 def testBackTrack(self):
1889 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1891 def testMultiAbs(self):
1892 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1895 class TestValidateServiceName(unittest.TestCase):
1896 def testValid(self):
1898 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1903 "0", "80", "1111", "65535",
1906 for name in testnames:
1907 self.assertEqual(utils.ValidateServiceName(name), name)
1909 def testInvalid(self):
1911 -15756, -1, 65536, 133428083,
1912 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1913 "-8546", "-1", "65536",
1917 for name in testnames:
1918 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1921 class TestParseAsn1Generalizedtime(unittest.TestCase):
1924 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1925 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1927 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1931 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1933 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1935 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1937 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1939 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1942 # Leap seconds are not supported by datetime.datetime
1943 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1944 "19841231235960+0000")
1945 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1946 "19920630235960+0000")
1949 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1950 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1951 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1953 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1954 "Mon Feb 22 17:47:02 UTC 2010")
1955 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1956 "2010-02-22 17:42:02")
1959 class TestGetX509CertValidity(testutils.GanetiTestCase):
1961 testutils.GanetiTestCase.setUp(self)
1963 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1965 # Test whether we have pyOpenSSL 0.7 or above
1966 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1968 if not self.pyopenssl0_7:
1969 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1970 " function correctly")
1972 def _LoadCert(self, name):
1973 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1974 self._ReadTestData(name))
1977 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1978 if self.pyopenssl0_7:
1979 self.assertEqual(validity, (1266919967, 1267524767))
1981 self.assertEqual(validity, (None, None))
1984 class TestSignX509Certificate(unittest.TestCase):
1985 KEY = "My private key!"
1986 KEY_OTHER = "Another key"
1989 # Generate certificate valid for 5 minutes
1990 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1992 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1995 # No signature at all
1996 self.assertRaises(errors.GenericError,
1997 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
2000 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2002 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2003 "X-Ganeti-Signature: \n", self.KEY)
2004 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2005 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
2006 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2007 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
2008 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2009 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
2012 for salt in list("-_@$,:;/\\ \t\n"):
2013 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
2014 cert_pem, self.KEY, "foo%sbar" % salt)
2016 for salt in ["HelloWorld", "salt", string.letters, string.digits,
2017 utils.GenerateSecret(numbytes=4),
2018 utils.GenerateSecret(numbytes=16),
2019 "{123:456}".encode("hex")]:
2020 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2022 self._Check(cert, salt, signed_pem)
2024 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2025 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2026 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2027 "lines----\n------ at\nthe end!"))
2029 def _Check(self, cert, salt, pem):
2030 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2031 self.assertEqual(salt, salt2)
2032 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2035 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2036 pem, self.KEY_OTHER)
2039 class TestMakedirs(unittest.TestCase):
2041 self.tmpdir = tempfile.mkdtemp()
2044 shutil.rmtree(self.tmpdir)
2046 def testNonExisting(self):
2047 path = PathJoin(self.tmpdir, "foo")
2048 utils.Makedirs(path)
2049 self.assert_(os.path.isdir(path))
2051 def testExisting(self):
2052 path = PathJoin(self.tmpdir, "foo")
2054 utils.Makedirs(path)
2055 self.assert_(os.path.isdir(path))
2057 def testRecursiveNonExisting(self):
2058 path = PathJoin(self.tmpdir, "foo/bar/baz")
2059 utils.Makedirs(path)
2060 self.assert_(os.path.isdir(path))
2062 def testRecursiveExisting(self):
2063 path = PathJoin(self.tmpdir, "B/moo/xyz")
2064 self.assertFalse(os.path.exists(path))
2065 os.mkdir(PathJoin(self.tmpdir, "B"))
2066 utils.Makedirs(path)
2067 self.assert_(os.path.isdir(path))
2070 class TestRetry(testutils.GanetiTestCase):
2072 testutils.GanetiTestCase.setUp(self)
2076 def _RaiseRetryAgain():
2077 raise utils.RetryAgain()
2080 def _RaiseRetryAgainWithArg(args):
2081 raise utils.RetryAgain(*args)
2083 def _WrongNestedLoop(self):
2084 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2086 def _RetryAndSucceed(self, retries):
2087 if self.retries < retries:
2089 raise utils.RetryAgain()
2093 def testRaiseTimeout(self):
2094 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2095 self._RaiseRetryAgain, 0.01, 0.02)
2096 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2097 self._RetryAndSucceed, 0.01, 0, args=[1])
2098 self.failUnlessEqual(self.retries, 1)
2100 def testComplete(self):
2101 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2102 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2104 self.failUnlessEqual(self.retries, 2)
2106 def testNestedLoop(self):
2108 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2109 self._WrongNestedLoop, 0, 1)
2110 except utils.RetryTimeout:
2111 self.fail("Didn't detect inner loop's exception")
2113 def testTimeoutArgument(self):
2114 retry_arg="my_important_debugging_message"
2116 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2117 except utils.RetryTimeout, err:
2118 self.failUnlessEqual(err.args, (retry_arg, ))
2120 self.fail("Expected timeout didn't happen")
2122 def testRaiseInnerWithExc(self):
2123 retry_arg="my_important_debugging_message"
2126 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2127 args=[[errors.GenericError(retry_arg, retry_arg)]])
2128 except utils.RetryTimeout, err:
2131 self.fail("Expected timeout didn't happen")
2132 except errors.GenericError, err:
2133 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2135 self.fail("Expected GenericError didn't happen")
2137 def testRaiseInnerWithMsg(self):
2138 retry_arg="my_important_debugging_message"
2141 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2142 args=[[retry_arg, retry_arg]])
2143 except utils.RetryTimeout, err:
2146 self.fail("Expected timeout didn't happen")
2147 except utils.RetryTimeout, err:
2148 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2150 self.fail("Expected RetryTimeout didn't happen")
2153 class TestLineSplitter(unittest.TestCase):
2156 ls = utils.LineSplitter(lines.append)
2157 ls.write("Hello World\n")
2158 self.assertEqual(lines, [])
2159 ls.write("Foo\n Bar\r\n ")
2162 self.assertEqual(lines, [])
2164 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2166 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2168 def _testExtra(self, line, all_lines, p1, p2):
2169 self.assertEqual(p1, 999)
2170 self.assertEqual(p2, "extra")
2171 all_lines.append(line)
2173 def testExtraArgsNoFlush(self):
2175 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2176 ls.write("\n\nHello World\n")
2177 ls.write("Foo\n Bar\r\n ")
2180 ls.write("Moo\n\nx\n")
2181 self.assertEqual(lines, [])
2183 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2187 class TestReadLockedPidFile(unittest.TestCase):
2189 self.tmpdir = tempfile.mkdtemp()
2192 shutil.rmtree(self.tmpdir)
2194 def testNonExistent(self):
2195 path = PathJoin(self.tmpdir, "nonexist")
2196 self.assert_(utils.ReadLockedPidFile(path) is None)
2198 def testUnlocked(self):
2199 path = PathJoin(self.tmpdir, "pid")
2200 utils.WriteFile(path, data="123")
2201 self.assert_(utils.ReadLockedPidFile(path) is None)
2203 def testLocked(self):
2204 path = PathJoin(self.tmpdir, "pid")
2205 utils.WriteFile(path, data="123")
2207 fl = utils.FileLock.Open(path)
2209 fl.Exclusive(blocking=True)
2211 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2215 self.assert_(utils.ReadLockedPidFile(path) is None)
2217 def testError(self):
2218 path = PathJoin(self.tmpdir, "foobar", "pid")
2219 utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2220 # open(2) should return ENOTDIR
2221 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2224 class TestCertVerification(testutils.GanetiTestCase):
2226 testutils.GanetiTestCase.setUp(self)
2228 self.tmpdir = tempfile.mkdtemp()
2231 shutil.rmtree(self.tmpdir)
2233 def testVerifyCertificate(self):
2234 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2235 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2238 # Not checking return value as this certificate is expired
2239 utils.VerifyX509Certificate(cert, 30, 7)
2242 class TestVerifyCertificateInner(unittest.TestCase):
2244 vci = utils._VerifyCertificateInner
2247 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2251 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2252 self.assertEqual(errcode, utils.CERT_WARNING)
2255 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2256 self.assertEqual(errcode, utils.CERT_ERROR)
2258 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2259 self.assertEqual(errcode, utils.CERT_WARNING)
2261 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2262 self.assertEqual(errcode, None)
2265 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2266 self.assertEqual(errcode, utils.CERT_ERROR)
2268 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2269 self.assertEqual(errcode, utils.CERT_ERROR)
2271 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2272 self.assertEqual(errcode, utils.CERT_ERROR)
2274 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2275 self.assertEqual(errcode, utils.CERT_ERROR)
2278 class TestHmacFunctions(unittest.TestCase):
2279 # Digests can be checked with "openssl sha1 -hmac $key"
2280 def testSha1Hmac(self):
2281 self.assertEqual(utils.Sha1Hmac("", ""),
2282 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2283 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2284 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2285 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2286 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2288 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2289 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2290 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2292 def testSha1HmacSalt(self):
2293 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2294 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2295 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2296 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2297 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2298 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2300 def testVerifySha1Hmac(self):
2301 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2302 "7d64b71fb76370690e1d")))
2303 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2304 ("f904c2476527c6d3e660"
2305 "9ab683c66fa0652cb1dc")))
2307 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2308 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2309 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2311 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2313 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2316 def testVerifySha1HmacSalt(self):
2317 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2318 ("17a4adc34d69c0d367d4"
2319 "ffbef96fd41d4df7a6e8"),
2321 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2322 ("7f264f8114c9066afc9b"
2323 "b7636e1786d996d3cc0d"),
2327 class TestIgnoreSignals(unittest.TestCase):
2328 """Test the IgnoreSignals decorator"""
2331 def _Raise(exception):
2338 def testIgnoreSignals(self):
2339 sock_err_intr = socket.error(errno.EINTR, "Message")
2340 sock_err_inval = socket.error(errno.EINVAL, "Message")
2342 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2343 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2345 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2346 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2347 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2348 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2350 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2351 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2352 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2354 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2357 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2358 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2361 class TestEnsureDirs(unittest.TestCase):
2362 """Tests for EnsureDirs"""
2365 self.dir = tempfile.mkdtemp()
2366 self.old_umask = os.umask(0777)
2368 def testEnsureDirs(self):
2370 (PathJoin(self.dir, "foo"), 0777),
2371 (PathJoin(self.dir, "bar"), 0000),
2373 self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2374 self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2377 os.rmdir(PathJoin(self.dir, "foo"))
2378 os.rmdir(PathJoin(self.dir, "bar"))
2380 os.umask(self.old_umask)
2383 class TestFormatSeconds(unittest.TestCase):
2385 self.assertEqual(utils.FormatSeconds(1), "1s")
2386 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2387 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2388 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2389 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2390 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2391 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2392 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2393 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2394 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2395 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2397 def testFloat(self):
2398 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2399 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2400 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2401 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2404 class TestIgnoreProcessNotFound(unittest.TestCase):
2407 os.write(fd, str(os.getpid()))
2412 (pid_read_fd, pid_write_fd) = os.pipe()
2414 # Start short-lived process which writes its PID to pipe
2415 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2416 os.close(pid_write_fd)
2418 # Read PID from pipe
2419 pid = int(os.read(pid_read_fd, 1024))
2420 os.close(pid_read_fd)
2422 # Try to send signal to process which exited recently
2423 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2426 class TestShellWriter(unittest.TestCase):
2429 sw = utils.ShellWriter(buf)
2430 sw.Write("#!/bin/bash")
2431 sw.Write("if true; then")
2434 sw.Write("echo true")
2436 sw.Write("for i in 1 2 3")
2440 self.assertEqual(sw._indent, 2)
2447 sw.Write("echo %s", utils.ShellQuote("Hello World"))
2450 self.assertEqual(sw._indent, 0)
2452 output = buf.getvalue()
2454 self.assert_(output.endswith("\n"))
2456 lines = output.splitlines()
2457 self.assertEqual(len(lines), 9)
2458 self.assertEqual(lines[0], "#!/bin/bash")
2459 self.assert_(re.match(r"^\s+date$", lines[5]))
2460 self.assertEqual(lines[7], "echo 'Hello World'")
2462 def testEmpty(self):
2464 sw = utils.ShellWriter(buf)
2466 self.assertEqual(buf.getvalue(), "")
2469 class TestCommaJoin(unittest.TestCase):
2471 self.assertEqual(utils.CommaJoin([]), "")
2472 self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
2473 self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
2474 self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
2475 self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
2479 class TestFindMatch(unittest.TestCase):
2483 "bb": {"Two B": True},
2484 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
2487 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
2488 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
2490 for i in ["foo", "bar", "bazX"]:
2491 for j in range(1, 100, 7):
2492 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
2493 ((1, 2, 3), [i, str(j)]))
2495 def testNoMatch(self):
2496 self.assert_(utils.FindMatch({}, "") is None)
2497 self.assert_(utils.FindMatch({}, "foo") is None)
2498 self.assert_(utils.FindMatch({}, 1234) is None)
2502 re.compile("^(something)$"): "Hello World",
2505 self.assert_(utils.FindMatch(data, "") is None)
2506 self.assert_(utils.FindMatch(data, "Hello World") is None)
2509 class TestFileID(testutils.GanetiTestCase):
2510 def testEquality(self):
2511 name = self._CreateTempFile()
2512 oldi = utils.GetFileID(path=name)
2513 self.failUnless(utils.VerifyFileID(oldi, oldi))
2515 def testUpdate(self):
2516 name = self._CreateTempFile()
2517 oldi = utils.GetFileID(path=name)
2518 os.utime(name, None)
2519 fd = os.open(name, os.O_RDWR)
2521 newi = utils.GetFileID(fd=fd)
2522 self.failUnless(utils.VerifyFileID(oldi, newi))
2523 self.failUnless(utils.VerifyFileID(newi, oldi))
2527 def testWriteFile(self):
2528 name = self._CreateTempFile()
2529 oldi = utils.GetFileID(path=name)
2531 os.utime(name, (mtime + 10, mtime + 10))
2532 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
2534 os.utime(name, (mtime - 10, mtime - 10))
2535 utils.SafeWriteFile(name, oldi, data="")
2536 oldi = utils.GetFileID(path=name)
2538 os.utime(name, (mtime + 10, mtime + 10))
2539 # this doesn't raise, since we passed None
2540 utils.SafeWriteFile(name, None, data="")
2542 def testError(self):
2543 t = tempfile.NamedTemporaryFile()
2544 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
2545 path=t.name, fd=t.fileno())
2549 def __init__(self, values):
2550 self.values = values
2553 return self.values.pop(0)
2556 class TestRunningTimeout(unittest.TestCase):
2558 self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
2560 def testRemainingFloat(self):
2561 timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
2562 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2563 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2564 self.assertAlmostEqual(timeout.Remaining(), -1.5)
2566 def testRemaining(self):
2567 self.time_fn = TimeMock([0, 2, 4, 5, 6])
2568 timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
2569 self.assertEqual(timeout.Remaining(), 3)
2570 self.assertEqual(timeout.Remaining(), 1)
2571 self.assertEqual(timeout.Remaining(), 0)
2572 self.assertEqual(timeout.Remaining(), -1)
2574 def testRemainingNonNegative(self):
2575 timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
2576 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2577 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2578 self.assertEqual(timeout.Remaining(), 0.0)
2580 def testNegativeTimeout(self):
2581 self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
2584 class TestTryConvert(unittest.TestCase):
2586 for src, fn, result in [
2592 self.assertEqual(utils.TryConvert(fn, src), result)
2595 class TestIsValidShellParam(unittest.TestCase):
2597 for val, result in [
2601 self.assertEqual(utils.IsValidShellParam(val), result)
2604 class TestBuildShellCmd(unittest.TestCase):
2606 self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
2608 self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
2611 class TestWriteFile(unittest.TestCase):
2613 self.tfile = tempfile.NamedTemporaryFile()
2614 self.did_pre = False
2615 self.did_post = False
2616 self.did_write = False
2618 def markPre(self, fd):
2621 def markPost(self, fd):
2622 self.did_post = True
2624 def markWrite(self, fd):
2625 self.did_write = True
2627 def testWrite(self):
2629 utils.WriteFile(self.tfile.name, data=data)
2630 self.assertEqual(utils.ReadFile(self.tfile.name), data)
2632 def testErrors(self):
2633 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2634 self.tfile.name, data="test", fn=lambda fd: None)
2635 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
2636 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2637 self.tfile.name, data="test", atime=0)
2639 def testCalls(self):
2640 utils.WriteFile(self.tfile.name, fn=self.markWrite,
2641 prewrite=self.markPre, postwrite=self.markPost)
2642 self.assertTrue(self.did_pre)
2643 self.assertTrue(self.did_post)
2644 self.assertTrue(self.did_write)
2646 def testDryRun(self):
2648 self.tfile.write(orig)
2650 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
2651 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
2653 def testTimes(self):
2655 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
2656 (int(time.time()), 5000)]:
2657 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
2659 self.assertEqual(st.st_atime, at)
2660 self.assertEqual(st.st_mtime, mt)
2663 def testNoClose(self):
2665 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
2666 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
2669 self.assertEqual(os.read(fd, 4096), data)
2674 class TestNormalizeAndValidateMac(unittest.TestCase):
2675 def testInvalid(self):
2676 self.assertRaises(errors.OpPrereqError,
2677 utils.NormalizeAndValidateMac, "xxx")
2679 def testNormalization(self):
2680 for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
2681 self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
2684 class TestNiceSort(unittest.TestCase):
2686 self.assertEqual(utils.NiceSort([]), [])
2687 self.assertEqual(utils.NiceSort(["foo"]), ["foo"])
2688 self.assertEqual(utils.NiceSort(["bar", ""]), ["", "bar"])
2689 self.assertEqual(utils.NiceSort([",", "."]), [",", "."])
2690 self.assertEqual(utils.NiceSort(["0.1", "0.2"]), ["0.1", "0.2"])
2691 self.assertEqual(utils.NiceSort(["0;099", "0,099", "0.1", "0.2"]),
2692 ["0,099", "0.1", "0.2", "0;099"])
2694 data = ["a0", "a1", "a99", "a20", "a2", "b10", "b70", "b00", "0000"]
2695 self.assertEqual(utils.NiceSort(data),
2696 ["0000", "a0", "a1", "a2", "a20", "a99",
2697 "b00", "b10", "b70"])
2699 data = ["a0-0", "a1-0", "a99-10", "a20-3", "a0-4", "a99-3", "a09-2",
2700 "Z", "a9-1", "A", "b"]
2701 self.assertEqual(utils.NiceSort(data),
2702 ["A", "Z", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2703 "a20-3", "a99-3", "a99-10", "b"])
2704 self.assertEqual(utils.NiceSort(data, key=str.lower),
2705 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2706 "a20-3", "a99-3", "a99-10", "b", "Z"])
2707 self.assertEqual(utils.NiceSort(data, key=str.upper),
2708 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2709 "a20-3", "a99-3", "a99-10", "b", "Z"])
2711 def testLargeA(self):
2713 "Eegah9ei", "xij88brTulHYAv8IEOyU", "3jTwJPtrXOY22bwL2YoW",
2714 "Z8Ljf1Pf5eBfNg171wJR", "WvNJd91OoXvLzdEiEXa6", "uHXAyYYftCSG1o7qcCqe",
2715 "xpIUJeVT1Rp", "KOt7vn1dWXi", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2716 "cPRi0lM7HLnSuWA2G9", "KVQqLPDjcPjf8T3oyzjcOsfkb",
2717 "guKJkXnkULealVC8CyF1xefym", "pqF8dkU5B1cMnyZuREaSOADYx",
2719 self.assertEqual(utils.NiceSort(data), [
2720 "3jTwJPtrXOY22bwL2YoW", "Eegah9ei", "KOt7vn1dWXi",
2721 "KVQqLPDjcPjf8T3oyzjcOsfkb", "WvNJd91OoXvLzdEiEXa6",
2722 "Z8Ljf1Pf5eBfNg171wJR", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2723 "cPRi0lM7HLnSuWA2G9", "guKJkXnkULealVC8CyF1xefym",
2724 "pqF8dkU5B1cMnyZuREaSOADYx", "uHXAyYYftCSG1o7qcCqe",
2725 "xij88brTulHYAv8IEOyU", "xpIUJeVT1Rp"
2728 def testLargeB(self):
2730 "inst-0.0.0.0-0.0.0.0",
2731 "inst-0.1.0.0-0.0.0.0",
2732 "inst-0.2.0.0-0.0.0.0",
2733 "inst-0.2.1.0-0.0.0.0",
2734 "inst-0.2.2.0-0.0.0.0",
2735 "inst-0.2.2.0-0.0.0.9",
2736 "inst-0.2.2.0-0.0.3.9",
2737 "inst-0.2.2.0-0.2.0.9",
2738 "inst-0.2.2.0-0.9.0.9",
2739 "inst-0.20.2.0-0.0.0.0",
2740 "inst-0.20.2.0-0.9.0.9",
2741 "inst-10.020.2.0-0.9.0.10",
2742 "inst-15.020.2.0-0.9.1.00",
2743 "inst-100.020.2.0-0.9.0.9",
2745 # Only the last group, not converted to a number anymore, differs
2746 "inst-100.020.2.0a999",
2747 "inst-100.020.2.0b000",
2748 "inst-100.020.2.0c10",
2749 "inst-100.020.2.0c101",
2750 "inst-100.020.2.0c2",
2751 "inst-100.020.2.0c20",
2752 "inst-100.020.2.0c3",
2753 "inst-100.020.2.0c39123",
2756 rnd = random.Random(16205)
2759 rnd.shuffle(testdata)
2760 assert testdata != data
2761 self.assertEqual(utils.NiceSort(testdata), data)
2764 def __init__(self, fn):
2768 def __call__(self, *args):
2770 return self.fn(*args)
2772 def testKeyfuncA(self):
2773 # Generate some random numbers
2774 rnd = random.Random(21131)
2775 numbers = [rnd.randint(0, 10000) for _ in range(999)]
2776 assert numbers != sorted(numbers)
2779 data = [hex(i) for i in numbers]
2782 keyfn = self._CallCount(lambda value: str(int(value, 16)))
2784 # Sort with key function converting hex to decimal
2785 result = utils.NiceSort(data, key=keyfn)
2787 self.assertEqual([hex(i) for i in sorted(numbers)], result)
2788 self.assertEqual(data, datacopy, msg="Input data was modified in NiceSort")
2789 self.assertEqual(keyfn.count, len(numbers),
2790 msg="Key function was not called once per value")
2793 def __init__(self, name, value):
2797 def testKeyfuncB(self):
2798 rnd = random.Random(27396)
2800 for i in range(123):
2801 v1 = rnd.randint(0, 5)
2802 v2 = rnd.randint(0, 5)
2803 data.append(self._TestData("inst-%s-%s-%s" % (v1, v2, i),
2806 assert data != sorted(data, key=operator.attrgetter("name"))
2808 keyfn = self._CallCount(operator.attrgetter("name"))
2811 result = utils.NiceSort(data, key=keyfn)
2813 self.assertEqual(result, sorted(data, key=operator.attrgetter("value")))
2814 self.assertEqual(keyfn.count, len(data),
2815 msg="Key function was not called once per value")
2818 if __name__ == '__main__':
2819 testutils.GanetiTestProgram()