X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/24d70417bc9f4df2ac865fca584a8ebfe9c94519..39019f912d61d301e593dcbc8e72d5be44f75d7a:/test/ganeti.utils_unittest.py diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 836b200..511e48a 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -39,6 +39,8 @@ import OpenSSL import warnings import distutils.version import glob +import md5 +import errno import ganeti import testutils @@ -52,7 +54,7 @@ from ganeti.utils import IsProcessAlive, RunCmd, \ ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \ - UnescapeAndSplit, RunParts, PathJoin, HostInfo + UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile from ganeti.errors import LockError, UnitParseError, GenericError, \ ProgrammerError, OpPrereqError @@ -662,6 +664,107 @@ class TestMatchNameComponent(unittest.TestCase): None) +class TestReadFile(testutils.GanetiTestCase): + + def testReadAll(self): + data = utils.ReadFile(self._TestDataFilename("cert1.pem")) + self.assertEqual(len(data), 814) + + h = md5.new() + h.update(data) + self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4") + + def testReadSize(self): + data = utils.ReadFile(self._TestDataFilename("cert1.pem"), + size=100) + self.assertEqual(len(data), 100) + + h = md5.new() + h.update(data) + self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7") + + def testError(self): + self.assertRaises(EnvironmentError, utils.ReadFile, + "/dev/null/does-not-exist") + + +class TestReadOneLineFile(testutils.GanetiTestCase): + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + + def testDefault(self): + data = ReadOneLineFile(self._TestDataFilename("cert1.pem")) + self.assertEqual(len(data), 27) + self.assertEqual(data, "-----BEGIN CERTIFICATE-----") + + def testNotStrict(self): + data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False) + self.assertEqual(len(data), 27) + self.assertEqual(data, "-----BEGIN CERTIFICATE-----") + + def testStrictFailure(self): + self.assertRaises(errors.GenericError, ReadOneLineFile, + self._TestDataFilename("cert1.pem"), strict=True) + + def testLongLine(self): + dummydata = (1024 * "Hello World! ") + myfile = self._CreateTempFile() + utils.WriteFile(myfile, data=dummydata) + datastrict = ReadOneLineFile(myfile, strict=True) + datalax = ReadOneLineFile(myfile, strict=False) + self.assertEqual(dummydata, datastrict) + self.assertEqual(dummydata, datalax) + + def testNewline(self): + myfile = self._CreateTempFile() + myline = "myline" + for nl in ["", "\n", "\r\n"]: + dummydata = "%s%s" % (myline, nl) + utils.WriteFile(myfile, data=dummydata) + datalax = ReadOneLineFile(myfile, strict=False) + self.assertEqual(myline, datalax) + datastrict = ReadOneLineFile(myfile, strict=True) + self.assertEqual(myline, datastrict) + + def testWhitespaceAndMultipleLines(self): + myfile = self._CreateTempFile() + for nl in ["", "\n", "\r\n"]: + for ws in [" ", "\t", "\t\t \t", "\t "]: + dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl))) + utils.WriteFile(myfile, data=dummydata) + datalax = ReadOneLineFile(myfile, strict=False) + if nl: + self.assert_(set("\r\n") & set(dummydata)) + self.assertRaises(errors.GenericError, ReadOneLineFile, + myfile, strict=True) + explen = len("Foo bar baz ") + len(ws) + self.assertEqual(len(datalax), explen) + self.assertEqual(datalax, dummydata[:explen]) + self.assertFalse(set("\r\n") & set(datalax)) + else: + datastrict = ReadOneLineFile(myfile, strict=True) + self.assertEqual(dummydata, datastrict) + self.assertEqual(dummydata, datalax) + + def testEmptylines(self): + myfile = self._CreateTempFile() + myline = "myline" + for nl in ["\n", "\r\n"]: + for ol in ["", "otherline"]: + dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl) + utils.WriteFile(myfile, data=dummydata) + self.assert_(set("\r\n") & set(dummydata)) + datalax = ReadOneLineFile(myfile, strict=False) + self.assertEqual(myline, datalax) + if ol: + self.assertRaises(errors.GenericError, ReadOneLineFile, + myfile, strict=True) + else: + datastrict = ReadOneLineFile(myfile, strict=True) + self.assertEqual(myline, datastrict) + + class TestTimestampForFilename(unittest.TestCase): def test(self): self.assert_("." not in utils.TimestampForFilename()) @@ -1050,8 +1153,8 @@ class TestOwnIpAddress(unittest.TestCase): def testNowOwnAddress(self): """check that I don't own an address""" - # network 192.0.2.0/24 is reserved for test/documentation as per - # rfc 3330, so we *should* not have an address of this range... if + # Network 192.0.2.0/24 is reserved for test/documentation as per + # RFC 5735, so we *should* not have an address of this range... if # this fails, we should extend the test to multiple addresses DST_IP = "192.0.2.1" self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP) @@ -1466,7 +1569,7 @@ class TestForceDictType(unittest.TestCase): self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) -class TestIsAbsNormPath(unittest.TestCase): +class TestIsNormAbsPath(unittest.TestCase): """Testing case for IsNormAbsPath""" def _pathTestHelper(self, path, result): @@ -1852,19 +1955,40 @@ class TestMakedirs(unittest.TestCase): class TestRetry(testutils.GanetiTestCase): + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.retries = 0 + @staticmethod def _RaiseRetryAgain(): raise utils.RetryAgain() + @staticmethod + def _RaiseRetryAgainWithArg(args): + raise utils.RetryAgain(*args) + def _WrongNestedLoop(self): return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02) + def _RetryAndSucceed(self, retries): + if self.retries < retries: + self.retries += 1 + raise utils.RetryAgain() + else: + return True + def testRaiseTimeout(self): self.failUnlessRaises(utils.RetryTimeout, utils.Retry, self._RaiseRetryAgain, 0.01, 0.02) + self.failUnlessRaises(utils.RetryTimeout, utils.Retry, + self._RetryAndSucceed, 0.01, 0, args=[1]) + self.failUnlessEqual(self.retries, 1) def testComplete(self): self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True) + self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]), + True) + self.failUnlessEqual(self.retries, 2) def testNestedLoop(self): try: @@ -1873,6 +1997,45 @@ class TestRetry(testutils.GanetiTestCase): except utils.RetryTimeout: self.fail("Didn't detect inner loop's exception") + def testTimeoutArgument(self): + retry_arg="my_important_debugging_message" + try: + utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]]) + except utils.RetryTimeout, err: + self.failUnlessEqual(err.args, (retry_arg, )) + else: + self.fail("Expected timeout didn't happen") + + def testRaiseInnerWithExc(self): + retry_arg="my_important_debugging_message" + try: + try: + utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, + args=[[errors.GenericError(retry_arg, retry_arg)]]) + except utils.RetryTimeout, err: + err.RaiseInner() + else: + self.fail("Expected timeout didn't happen") + except errors.GenericError, err: + self.failUnlessEqual(err.args, (retry_arg, retry_arg)) + else: + self.fail("Expected GenericError didn't happen") + + def testRaiseInnerWithMsg(self): + retry_arg="my_important_debugging_message" + try: + try: + utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, + args=[[retry_arg, retry_arg]]) + except utils.RetryTimeout, err: + err.RaiseInner() + else: + self.fail("Expected timeout didn't happen") + except utils.RetryTimeout, err: + self.failUnlessEqual(err.args, (retry_arg, retry_arg)) + else: + self.fail("Expected RetryTimeout didn't happen") + class TestLineSplitter(unittest.TestCase): def test(self): @@ -1999,5 +2162,90 @@ class TestVerifyCertificateInner(unittest.TestCase): self.assertEqual(errcode, utils.CERT_ERROR) +class TestHmacFunctions(unittest.TestCase): + # Digests can be checked with "openssl sha1 -hmac $key" + def testSha1Hmac(self): + self.assertEqual(utils.Sha1Hmac("", ""), + "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d") + self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"), + "ef4f3bda82212ecb2f7ce868888a19092481f1fd") + self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""), + "f904c2476527c6d3e6609ab683c66fa0652cb1dc") + + longtext = 1500 * "The quick brown fox jumps over the lazy dog\n" + self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext), + "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54") + + def testSha1HmacSalt(self): + self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"), + "4999bf342470eadb11dfcd24ca5680cf9fd7cdce") + self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"), + "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8") + self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"), + "7f264f8114c9066afc9bb7636e1786d996d3cc0d") + + def testVerifySha1Hmac(self): + self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b" + "7d64b71fb76370690e1d"))) + self.assert_(utils.VerifySha1Hmac("TguMTA2K", "", + ("f904c2476527c6d3e660" + "9ab683c66fa0652cb1dc"))) + + digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd" + self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest)) + self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", + digest.lower())) + self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", + digest.upper())) + self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", + digest.title())) + + def testVerifySha1HmacSalt(self): + self.assert_(utils.VerifySha1Hmac("TguMTA2K", "", + ("17a4adc34d69c0d367d4" + "ffbef96fd41d4df7a6e8"), + salt="abc9")) + self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", + ("7f264f8114c9066afc9b" + "b7636e1786d996d3cc0d"), + salt="xyz0")) + + +class TestIgnoreSignals(unittest.TestCase): + """Test the IgnoreSignals decorator""" + + @staticmethod + def _Raise(exception): + raise exception + + @staticmethod + def _Return(rval): + return rval + + def testIgnoreSignals(self): + sock_err_intr = socket.error(errno.EINTR, "Message") + sock_err_intr.errno = errno.EINTR + sock_err_inval = socket.error(errno.EINVAL, "Message") + sock_err_inval.errno = errno.EINVAL + + env_err_intr = EnvironmentError(errno.EINTR, "Message") + env_err_inval = EnvironmentError(errno.EINVAL, "Message") + + self.assertRaises(socket.error, self._Raise, sock_err_intr) + self.assertRaises(socket.error, self._Raise, sock_err_inval) + self.assertRaises(EnvironmentError, self._Raise, env_err_intr) + self.assertRaises(EnvironmentError, self._Raise, env_err_inval) + + self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None) + self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None) + self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise, + sock_err_inval) + self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise, + env_err_inval) + + self.assertEquals(utils.IgnoreSignals(self._Return, True), True) + self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33) + + if __name__ == '__main__': testutils.GanetiTestProgram()