Convert snapshot_export rpc to new style
[ganeti-local] / test / ganeti.utils_unittest.py
index 29fddfd..846b4b4 100755 (executable)
@@ -38,11 +38,14 @@ import ganeti
 import testutils
 from ganeti import constants
 from ganeti import utils
+from ganeti import errors
 from ganeti.utils import IsProcessAlive, RunCmd, \
      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
-     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
+     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
+     TailFile, ForceDictType, IsNormAbsPath
+
 from ganeti.errors import LockError, UnitParseError, GenericError, \
      ProgrammerError
 
@@ -128,13 +131,9 @@ class TestRunCmd(testutils.GanetiTestCase):
   """Testing case for the RunCmd function"""
 
   def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
     self.magic = time.ctime() + " ganeti test"
-    fh, self.fname = tempfile.mkstemp()
-    os.close(fh)
-
-  def tearDown(self):
-    if self.fname:
-      utils.RemoveFile(self.fname)
+    self.fname = self._CreateTempFile()
 
   def testOk(self):
     """Test successful exit code"""
@@ -451,21 +450,14 @@ class TestSshKeys(testutils.GanetiTestCase):
            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
 
   def setUp(self):
-    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
+    testutils.GanetiTestCase.setUp(self)
+    self.tmpname = self._CreateTempFile()
+    handle = open(self.tmpname, 'w')
     try:
-      handle = os.fdopen(fd, 'w')
-      try:
-        handle.write("%s\n" % TestSshKeys.KEY_A)
-        handle.write("%s\n" % TestSshKeys.KEY_B)
-      finally:
-        handle.close()
-    except:
-      utils.RemoveFile(self.tmpname)
-      raise
-
-  def tearDown(self):
-    utils.RemoveFile(self.tmpname)
-    del self.tmpname
+      handle.write("%s\n" % TestSshKeys.KEY_A)
+      handle.write("%s\n" % TestSshKeys.KEY_B)
+    finally:
+      handle.close()
 
   def testAddingNewKey(self):
     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
@@ -517,22 +509,15 @@ class TestEtcHosts(testutils.GanetiTestCase):
   """Test functions modifying /etc/hosts"""
 
   def setUp(self):
-    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
+    testutils.GanetiTestCase.setUp(self)
+    self.tmpname = self._CreateTempFile()
+    handle = open(self.tmpname, 'w')
     try:
-      handle = os.fdopen(fd, 'w')
-      try:
-        handle.write('# This is a test file for /etc/hosts\n')
-        handle.write('127.0.0.1\tlocalhost\n')
-        handle.write('192.168.1.1 router gw\n')
-      finally:
-        handle.close()
-    except:
-      utils.RemoveFile(self.tmpname)
-      raise
-
-  def tearDown(self):
-    utils.RemoveFile(self.tmpname)
-    del self.tmpname
+      handle.write('# This is a test file for /etc/hosts\n')
+      handle.write('127.0.0.1\tlocalhost\n')
+      handle.write('192.168.1.1 router gw\n')
+    finally:
+      handle.close()
 
   def testSettingNewIp(self):
     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
@@ -542,6 +527,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1 router gw\n"
       "1.2.3.4\tmyhost.domain.tld myhost\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testSettingExistingIp(self):
     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
@@ -551,6 +537,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "# This is a test file for /etc/hosts\n"
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1\tmyhost.domain.tld myhost\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testSettingDuplicateName(self):
     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
@@ -560,6 +547,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1 router gw\n"
       "1.2.3.4\tmyhost\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testRemovingExistingHost(self):
     RemoveEtcHostsEntry(self.tmpname, 'router')
@@ -568,6 +556,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "# This is a test file for /etc/hosts\n"
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1 gw\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testRemovingSingleExistingHost(self):
     RemoveEtcHostsEntry(self.tmpname, 'localhost')
@@ -575,6 +564,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
     self.assertFileContent(self.tmpname,
       "# This is a test file for /etc/hosts\n"
       "192.168.1.1 router gw\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testRemovingNonExistingHost(self):
     RemoveEtcHostsEntry(self.tmpname, 'myhost')
@@ -583,6 +573,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "# This is a test file for /etc/hosts\n"
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1 router gw\n")
+    self.assertFileMode(self.tmpname, 0644)
 
   def testRemovingAlias(self):
     RemoveEtcHostsEntry(self.tmpname, 'gw')
@@ -591,6 +582,7 @@ class TestEtcHosts(testutils.GanetiTestCase):
       "# This is a test file for /etc/hosts\n"
       "127.0.0.1\tlocalhost\n"
       "192.168.1.1 router\n")
+    self.assertFileMode(self.tmpname, 0644)
 
 
 class TestShellQuoting(unittest.TestCase):
@@ -790,6 +782,48 @@ class TestFirstFree(unittest.TestCase):
     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
 
 
+class TestTailFile(testutils.GanetiTestCase):
+  """Test case for the TailFile function"""
+
+  def testEmpty(self):
+    fname = self._CreateTempFile()
+    self.failUnlessEqual(TailFile(fname), [])
+    self.failUnlessEqual(TailFile(fname, lines=25), [])
+
+  def testAllLines(self):
+    data = ["test %d" % i for i in range(30)]
+    for i in range(30):
+      fname = self._CreateTempFile()
+      fd = open(fname, "w")
+      fd.write("\n".join(data[:i]))
+      if i > 0:
+        fd.write("\n")
+      fd.close()
+      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
+
+  def testPartialLines(self):
+    data = ["test %d" % i for i in range(30)]
+    fname = self._CreateTempFile()
+    fd = open(fname, "w")
+    fd.write("\n".join(data))
+    fd.write("\n")
+    fd.close()
+    for i in range(1, 30):
+      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+  def testBigFile(self):
+    data = ["test %d" % i for i in range(30)]
+    fname = self._CreateTempFile()
+    fd = open(fname, "w")
+    fd.write("X" * 1048576)
+    fd.write("\n")
+    fd.write("\n".join(data))
+    fd.write("\n")
+    fd.close()
+    for i in range(1, 30):
+      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+
 class TestFileLock(unittest.TestCase):
   """Test case for the FileLock class"""
 
@@ -895,6 +929,63 @@ class FieldSetTestCase(unittest.TestCase):
     self.failIf(f.NonMatching(["b12", "c"]))
     self.failUnless(f.NonMatching(["a", "1"]))
 
+class TestForceDictType(unittest.TestCase):
+  """Test case for ForceDictType"""
+
+  def setUp(self):
+    self.key_types = {
+      'a': constants.VTYPE_INT,
+      'b': constants.VTYPE_BOOL,
+      'c': constants.VTYPE_STRING,
+      'd': constants.VTYPE_SIZE,
+      }
+
+  def _fdt(self, dict, allowed_values=None):
+    if allowed_values is None:
+      ForceDictType(dict, self.key_types)
+    else:
+      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
+
+    return dict
+
+  def testSimpleDict(self):
+    self.assertEqual(self._fdt({}), {})
+    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
+    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
+    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
+    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
+    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
+    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
+    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
+    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
+    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
+    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
+    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+
+  def testErrors(self):
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+
+
+class TestIsAbsNormPath(unittest.TestCase):
+  """Testing case for IsProcessAlive"""
+
+  def _pathTestHelper(self, path, result):
+    if result:
+      self.assert_(IsNormAbsPath(path),
+          "Path %s should result absolute and normalized" % path)
+    else:
+      self.assert_(not IsNormAbsPath(path),
+          "Path %s should not result absolute and normalized" % path)
+
+  def testBase(self):
+    self._pathTestHelper('/etc', True)
+    self._pathTestHelper('/srv', True)
+    self._pathTestHelper('etc', False)
+    self._pathTestHelper('/etc/../root', False)
+    self._pathTestHelper('/etc/', False)
 
 if __name__ == '__main__':
   unittest.main()