4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
24 import distutils.version
43 from cStringIO import StringIO
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
51 ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
52 TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
53 ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
56 class TestIsProcessAlive(unittest.TestCase):
57 """Testing case for IsProcessAlive"""
61 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
63 def testNotExisting(self):
64 pid_non_existing = os.fork()
65 if pid_non_existing == 0:
67 elif pid_non_existing < 0:
68 raise SystemError("can't fork")
69 os.waitpid(pid_non_existing, 0)
70 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
71 "nonexisting process detected")
74 class TestGetProcStatusPath(unittest.TestCase):
76 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
77 self.assertNotEqual(utils._GetProcStatusPath(1),
78 utils._GetProcStatusPath(2))
81 class TestIsProcessHandlingSignal(unittest.TestCase):
83 self.tmpdir = tempfile.mkdtemp()
86 shutil.rmtree(self.tmpdir)
88 def testParseSigsetT(self):
89 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
90 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
91 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
92 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
93 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
95 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
97 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
99 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
100 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
101 24, 25, 26, 28, 31]))
102 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
104 def testGetProcStatusField(self):
105 for field in ["SigCgt", "Name", "FDSize"]:
106 for value in ["", "0", "cat", " 1234 KB"]:
107 pstatus = "\n".join([
109 "%s: %s" % (field, value),
112 result = utils._GetProcStatusField(pstatus, field)
113 self.assertEqual(result, value.strip())
116 sp = PathJoin(self.tmpdir, "status")
118 utils.WriteFile(sp, data="\n".join([
120 "State: S (sleeping)",
125 "SigBlk: 0000000000010000",
126 "SigIgn: 0000000000384004",
127 "SigCgt: 000000004b813efb",
128 "CapEff: 0000000000000000",
131 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
133 def testNoSigCgt(self):
134 sp = PathJoin(self.tmpdir, "status")
136 utils.WriteFile(sp, data="\n".join([
140 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
141 1234, 10, status_path=sp)
143 def testNoSuchFile(self):
144 sp = PathJoin(self.tmpdir, "notexist")
146 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
149 def _TestRealProcess():
150 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
151 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
152 raise Exception("SIGUSR1 is handled when it should not be")
154 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
155 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
156 raise Exception("SIGUSR1 is not handled when it should be")
158 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is not handled when it should be")
162 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
163 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is handled when it should not be")
168 def testRealProcess(self):
169 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
172 class TestPidFileFunctions(unittest.TestCase):
173 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
176 self.dir = tempfile.mkdtemp()
177 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
178 utils.DaemonPidFileName = self.f_dpn
180 def testPidFileFunctions(self):
181 pid_file = self.f_dpn('test')
182 fd = utils.WritePidFile(self.f_dpn('test'))
183 self.failUnless(os.path.exists(pid_file),
184 "PID file should have been created")
185 read_pid = utils.ReadPidFile(pid_file)
186 self.failUnlessEqual(read_pid, os.getpid())
187 self.failUnless(utils.IsProcessAlive(read_pid))
188 self.failUnlessRaises(errors.LockError, utils.WritePidFile,
191 utils.RemovePidFile('test')
192 self.failIf(os.path.exists(pid_file),
193 "PID file should not exist anymore")
194 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195 "ReadPidFile should return 0 for missing pid file")
196 fh = open(pid_file, "w")
199 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
200 "ReadPidFile should return 0 for invalid pid file")
201 # but now, even with the file existing, we should be able to lock it
202 fd = utils.WritePidFile(self.f_dpn('test'))
204 utils.RemovePidFile('test')
205 self.failIf(os.path.exists(pid_file),
206 "PID file should not exist anymore")
209 pid_file = self.f_dpn('child')
210 r_fd, w_fd = os.pipe()
212 if new_pid == 0: #child
213 utils.WritePidFile(self.f_dpn('child'))
218 # else we are in the parent
219 # wait until the child has written the pid file
221 read_pid = utils.ReadPidFile(pid_file)
222 self.failUnlessEqual(read_pid, new_pid)
223 self.failUnless(utils.IsProcessAlive(new_pid))
224 utils.KillProcess(new_pid, waitpid=True)
225 self.failIf(utils.IsProcessAlive(new_pid))
226 utils.RemovePidFile('child')
227 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
230 for name in os.listdir(self.dir):
231 os.unlink(os.path.join(self.dir, name))
235 class TestRunCmd(testutils.GanetiTestCase):
236 """Testing case for the RunCmd function"""
239 testutils.GanetiTestCase.setUp(self)
240 self.magic = time.ctime() + " ganeti test"
241 self.fname = self._CreateTempFile()
242 self.fifo_tmpdir = tempfile.mkdtemp()
243 self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
244 os.mkfifo(self.fifo_file)
247 shutil.rmtree(self.fifo_tmpdir)
248 testutils.GanetiTestCase.tearDown(self)
251 """Test successful exit code"""
252 result = RunCmd("/bin/sh -c 'exit 0'")
253 self.assertEqual(result.exit_code, 0)
254 self.assertEqual(result.output, "")
257 """Test fail exit code"""
258 result = RunCmd("/bin/sh -c 'exit 1'")
259 self.assertEqual(result.exit_code, 1)
260 self.assertEqual(result.output, "")
262 def testStdout(self):
263 """Test standard output"""
264 cmd = 'echo -n "%s"' % self.magic
265 result = RunCmd("/bin/sh -c '%s'" % cmd)
266 self.assertEqual(result.stdout, self.magic)
267 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
268 self.assertEqual(result.output, "")
269 self.assertFileContent(self.fname, self.magic)
271 def testStderr(self):
272 """Test standard error"""
273 cmd = 'echo -n "%s"' % self.magic
274 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
275 self.assertEqual(result.stderr, self.magic)
276 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
277 self.assertEqual(result.output, "")
278 self.assertFileContent(self.fname, self.magic)
280 def testCombined(self):
281 """Test combined output"""
282 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
283 expected = "A" + self.magic + "B" + self.magic
284 result = RunCmd("/bin/sh -c '%s'" % cmd)
285 self.assertEqual(result.output, expected)
286 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
287 self.assertEqual(result.output, "")
288 self.assertFileContent(self.fname, expected)
290 def testSignal(self):
292 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
293 self.assertEqual(result.signal, 15)
294 self.assertEqual(result.output, "")
296 def testTimeoutClean(self):
297 cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
298 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
299 self.assertEqual(result.exit_code, 0)
301 def testTimeoutKill(self):
302 cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
304 out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
305 timeout, _linger_timeout=0.2)
306 self.assert_(status < 0)
307 self.assertEqual(-status, signal.SIGKILL)
309 def testTimeoutOutputAfterTerm(self):
310 cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
311 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
312 self.assert_(result.failed)
313 self.assertEqual(result.stdout, "sigtermed\n")
315 def testListRun(self):
317 result = RunCmd(["true"])
318 self.assertEqual(result.signal, None)
319 self.assertEqual(result.exit_code, 0)
320 result = RunCmd(["/bin/sh", "-c", "exit 1"])
321 self.assertEqual(result.signal, None)
322 self.assertEqual(result.exit_code, 1)
323 result = RunCmd(["echo", "-n", self.magic])
324 self.assertEqual(result.signal, None)
325 self.assertEqual(result.exit_code, 0)
326 self.assertEqual(result.stdout, self.magic)
328 def testFileEmptyOutput(self):
329 """Test file output"""
330 result = RunCmd(["true"], output=self.fname)
331 self.assertEqual(result.signal, None)
332 self.assertEqual(result.exit_code, 0)
333 self.assertFileContent(self.fname, "")
336 """Test locale environment"""
337 old_env = os.environ.copy()
339 os.environ["LANG"] = "en_US.UTF-8"
340 os.environ["LC_ALL"] = "en_US.UTF-8"
341 result = RunCmd(["locale"])
342 for line in result.output.splitlines():
343 key, value = line.split("=", 1)
344 # Ignore these variables, they're overridden by LC_ALL
345 if key == "LANG" or key == "LANGUAGE":
347 self.failIf(value and value != "C" and value != '"C"',
348 "Variable %s is set to the invalid value '%s'" % (key, value))
352 def testDefaultCwd(self):
353 """Test default working directory"""
354 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
357 """Test default working directory"""
358 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
359 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
361 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
363 def testResetEnv(self):
364 """Test environment reset functionality"""
365 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
366 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
367 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
369 def testNoFork(self):
370 """Test that nofork raise an error"""
371 assert not utils.no_fork
374 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
376 utils.no_fork = False
378 def testWrongParams(self):
379 """Test wrong parameters"""
380 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
381 output="/dev/null", interactive=True)
384 class TestRunParts(testutils.GanetiTestCase):
385 """Testing case for the RunParts function"""
388 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
391 shutil.rmtree(self.rundir)
394 """Test on an empty dir"""
395 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
397 def testSkipWrongName(self):
398 """Test that wrong files are skipped"""
399 fname = os.path.join(self.rundir, "00test.dot")
400 utils.WriteFile(fname, data="")
401 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
402 relname = os.path.basename(fname)
403 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
404 [(relname, constants.RUNPARTS_SKIP, None)])
406 def testSkipNonExec(self):
407 """Test that non executable files are skipped"""
408 fname = os.path.join(self.rundir, "00test")
409 utils.WriteFile(fname, data="")
410 relname = os.path.basename(fname)
411 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
412 [(relname, constants.RUNPARTS_SKIP, None)])
415 """Test error on a broken executable"""
416 fname = os.path.join(self.rundir, "00test")
417 utils.WriteFile(fname, data="")
418 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
419 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
420 self.failUnlessEqual(relname, os.path.basename(fname))
421 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
422 self.failUnless(error)
424 def testSorted(self):
425 """Test executions are sorted"""
427 files.append(os.path.join(self.rundir, "64test"))
428 files.append(os.path.join(self.rundir, "00test"))
429 files.append(os.path.join(self.rundir, "42test"))
432 utils.WriteFile(fname, data="")
434 results = RunParts(self.rundir, reset_env=True)
436 for fname in sorted(files):
437 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
440 """Test correct execution"""
441 fname = os.path.join(self.rundir, "00test")
442 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
443 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
444 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
445 self.failUnlessEqual(relname, os.path.basename(fname))
446 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
447 self.failUnlessEqual(runresult.stdout, "ciao")
449 def testRunFail(self):
450 """Test correct execution, with run failure"""
451 fname = os.path.join(self.rundir, "00test")
452 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
453 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
454 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
455 self.failUnlessEqual(relname, os.path.basename(fname))
456 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457 self.failUnlessEqual(runresult.exit_code, 1)
458 self.failUnless(runresult.failed)
460 def testRunMix(self):
462 files.append(os.path.join(self.rundir, "00test"))
463 files.append(os.path.join(self.rundir, "42test"))
464 files.append(os.path.join(self.rundir, "64test"))
465 files.append(os.path.join(self.rundir, "99test"))
469 # 1st has errors in execution
470 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
471 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
474 utils.WriteFile(files[1], data="")
476 # 3rd cannot execute properly
477 utils.WriteFile(files[2], data="")
478 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
481 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
482 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
484 results = RunParts(self.rundir, reset_env=True)
486 (relname, status, runresult) = results[0]
487 self.failUnlessEqual(relname, os.path.basename(files[0]))
488 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
489 self.failUnlessEqual(runresult.exit_code, 1)
490 self.failUnless(runresult.failed)
492 (relname, status, runresult) = results[1]
493 self.failUnlessEqual(relname, os.path.basename(files[1]))
494 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
495 self.failUnlessEqual(runresult, None)
497 (relname, status, runresult) = results[2]
498 self.failUnlessEqual(relname, os.path.basename(files[2]))
499 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
500 self.failUnless(runresult)
502 (relname, status, runresult) = results[3]
503 self.failUnlessEqual(relname, os.path.basename(files[3]))
504 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
505 self.failUnlessEqual(runresult.output, "ciao")
506 self.failUnlessEqual(runresult.exit_code, 0)
507 self.failUnless(not runresult.failed)
509 def testMissingDirectory(self):
510 nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
511 self.assertEqual(RunParts(nosuchdir), [])
514 class TestStartDaemon(testutils.GanetiTestCase):
516 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
517 self.tmpfile = os.path.join(self.tmpdir, "test")
520 shutil.rmtree(self.tmpdir)
523 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
524 self._wait(self.tmpfile, 60.0, "Hello World")
526 def testShellOutput(self):
527 utils.StartDaemon("echo Hello World", output=self.tmpfile)
528 self._wait(self.tmpfile, 60.0, "Hello World")
530 def testNoShellNoOutput(self):
531 utils.StartDaemon(["pwd"])
533 def testNoShellNoOutputTouch(self):
534 testfile = os.path.join(self.tmpdir, "check")
535 self.failIf(os.path.exists(testfile))
536 utils.StartDaemon(["touch", testfile])
537 self._wait(testfile, 60.0, "")
539 def testNoShellOutput(self):
540 utils.StartDaemon(["pwd"], output=self.tmpfile)
541 self._wait(self.tmpfile, 60.0, "/")
543 def testNoShellOutputCwd(self):
544 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
545 self._wait(self.tmpfile, 60.0, os.getcwd())
547 def testShellEnv(self):
548 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
549 env={ "GNT_TEST_VAR": "Hello World", })
550 self._wait(self.tmpfile, 60.0, "Hello World")
552 def testNoShellEnv(self):
553 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
554 env={ "GNT_TEST_VAR": "Hello World", })
555 self._wait(self.tmpfile, 60.0, "Hello World")
557 def testOutputFd(self):
558 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
560 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
563 self._wait(self.tmpfile, 60.0, os.getcwd())
566 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
567 self._wait(self.tmpfile, 60.0, str(pid))
569 def testPidFile(self):
570 pidfile = os.path.join(self.tmpdir, "pid")
571 checkfile = os.path.join(self.tmpdir, "abort")
573 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
576 fd = os.open(pidfile, os.O_RDONLY)
578 # Check file is locked
579 self.assertRaises(errors.LockError, utils.LockFile, fd)
581 pidtext = os.read(fd, 100)
585 self.assertEqual(int(pidtext.strip()), pid)
587 self.assert_(utils.IsProcessAlive(pid))
589 # No matter what happens, kill daemon
590 utils.KillProcess(pid, timeout=5.0, waitpid=False)
591 self.failIf(utils.IsProcessAlive(pid))
593 self.assertEqual(utils.ReadFile(self.tmpfile), "")
595 def _wait(self, path, timeout, expected):
596 # Due to the asynchronous nature of daemon processes, polling is necessary.
597 # A timeout makes sure the test doesn't hang forever.
599 if not (os.path.isfile(path) and
600 utils.ReadFile(path).strip() == expected):
601 raise utils.RetryAgain()
604 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
605 except utils.RetryTimeout:
606 self.fail("Apparently the daemon didn't run in %s seconds and/or"
607 " didn't write the correct output" % timeout)
610 self.assertRaises(errors.OpExecError, utils.StartDaemon,
611 ["./does-NOT-EXIST/here/0123456789"])
612 self.assertRaises(errors.OpExecError, utils.StartDaemon,
613 ["./does-NOT-EXIST/here/0123456789"],
614 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
615 self.assertRaises(errors.OpExecError, utils.StartDaemon,
616 ["./does-NOT-EXIST/here/0123456789"],
617 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
618 self.assertRaises(errors.OpExecError, utils.StartDaemon,
619 ["./does-NOT-EXIST/here/0123456789"],
620 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
622 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
624 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
625 ["./does-NOT-EXIST/here/0123456789"],
626 output=self.tmpfile, output_fd=fd)
631 class TestSetCloseOnExecFlag(unittest.TestCase):
632 """Tests for SetCloseOnExecFlag"""
635 self.tmpfile = tempfile.TemporaryFile()
637 def testEnable(self):
638 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
639 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
642 def testDisable(self):
643 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
644 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
648 class TestSetNonblockFlag(unittest.TestCase):
650 self.tmpfile = tempfile.TemporaryFile()
652 def testEnable(self):
653 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
654 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
657 def testDisable(self):
658 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
659 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
663 class TestRemoveFile(unittest.TestCase):
664 """Test case for the RemoveFile function"""
667 """Create a temp dir and file for each case"""
668 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
669 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
673 if os.path.exists(self.tmpfile):
674 os.unlink(self.tmpfile)
675 os.rmdir(self.tmpdir)
677 def testIgnoreDirs(self):
678 """Test that RemoveFile() ignores directories"""
679 self.assertEqual(None, RemoveFile(self.tmpdir))
681 def testIgnoreNotExisting(self):
682 """Test that RemoveFile() ignores non-existing files"""
683 RemoveFile(self.tmpfile)
684 RemoveFile(self.tmpfile)
686 def testRemoveFile(self):
687 """Test that RemoveFile does remove a file"""
688 RemoveFile(self.tmpfile)
689 if os.path.exists(self.tmpfile):
690 self.fail("File '%s' not removed" % self.tmpfile)
692 def testRemoveSymlink(self):
693 """Test that RemoveFile does remove symlinks"""
694 symlink = self.tmpdir + "/symlink"
695 os.symlink("no-such-file", symlink)
697 if os.path.exists(symlink):
698 self.fail("File '%s' not removed" % symlink)
699 os.symlink(self.tmpfile, symlink)
701 if os.path.exists(symlink):
702 self.fail("File '%s' not removed" % symlink)
705 class TestRemoveDir(unittest.TestCase):
707 self.tmpdir = tempfile.mkdtemp()
711 shutil.rmtree(self.tmpdir)
712 except EnvironmentError:
715 def testEmptyDir(self):
716 utils.RemoveDir(self.tmpdir)
717 self.assertFalse(os.path.isdir(self.tmpdir))
719 def testNonEmptyDir(self):
720 self.tmpfile = os.path.join(self.tmpdir, "test1")
721 open(self.tmpfile, "w").close()
722 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
725 class TestRename(unittest.TestCase):
726 """Test case for RenameFile"""
729 """Create a temporary directory"""
730 self.tmpdir = tempfile.mkdtemp()
731 self.tmpfile = os.path.join(self.tmpdir, "test1")
734 open(self.tmpfile, "w").close()
737 """Remove temporary directory"""
738 shutil.rmtree(self.tmpdir)
740 def testSimpleRename1(self):
741 """Simple rename 1"""
742 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
743 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
745 def testSimpleRename2(self):
746 """Simple rename 2"""
747 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
749 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
751 def testRenameMkdir(self):
752 """Rename with mkdir"""
753 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
755 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
756 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
758 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
759 os.path.join(self.tmpdir, "test/foo/bar/baz"),
761 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
762 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
763 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
766 class TestMatchNameComponent(unittest.TestCase):
767 """Test case for the MatchNameComponent function"""
769 def testEmptyList(self):
770 """Test that there is no match against an empty list"""
772 self.failUnlessEqual(MatchNameComponent("", []), None)
773 self.failUnlessEqual(MatchNameComponent("test", []), None)
775 def testSingleMatch(self):
776 """Test that a single match is performed correctly"""
777 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
778 for key in "test2", "test2.example", "test2.example.com":
779 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
781 def testMultipleMatches(self):
782 """Test that a multiple match is returned as None"""
783 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
784 for key in "test1", "test1.example":
785 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
787 def testFullMatch(self):
788 """Test that a full match is returned correctly"""
790 key2 = "test1.example"
791 mlist = [key2, key2 + ".com"]
792 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
793 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
795 def testCaseInsensitivePartialMatch(self):
796 """Test for the case_insensitive keyword"""
797 mlist = ["test1.example.com", "test2.example.net"]
798 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
800 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
802 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
804 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
808 def testCaseInsensitiveFullMatch(self):
809 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
810 # Between the two ts1 a full string match non-case insensitive should work
811 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
813 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
815 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
817 # Between the two ts2 only case differs, so only case-match works
818 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
820 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
822 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
826 class TestReadFile(testutils.GanetiTestCase):
828 def testReadAll(self):
829 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
830 self.assertEqual(len(data), 814)
832 h = compat.md5_hash()
834 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
836 def testReadSize(self):
837 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
839 self.assertEqual(len(data), 100)
841 h = compat.md5_hash()
843 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
846 self.assertRaises(EnvironmentError, utils.ReadFile,
847 "/dev/null/does-not-exist")
850 class TestReadOneLineFile(testutils.GanetiTestCase):
853 testutils.GanetiTestCase.setUp(self)
855 def testDefault(self):
856 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
857 self.assertEqual(len(data), 27)
858 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
860 def testNotStrict(self):
861 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
862 self.assertEqual(len(data), 27)
863 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
865 def testStrictFailure(self):
866 self.assertRaises(errors.GenericError, ReadOneLineFile,
867 self._TestDataFilename("cert1.pem"), strict=True)
869 def testLongLine(self):
870 dummydata = (1024 * "Hello World! ")
871 myfile = self._CreateTempFile()
872 utils.WriteFile(myfile, data=dummydata)
873 datastrict = ReadOneLineFile(myfile, strict=True)
874 datalax = ReadOneLineFile(myfile, strict=False)
875 self.assertEqual(dummydata, datastrict)
876 self.assertEqual(dummydata, datalax)
878 def testNewline(self):
879 myfile = self._CreateTempFile()
881 for nl in ["", "\n", "\r\n"]:
882 dummydata = "%s%s" % (myline, nl)
883 utils.WriteFile(myfile, data=dummydata)
884 datalax = ReadOneLineFile(myfile, strict=False)
885 self.assertEqual(myline, datalax)
886 datastrict = ReadOneLineFile(myfile, strict=True)
887 self.assertEqual(myline, datastrict)
889 def testWhitespaceAndMultipleLines(self):
890 myfile = self._CreateTempFile()
891 for nl in ["", "\n", "\r\n"]:
892 for ws in [" ", "\t", "\t\t \t", "\t "]:
893 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
894 utils.WriteFile(myfile, data=dummydata)
895 datalax = ReadOneLineFile(myfile, strict=False)
897 self.assert_(set("\r\n") & set(dummydata))
898 self.assertRaises(errors.GenericError, ReadOneLineFile,
900 explen = len("Foo bar baz ") + len(ws)
901 self.assertEqual(len(datalax), explen)
902 self.assertEqual(datalax, dummydata[:explen])
903 self.assertFalse(set("\r\n") & set(datalax))
905 datastrict = ReadOneLineFile(myfile, strict=True)
906 self.assertEqual(dummydata, datastrict)
907 self.assertEqual(dummydata, datalax)
909 def testEmptylines(self):
910 myfile = self._CreateTempFile()
912 for nl in ["\n", "\r\n"]:
913 for ol in ["", "otherline"]:
914 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
915 utils.WriteFile(myfile, data=dummydata)
916 self.assert_(set("\r\n") & set(dummydata))
917 datalax = ReadOneLineFile(myfile, strict=False)
918 self.assertEqual(myline, datalax)
920 self.assertRaises(errors.GenericError, ReadOneLineFile,
923 datastrict = ReadOneLineFile(myfile, strict=True)
924 self.assertEqual(myline, datastrict)
926 def testEmptyfile(self):
927 myfile = self._CreateTempFile()
928 self.assertRaises(errors.GenericError, ReadOneLineFile, myfile)
931 class TestTimestampForFilename(unittest.TestCase):
933 self.assert_("." not in utils.TimestampForFilename())
934 self.assert_(":" not in utils.TimestampForFilename())
937 class TestCreateBackup(testutils.GanetiTestCase):
939 testutils.GanetiTestCase.setUp(self)
941 self.tmpdir = tempfile.mkdtemp()
944 testutils.GanetiTestCase.tearDown(self)
946 shutil.rmtree(self.tmpdir)
949 filename = PathJoin(self.tmpdir, "config.data")
950 utils.WriteFile(filename, data="")
951 bname = utils.CreateBackup(filename)
952 self.assertFileContent(bname, "")
953 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
954 utils.CreateBackup(filename)
955 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
956 utils.CreateBackup(filename)
957 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
959 fifoname = PathJoin(self.tmpdir, "fifo")
961 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
963 def testContent(self):
965 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
966 for rep in [1, 2, 10, 127]:
967 testdata = data * rep
969 filename = PathJoin(self.tmpdir, "test.data_")
970 utils.WriteFile(filename, data=testdata)
971 self.assertFileContent(filename, testdata)
974 bname = utils.CreateBackup(filename)
976 self.assertFileContent(bname, testdata)
977 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
980 class TestFormatUnit(unittest.TestCase):
981 """Test case for the FormatUnit function"""
984 self.assertEqual(FormatUnit(1, 'h'), '1M')
985 self.assertEqual(FormatUnit(100, 'h'), '100M')
986 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
988 self.assertEqual(FormatUnit(1, 'm'), '1')
989 self.assertEqual(FormatUnit(100, 'm'), '100')
990 self.assertEqual(FormatUnit(1023, 'm'), '1023')
992 self.assertEqual(FormatUnit(1024, 'm'), '1024')
993 self.assertEqual(FormatUnit(1536, 'm'), '1536')
994 self.assertEqual(FormatUnit(17133, 'm'), '17133')
995 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
998 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
999 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
1000 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
1001 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
1003 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
1004 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
1005 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
1006 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
1008 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
1009 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
1010 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
1013 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
1014 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
1015 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
1017 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
1018 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
1019 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
1021 def testErrors(self):
1022 self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a")
1025 class TestParseUnit(unittest.TestCase):
1026 """Test case for the ParseUnit function"""
1029 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
1030 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
1031 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
1033 def testRounding(self):
1034 self.assertEqual(ParseUnit('0'), 0)
1035 self.assertEqual(ParseUnit('1'), 4)
1036 self.assertEqual(ParseUnit('2'), 4)
1037 self.assertEqual(ParseUnit('3'), 4)
1039 self.assertEqual(ParseUnit('124'), 124)
1040 self.assertEqual(ParseUnit('125'), 128)
1041 self.assertEqual(ParseUnit('126'), 128)
1042 self.assertEqual(ParseUnit('127'), 128)
1043 self.assertEqual(ParseUnit('128'), 128)
1044 self.assertEqual(ParseUnit('129'), 132)
1045 self.assertEqual(ParseUnit('130'), 132)
1047 def testFloating(self):
1048 self.assertEqual(ParseUnit('0'), 0)
1049 self.assertEqual(ParseUnit('0.5'), 4)
1050 self.assertEqual(ParseUnit('1.75'), 4)
1051 self.assertEqual(ParseUnit('1.99'), 4)
1052 self.assertEqual(ParseUnit('2.00'), 4)
1053 self.assertEqual(ParseUnit('2.01'), 4)
1054 self.assertEqual(ParseUnit('3.99'), 4)
1055 self.assertEqual(ParseUnit('4.00'), 4)
1056 self.assertEqual(ParseUnit('4.01'), 8)
1057 self.assertEqual(ParseUnit('1.5G'), 1536)
1058 self.assertEqual(ParseUnit('1.8G'), 1844)
1059 self.assertEqual(ParseUnit('8.28T'), 8682212)
1061 def testSuffixes(self):
1062 for sep in ('', ' ', ' ', "\t", "\t "):
1063 for suffix, scale in TestParseUnit.SCALES:
1064 for func in (lambda x: x, str.lower, str.upper):
1065 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
1068 def testInvalidInput(self):
1069 for sep in ('-', '_', ',', 'a'):
1070 for suffix, _ in TestParseUnit.SCALES:
1071 self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
1073 for suffix, _ in TestParseUnit.SCALES:
1074 self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
1077 class TestParseCpuMask(unittest.TestCase):
1078 """Test case for the ParseCpuMask function."""
1080 def testWellFormed(self):
1081 self.assertEqual(utils.ParseCpuMask(""), [])
1082 self.assertEqual(utils.ParseCpuMask("1"), [1])
1083 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
1085 def testInvalidInput(self):
1086 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
1087 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
1090 class TestSshKeys(testutils.GanetiTestCase):
1091 """Test case for the AddAuthorizedKey function"""
1093 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1094 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1095 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1098 testutils.GanetiTestCase.setUp(self)
1099 self.tmpname = self._CreateTempFile()
1100 handle = open(self.tmpname, 'w')
1102 handle.write("%s\n" % TestSshKeys.KEY_A)
1103 handle.write("%s\n" % TestSshKeys.KEY_B)
1107 def testAddingNewKey(self):
1108 utils.AddAuthorizedKey(self.tmpname,
1109 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1111 self.assertFileContent(self.tmpname,
1112 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1113 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1114 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1115 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1117 def testAddingAlmostButNotCompletelyTheSameKey(self):
1118 utils.AddAuthorizedKey(self.tmpname,
1119 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1121 self.assertFileContent(self.tmpname,
1122 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1123 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1124 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1125 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1127 def testAddingExistingKeyWithSomeMoreSpaces(self):
1128 utils.AddAuthorizedKey(self.tmpname,
1129 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1131 self.assertFileContent(self.tmpname,
1132 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1133 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1134 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1136 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1137 utils.RemoveAuthorizedKey(self.tmpname,
1138 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1140 self.assertFileContent(self.tmpname,
1141 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1142 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1144 def testRemovingNonExistingKey(self):
1145 utils.RemoveAuthorizedKey(self.tmpname,
1146 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1148 self.assertFileContent(self.tmpname,
1149 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1150 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1151 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1154 class TestEtcHosts(testutils.GanetiTestCase):
1155 """Test functions modifying /etc/hosts"""
1158 testutils.GanetiTestCase.setUp(self)
1159 self.tmpname = self._CreateTempFile()
1160 handle = open(self.tmpname, 'w')
1162 handle.write('# This is a test file for /etc/hosts\n')
1163 handle.write('127.0.0.1\tlocalhost\n')
1164 handle.write('192.0.2.1 router gw\n')
1168 def testSettingNewIp(self):
1169 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1172 self.assertFileContent(self.tmpname,
1173 "# This is a test file for /etc/hosts\n"
1174 "127.0.0.1\tlocalhost\n"
1175 "192.0.2.1 router gw\n"
1176 "198.51.100.4\tmyhost.example.com myhost\n")
1177 self.assertFileMode(self.tmpname, 0644)
1179 def testSettingExistingIp(self):
1180 SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1183 self.assertFileContent(self.tmpname,
1184 "# This is a test file for /etc/hosts\n"
1185 "127.0.0.1\tlocalhost\n"
1186 "192.0.2.1\tmyhost.example.com myhost\n")
1187 self.assertFileMode(self.tmpname, 0644)
1189 def testSettingDuplicateName(self):
1190 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1192 self.assertFileContent(self.tmpname,
1193 "# This is a test file for /etc/hosts\n"
1194 "127.0.0.1\tlocalhost\n"
1195 "192.0.2.1 router gw\n"
1196 "198.51.100.4\tmyhost\n")
1197 self.assertFileMode(self.tmpname, 0644)
1199 def testRemovingExistingHost(self):
1200 RemoveEtcHostsEntry(self.tmpname, 'router')
1202 self.assertFileContent(self.tmpname,
1203 "# This is a test file for /etc/hosts\n"
1204 "127.0.0.1\tlocalhost\n"
1206 self.assertFileMode(self.tmpname, 0644)
1208 def testRemovingSingleExistingHost(self):
1209 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1211 self.assertFileContent(self.tmpname,
1212 "# This is a test file for /etc/hosts\n"
1213 "192.0.2.1 router gw\n")
1214 self.assertFileMode(self.tmpname, 0644)
1216 def testRemovingNonExistingHost(self):
1217 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1219 self.assertFileContent(self.tmpname,
1220 "# This is a test file for /etc/hosts\n"
1221 "127.0.0.1\tlocalhost\n"
1222 "192.0.2.1 router gw\n")
1223 self.assertFileMode(self.tmpname, 0644)
1225 def testRemovingAlias(self):
1226 RemoveEtcHostsEntry(self.tmpname, 'gw')
1228 self.assertFileContent(self.tmpname,
1229 "# This is a test file for /etc/hosts\n"
1230 "127.0.0.1\tlocalhost\n"
1231 "192.0.2.1 router\n")
1232 self.assertFileMode(self.tmpname, 0644)
1235 class TestGetMounts(unittest.TestCase):
1236 """Test case for GetMounts()."""
1239 "rootfs / rootfs rw 0 0\n"
1240 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1241 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1244 self.tmpfile = tempfile.NamedTemporaryFile()
1245 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1247 def testGetMounts(self):
1248 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1250 ("rootfs", "/", "rootfs", "rw"),
1251 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1252 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1256 class TestShellQuoting(unittest.TestCase):
1257 """Test case for shell quoting functions"""
1259 def testShellQuote(self):
1260 self.assertEqual(ShellQuote('abc'), "abc")
1261 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1262 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1263 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1264 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1266 def testShellQuoteArgs(self):
1267 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1268 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1269 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1272 class TestListVisibleFiles(unittest.TestCase):
1273 """Test case for ListVisibleFiles"""
1276 self.path = tempfile.mkdtemp()
1279 shutil.rmtree(self.path)
1281 def _CreateFiles(self, files):
1283 utils.WriteFile(os.path.join(self.path, name), data="test")
1285 def _test(self, files, expected):
1286 self._CreateFiles(files)
1287 found = ListVisibleFiles(self.path)
1288 self.assertEqual(set(found), set(expected))
1290 def testAllVisible(self):
1291 files = ["a", "b", "c"]
1293 self._test(files, expected)
1295 def testNoneVisible(self):
1296 files = [".a", ".b", ".c"]
1298 self._test(files, expected)
1300 def testSomeVisible(self):
1301 files = ["a", "b", ".c"]
1302 expected = ["a", "b"]
1303 self._test(files, expected)
1305 def testNonAbsolutePath(self):
1306 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1308 def testNonNormalizedPath(self):
1309 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1313 class TestNewUUID(unittest.TestCase):
1314 """Test case for NewUUID"""
1317 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
1320 class TestUniqueSequence(unittest.TestCase):
1321 """Test case for UniqueSequence"""
1323 def _test(self, input, expected):
1324 self.assertEqual(utils.UniqueSequence(input), expected)
1328 self._test([1, 2, 3], [1, 2, 3])
1329 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1330 self._test([1, 2, 2, 3], [1, 2, 3])
1331 self._test([1, 2, 3, 3], [1, 2, 3])
1334 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1335 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1338 self._test(["a", "a"], ["a"])
1339 self._test(["a", "b"], ["a", "b"])
1340 self._test(["a", "b", "a"], ["a", "b"])
1343 class TestFindDuplicates(unittest.TestCase):
1344 """Test case for FindDuplicates"""
1346 def _Test(self, seq, expected):
1347 result = utils.FindDuplicates(seq)
1348 self.assertEqual(result, utils.UniqueSequence(result))
1349 self.assertEqual(set(result), set(expected))
1353 self._Test([1, 2, 3], [])
1354 self._Test([9, 8, 8, 0, 5, 1, 7, 0, 6, 7], [8, 0, 7])
1355 for exp in [[1, 2, 3], [3, 2, 1]]:
1356 self._Test([1, 1, 2, 2, 3, 3], exp)
1358 self._Test(["A", "a", "B"], [])
1359 self._Test(["a", "A", "a", "B"], ["a"])
1360 self._Test("Hello World out there!", ["e", " ", "o", "r", "t", "l"])
1362 self._Test(self._Gen(False), [])
1363 self._Test(self._Gen(True), range(1, 10))
1374 class TestFirstFree(unittest.TestCase):
1375 """Test case for the FirstFree function"""
1378 """Test FirstFree"""
1379 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1380 self.failUnlessEqual(FirstFree([]), None)
1381 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1382 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1383 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1386 class TestTailFile(testutils.GanetiTestCase):
1387 """Test case for the TailFile function"""
1389 def testEmpty(self):
1390 fname = self._CreateTempFile()
1391 self.failUnlessEqual(TailFile(fname), [])
1392 self.failUnlessEqual(TailFile(fname, lines=25), [])
1394 def testAllLines(self):
1395 data = ["test %d" % i for i in range(30)]
1397 fname = self._CreateTempFile()
1398 fd = open(fname, "w")
1399 fd.write("\n".join(data[:i]))
1403 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1405 def testPartialLines(self):
1406 data = ["test %d" % i for i in range(30)]
1407 fname = self._CreateTempFile()
1408 fd = open(fname, "w")
1409 fd.write("\n".join(data))
1412 for i in range(1, 30):
1413 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1415 def testBigFile(self):
1416 data = ["test %d" % i for i in range(30)]
1417 fname = self._CreateTempFile()
1418 fd = open(fname, "w")
1419 fd.write("X" * 1048576)
1421 fd.write("\n".join(data))
1424 for i in range(1, 30):
1425 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1428 class _BaseFileLockTest:
1429 """Test case for the FileLock class"""
1431 def testSharedNonblocking(self):
1432 self.lock.Shared(blocking=False)
1435 def testExclusiveNonblocking(self):
1436 self.lock.Exclusive(blocking=False)
1439 def testUnlockNonblocking(self):
1440 self.lock.Unlock(blocking=False)
1443 def testSharedBlocking(self):
1444 self.lock.Shared(blocking=True)
1447 def testExclusiveBlocking(self):
1448 self.lock.Exclusive(blocking=True)
1451 def testUnlockBlocking(self):
1452 self.lock.Unlock(blocking=True)
1455 def testSharedExclusiveUnlock(self):
1456 self.lock.Shared(blocking=False)
1457 self.lock.Exclusive(blocking=False)
1458 self.lock.Unlock(blocking=False)
1461 def testExclusiveSharedUnlock(self):
1462 self.lock.Exclusive(blocking=False)
1463 self.lock.Shared(blocking=False)
1464 self.lock.Unlock(blocking=False)
1467 def testSimpleTimeout(self):
1468 # These will succeed on the first attempt, hence a short timeout
1469 self.lock.Shared(blocking=True, timeout=10.0)
1470 self.lock.Exclusive(blocking=False, timeout=10.0)
1471 self.lock.Unlock(blocking=True, timeout=10.0)
1475 def _TryLockInner(filename, shared, blocking):
1476 lock = utils.FileLock.Open(filename)
1484 # The timeout doesn't really matter as the parent process waits for us to
1486 fn(blocking=blocking, timeout=0.01)
1487 except errors.LockError, err:
1492 def _TryLock(self, *args):
1493 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1496 def testTimeout(self):
1497 for blocking in [True, False]:
1498 self.lock.Exclusive(blocking=True)
1499 self.failIf(self._TryLock(False, blocking))
1500 self.failIf(self._TryLock(True, blocking))
1502 self.lock.Shared(blocking=True)
1503 self.assert_(self._TryLock(True, blocking))
1504 self.failIf(self._TryLock(False, blocking))
1506 def testCloseShared(self):
1508 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1510 def testCloseExclusive(self):
1512 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1514 def testCloseUnlock(self):
1516 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1519 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1520 TESTDATA = "Hello World\n" * 10
1523 testutils.GanetiTestCase.setUp(self)
1525 self.tmpfile = tempfile.NamedTemporaryFile()
1526 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1527 self.lock = utils.FileLock.Open(self.tmpfile.name)
1529 # Ensure "Open" didn't truncate file
1530 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1533 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1535 testutils.GanetiTestCase.tearDown(self)
1538 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1540 self.tmpfile = tempfile.NamedTemporaryFile()
1541 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1544 class TestTimeFunctions(unittest.TestCase):
1545 """Test case for time functions"""
1548 self.assertEqual(utils.SplitTime(1), (1, 0))
1549 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1550 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1551 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1552 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1553 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1554 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1555 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1557 self.assertRaises(AssertionError, utils.SplitTime, -1)
1559 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1560 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1561 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1563 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1565 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1567 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1568 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1569 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1570 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1571 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1574 class FieldSetTestCase(unittest.TestCase):
1575 """Test case for FieldSets"""
1577 def testSimpleMatch(self):
1578 f = utils.FieldSet("a", "b", "c", "def")
1579 self.failUnless(f.Matches("a"))
1580 self.failIf(f.Matches("d"), "Substring matched")
1581 self.failIf(f.Matches("defghi"), "Prefix string matched")
1582 self.failIf(f.NonMatching(["b", "c"]))
1583 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1584 self.failUnless(f.NonMatching(["a", "d"]))
1586 def testRegexMatch(self):
1587 f = utils.FieldSet("a", "b([0-9]+)", "c")
1588 self.failUnless(f.Matches("b1"))
1589 self.failUnless(f.Matches("b99"))
1590 self.failIf(f.Matches("b/1"))
1591 self.failIf(f.NonMatching(["b12", "c"]))
1592 self.failUnless(f.NonMatching(["a", "1"]))
1594 class TestForceDictType(unittest.TestCase):
1595 """Test case for ForceDictType"""
1597 "a": constants.VTYPE_INT,
1598 "b": constants.VTYPE_BOOL,
1599 "c": constants.VTYPE_STRING,
1600 "d": constants.VTYPE_SIZE,
1601 "e": constants.VTYPE_MAYBE_STRING,
1604 def _fdt(self, dict, allowed_values=None):
1605 if allowed_values is None:
1606 utils.ForceDictType(dict, self.KEY_TYPES)
1608 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
1612 def testSimpleDict(self):
1613 self.assertEqual(self._fdt({}), {})
1614 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1615 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1616 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1617 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1618 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1619 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1620 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1621 self.assertEqual(self._fdt({'b': False}), {'b': False})
1622 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1623 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1624 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1625 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1626 self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1627 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1628 self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1629 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
1631 def testErrors(self):
1632 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1633 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
1634 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1635 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1636 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1637 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1638 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1639 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
1640 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
1641 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
1642 {"b": "hello"}, {"b": "no-such-type"})
1645 class TestIsNormAbsPath(unittest.TestCase):
1646 """Testing case for IsNormAbsPath"""
1648 def _pathTestHelper(self, path, result):
1650 self.assert_(utils.IsNormAbsPath(path),
1651 "Path %s should result absolute and normalized" % path)
1653 self.assertFalse(utils.IsNormAbsPath(path),
1654 "Path %s should not result absolute and normalized" % path)
1657 self._pathTestHelper('/etc', True)
1658 self._pathTestHelper('/srv', True)
1659 self._pathTestHelper('etc', False)
1660 self._pathTestHelper('/etc/../root', False)
1661 self._pathTestHelper('/etc/', False)
1664 class TestSafeEncode(unittest.TestCase):
1665 """Test case for SafeEncode"""
1667 def testAscii(self):
1668 for txt in [string.digits, string.letters, string.punctuation]:
1669 self.failUnlessEqual(txt, SafeEncode(txt))
1671 def testDoubleEncode(self):
1672 for i in range(255):
1673 txt = SafeEncode(chr(i))
1674 self.failUnlessEqual(txt, SafeEncode(txt))
1676 def testUnicode(self):
1677 # 1024 is high enough to catch non-direct ASCII mappings
1678 for i in range(1024):
1679 txt = SafeEncode(unichr(i))
1680 self.failUnlessEqual(txt, SafeEncode(txt))
1683 class TestFormatTime(unittest.TestCase):
1684 """Testing case for FormatTime"""
1687 def _TestInProcess(tz, timestamp, expected):
1688 os.environ["TZ"] = tz
1690 return utils.FormatTime(timestamp) == expected
1692 def _Test(self, *args):
1693 # Need to use separate process as we want to change TZ
1694 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
1697 self._Test("UTC", 0, "1970-01-01 00:00:00")
1698 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
1699 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
1700 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
1701 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
1704 self.failUnlessEqual(FormatTime(None), "N/A")
1706 def testInvalid(self):
1707 self.failUnlessEqual(FormatTime(()), "N/A")
1710 # tests that we accept time.time input
1711 FormatTime(time.time())
1712 # tests that we accept int input
1713 FormatTime(int(time.time()))
1716 class TestFormatTimestampWithTZ(unittest.TestCase):
1718 def _TestInProcess(tz, timestamp, expected):
1719 os.environ["TZ"] = tz
1721 return utils.FormatTimestampWithTZ(timestamp) == expected
1723 def _Test(self, *args):
1724 # Need to use separate process as we want to change TZ
1725 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
1728 self._Test("UTC", 0, "1970-01-01 00:00:00 UTC")
1729 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46 BRST")
1730 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46 GMT")
1731 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46 CET")
1732 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46 EST")
1735 class RunInSeparateProcess(unittest.TestCase):
1737 for exp in [True, False]:
1741 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1744 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1745 def _child(carg1, carg2):
1746 return carg1 == "Foo" and carg2 == arg
1748 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1751 parent_pid = os.getpid()
1754 return os.getpid() == parent_pid
1756 self.failIf(utils.RunInSeparateProcess(_check))
1758 def testSignal(self):
1760 os.kill(os.getpid(), signal.SIGTERM)
1762 self.assertRaises(errors.GenericError,
1763 utils.RunInSeparateProcess, _kill)
1765 def testException(self):
1767 raise errors.GenericError("This is a test")
1769 self.assertRaises(errors.GenericError,
1770 utils.RunInSeparateProcess, _exc)
1773 class TestFingerprintFiles(unittest.TestCase):
1775 self.tmpfile = tempfile.NamedTemporaryFile()
1776 self.tmpfile2 = tempfile.NamedTemporaryFile()
1777 utils.WriteFile(self.tmpfile2.name, data="Hello World\n")
1779 self.tmpfile.name: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
1780 self.tmpfile2.name: "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a",
1783 def testSingleFile(self):
1784 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1785 self.results[self.tmpfile.name])
1787 self.assertEqual(utils._FingerprintFile("/no/such/file"), None)
1789 def testBigFile(self):
1790 self.tmpfile.write("A" * 8192)
1791 self.tmpfile.flush()
1792 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1793 "35b6795ca20d6dc0aff8c7c110c96cd1070b8c38")
1795 def testMultiple(self):
1796 all_files = self.results.keys()
1797 all_files.append("/no/such/file")
1798 self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results)
1801 class TestUnescapeAndSplit(unittest.TestCase):
1802 """Testing case for UnescapeAndSplit"""
1805 # testing more that one separator for regexp safety
1806 self._seps = [",", "+", "."]
1808 def testSimple(self):
1809 a = ["a", "b", "c", "d"]
1810 for sep in self._seps:
1811 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1813 def testEscape(self):
1814 for sep in self._seps:
1815 a = ["a", "b\\" + sep + "c", "d"]
1816 b = ["a", "b" + sep + "c", "d"]
1817 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1819 def testDoubleEscape(self):
1820 for sep in self._seps:
1821 a = ["a", "b\\\\", "c", "d"]
1822 b = ["a", "b\\", "c", "d"]
1823 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1825 def testThreeEscape(self):
1826 for sep in self._seps:
1827 a = ["a", "b\\\\\\" + sep + "c", "d"]
1828 b = ["a", "b\\" + sep + "c", "d"]
1829 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1832 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1834 self.tmpdir = tempfile.mkdtemp()
1837 shutil.rmtree(self.tmpdir)
1839 def _checkRsaPrivateKey(self, key):
1840 lines = key.splitlines()
1841 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1842 "-----END RSA PRIVATE KEY-----" in lines)
1844 def _checkCertificate(self, cert):
1845 lines = cert.splitlines()
1846 return ("-----BEGIN CERTIFICATE-----" in lines and
1847 "-----END CERTIFICATE-----" in lines)
1850 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1851 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1852 self._checkRsaPrivateKey(key_pem)
1853 self._checkCertificate(cert_pem)
1855 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1857 self.assert_(key.bits() >= 1024)
1858 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1859 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1861 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1863 self.failIf(x509.has_expired())
1864 self.assertEqual(x509.get_issuer().CN, common_name)
1865 self.assertEqual(x509.get_subject().CN, common_name)
1866 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1868 def testLegacy(self):
1869 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1871 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1873 cert1 = utils.ReadFile(cert1_filename)
1875 self.assert_(self._checkRsaPrivateKey(cert1))
1876 self.assert_(self._checkCertificate(cert1))
1879 class TestPathJoin(unittest.TestCase):
1880 """Testing case for PathJoin"""
1882 def testBasicItems(self):
1883 mlist = ["/a", "b", "c"]
1884 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1886 def testNonAbsPrefix(self):
1887 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1889 def testBackTrack(self):
1890 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1892 def testMultiAbs(self):
1893 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1896 class TestValidateServiceName(unittest.TestCase):
1897 def testValid(self):
1899 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1904 "0", "80", "1111", "65535",
1907 for name in testnames:
1908 self.assertEqual(utils.ValidateServiceName(name), name)
1910 def testInvalid(self):
1912 -15756, -1, 65536, 133428083,
1913 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1914 "-8546", "-1", "65536",
1918 for name in testnames:
1919 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1922 class TestParseAsn1Generalizedtime(unittest.TestCase):
1925 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1926 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1928 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1932 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1934 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1936 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1938 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1940 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1943 # Leap seconds are not supported by datetime.datetime
1944 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1945 "19841231235960+0000")
1946 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1947 "19920630235960+0000")
1950 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1951 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1952 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1954 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1955 "Mon Feb 22 17:47:02 UTC 2010")
1956 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1957 "2010-02-22 17:42:02")
1960 class TestGetX509CertValidity(testutils.GanetiTestCase):
1962 testutils.GanetiTestCase.setUp(self)
1964 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1966 # Test whether we have pyOpenSSL 0.7 or above
1967 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1969 if not self.pyopenssl0_7:
1970 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1971 " function correctly")
1973 def _LoadCert(self, name):
1974 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1975 self._ReadTestData(name))
1978 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1979 if self.pyopenssl0_7:
1980 self.assertEqual(validity, (1266919967, 1267524767))
1982 self.assertEqual(validity, (None, None))
1985 class TestSignX509Certificate(unittest.TestCase):
1986 KEY = "My private key!"
1987 KEY_OTHER = "Another key"
1990 # Generate certificate valid for 5 minutes
1991 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1993 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1996 # No signature at all
1997 self.assertRaises(errors.GenericError,
1998 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
2001 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2003 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2004 "X-Ganeti-Signature: \n", self.KEY)
2005 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2006 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
2007 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2008 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
2009 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2010 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
2013 for salt in list("-_@$,:;/\\ \t\n"):
2014 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
2015 cert_pem, self.KEY, "foo%sbar" % salt)
2017 for salt in ["HelloWorld", "salt", string.letters, string.digits,
2018 utils.GenerateSecret(numbytes=4),
2019 utils.GenerateSecret(numbytes=16),
2020 "{123:456}".encode("hex")]:
2021 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2023 self._Check(cert, salt, signed_pem)
2025 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2026 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2027 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2028 "lines----\n------ at\nthe end!"))
2030 def _Check(self, cert, salt, pem):
2031 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2032 self.assertEqual(salt, salt2)
2033 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2036 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2037 pem, self.KEY_OTHER)
2040 class TestMakedirs(unittest.TestCase):
2042 self.tmpdir = tempfile.mkdtemp()
2045 shutil.rmtree(self.tmpdir)
2047 def testNonExisting(self):
2048 path = PathJoin(self.tmpdir, "foo")
2049 utils.Makedirs(path)
2050 self.assert_(os.path.isdir(path))
2052 def testExisting(self):
2053 path = PathJoin(self.tmpdir, "foo")
2055 utils.Makedirs(path)
2056 self.assert_(os.path.isdir(path))
2058 def testRecursiveNonExisting(self):
2059 path = PathJoin(self.tmpdir, "foo/bar/baz")
2060 utils.Makedirs(path)
2061 self.assert_(os.path.isdir(path))
2063 def testRecursiveExisting(self):
2064 path = PathJoin(self.tmpdir, "B/moo/xyz")
2065 self.assertFalse(os.path.exists(path))
2066 os.mkdir(PathJoin(self.tmpdir, "B"))
2067 utils.Makedirs(path)
2068 self.assert_(os.path.isdir(path))
2071 class TestRetry(testutils.GanetiTestCase):
2073 testutils.GanetiTestCase.setUp(self)
2077 def _RaiseRetryAgain():
2078 raise utils.RetryAgain()
2081 def _RaiseRetryAgainWithArg(args):
2082 raise utils.RetryAgain(*args)
2084 def _WrongNestedLoop(self):
2085 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2087 def _RetryAndSucceed(self, retries):
2088 if self.retries < retries:
2090 raise utils.RetryAgain()
2094 def testRaiseTimeout(self):
2095 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2096 self._RaiseRetryAgain, 0.01, 0.02)
2097 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2098 self._RetryAndSucceed, 0.01, 0, args=[1])
2099 self.failUnlessEqual(self.retries, 1)
2101 def testComplete(self):
2102 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2103 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2105 self.failUnlessEqual(self.retries, 2)
2107 def testNestedLoop(self):
2109 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2110 self._WrongNestedLoop, 0, 1)
2111 except utils.RetryTimeout:
2112 self.fail("Didn't detect inner loop's exception")
2114 def testTimeoutArgument(self):
2115 retry_arg="my_important_debugging_message"
2117 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2118 except utils.RetryTimeout, err:
2119 self.failUnlessEqual(err.args, (retry_arg, ))
2121 self.fail("Expected timeout didn't happen")
2123 def testRaiseInnerWithExc(self):
2124 retry_arg="my_important_debugging_message"
2127 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2128 args=[[errors.GenericError(retry_arg, retry_arg)]])
2129 except utils.RetryTimeout, err:
2132 self.fail("Expected timeout didn't happen")
2133 except errors.GenericError, err:
2134 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2136 self.fail("Expected GenericError didn't happen")
2138 def testRaiseInnerWithMsg(self):
2139 retry_arg="my_important_debugging_message"
2142 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2143 args=[[retry_arg, retry_arg]])
2144 except utils.RetryTimeout, err:
2147 self.fail("Expected timeout didn't happen")
2148 except utils.RetryTimeout, err:
2149 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2151 self.fail("Expected RetryTimeout didn't happen")
2154 class TestLineSplitter(unittest.TestCase):
2157 ls = utils.LineSplitter(lines.append)
2158 ls.write("Hello World\n")
2159 self.assertEqual(lines, [])
2160 ls.write("Foo\n Bar\r\n ")
2163 self.assertEqual(lines, [])
2165 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2167 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2169 def _testExtra(self, line, all_lines, p1, p2):
2170 self.assertEqual(p1, 999)
2171 self.assertEqual(p2, "extra")
2172 all_lines.append(line)
2174 def testExtraArgsNoFlush(self):
2176 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2177 ls.write("\n\nHello World\n")
2178 ls.write("Foo\n Bar\r\n ")
2181 ls.write("Moo\n\nx\n")
2182 self.assertEqual(lines, [])
2184 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2188 class TestReadLockedPidFile(unittest.TestCase):
2190 self.tmpdir = tempfile.mkdtemp()
2193 shutil.rmtree(self.tmpdir)
2195 def testNonExistent(self):
2196 path = PathJoin(self.tmpdir, "nonexist")
2197 self.assert_(utils.ReadLockedPidFile(path) is None)
2199 def testUnlocked(self):
2200 path = PathJoin(self.tmpdir, "pid")
2201 utils.WriteFile(path, data="123")
2202 self.assert_(utils.ReadLockedPidFile(path) is None)
2204 def testLocked(self):
2205 path = PathJoin(self.tmpdir, "pid")
2206 utils.WriteFile(path, data="123")
2208 fl = utils.FileLock.Open(path)
2210 fl.Exclusive(blocking=True)
2212 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2216 self.assert_(utils.ReadLockedPidFile(path) is None)
2218 def testError(self):
2219 path = PathJoin(self.tmpdir, "foobar", "pid")
2220 utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2221 # open(2) should return ENOTDIR
2222 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2225 class TestCertVerification(testutils.GanetiTestCase):
2227 testutils.GanetiTestCase.setUp(self)
2229 self.tmpdir = tempfile.mkdtemp()
2232 shutil.rmtree(self.tmpdir)
2234 def testVerifyCertificate(self):
2235 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2236 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2239 # Not checking return value as this certificate is expired
2240 utils.VerifyX509Certificate(cert, 30, 7)
2243 class TestVerifyCertificateInner(unittest.TestCase):
2245 vci = utils._VerifyCertificateInner
2248 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2252 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2253 self.assertEqual(errcode, utils.CERT_WARNING)
2256 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2257 self.assertEqual(errcode, utils.CERT_ERROR)
2259 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2260 self.assertEqual(errcode, utils.CERT_WARNING)
2262 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2263 self.assertEqual(errcode, None)
2266 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2267 self.assertEqual(errcode, utils.CERT_ERROR)
2269 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2270 self.assertEqual(errcode, utils.CERT_ERROR)
2272 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2273 self.assertEqual(errcode, utils.CERT_ERROR)
2275 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2276 self.assertEqual(errcode, utils.CERT_ERROR)
2279 class TestHmacFunctions(unittest.TestCase):
2280 # Digests can be checked with "openssl sha1 -hmac $key"
2281 def testSha1Hmac(self):
2282 self.assertEqual(utils.Sha1Hmac("", ""),
2283 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2284 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2285 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2286 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2287 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2289 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2290 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2291 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2293 def testSha1HmacSalt(self):
2294 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2295 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2296 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2297 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2298 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2299 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2301 def testVerifySha1Hmac(self):
2302 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2303 "7d64b71fb76370690e1d")))
2304 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2305 ("f904c2476527c6d3e660"
2306 "9ab683c66fa0652cb1dc")))
2308 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2309 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2310 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2312 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2314 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2317 def testVerifySha1HmacSalt(self):
2318 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2319 ("17a4adc34d69c0d367d4"
2320 "ffbef96fd41d4df7a6e8"),
2322 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2323 ("7f264f8114c9066afc9b"
2324 "b7636e1786d996d3cc0d"),
2328 class TestIgnoreSignals(unittest.TestCase):
2329 """Test the IgnoreSignals decorator"""
2332 def _Raise(exception):
2339 def testIgnoreSignals(self):
2340 sock_err_intr = socket.error(errno.EINTR, "Message")
2341 sock_err_inval = socket.error(errno.EINVAL, "Message")
2343 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2344 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2346 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2347 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2348 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2349 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2351 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2352 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2353 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2355 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2358 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2359 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2362 class TestEnsureDirs(unittest.TestCase):
2363 """Tests for EnsureDirs"""
2366 self.dir = tempfile.mkdtemp()
2367 self.old_umask = os.umask(0777)
2369 def testEnsureDirs(self):
2371 (PathJoin(self.dir, "foo"), 0777),
2372 (PathJoin(self.dir, "bar"), 0000),
2374 self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2375 self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2378 os.rmdir(PathJoin(self.dir, "foo"))
2379 os.rmdir(PathJoin(self.dir, "bar"))
2381 os.umask(self.old_umask)
2384 class TestFormatSeconds(unittest.TestCase):
2386 self.assertEqual(utils.FormatSeconds(1), "1s")
2387 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2388 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2389 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2390 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2391 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2392 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2393 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2394 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2395 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2396 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2398 def testFloat(self):
2399 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2400 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2401 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2402 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2405 class TestIgnoreProcessNotFound(unittest.TestCase):
2408 os.write(fd, str(os.getpid()))
2413 (pid_read_fd, pid_write_fd) = os.pipe()
2415 # Start short-lived process which writes its PID to pipe
2416 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2417 os.close(pid_write_fd)
2419 # Read PID from pipe
2420 pid = int(os.read(pid_read_fd, 1024))
2421 os.close(pid_read_fd)
2423 # Try to send signal to process which exited recently
2424 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2427 class TestShellWriter(unittest.TestCase):
2430 sw = utils.ShellWriter(buf)
2431 sw.Write("#!/bin/bash")
2432 sw.Write("if true; then")
2435 sw.Write("echo true")
2437 sw.Write("for i in 1 2 3")
2441 self.assertEqual(sw._indent, 2)
2448 sw.Write("echo %s", utils.ShellQuote("Hello World"))
2451 self.assertEqual(sw._indent, 0)
2453 output = buf.getvalue()
2455 self.assert_(output.endswith("\n"))
2457 lines = output.splitlines()
2458 self.assertEqual(len(lines), 9)
2459 self.assertEqual(lines[0], "#!/bin/bash")
2460 self.assert_(re.match(r"^\s+date$", lines[5]))
2461 self.assertEqual(lines[7], "echo 'Hello World'")
2463 def testEmpty(self):
2465 sw = utils.ShellWriter(buf)
2467 self.assertEqual(buf.getvalue(), "")
2470 class TestCommaJoin(unittest.TestCase):
2472 self.assertEqual(utils.CommaJoin([]), "")
2473 self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
2474 self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
2475 self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
2476 self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
2480 class TestFindMatch(unittest.TestCase):
2484 "bb": {"Two B": True},
2485 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
2488 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
2489 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
2491 for i in ["foo", "bar", "bazX"]:
2492 for j in range(1, 100, 7):
2493 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
2494 ((1, 2, 3), [i, str(j)]))
2496 def testNoMatch(self):
2497 self.assert_(utils.FindMatch({}, "") is None)
2498 self.assert_(utils.FindMatch({}, "foo") is None)
2499 self.assert_(utils.FindMatch({}, 1234) is None)
2503 re.compile("^(something)$"): "Hello World",
2506 self.assert_(utils.FindMatch(data, "") is None)
2507 self.assert_(utils.FindMatch(data, "Hello World") is None)
2510 class TestFileID(testutils.GanetiTestCase):
2511 def testEquality(self):
2512 name = self._CreateTempFile()
2513 oldi = utils.GetFileID(path=name)
2514 self.failUnless(utils.VerifyFileID(oldi, oldi))
2516 def testUpdate(self):
2517 name = self._CreateTempFile()
2518 oldi = utils.GetFileID(path=name)
2519 os.utime(name, None)
2520 fd = os.open(name, os.O_RDWR)
2522 newi = utils.GetFileID(fd=fd)
2523 self.failUnless(utils.VerifyFileID(oldi, newi))
2524 self.failUnless(utils.VerifyFileID(newi, oldi))
2528 def testWriteFile(self):
2529 name = self._CreateTempFile()
2530 oldi = utils.GetFileID(path=name)
2532 os.utime(name, (mtime + 10, mtime + 10))
2533 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
2535 os.utime(name, (mtime - 10, mtime - 10))
2536 utils.SafeWriteFile(name, oldi, data="")
2537 oldi = utils.GetFileID(path=name)
2539 os.utime(name, (mtime + 10, mtime + 10))
2540 # this doesn't raise, since we passed None
2541 utils.SafeWriteFile(name, None, data="")
2543 def testError(self):
2544 t = tempfile.NamedTemporaryFile()
2545 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
2546 path=t.name, fd=t.fileno())
2550 def __init__(self, values):
2551 self.values = values
2554 return self.values.pop(0)
2557 class TestRunningTimeout(unittest.TestCase):
2559 self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
2561 def testRemainingFloat(self):
2562 timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
2563 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2564 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2565 self.assertAlmostEqual(timeout.Remaining(), -1.5)
2567 def testRemaining(self):
2568 self.time_fn = TimeMock([0, 2, 4, 5, 6])
2569 timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
2570 self.assertEqual(timeout.Remaining(), 3)
2571 self.assertEqual(timeout.Remaining(), 1)
2572 self.assertEqual(timeout.Remaining(), 0)
2573 self.assertEqual(timeout.Remaining(), -1)
2575 def testRemainingNonNegative(self):
2576 timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
2577 self.assertAlmostEqual(timeout.Remaining(), 4.7)
2578 self.assertAlmostEqual(timeout.Remaining(), 0.4)
2579 self.assertEqual(timeout.Remaining(), 0.0)
2581 def testNegativeTimeout(self):
2582 self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
2585 class TestTryConvert(unittest.TestCase):
2587 for src, fn, result in [
2593 self.assertEqual(utils.TryConvert(fn, src), result)
2596 class TestIsValidShellParam(unittest.TestCase):
2598 for val, result in [
2602 self.assertEqual(utils.IsValidShellParam(val), result)
2605 class TestBuildShellCmd(unittest.TestCase):
2607 self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
2609 self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
2612 class TestWriteFile(unittest.TestCase):
2614 self.tfile = tempfile.NamedTemporaryFile()
2615 self.did_pre = False
2616 self.did_post = False
2617 self.did_write = False
2619 def markPre(self, fd):
2622 def markPost(self, fd):
2623 self.did_post = True
2625 def markWrite(self, fd):
2626 self.did_write = True
2628 def testWrite(self):
2630 utils.WriteFile(self.tfile.name, data=data)
2631 self.assertEqual(utils.ReadFile(self.tfile.name), data)
2633 def testErrors(self):
2634 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2635 self.tfile.name, data="test", fn=lambda fd: None)
2636 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
2637 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2638 self.tfile.name, data="test", atime=0)
2640 def testCalls(self):
2641 utils.WriteFile(self.tfile.name, fn=self.markWrite,
2642 prewrite=self.markPre, postwrite=self.markPost)
2643 self.assertTrue(self.did_pre)
2644 self.assertTrue(self.did_post)
2645 self.assertTrue(self.did_write)
2647 def testDryRun(self):
2649 self.tfile.write(orig)
2651 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
2652 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
2654 def testTimes(self):
2656 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
2657 (int(time.time()), 5000)]:
2658 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
2660 self.assertEqual(st.st_atime, at)
2661 self.assertEqual(st.st_mtime, mt)
2664 def testNoClose(self):
2666 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
2667 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
2670 self.assertEqual(os.read(fd, 4096), data)
2675 class TestNormalizeAndValidateMac(unittest.TestCase):
2676 def testInvalid(self):
2677 self.assertRaises(errors.OpPrereqError,
2678 utils.NormalizeAndValidateMac, "xxx")
2680 def testNormalization(self):
2681 for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
2682 self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
2685 class TestNiceSort(unittest.TestCase):
2687 self.assertEqual(utils.NiceSort([]), [])
2688 self.assertEqual(utils.NiceSort(["foo"]), ["foo"])
2689 self.assertEqual(utils.NiceSort(["bar", ""]), ["", "bar"])
2690 self.assertEqual(utils.NiceSort([",", "."]), [",", "."])
2691 self.assertEqual(utils.NiceSort(["0.1", "0.2"]), ["0.1", "0.2"])
2692 self.assertEqual(utils.NiceSort(["0;099", "0,099", "0.1", "0.2"]),
2693 ["0,099", "0.1", "0.2", "0;099"])
2695 data = ["a0", "a1", "a99", "a20", "a2", "b10", "b70", "b00", "0000"]
2696 self.assertEqual(utils.NiceSort(data),
2697 ["0000", "a0", "a1", "a2", "a20", "a99",
2698 "b00", "b10", "b70"])
2700 data = ["a0-0", "a1-0", "a99-10", "a20-3", "a0-4", "a99-3", "a09-2",
2701 "Z", "a9-1", "A", "b"]
2702 self.assertEqual(utils.NiceSort(data),
2703 ["A", "Z", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2704 "a20-3", "a99-3", "a99-10", "b"])
2705 self.assertEqual(utils.NiceSort(data, key=str.lower),
2706 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2707 "a20-3", "a99-3", "a99-10", "b", "Z"])
2708 self.assertEqual(utils.NiceSort(data, key=str.upper),
2709 ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
2710 "a20-3", "a99-3", "a99-10", "b", "Z"])
2712 def testLargeA(self):
2714 "Eegah9ei", "xij88brTulHYAv8IEOyU", "3jTwJPtrXOY22bwL2YoW",
2715 "Z8Ljf1Pf5eBfNg171wJR", "WvNJd91OoXvLzdEiEXa6", "uHXAyYYftCSG1o7qcCqe",
2716 "xpIUJeVT1Rp", "KOt7vn1dWXi", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2717 "cPRi0lM7HLnSuWA2G9", "KVQqLPDjcPjf8T3oyzjcOsfkb",
2718 "guKJkXnkULealVC8CyF1xefym", "pqF8dkU5B1cMnyZuREaSOADYx",
2720 self.assertEqual(utils.NiceSort(data), [
2721 "3jTwJPtrXOY22bwL2YoW", "Eegah9ei", "KOt7vn1dWXi",
2722 "KVQqLPDjcPjf8T3oyzjcOsfkb", "WvNJd91OoXvLzdEiEXa6",
2723 "Z8Ljf1Pf5eBfNg171wJR", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
2724 "cPRi0lM7HLnSuWA2G9", "guKJkXnkULealVC8CyF1xefym",
2725 "pqF8dkU5B1cMnyZuREaSOADYx", "uHXAyYYftCSG1o7qcCqe",
2726 "xij88brTulHYAv8IEOyU", "xpIUJeVT1Rp"
2729 def testLargeB(self):
2731 "inst-0.0.0.0-0.0.0.0",
2732 "inst-0.1.0.0-0.0.0.0",
2733 "inst-0.2.0.0-0.0.0.0",
2734 "inst-0.2.1.0-0.0.0.0",
2735 "inst-0.2.2.0-0.0.0.0",
2736 "inst-0.2.2.0-0.0.0.9",
2737 "inst-0.2.2.0-0.0.3.9",
2738 "inst-0.2.2.0-0.2.0.9",
2739 "inst-0.2.2.0-0.9.0.9",
2740 "inst-0.20.2.0-0.0.0.0",
2741 "inst-0.20.2.0-0.9.0.9",
2742 "inst-10.020.2.0-0.9.0.10",
2743 "inst-15.020.2.0-0.9.1.00",
2744 "inst-100.020.2.0-0.9.0.9",
2746 # Only the last group, not converted to a number anymore, differs
2747 "inst-100.020.2.0a999",
2748 "inst-100.020.2.0b000",
2749 "inst-100.020.2.0c10",
2750 "inst-100.020.2.0c101",
2751 "inst-100.020.2.0c2",
2752 "inst-100.020.2.0c20",
2753 "inst-100.020.2.0c3",
2754 "inst-100.020.2.0c39123",
2757 rnd = random.Random(16205)
2760 rnd.shuffle(testdata)
2761 assert testdata != data
2762 self.assertEqual(utils.NiceSort(testdata), data)
2765 def __init__(self, fn):
2769 def __call__(self, *args):
2771 return self.fn(*args)
2773 def testKeyfuncA(self):
2774 # Generate some random numbers
2775 rnd = random.Random(21131)
2776 numbers = [rnd.randint(0, 10000) for _ in range(999)]
2777 assert numbers != sorted(numbers)
2780 data = [hex(i) for i in numbers]
2783 keyfn = self._CallCount(lambda value: str(int(value, 16)))
2785 # Sort with key function converting hex to decimal
2786 result = utils.NiceSort(data, key=keyfn)
2788 self.assertEqual([hex(i) for i in sorted(numbers)], result)
2789 self.assertEqual(data, datacopy, msg="Input data was modified in NiceSort")
2790 self.assertEqual(keyfn.count, len(numbers),
2791 msg="Key function was not called once per value")
2794 def __init__(self, name, value):
2798 def testKeyfuncB(self):
2799 rnd = random.Random(27396)
2801 for i in range(123):
2802 v1 = rnd.randint(0, 5)
2803 v2 = rnd.randint(0, 5)
2804 data.append(self._TestData("inst-%s-%s-%s" % (v1, v2, i),
2807 assert data != sorted(data, key=operator.attrgetter("name"))
2809 keyfn = self._CallCount(operator.attrgetter("name"))
2812 result = utils.NiceSort(data, key=keyfn)
2814 self.assertEqual(result, sorted(data, key=operator.attrgetter("value")))
2815 self.assertEqual(keyfn.count, len(data),
2816 msg="Key function was not called once per value")
2819 if __name__ == '__main__':
2820 testutils.GanetiTestProgram()