Remove authorization code from remote API
[ganeti-local] / test / ganeti.utils_unittest.py
index 4b081a0..d57e0c0 100755 (executable)
@@ -26,6 +26,7 @@ import os
 import time
 import tempfile
 import os.path
 import time
 import tempfile
 import os.path
+import os
 import md5
 import socket
 import shutil
 import md5
 import socket
 import shutil
@@ -89,6 +90,21 @@ class TestIsProcessAlive(unittest.TestCase):
 
 class TestLocking(unittest.TestCase):
   """Testing case for the Lock/Unlock functions"""
 
 class TestLocking(unittest.TestCase):
   """Testing case for the Lock/Unlock functions"""
+
+  def setUp(self):
+    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
+                                suffix=".locking")
+    self.old_lock_dir = constants.LOCK_DIR
+    constants.LOCK_DIR = lock_dir
+
+  def tearDown(self):
+    try:
+      ganeti.utils.Unlock("unittest")
+    except LockError:
+      pass
+    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
+    constants.LOCK_DIR = self.old_lock_dir
+
   def clean_lock(self, name):
     try:
       ganeti.utils.Unlock("unittest")
   def clean_lock(self, name):
     try:
       ganeti.utils.Unlock("unittest")
@@ -106,7 +122,6 @@ class TestLocking(unittest.TestCase):
     ganeti.utils.Lock("unittest")
     self.assertEqual(None, Unlock("unittest"))
 
     ganeti.utils.Lock("unittest")
     self.assertEqual(None, Unlock("unittest"))
 
-
   def testDoubleLock(self):
     self.clean_lock("unittest")
     ganeti.utils.Lock("unittest")
   def testDoubleLock(self):
     self.clean_lock("unittest")
     ganeti.utils.Lock("unittest")
@@ -335,7 +350,8 @@ class TestParseUnit(unittest.TestCase):
     for sep in ('', ' ', '   ', "\t", "\t "):
       for suffix, scale in TestParseUnit.SCALES:
         for func in (lambda x: x, str.lower, str.upper):
     for sep in ('', ' ', '   ', "\t", "\t "):
       for suffix, scale in TestParseUnit.SCALES:
         for func in (lambda x: x, str.lower, str.upper):
-          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
+          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
+                           1024 * scale)
 
   def testInvalidInput(self):
     for sep in ('-', '_', ',', 'a'):
 
   def testInvalidInput(self):
     for sep in ('-', '_', ',', 'a'):
@@ -353,171 +369,147 @@ class TestSshKeys(GanetiTestCase):
   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
 
   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
 
-  def writeTestFile(self):
-    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
-    f = os.fdopen(fd, 'w')
+  def setUp(self):
+    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
     try:
     try:
-      f.write(TestSshKeys.KEY_A)
-      f.write("\n")
-      f.write(TestSshKeys.KEY_B)
-      f.write("\n")
-    finally:
-      f.close()
+      handle = os.fdopen(fd, 'w')
+      try:
+        handle.write("%s\n" % TestSshKeys.KEY_A)
+        handle.write("%s\n" % TestSshKeys.KEY_B)
+      finally:
+        handle.close()
+    except:
+      utils.RemoveFile(self.tmpname)
+      raise
 
 
-    return tmpname
+  def tearDown(self):
+    utils.RemoveFile(self.tmpname)
+    del self.tmpname
 
   def testAddingNewKey(self):
 
   def testAddingNewKey(self):
-    tmpname = self.writeTestFile()
-    try:
-      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
 
 
-      self.assertFileContent(tmpname,
-        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
-        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
-        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
-        "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
 
 
-  def testAddingAlmostButNotCompletlyTheSameKey(self):
-    tmpname = self.writeTestFile()
-    try:
-      AddAuthorizedKey(tmpname,
-          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
-
-      self.assertFileContent(tmpname,
-        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
-        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
-        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
-        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
-    finally:
-      os.unlink(tmpname)
+  def testAddingAlmostButNotCompletelyTheSameKey(self):
+    AddAuthorizedKey(self.tmpname,
+        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+
+    self.assertFileContent(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
 
   def testAddingExistingKeyWithSomeMoreSpaces(self):
 
   def testAddingExistingKeyWithSomeMoreSpaces(self):
-    tmpname = self.writeTestFile()
-    try:
-      AddAuthorizedKey(tmpname,
-          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+    AddAuthorizedKey(self.tmpname,
+        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
 
 
-      self.assertFileContent(tmpname,
-        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
-        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
-        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
 
   def testRemovingExistingKeyWithSomeMoreSpaces(self):
 
   def testRemovingExistingKeyWithSomeMoreSpaces(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveAuthorizedKey(tmpname,
-          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+    RemoveAuthorizedKey(self.tmpname,
+        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
 
 
-      self.assertFileContent(tmpname,
-        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
-        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
 
   def testRemovingNonExistingKey(self):
 
   def testRemovingNonExistingKey(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveAuthorizedKey(tmpname,
-          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
+    RemoveAuthorizedKey(self.tmpname,
+        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
 
 
-      self.assertFileContent(tmpname,
-        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
-        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
-        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
 
 
 class TestEtcHosts(GanetiTestCase):
   """Test functions modifying /etc/hosts"""
 
 
 
 class TestEtcHosts(GanetiTestCase):
   """Test functions modifying /etc/hosts"""
 
-  def writeTestFile(self):
-    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
-    f = os.fdopen(fd, 'w')
+  def setUp(self):
+    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
     try:
     try:
-      f.write('# This is a test file for /etc/hosts\n')
-      f.write('127.0.0.1\tlocalhost\n')
-      f.write('192.168.1.1 router gw\n')
-    finally:
-      f.close()
+      handle = os.fdopen(fd, 'w')
+      try:
+        handle.write('# This is a test file for /etc/hosts\n')
+        handle.write('127.0.0.1\tlocalhost\n')
+        handle.write('192.168.1.1 router gw\n')
+      finally:
+        handle.close()
+    except:
+      utils.RemoveFile(self.tmpname)
+      raise
 
 
-    return tmpname
+  def tearDown(self):
+    utils.RemoveFile(self.tmpname)
+    del self.tmpname
 
   def testSettingNewIp(self):
 
   def testSettingNewIp(self):
-    tmpname = self.writeTestFile()
-    try:
-      SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
+    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "127.0.0.1\tlocalhost\n"
-        "192.168.1.1 router gw\n"
-        "1.2.3.4\tmyhost.domain.tld myhost\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1 router gw\n"
+      "1.2.3.4\tmyhost.domain.tld myhost\n")
 
   def testSettingExistingIp(self):
 
   def testSettingExistingIp(self):
-    tmpname = self.writeTestFile()
-    try:
-      SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost'])
+    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
+                     ['myhost'])
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "127.0.0.1\tlocalhost\n"
-        "192.168.1.1\tmyhost.domain.tld myhost\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1\tmyhost.domain.tld myhost\n")
+
+  def testSettingDuplicateName(self):
+    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
+
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1 router gw\n"
+      "1.2.3.4\tmyhost\n")
 
   def testRemovingExistingHost(self):
 
   def testRemovingExistingHost(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveEtcHostsEntry(tmpname, 'router')
+    RemoveEtcHostsEntry(self.tmpname, 'router')
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "127.0.0.1\tlocalhost\n"
-        "192.168.1.1 gw\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1 gw\n")
 
   def testRemovingSingleExistingHost(self):
 
   def testRemovingSingleExistingHost(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveEtcHostsEntry(tmpname, 'localhost')
+    RemoveEtcHostsEntry(self.tmpname, 'localhost')
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "192.168.1.1 router gw\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "192.168.1.1 router gw\n")
 
   def testRemovingNonExistingHost(self):
 
   def testRemovingNonExistingHost(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveEtcHostsEntry(tmpname, 'myhost')
+    RemoveEtcHostsEntry(self.tmpname, 'myhost')
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "127.0.0.1\tlocalhost\n"
-        "192.168.1.1 router gw\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1 router gw\n")
 
   def testRemovingAlias(self):
 
   def testRemovingAlias(self):
-    tmpname = self.writeTestFile()
-    try:
-      RemoveEtcHostsEntry(tmpname, 'gw')
+    RemoveEtcHostsEntry(self.tmpname, 'gw')
 
 
-      self.assertFileContent(tmpname,
-        "# This is a test file for /etc/hosts\n"
-        "127.0.0.1\tlocalhost\n"
-        "192.168.1.1 router\n")
-    finally:
-      os.unlink(tmpname)
+    self.assertFileContent(self.tmpname,
+      "# This is a test file for /etc/hosts\n"
+      "127.0.0.1\tlocalhost\n"
+      "192.168.1.1 router\n")
 
 
 class TestShellQuoting(unittest.TestCase):
 
 
 class TestShellQuoting(unittest.TestCase):
@@ -552,12 +544,20 @@ class TestTcpPing(unittest.TestCase):
 
   def testTcpPingToLocalHostAccept(self):
     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
 
   def testTcpPingToLocalHostAccept(self):
     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
-                         constants.LOCALHOST_IP_ADDRESS,
                          self.listenerport,
                          timeout=10,
                          self.listenerport,
                          timeout=10,
-                         live_port_needed=True),
+                         live_port_needed=True,
+                         source=constants.LOCALHOST_IP_ADDRESS,
+                         ),
                  "failed to connect to test listener")
 
                  "failed to connect to test listener")
 
+    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+                         self.listenerport,
+                         timeout=10,
+                         live_port_needed=True,
+                         ),
+                 "failed to connect to test listener (no source)")
+
 
 class TestTcpPingDeaf(unittest.TestCase):
   """Testcase for TCP version of ping - against non listen(2)ing port"""
 
 class TestTcpPingDeaf(unittest.TestCase):
   """Testcase for TCP version of ping - against non listen(2)ing port"""
@@ -573,20 +573,36 @@ class TestTcpPingDeaf(unittest.TestCase):
 
   def testTcpPingToLocalHostAcceptDeaf(self):
     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
 
   def testTcpPingToLocalHostAcceptDeaf(self):
     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
-                        constants.LOCALHOST_IP_ADDRESS,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
-                        live_port_needed=True), # need successful connect(2)
+                        live_port_needed=True,
+                        source=constants.LOCALHOST_IP_ADDRESS,
+                        ), # need successful connect(2)
                 "successfully connected to deaf listener")
 
                 "successfully connected to deaf listener")
 
+    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+                        self.deaflistenerport,
+                        timeout=constants.TCP_PING_TIMEOUT,
+                        live_port_needed=True,
+                        ), # need successful connect(2)
+                "successfully connected to deaf listener (no source addr)")
+
   def testTcpPingToLocalHostNoAccept(self):
     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
   def testTcpPingToLocalHostNoAccept(self):
     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
-                         constants.LOCALHOST_IP_ADDRESS,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
-                         live_port_needed=False), # ECONNREFUSED is OK
+                         live_port_needed=False,
+                         source=constants.LOCALHOST_IP_ADDRESS,
+                         ), # ECONNREFUSED is OK
                  "failed to ping alive host on deaf port")
 
                  "failed to ping alive host on deaf port")
 
+    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+                         self.deaflistenerport,
+                         timeout=constants.TCP_PING_TIMEOUT,
+                         live_port_needed=False,
+                         ), # ECONNREFUSED is OK
+                 "failed to ping alive host on deaf port (no source addr)")
+
 
 class TestListVisibleFiles(unittest.TestCase):
   """Test case for ListVisibleFiles"""
 
 class TestListVisibleFiles(unittest.TestCase):
   """Test case for ListVisibleFiles"""
@@ -640,5 +656,28 @@ class TestNewUUID(unittest.TestCase):
     self.failUnless(self._re_uuid.match(utils.NewUUID()))
 
 
     self.failUnless(self._re_uuid.match(utils.NewUUID()))
 
 
+class TestUniqueSequence(unittest.TestCase):
+  """Test case for UniqueSequence"""
+
+  def _test(self, input, expected):
+    self.assertEqual(utils.UniqueSequence(input), expected)
+
+  def runTest(self):
+    # Ordered input
+    self._test([1, 2, 3], [1, 2, 3])
+    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
+    self._test([1, 2, 2, 3], [1, 2, 3])
+    self._test([1, 2, 3, 3], [1, 2, 3])
+
+    # Unordered input
+    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
+    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
+
+    # Strings
+    self._test(["a", "a"], ["a"])
+    self._test(["a", "b"], ["a", "b"])
+    self._test(["a", "b", "a"], ["a", "b"])
+
+
 if __name__ == '__main__':
   unittest.main()
 if __name__ == '__main__':
   unittest.main()