- results = RunParts(self.rundir, reset_env=True)
-
- (relname, status, runresult) = results[0]
- self.failUnlessEqual(relname, os.path.basename(files[0]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.exit_code, 1)
- self.failUnless(runresult.failed)
-
- (relname, status, runresult) = results[1]
- self.failUnlessEqual(relname, os.path.basename(files[1]))
- self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
- self.failUnlessEqual(runresult, None)
-
- (relname, status, runresult) = results[2]
- self.failUnlessEqual(relname, os.path.basename(files[2]))
- self.failUnlessEqual(status, constants.RUNPARTS_ERR)
- self.failUnless(runresult)
-
- (relname, status, runresult) = results[3]
- self.failUnlessEqual(relname, os.path.basename(files[3]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.output, "ciao")
- self.failUnlessEqual(runresult.exit_code, 0)
- self.failUnless(not runresult.failed)
-
-
-class TestRemoveFile(unittest.TestCase):
- """Test case for the RemoveFile function"""
-
- def setUp(self):
- """Create a temp dir and file for each case"""
- self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
- fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
- os.close(fd)
-
- def tearDown(self):
- if os.path.exists(self.tmpfile):
- os.unlink(self.tmpfile)
- os.rmdir(self.tmpdir)
-
-
- def testIgnoreDirs(self):
- """Test that RemoveFile() ignores directories"""
- self.assertEqual(None, RemoveFile(self.tmpdir))
-
-
- def testIgnoreNotExisting(self):
- """Test that RemoveFile() ignores non-existing files"""
- RemoveFile(self.tmpfile)
- RemoveFile(self.tmpfile)
-
-
- def testRemoveFile(self):
- """Test that RemoveFile does remove a file"""
- RemoveFile(self.tmpfile)
- if os.path.exists(self.tmpfile):
- self.fail("File '%s' not removed" % self.tmpfile)
-
-
- def testRemoveSymlink(self):
- """Test that RemoveFile does remove symlinks"""
- symlink = self.tmpdir + "/symlink"
- os.symlink("no-such-file", symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
- os.symlink(self.tmpfile, symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
-
-
-class TestRename(unittest.TestCase):
- """Test case for RenameFile"""
-
- def setUp(self):
- """Create a temporary directory"""
- self.tmpdir = tempfile.mkdtemp()
- self.tmpfile = os.path.join(self.tmpdir, "test1")
-
- # Touch the file
- open(self.tmpfile, "w").close()
-
- def tearDown(self):
- """Remove temporary directory"""
- shutil.rmtree(self.tmpdir)
-
- def testSimpleRename1(self):
- """Simple rename 1"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testSimpleRename2(self):
- """Simple rename 2"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
- mkdir=True)
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testRenameMkdir(self):
- """Rename with mkdir"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
-
- utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
- os.path.join(self.tmpdir, "test/foo/bar/baz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
-
-
-class TestMatchNameComponent(unittest.TestCase):
- """Test case for the MatchNameComponent function"""
-
- def testEmptyList(self):
- """Test that there is no match against an empty list"""
-
- self.failUnlessEqual(MatchNameComponent("", []), None)
- self.failUnlessEqual(MatchNameComponent("test", []), None)
-
- def testSingleMatch(self):
- """Test that a single match is performed correctly"""
- mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
- for key in "test2", "test2.example", "test2.example.com":
- self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
-
- def testMultipleMatches(self):
- """Test that a multiple match is returned as None"""
- mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
- for key in "test1", "test1.example":
- self.failUnlessEqual(MatchNameComponent(key, mlist), None)
-
- def testFullMatch(self):
- """Test that a full match is returned correctly"""
- key1 = "test1"
- key2 = "test1.example"
- mlist = [key2, key2 + ".com"]
- self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
- self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
-
- def testCaseInsensitivePartialMatch(self):
- """Test for the case_insensitive keyword"""
- mlist = ["test1.example.com", "test2.example.net"]
- self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
- "test2.example.net")
-
-
- def testCaseInsensitiveFullMatch(self):
- mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
- # Between the two ts1 a full string match non-case insensitive should work
- self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
- None)
- self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- # Between the two ts2 only case differs, so only case-match works
- self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
- "ts2.ex")
- self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
- "Ts2.ex")
- self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
- None)
-
-
-class TestFormatUnit(unittest.TestCase):
- """Test case for the FormatUnit function"""
-
- def testMiB(self):
- self.assertEqual(FormatUnit(1, 'h'), '1M')
- self.assertEqual(FormatUnit(100, 'h'), '100M')
- self.assertEqual(FormatUnit(1023, 'h'), '1023M')
-
- self.assertEqual(FormatUnit(1, 'm'), '1')
- self.assertEqual(FormatUnit(100, 'm'), '100')
- self.assertEqual(FormatUnit(1023, 'm'), '1023')
-
- self.assertEqual(FormatUnit(1024, 'm'), '1024')
- self.assertEqual(FormatUnit(1536, 'm'), '1536')
- self.assertEqual(FormatUnit(17133, 'm'), '17133')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
-
- def testGiB(self):
- self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
- self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
- self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
-
- self.assertEqual(FormatUnit(1024, 'g'), '1.0')
- self.assertEqual(FormatUnit(1536, 'g'), '1.5')
- self.assertEqual(FormatUnit(17133, 'g'), '16.7')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
-
- self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
- self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
- self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
-
- def testTiB(self):
- self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
- self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
- self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
-
- self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
- self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
- self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
-
-class TestParseUnit(unittest.TestCase):
- """Test case for the ParseUnit function"""
-
- SCALES = (('', 1),
- ('M', 1), ('G', 1024), ('T', 1024 * 1024),
- ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
- ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
-
- def testRounding(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('1'), 4)
- self.assertEqual(ParseUnit('2'), 4)
- self.assertEqual(ParseUnit('3'), 4)
-
- self.assertEqual(ParseUnit('124'), 124)
- self.assertEqual(ParseUnit('125'), 128)
- self.assertEqual(ParseUnit('126'), 128)
- self.assertEqual(ParseUnit('127'), 128)
- self.assertEqual(ParseUnit('128'), 128)
- self.assertEqual(ParseUnit('129'), 132)
- self.assertEqual(ParseUnit('130'), 132)
-
- def testFloating(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('0.5'), 4)
- self.assertEqual(ParseUnit('1.75'), 4)
- self.assertEqual(ParseUnit('1.99'), 4)
- self.assertEqual(ParseUnit('2.00'), 4)
- self.assertEqual(ParseUnit('2.01'), 4)
- self.assertEqual(ParseUnit('3.99'), 4)
- self.assertEqual(ParseUnit('4.00'), 4)
- self.assertEqual(ParseUnit('4.01'), 8)
- self.assertEqual(ParseUnit('1.5G'), 1536)
- self.assertEqual(ParseUnit('1.8G'), 1844)
- self.assertEqual(ParseUnit('8.28T'), 8682212)
-
- def testSuffixes(self):
- for sep in ('', ' ', ' ', "\t", "\t "):
- for suffix, scale in TestParseUnit.SCALES:
- for func in (lambda x: x, str.lower, str.upper):
- self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
- 1024 * scale)