import unittest
import time
import tempfile
+import pycurl
+import itertools
+import threading
from cStringIO import StringIO
from ganeti import http
+from ganeti import compat
import ganeti.http.server
import ganeti.http.client
self.assertEqual(cr.headers, [])
self.assertEqual(cr.url, "https://localhost:1234/version")
+ def testPlainAddressIPv4(self):
+ cr = http.client.HttpClientRequest("192.0.2.9", 19956, "GET", "/version")
+ self.assertEqual(cr.url, "https://192.0.2.9:19956/version")
+
+ def testPlainAddressIPv6(self):
+ cr = http.client.HttpClientRequest("2001:db8::cafe", 15110, "GET", "/info")
+ self.assertEqual(cr.url, "https://[2001:db8::cafe]:15110/info")
+
def testOldStyleHeaders(self):
headers = {
"Content-type": "text/plain",
cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
self.assertEqual(cr.post_data, "")
- def testIdentity(self):
- # These should all use different connections, hence also have a different
- # identity
- cr1 = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
- cr2 = http.client.HttpClientRequest("localhost", 9999, "GET", "/version")
- cr3 = http.client.HttpClientRequest("node1", 1234, "GET", "/version")
- cr4 = http.client.HttpClientRequest("node1", 9999, "GET", "/version")
+ def testCompletionCallback(self):
+ for argname in ["completion_cb", "curl_config_fn"]:
+ kwargs = {
+ argname: NotImplementedError,
+ }
+ cr = http.client.HttpClientRequest("localhost", 14038, "GET", "/version",
+ **kwargs)
+ self.assertEqual(getattr(cr, argname), NotImplementedError)
+
+ for fn in [NotImplemented, {}, 1]:
+ kwargs = {
+ argname: fn,
+ }
+ self.assertRaises(AssertionError, http.client.HttpClientRequest,
+ "localhost", 23150, "GET", "/version", **kwargs)
+
+
+class _FakeCurl:
+ def __init__(self):
+ self.opts = {}
+ self.info = NotImplemented
+
+ def setopt(self, opt, value):
+ assert opt not in self.opts, "Option set more than once"
+ self.opts[opt] = value
+
+ def getinfo(self, info):
+ return self.info.pop(info)
+
+
+class TestClientStartRequest(unittest.TestCase):
+ @staticmethod
+ def _TestCurlConfig(curl):
+ curl.setopt(pycurl.SSLKEYTYPE, "PEM")
+
+ def test(self):
+ for method in [http.HTTP_GET, http.HTTP_PUT, "CUSTOM"]:
+ for port in [8761, 29796, 19528]:
+ for curl_config_fn in [None, self._TestCurlConfig]:
+ for read_timeout in [None, 0, 1, 123, 36000]:
+ self._TestInner(method, port, curl_config_fn, read_timeout)
+
+ def _TestInner(self, method, port, curl_config_fn, read_timeout):
+ for response_code in [http.HTTP_OK, http.HttpNotFound.code,
+ http.HTTP_NOT_MODIFIED]:
+ for response_body in [None, "Hello World",
+ "Very Long\tContent here\n" * 171]:
+ for errmsg in [None, "error"]:
+ req = http.client.HttpClientRequest("localhost", port, method,
+ "/version",
+ curl_config_fn=curl_config_fn,
+ read_timeout=read_timeout)
+ curl = _FakeCurl()
+ pending = http.client._StartRequest(curl, req)
+ self.assertEqual(pending.GetCurlHandle(), curl)
+ self.assertEqual(pending.GetCurrentRequest(), req)
+
+ # Check options
+ opts = curl.opts
+ self.assertEqual(opts.pop(pycurl.CUSTOMREQUEST), method)
+ self.assertEqual(opts.pop(pycurl.URL),
+ "https://localhost:%s/version" % port)
+ if read_timeout is None:
+ self.assertEqual(opts.pop(pycurl.TIMEOUT), 0)
+ else:
+ self.assertEqual(opts.pop(pycurl.TIMEOUT), read_timeout)
+ self.assertFalse(opts.pop(pycurl.VERBOSE))
+ self.assertTrue(opts.pop(pycurl.NOSIGNAL))
+ self.assertEqual(opts.pop(pycurl.USERAGENT),
+ http.HTTP_GANETI_VERSION)
+ self.assertEqual(opts.pop(pycurl.PROXY), "")
+ self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+ self.assertFalse(opts.pop(pycurl.HTTPHEADER))
+ write_fn = opts.pop(pycurl.WRITEFUNCTION)
+ self.assertTrue(callable(write_fn))
+ if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+ self.assertFalse(opts.pop(pycurl.SSL_SESSIONID_CACHE))
+ if curl_config_fn:
+ self.assertEqual(opts.pop(pycurl.SSLKEYTYPE), "PEM")
+ else:
+ self.assertFalse(pycurl.SSLKEYTYPE in opts)
+ self.assertFalse(opts)
+
+ if response_body is not None:
+ offset = 0
+ while offset < len(response_body):
+ piece = response_body[offset:offset + 10]
+ write_fn(piece)
+ offset += len(piece)
+
+ curl.info = {
+ pycurl.RESPONSE_CODE: response_code,
+ }
+
+ # Finalize request
+ pending.Done(errmsg)
+
+ self.assertFalse(curl.info)
+
+ # Can only finalize once
+ self.assertRaises(AssertionError, pending.Done, True)
+
+ if errmsg:
+ self.assertFalse(req.success)
+ else:
+ self.assertTrue(req.success)
+ self.assertEqual(req.error, errmsg)
+ self.assertEqual(req.resp_status_code, response_code)
+ if response_body is None:
+ self.assertEqual(req.resp_body, "")
+ else:
+ self.assertEqual(req.resp_body, response_body)
+
+ # Check if resetting worked
+ assert not hasattr(curl, "reset")
+ opts = curl.opts
+ self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+ self.assertTrue(callable(opts.pop(pycurl.WRITEFUNCTION)))
+ self.assertFalse(opts)
+
+ self.assertFalse(curl.opts,
+ msg="Previous checks did not consume all options")
+ assert id(opts) == id(curl.opts)
+
+ def _TestWrongTypes(self, *args, **kwargs):
+ req = http.client.HttpClientRequest(*args, **kwargs)
+ self.assertRaises(AssertionError, http.client._StartRequest,
+ _FakeCurl(), req)
+
+ def testWrongHostType(self):
+ self._TestWrongTypes(unicode("localhost"), 8080, "GET", "/version")
+
+ def testWrongUrlType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", unicode("/version"))
+
+ def testWrongMethodType(self):
+ self._TestWrongTypes("localhost", 8080, unicode("GET"), "/version")
+
+ def testWrongHeaderType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", "/version",
+ headers={
+ unicode("foo"): "bar",
+ })
+
+ def testWrongPostDataType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", "/version",
+ post_data=unicode("verylongdata" * 100))
+
+
+class _EmptyCurlMulti:
+ def perform(self):
+ return (pycurl.E_MULTI_OK, 0)
- self.assertEqual(len(set([cr1.identity, cr2.identity,
- cr3.identity, cr4.identity])), 4)
+ def info_read(self):
+ return (0, [], [])
- # But this one should have the same
- cr1vglist = http.client.HttpClientRequest("localhost", 1234,
- "GET", "/vg_list")
- self.assertEqual(cr1.identity, cr1vglist.identity)
+class TestClientProcessRequests(unittest.TestCase):
+ def testEmpty(self):
+ requests = []
+ http.client.ProcessRequests(requests, _curl=NotImplemented,
+ _curl_multi=_EmptyCurlMulti)
+ self.assertEqual(requests, [])
+
+
+class TestProcessCurlRequests(unittest.TestCase):
+ class _FakeCurlMulti:
+ def __init__(self):
+ self.handles = []
+ self.will_fail = []
+ self._expect = ["perform"]
+ self._counter = itertools.count()
+
+ def add_handle(self, curl):
+ assert curl not in self.handles
+ self.handles.append(curl)
+ if self._counter.next() % 3 == 0:
+ self.will_fail.append(curl)
+
+ def remove_handle(self, curl):
+ self.handles.remove(curl)
+
+ def perform(self):
+ assert self._expect.pop(0) == "perform"
+
+ if self._counter.next() % 2 == 0:
+ self._expect.append("perform")
+ return (pycurl.E_CALL_MULTI_PERFORM, None)
+
+ self._expect.append("info_read")
+
+ return (pycurl.E_MULTI_OK, len(self.handles))
+
+ def info_read(self):
+ assert self._expect.pop(0) == "info_read"
+ successful = []
+ failed = []
+ if self.handles:
+ if self._counter.next() % 17 == 0:
+ curl = self.handles[0]
+ if curl in self.will_fail:
+ failed.append((curl, -1, "test error"))
+ else:
+ successful.append(curl)
+ remaining_messages = len(self.handles) % 3
+ if remaining_messages > 0:
+ self._expect.append("info_read")
+ else:
+ self._expect.append("select")
+ else:
+ remaining_messages = 0
+ self._expect.append("select")
+ return (remaining_messages, successful, failed)
+
+ def select(self, timeout):
+ # Never compare floats for equality
+ assert timeout >= 0.95 and timeout <= 1.05
+ assert self._expect.pop(0) == "select"
+ self._expect.append("perform")
-class TestClient(unittest.TestCase):
def test(self):
- pool = http.client.HttpClientPool(None)
- self.assertFalse(pool._pool)
+ requests = [_FakeCurl() for _ in range(10)]
+ multi = self._FakeCurlMulti()
+ for (curl, errmsg) in http.client._ProcessCurlRequests(multi, requests):
+ self.assertTrue(curl not in multi.handles)
+ if curl in multi.will_fail:
+ self.assertTrue("test error" in errmsg)
+ else:
+ self.assertTrue(errmsg is None)
+ self.assertFalse(multi.handles)
+ self.assertEqual(multi._expect, ["select"])
+
+
+class TestProcessRequests(unittest.TestCase):
+ class _DummyCurlMulti:
+ pass
+
+ def testNoMonitor(self):
+ self._Test(False)
+
+ def testWithMonitor(self):
+ self._Test(True)
+
+ class _MonitorChecker:
+ def __init__(self):
+ self._monitor = None
+
+ def GetMonitor(self):
+ return self._monitor
+
+ def __call__(self, monitor):
+ assert callable(monitor.GetLockInfo)
+ self._monitor = monitor
+
+ def _Test(self, use_monitor):
+ def cfg_fn(port, curl):
+ curl.opts["__port__"] = port
+
+ def _LockCheckReset(monitor, req):
+ self.assertTrue(monitor._lock.is_owned(shared=0),
+ msg="Lock must be owned in exclusive mode")
+ assert not hasattr(req, "lockcheck__")
+ setattr(req, "lockcheck__", True)
+
+ def _BuildNiceName(port, default=None):
+ if port % 5 == 0:
+ return "nicename%s" % port
+ else:
+ # Use standard name
+ return default
+
+ requests = \
+ [http.client.HttpClientRequest("localhost", i, "POST", "/version%s" % i,
+ curl_config_fn=compat.partial(cfg_fn, i),
+ completion_cb=NotImplementedError,
+ nicename=_BuildNiceName(i))
+ for i in range(15176, 15501)]
+ requests_count = len(requests)
+
+ if use_monitor:
+ lock_monitor_cb = self._MonitorChecker()
+ else:
+ lock_monitor_cb = None
+
+ def _ProcessRequests(multi, handles):
+ self.assertTrue(isinstance(multi, self._DummyCurlMulti))
+ self.assertEqual(len(requests), len(handles))
+ self.assertTrue(compat.all(isinstance(curl, _FakeCurl)
+ for curl in handles))
+
+ # Prepare for lock check
+ for req in requests:
+ assert req.completion_cb is NotImplementedError
+ if use_monitor:
+ req.completion_cb = \
+ compat.partial(_LockCheckReset, lock_monitor_cb.GetMonitor())
+
+ for idx, curl in enumerate(handles):
+ try:
+ port = curl.opts["__port__"]
+ except KeyError:
+ self.fail("Per-request config function was not called")
+
+ if use_monitor:
+ # Check if lock information is correct
+ lock_info = lock_monitor_cb.GetMonitor().GetLockInfo(None)
+ expected = \
+ [("rpc/%s" % (_BuildNiceName(handle.opts["__port__"],
+ default=("localhost/version%s" %
+ handle.opts["__port__"]))),
+ None,
+ [threading.currentThread().getName()], None)
+ for handle in handles[idx:]]
+ self.assertEqual(sorted(lock_info), sorted(expected))
+
+ if port % 3 == 0:
+ response_code = http.HTTP_OK
+ msg = None
+ else:
+ response_code = http.HttpNotFound.code
+ msg = "test error"
+
+ curl.info = {
+ pycurl.RESPONSE_CODE: response_code,
+ }
+
+ # Prepare for reset
+ self.assertFalse(curl.opts.pop(pycurl.POSTFIELDS))
+ self.assertTrue(callable(curl.opts.pop(pycurl.WRITEFUNCTION)))
+
+ yield (curl, msg)
+
+ if use_monitor:
+ self.assertTrue(compat.all(req.lockcheck__ for req in requests))
+
+ if use_monitor:
+ self.assertEqual(lock_monitor_cb.GetMonitor(), None)
+
+ http.client.ProcessRequests(requests, lock_monitor_cb=lock_monitor_cb,
+ _curl=_FakeCurl,
+ _curl_multi=self._DummyCurlMulti,
+ _curl_process=_ProcessRequests)
+ for req in requests:
+ if req.port % 3 == 0:
+ self.assertTrue(req.success)
+ self.assertEqual(req.error, None)
+ else:
+ self.assertFalse(req.success)
+ self.assertTrue("test error" in req.error)
+
+ # See if monitor was disabled
+ if use_monitor:
+ monitor = lock_monitor_cb.GetMonitor()
+ self.assertEqual(monitor._pending_fn, None)
+ self.assertEqual(monitor.GetLockInfo(None), [])
+ else:
+ self.assertEqual(lock_monitor_cb, None)
+
+ self.assertEqual(len(requests), requests_count)
+
+ def testBadRequest(self):
+ bad_request = http.client.HttpClientRequest("localhost", 27784,
+ "POST", "/version")
+ bad_request.success = False
+
+ self.assertRaises(AssertionError, http.client.ProcessRequests,
+ [bad_request], _curl=NotImplemented,
+ _curl_multi=NotImplemented, _curl_process=NotImplemented)
if __name__ == '__main__':