http.client: Remove use of PycURL's “reset” function
[ganeti-local] / test / ganeti.http_unittest.py
index 7d0d477..9cbbf13 100755 (executable)
@@ -26,8 +26,13 @@ import os
 import unittest
 import time
 import tempfile
+import pycurl
+import itertools
+import threading
+from cStringIO import StringIO
 
 from ganeti import http
+from ganeti import compat
 
 import ganeti.http.server
 import ganeti.http.client
@@ -87,12 +92,6 @@ class TestMisc(unittest.TestCase):
     self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0)
     self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0)
 
-  def testClientSizeLimits(self):
-    """Test HTTP client size limits"""
-    message_reader_class = http.client._HttpServerToClientMessageReader
-    self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0)
-    self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0)
-
   def testFormatAuthHeader(self):
     self.assertEqual(http.auth._FormatAuthHeader("Basic", {}),
                      "Basic")
@@ -296,32 +295,24 @@ class TestHttpServerRequestAuthentication(unittest.TestCase):
           self.assert_(ac.called)
 
 
-class TestReadPasswordFile(testutils.GanetiTestCase):
-  def setUp(self):
-    testutils.GanetiTestCase.setUp(self)
-
-    self.tmpfile = tempfile.NamedTemporaryFile()
-
+class TestReadPasswordFile(unittest.TestCase):
   def testSimple(self):
-    self.tmpfile.write("user1 password")
-    self.tmpfile.flush()
-
-    users = http.auth.ReadPasswordFile(self.tmpfile.name)
+    users = http.auth.ParsePasswordFile("user1 password")
     self.assertEqual(len(users), 1)
     self.assertEqual(users["user1"].password, "password")
     self.assertEqual(len(users["user1"].options), 0)
 
   def testOptions(self):
-    self.tmpfile.write("# Passwords\n")
-    self.tmpfile.write("user1 password\n")
-    self.tmpfile.write("\n")
-    self.tmpfile.write("# Comment\n")
-    self.tmpfile.write("user2 pw write,read\n")
-    self.tmpfile.write("   \t# Another comment\n")
-    self.tmpfile.write("invalidline\n")
-    self.tmpfile.flush()
-
-    users = http.auth.ReadPasswordFile(self.tmpfile.name)
+    buf = StringIO()
+    buf.write("# Passwords\n")
+    buf.write("user1 password\n")
+    buf.write("\n")
+    buf.write("# Comment\n")
+    buf.write("user2 pw write,read\n")
+    buf.write("   \t# Another comment\n")
+    buf.write("invalidline\n")
+
+    users = http.auth.ParsePasswordFile(buf.getvalue())
     self.assertEqual(len(users), 2)
     self.assertEqual(users["user1"].password, "password")
     self.assertEqual(len(users["user1"].options), 0)
@@ -330,5 +321,431 @@ class TestReadPasswordFile(testutils.GanetiTestCase):
     self.assertEqual(users["user2"].options, ["write", "read"])
 
 
+class TestClientRequest(unittest.TestCase):
+  def testRepr(self):
+    cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+                                       headers=[], post_data="Hello World")
+    self.assert_(repr(cr).startswith("<"))
+
+  def testNoHeaders(self):
+    cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+                                       headers=None)
+    self.assert_(isinstance(cr.headers, list))
+    self.assertEqual(cr.headers, [])
+    self.assertEqual(cr.url, "https://localhost:1234/version")
+
+  def testPlainAddressIPv4(self):
+    cr = http.client.HttpClientRequest("192.0.2.9", 19956, "GET", "/version")
+    self.assertEqual(cr.url, "https://192.0.2.9:19956/version")
+
+  def testPlainAddressIPv6(self):
+    cr = http.client.HttpClientRequest("2001:db8::cafe", 15110, "GET", "/info")
+    self.assertEqual(cr.url, "https://[2001:db8::cafe]:15110/info")
+
+  def testOldStyleHeaders(self):
+    headers = {
+      "Content-type": "text/plain",
+      "Accept": "text/html",
+      }
+    cr = http.client.HttpClientRequest("localhost", 16481, "GET", "/vg_list",
+                                       headers=headers)
+    self.assert_(isinstance(cr.headers, list))
+    self.assertEqual(sorted(cr.headers), [
+      "Accept: text/html",
+      "Content-type: text/plain",
+      ])
+    self.assertEqual(cr.url, "https://localhost:16481/vg_list")
+
+  def testNewStyleHeaders(self):
+    headers = [
+      "Accept: text/html",
+      "Content-type: text/plain; charset=ascii",
+      "Server: httpd 1.0",
+      ]
+    cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+                                       headers=headers)
+    self.assert_(isinstance(cr.headers, list))
+    self.assertEqual(sorted(cr.headers), sorted(headers))
+    self.assertEqual(cr.url, "https://localhost:1234/version")
+
+  def testPostData(self):
+    cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+                                       post_data="Hello World")
+    self.assertEqual(cr.post_data, "Hello World")
+
+  def testNoPostData(self):
+    cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
+    self.assertEqual(cr.post_data, "")
+
+  def testCompletionCallback(self):
+    for argname in ["completion_cb", "curl_config_fn"]:
+      kwargs = {
+        argname: NotImplementedError,
+        }
+      cr = http.client.HttpClientRequest("localhost", 14038, "GET", "/version",
+                                         **kwargs)
+      self.assertEqual(getattr(cr, argname), NotImplementedError)
+
+      for fn in [NotImplemented, {}, 1]:
+        kwargs = {
+          argname: fn,
+          }
+        self.assertRaises(AssertionError, http.client.HttpClientRequest,
+                          "localhost", 23150, "GET", "/version", **kwargs)
+
+
+class _FakeCurl:
+  def __init__(self):
+    self.opts = {}
+    self.info = NotImplemented
+
+  def setopt(self, opt, value):
+    assert opt not in self.opts, "Option set more than once"
+    self.opts[opt] = value
+
+  def getinfo(self, info):
+    return self.info.pop(info)
+
+
+class TestClientStartRequest(unittest.TestCase):
+  @staticmethod
+  def _TestCurlConfig(curl):
+    curl.setopt(pycurl.SSLKEYTYPE, "PEM")
+
+  def test(self):
+    for method in [http.HTTP_GET, http.HTTP_PUT, "CUSTOM"]:
+      for port in [8761, 29796, 19528]:
+        for curl_config_fn in [None, self._TestCurlConfig]:
+          for read_timeout in [None, 0, 1, 123, 36000]:
+            self._TestInner(method, port, curl_config_fn, read_timeout)
+
+  def _TestInner(self, method, port, curl_config_fn, read_timeout):
+    for response_code in [http.HTTP_OK, http.HttpNotFound.code,
+                          http.HTTP_NOT_MODIFIED]:
+      for response_body in [None, "Hello World",
+                            "Very Long\tContent here\n" * 171]:
+        for errmsg in [None, "error"]:
+          req = http.client.HttpClientRequest("localhost", port, method,
+                                              "/version",
+                                              curl_config_fn=curl_config_fn,
+                                              read_timeout=read_timeout)
+          curl = _FakeCurl()
+          pending = http.client._StartRequest(curl, req)
+          self.assertEqual(pending.GetCurlHandle(), curl)
+          self.assertEqual(pending.GetCurrentRequest(), req)
+
+          # Check options
+          opts = curl.opts
+          self.assertEqual(opts.pop(pycurl.CUSTOMREQUEST), method)
+          self.assertEqual(opts.pop(pycurl.URL),
+                           "https://localhost:%s/version" % port)
+          if read_timeout is None:
+            self.assertEqual(opts.pop(pycurl.TIMEOUT), 0)
+          else:
+            self.assertEqual(opts.pop(pycurl.TIMEOUT), read_timeout)
+          self.assertFalse(opts.pop(pycurl.VERBOSE))
+          self.assertTrue(opts.pop(pycurl.NOSIGNAL))
+          self.assertEqual(opts.pop(pycurl.USERAGENT),
+                           http.HTTP_GANETI_VERSION)
+          self.assertEqual(opts.pop(pycurl.PROXY), "")
+          self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+          self.assertFalse(opts.pop(pycurl.HTTPHEADER))
+          write_fn = opts.pop(pycurl.WRITEFUNCTION)
+          self.assertTrue(callable(write_fn))
+          if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+            self.assertFalse(opts.pop(pycurl.SSL_SESSIONID_CACHE))
+          if curl_config_fn:
+            self.assertEqual(opts.pop(pycurl.SSLKEYTYPE), "PEM")
+          else:
+            self.assertFalse(pycurl.SSLKEYTYPE in opts)
+          self.assertFalse(opts)
+
+          if response_body is not None:
+            offset = 0
+            while offset < len(response_body):
+              piece = response_body[offset:offset + 10]
+              write_fn(piece)
+              offset += len(piece)
+
+          curl.info = {
+            pycurl.RESPONSE_CODE: response_code,
+            }
+
+          # Finalize request
+          pending.Done(errmsg)
+
+          self.assertFalse(curl.info)
+
+          # Can only finalize once
+          self.assertRaises(AssertionError, pending.Done, True)
+
+          if errmsg:
+            self.assertFalse(req.success)
+          else:
+            self.assertTrue(req.success)
+          self.assertEqual(req.error, errmsg)
+          self.assertEqual(req.resp_status_code, response_code)
+          if response_body is None:
+            self.assertEqual(req.resp_body, "")
+          else:
+            self.assertEqual(req.resp_body, response_body)
+
+          # Check if resetting worked
+          assert not hasattr(curl, "reset")
+          opts = curl.opts
+          self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+          self.assertTrue(callable(opts.pop(pycurl.WRITEFUNCTION)))
+          self.assertFalse(opts)
+
+          self.assertFalse(curl.opts,
+                           msg="Previous checks did not consume all options")
+          assert id(opts) == id(curl.opts)
+
+  def _TestWrongTypes(self, *args, **kwargs):
+    req = http.client.HttpClientRequest(*args, **kwargs)
+    self.assertRaises(AssertionError, http.client._StartRequest,
+                      _FakeCurl(), req)
+
+  def testWrongHostType(self):
+    self._TestWrongTypes(unicode("localhost"), 8080, "GET", "/version")
+
+  def testWrongUrlType(self):
+    self._TestWrongTypes("localhost", 8080, "GET", unicode("/version"))
+
+  def testWrongMethodType(self):
+    self._TestWrongTypes("localhost", 8080, unicode("GET"), "/version")
+
+  def testWrongHeaderType(self):
+    self._TestWrongTypes("localhost", 8080, "GET", "/version",
+                         headers={
+                           unicode("foo"): "bar",
+                           })
+
+  def testWrongPostDataType(self):
+    self._TestWrongTypes("localhost", 8080, "GET", "/version",
+                         post_data=unicode("verylongdata" * 100))
+
+
+class _EmptyCurlMulti:
+  def perform(self):
+    return (pycurl.E_MULTI_OK, 0)
+
+  def info_read(self):
+    return (0, [], [])
+
+
+class TestClientProcessRequests(unittest.TestCase):
+  def testEmpty(self):
+    requests = []
+    http.client.ProcessRequests(requests, _curl=NotImplemented,
+                                _curl_multi=_EmptyCurlMulti)
+    self.assertEqual(requests, [])
+
+
+class TestProcessCurlRequests(unittest.TestCase):
+  class _FakeCurlMulti:
+    def __init__(self):
+      self.handles = []
+      self.will_fail = []
+      self._expect = ["perform"]
+      self._counter = itertools.count()
+
+    def add_handle(self, curl):
+      assert curl not in self.handles
+      self.handles.append(curl)
+      if self._counter.next() % 3 == 0:
+        self.will_fail.append(curl)
+
+    def remove_handle(self, curl):
+      self.handles.remove(curl)
+
+    def perform(self):
+      assert self._expect.pop(0) == "perform"
+
+      if self._counter.next() % 2 == 0:
+        self._expect.append("perform")
+        return (pycurl.E_CALL_MULTI_PERFORM, None)
+
+      self._expect.append("info_read")
+
+      return (pycurl.E_MULTI_OK, len(self.handles))
+
+    def info_read(self):
+      assert self._expect.pop(0) == "info_read"
+      successful = []
+      failed = []
+      if self.handles:
+        if self._counter.next() % 17 == 0:
+          curl = self.handles[0]
+          if curl in self.will_fail:
+            failed.append((curl, -1, "test error"))
+          else:
+            successful.append(curl)
+        remaining_messages = len(self.handles) % 3
+        if remaining_messages > 0:
+          self._expect.append("info_read")
+        else:
+          self._expect.append("select")
+      else:
+        remaining_messages = 0
+        self._expect.append("select")
+      return (remaining_messages, successful, failed)
+
+    def select(self, timeout):
+      # Never compare floats for equality
+      assert timeout >= 0.95 and timeout <= 1.05
+      assert self._expect.pop(0) == "select"
+      self._expect.append("perform")
+
+  def test(self):
+    requests = [_FakeCurl() for _ in range(10)]
+    multi = self._FakeCurlMulti()
+    for (curl, errmsg) in http.client._ProcessCurlRequests(multi, requests):
+      self.assertTrue(curl not in multi.handles)
+      if curl in multi.will_fail:
+        self.assertTrue("test error" in errmsg)
+      else:
+        self.assertTrue(errmsg is None)
+    self.assertFalse(multi.handles)
+    self.assertEqual(multi._expect, ["select"])
+
+
+class TestProcessRequests(unittest.TestCase):
+  class _DummyCurlMulti:
+    pass
+
+  def testNoMonitor(self):
+    self._Test(False)
+
+  def testWithMonitor(self):
+    self._Test(True)
+
+  class _MonitorChecker:
+    def __init__(self):
+      self._monitor = None
+
+    def GetMonitor(self):
+      return self._monitor
+
+    def __call__(self, monitor):
+      assert callable(monitor.GetLockInfo)
+      self._monitor = monitor
+
+  def _Test(self, use_monitor):
+    def cfg_fn(port, curl):
+      curl.opts["__port__"] = port
+
+    def _LockCheckReset(monitor, req):
+      self.assertTrue(monitor._lock.is_owned(shared=0),
+                      msg="Lock must be owned in exclusive mode")
+      assert not hasattr(req, "lockcheck__")
+      setattr(req, "lockcheck__", True)
+
+    def _BuildNiceName(port, default=None):
+      if port % 5 == 0:
+        return "nicename%s" % port
+      else:
+        # Use standard name
+        return default
+
+    requests = \
+      [http.client.HttpClientRequest("localhost", i, "POST", "/version%s" % i,
+                                     curl_config_fn=compat.partial(cfg_fn, i),
+                                     completion_cb=NotImplementedError,
+                                     nicename=_BuildNiceName(i))
+       for i in range(15176, 15501)]
+    requests_count = len(requests)
+
+    if use_monitor:
+      lock_monitor_cb = self._MonitorChecker()
+    else:
+      lock_monitor_cb = None
+
+    def _ProcessRequests(multi, handles):
+      self.assertTrue(isinstance(multi, self._DummyCurlMulti))
+      self.assertEqual(len(requests), len(handles))
+      self.assertTrue(compat.all(isinstance(curl, _FakeCurl)
+                                 for curl in handles))
+
+      # Prepare for lock check
+      for req in requests:
+        assert req.completion_cb is NotImplementedError
+        if use_monitor:
+          req.completion_cb = \
+            compat.partial(_LockCheckReset, lock_monitor_cb.GetMonitor())
+
+      for idx, curl in enumerate(handles):
+        try:
+          port = curl.opts["__port__"]
+        except KeyError:
+          self.fail("Per-request config function was not called")
+
+        if use_monitor:
+          # Check if lock information is correct
+          lock_info = lock_monitor_cb.GetMonitor().GetLockInfo(None)
+          expected = \
+            [("rpc/%s" % (_BuildNiceName(handle.opts["__port__"],
+                                         default=("localhost/version%s" %
+                                                  handle.opts["__port__"]))),
+              None,
+              [threading.currentThread().getName()], None)
+             for handle in handles[idx:]]
+          self.assertEqual(sorted(lock_info), sorted(expected))
+
+        if port % 3 == 0:
+          response_code = http.HTTP_OK
+          msg = None
+        else:
+          response_code = http.HttpNotFound.code
+          msg = "test error"
+
+        curl.info = {
+          pycurl.RESPONSE_CODE: response_code,
+          }
+
+        # Prepare for reset
+        self.assertFalse(curl.opts.pop(pycurl.POSTFIELDS))
+        self.assertTrue(callable(curl.opts.pop(pycurl.WRITEFUNCTION)))
+
+        yield (curl, msg)
+
+      if use_monitor:
+        self.assertTrue(compat.all(req.lockcheck__ for req in requests))
+
+    if use_monitor:
+      self.assertEqual(lock_monitor_cb.GetMonitor(), None)
+
+    http.client.ProcessRequests(requests, lock_monitor_cb=lock_monitor_cb,
+                                _curl=_FakeCurl,
+                                _curl_multi=self._DummyCurlMulti,
+                                _curl_process=_ProcessRequests)
+    for req in requests:
+      if req.port % 3 == 0:
+        self.assertTrue(req.success)
+        self.assertEqual(req.error, None)
+      else:
+        self.assertFalse(req.success)
+        self.assertTrue("test error" in req.error)
+
+    # See if monitor was disabled
+    if use_monitor:
+      monitor = lock_monitor_cb.GetMonitor()
+      self.assertEqual(monitor._pending_fn, None)
+      self.assertEqual(monitor.GetLockInfo(None), [])
+    else:
+      self.assertEqual(lock_monitor_cb, None)
+
+    self.assertEqual(len(requests), requests_count)
+
+  def testBadRequest(self):
+    bad_request = http.client.HttpClientRequest("localhost", 27784,
+                                                "POST", "/version")
+    bad_request.success = False
+
+    self.assertRaises(AssertionError, http.client.ProcessRequests,
+                      [bad_request], _curl=NotImplemented,
+                      _curl_multi=NotImplemented, _curl_process=NotImplemented)
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()