import os
import unittest
import time
+import tempfile
+import pycurl
+import itertools
+import threading
+from cStringIO import StringIO
from ganeti import http
+from ganeti import compat
import ganeti.http.server
import ganeti.http.client
import ganeti.http.auth
+import testutils
+
class TestStartLines(unittest.TestCase):
"""Test cases for start line classes"""
def testHttpServerRequest(self):
"""Test ganeti.http.server._HttpServerRequest"""
- fake_request = http.HttpMessage()
- fake_request.start_line = \
- http.HttpClientToServerStartLine("GET", "/", "HTTP/1.1")
- server_request = http.server._HttpServerRequest(fake_request)
+ server_request = http.server._HttpServerRequest("GET", "/", None, None)
# These are expected by users of the HTTP server
self.assert_(hasattr(server_request, "request_method"))
self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0)
self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0)
- def testClientSizeLimits(self):
- """Test HTTP client size limits"""
- message_reader_class = http.client._HttpServerToClientMessageReader
- self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0)
- self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0)
+ def testFormatAuthHeader(self):
+ self.assertEqual(http.auth._FormatAuthHeader("Basic", {}),
+ "Basic")
+ self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "bar", }),
+ "Basic foo=bar")
+ self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "", }),
+ "Basic foo=\"\"")
+ self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "x,y", }),
+ "Basic foo=\"x,y\"")
+ params = {
+ "foo": "x,y",
+ "realm": "secure",
+ }
+ # It's a dict whose order isn't guaranteed, hence checking a list
+ self.assert_(http.auth._FormatAuthHeader("Digest", params) in
+ ("Digest foo=\"x,y\" realm=secure",
+ "Digest realm=secure foo=\"x,y\""))
class _FakeRequestAuth(http.auth.HttpServerRequestAuthentication):
- def __init__(self, realm):
+ def __init__(self, realm, authreq, authenticate_fn):
http.auth.HttpServerRequestAuthentication.__init__(self)
self.realm = realm
+ self.authreq = authreq
+ self.authenticate_fn = authenticate_fn
+
+ def AuthenticationRequired(self, req):
+ return self.authreq
def GetAuthRealm(self, req):
return self.realm
+ def Authenticate(self, *args):
+ if self.authenticate_fn:
+ return self.authenticate_fn(*args)
+ raise NotImplementedError()
+
class TestAuth(unittest.TestCase):
"""Authentication tests"""
hsra = http.auth.HttpServerRequestAuthentication
def testConstants(self):
- self.assertEqual(self.hsra._CLEARTEXT_SCHEME,
- self.hsra._CLEARTEXT_SCHEME.upper())
- self.assertEqual(self.hsra._HA1_SCHEME,
- self.hsra._HA1_SCHEME.upper())
+ for scheme in [self.hsra._CLEARTEXT_SCHEME, self.hsra._HA1_SCHEME]:
+ self.assertEqual(scheme, scheme.upper())
+ self.assert_(scheme.startswith("{"))
+ self.assert_(scheme.endswith("}"))
def _testVerifyBasicAuthPassword(self, realm, user, password, expected):
- ra = _FakeRequestAuth(realm)
+ ra = _FakeRequestAuth(realm, False, None)
return ra.VerifyBasicAuthPassword(None, user, password, expected)
-
def testVerifyBasicAuthPassword(self):
tvbap = self._testVerifyBasicAuthPassword
self.assert_(tvbap("This is only a test", "user", "pw",
"{HA1}92ea58ae804481498c257b2f65561a17"))
- self.failIf(tvbap(None, "user", "pw",
- "{HA1}92ea58ae804481498c257b2f65561a17"))
+ self.failUnlessRaises(AssertionError, tvbap, None, "user", "pw",
+ "{HA1}92ea58ae804481498c257b2f65561a17")
self.failIf(tvbap("Admin area", "user", "pw",
"{HA1}92ea58ae804481498c257b2f65561a17"))
self.failIf(tvbap("This is only a test", "someone", "pw",
"{HA1}92ea58ae804481498c257b2f65561a17"))
+class _SimpleAuthenticator:
+ def __init__(self, user, password):
+ self.user = user
+ self.password = password
+ self.called = False
+
+ def __call__(self, req, user, password):
+ self.called = True
+ return self.user == user and self.password == password
+
+
+class TestHttpServerRequestAuthentication(unittest.TestCase):
+ def testNoAuth(self):
+ req = http.server._HttpServerRequest("GET", "/", None, None)
+ _FakeRequestAuth("area1", False, None).PreHandleRequest(req)
+
+ def testNoRealm(self):
+ headers = { http.HTTP_AUTHORIZATION: "", }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth(None, False, None)
+ self.assertRaises(AssertionError, ra.PreHandleRequest, req)
+
+ def testNoScheme(self):
+ headers = { http.HTTP_AUTHORIZATION: "", }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth("area1", False, None)
+ self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
+
+ def testUnknownScheme(self):
+ headers = { http.HTTP_AUTHORIZATION: "NewStyleAuth abc", }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth("area1", False, None)
+ self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
+
+ def testInvalidBase64(self):
+ headers = { http.HTTP_AUTHORIZATION: "Basic x_=_", }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth("area1", False, None)
+ self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
+
+ def testAuthForPublicResource(self):
+ headers = {
+ http.HTTP_AUTHORIZATION: "Basic %s" % ("foo".encode("base64").strip(), ),
+ }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth("area1", False, None)
+ self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
+
+ def testAuthForPublicResource(self):
+ headers = {
+ http.HTTP_AUTHORIZATION:
+ "Basic %s" % ("foo:bar".encode("base64").strip(), ),
+ }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ac = _SimpleAuthenticator("foo", "bar")
+ ra = _FakeRequestAuth("area1", False, ac)
+ ra.PreHandleRequest(req)
+
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ac = _SimpleAuthenticator("something", "else")
+ ra = _FakeRequestAuth("area1", False, ac)
+ self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
+
+ def testInvalidRequestHeader(self):
+ checks = {
+ http.HttpUnauthorized: ["", "\t", "-", ".", "@", "<", ">", "Digest",
+ "basic %s" % "foobar".encode("base64").strip()],
+ http.HttpBadRequest: ["Basic"],
+ }
+
+ for exc, headers in checks.items():
+ for i in headers:
+ headers = { http.HTTP_AUTHORIZATION: i, }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+ ra = _FakeRequestAuth("area1", False, None)
+ self.assertRaises(exc, ra.PreHandleRequest, req)
+
+ def testBasicAuth(self):
+ for user in ["", "joe", "user name with spaces"]:
+ for pw in ["", "-", ":", "foobar", "Foo Bar Baz", "@@@", "###",
+ "foo:bar:baz"]:
+ for wrong_pw in [True, False]:
+ basic_auth = "%s:%s" % (user, pw)
+ if wrong_pw:
+ basic_auth += "WRONG"
+ headers = {
+ http.HTTP_AUTHORIZATION:
+ "Basic %s" % (basic_auth.encode("base64").strip(), ),
+ }
+ req = http.server._HttpServerRequest("GET", "/", headers, None)
+
+ ac = _SimpleAuthenticator(user, pw)
+ self.assertFalse(ac.called)
+ ra = _FakeRequestAuth("area1", True, ac)
+ if wrong_pw:
+ try:
+ ra.PreHandleRequest(req)
+ except http.HttpUnauthorized, err:
+ www_auth = err.headers[http.HTTP_WWW_AUTHENTICATE]
+ self.assert_(www_auth.startswith(http.auth.HTTP_BASIC_AUTH))
+ else:
+ self.fail("Didn't raise HttpUnauthorized")
+ else:
+ ra.PreHandleRequest(req)
+ self.assert_(ac.called)
+
+
+class TestReadPasswordFile(unittest.TestCase):
+ def testSimple(self):
+ users = http.auth.ParsePasswordFile("user1 password")
+ self.assertEqual(len(users), 1)
+ self.assertEqual(users["user1"].password, "password")
+ self.assertEqual(len(users["user1"].options), 0)
+
+ def testOptions(self):
+ buf = StringIO()
+ buf.write("# Passwords\n")
+ buf.write("user1 password\n")
+ buf.write("\n")
+ buf.write("# Comment\n")
+ buf.write("user2 pw write,read\n")
+ buf.write(" \t# Another comment\n")
+ buf.write("invalidline\n")
+
+ users = http.auth.ParsePasswordFile(buf.getvalue())
+ self.assertEqual(len(users), 2)
+ self.assertEqual(users["user1"].password, "password")
+ self.assertEqual(len(users["user1"].options), 0)
+
+ self.assertEqual(users["user2"].password, "pw")
+ self.assertEqual(users["user2"].options, ["write", "read"])
+
+
+class TestClientRequest(unittest.TestCase):
+ def testRepr(self):
+ cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+ headers=[], post_data="Hello World")
+ self.assert_(repr(cr).startswith("<"))
+
+ def testNoHeaders(self):
+ cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+ headers=None)
+ self.assert_(isinstance(cr.headers, list))
+ self.assertEqual(cr.headers, [])
+ self.assertEqual(cr.url, "https://localhost:1234/version")
+
+ def testPlainAddressIPv4(self):
+ cr = http.client.HttpClientRequest("192.0.2.9", 19956, "GET", "/version")
+ self.assertEqual(cr.url, "https://192.0.2.9:19956/version")
+
+ def testPlainAddressIPv6(self):
+ cr = http.client.HttpClientRequest("2001:db8::cafe", 15110, "GET", "/info")
+ self.assertEqual(cr.url, "https://[2001:db8::cafe]:15110/info")
+
+ def testOldStyleHeaders(self):
+ headers = {
+ "Content-type": "text/plain",
+ "Accept": "text/html",
+ }
+ cr = http.client.HttpClientRequest("localhost", 16481, "GET", "/vg_list",
+ headers=headers)
+ self.assert_(isinstance(cr.headers, list))
+ self.assertEqual(sorted(cr.headers), [
+ "Accept: text/html",
+ "Content-type: text/plain",
+ ])
+ self.assertEqual(cr.url, "https://localhost:16481/vg_list")
+
+ def testNewStyleHeaders(self):
+ headers = [
+ "Accept: text/html",
+ "Content-type: text/plain; charset=ascii",
+ "Server: httpd 1.0",
+ ]
+ cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+ headers=headers)
+ self.assert_(isinstance(cr.headers, list))
+ self.assertEqual(sorted(cr.headers), sorted(headers))
+ self.assertEqual(cr.url, "https://localhost:1234/version")
+
+ def testPostData(self):
+ cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
+ post_data="Hello World")
+ self.assertEqual(cr.post_data, "Hello World")
+
+ def testNoPostData(self):
+ cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
+ self.assertEqual(cr.post_data, "")
+
+ def testCompletionCallback(self):
+ for argname in ["completion_cb", "curl_config_fn"]:
+ kwargs = {
+ argname: NotImplementedError,
+ }
+ cr = http.client.HttpClientRequest("localhost", 14038, "GET", "/version",
+ **kwargs)
+ self.assertEqual(getattr(cr, argname), NotImplementedError)
+
+ for fn in [NotImplemented, {}, 1]:
+ kwargs = {
+ argname: fn,
+ }
+ self.assertRaises(AssertionError, http.client.HttpClientRequest,
+ "localhost", 23150, "GET", "/version", **kwargs)
+
+
+class _FakeCurl:
+ def __init__(self):
+ self.opts = {}
+ self.info = NotImplemented
+
+ def setopt(self, opt, value):
+ assert opt not in self.opts, "Option set more than once"
+ self.opts[opt] = value
+
+ def getinfo(self, info):
+ return self.info.pop(info)
+
+
+class TestClientStartRequest(unittest.TestCase):
+ @staticmethod
+ def _TestCurlConfig(curl):
+ curl.setopt(pycurl.SSLKEYTYPE, "PEM")
+
+ def test(self):
+ for method in [http.HTTP_GET, http.HTTP_PUT, "CUSTOM"]:
+ for port in [8761, 29796, 19528]:
+ for curl_config_fn in [None, self._TestCurlConfig]:
+ for read_timeout in [None, 0, 1, 123, 36000]:
+ self._TestInner(method, port, curl_config_fn, read_timeout)
+
+ def _TestInner(self, method, port, curl_config_fn, read_timeout):
+ for response_code in [http.HTTP_OK, http.HttpNotFound.code,
+ http.HTTP_NOT_MODIFIED]:
+ for response_body in [None, "Hello World",
+ "Very Long\tContent here\n" * 171]:
+ for errmsg in [None, "error"]:
+ req = http.client.HttpClientRequest("localhost", port, method,
+ "/version",
+ curl_config_fn=curl_config_fn,
+ read_timeout=read_timeout)
+ curl = _FakeCurl()
+ pending = http.client._StartRequest(curl, req)
+ self.assertEqual(pending.GetCurlHandle(), curl)
+ self.assertEqual(pending.GetCurrentRequest(), req)
+
+ # Check options
+ opts = curl.opts
+ self.assertEqual(opts.pop(pycurl.CUSTOMREQUEST), method)
+ self.assertEqual(opts.pop(pycurl.URL),
+ "https://localhost:%s/version" % port)
+ if read_timeout is None:
+ self.assertEqual(opts.pop(pycurl.TIMEOUT), 0)
+ else:
+ self.assertEqual(opts.pop(pycurl.TIMEOUT), read_timeout)
+ self.assertFalse(opts.pop(pycurl.VERBOSE))
+ self.assertTrue(opts.pop(pycurl.NOSIGNAL))
+ self.assertEqual(opts.pop(pycurl.USERAGENT),
+ http.HTTP_GANETI_VERSION)
+ self.assertEqual(opts.pop(pycurl.PROXY), "")
+ self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+ self.assertFalse(opts.pop(pycurl.HTTPHEADER))
+ write_fn = opts.pop(pycurl.WRITEFUNCTION)
+ self.assertTrue(callable(write_fn))
+ if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+ self.assertFalse(opts.pop(pycurl.SSL_SESSIONID_CACHE))
+ if curl_config_fn:
+ self.assertEqual(opts.pop(pycurl.SSLKEYTYPE), "PEM")
+ else:
+ self.assertFalse(pycurl.SSLKEYTYPE in opts)
+ self.assertFalse(opts)
+
+ if response_body is not None:
+ offset = 0
+ while offset < len(response_body):
+ piece = response_body[offset:offset + 10]
+ write_fn(piece)
+ offset += len(piece)
+
+ curl.info = {
+ pycurl.RESPONSE_CODE: response_code,
+ }
+
+ # Finalize request
+ pending.Done(errmsg)
+
+ self.assertFalse(curl.info)
+
+ # Can only finalize once
+ self.assertRaises(AssertionError, pending.Done, True)
+
+ if errmsg:
+ self.assertFalse(req.success)
+ else:
+ self.assertTrue(req.success)
+ self.assertEqual(req.error, errmsg)
+ self.assertEqual(req.resp_status_code, response_code)
+ if response_body is None:
+ self.assertEqual(req.resp_body, "")
+ else:
+ self.assertEqual(req.resp_body, response_body)
+
+ # Check if resetting worked
+ assert not hasattr(curl, "reset")
+ opts = curl.opts
+ self.assertFalse(opts.pop(pycurl.POSTFIELDS))
+ self.assertTrue(callable(opts.pop(pycurl.WRITEFUNCTION)))
+ self.assertFalse(opts)
+
+ self.assertFalse(curl.opts,
+ msg="Previous checks did not consume all options")
+ assert id(opts) == id(curl.opts)
+
+ def _TestWrongTypes(self, *args, **kwargs):
+ req = http.client.HttpClientRequest(*args, **kwargs)
+ self.assertRaises(AssertionError, http.client._StartRequest,
+ _FakeCurl(), req)
+
+ def testWrongHostType(self):
+ self._TestWrongTypes(unicode("localhost"), 8080, "GET", "/version")
+
+ def testWrongUrlType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", unicode("/version"))
+
+ def testWrongMethodType(self):
+ self._TestWrongTypes("localhost", 8080, unicode("GET"), "/version")
+
+ def testWrongHeaderType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", "/version",
+ headers={
+ unicode("foo"): "bar",
+ })
+
+ def testWrongPostDataType(self):
+ self._TestWrongTypes("localhost", 8080, "GET", "/version",
+ post_data=unicode("verylongdata" * 100))
+
+
+class _EmptyCurlMulti:
+ def perform(self):
+ return (pycurl.E_MULTI_OK, 0)
+
+ def info_read(self):
+ return (0, [], [])
+
+
+class TestClientProcessRequests(unittest.TestCase):
+ def testEmpty(self):
+ requests = []
+ http.client.ProcessRequests(requests, _curl=NotImplemented,
+ _curl_multi=_EmptyCurlMulti)
+ self.assertEqual(requests, [])
+
+
+class TestProcessCurlRequests(unittest.TestCase):
+ class _FakeCurlMulti:
+ def __init__(self):
+ self.handles = []
+ self.will_fail = []
+ self._expect = ["perform"]
+ self._counter = itertools.count()
+
+ def add_handle(self, curl):
+ assert curl not in self.handles
+ self.handles.append(curl)
+ if self._counter.next() % 3 == 0:
+ self.will_fail.append(curl)
+
+ def remove_handle(self, curl):
+ self.handles.remove(curl)
+
+ def perform(self):
+ assert self._expect.pop(0) == "perform"
+
+ if self._counter.next() % 2 == 0:
+ self._expect.append("perform")
+ return (pycurl.E_CALL_MULTI_PERFORM, None)
+
+ self._expect.append("info_read")
+
+ return (pycurl.E_MULTI_OK, len(self.handles))
+
+ def info_read(self):
+ assert self._expect.pop(0) == "info_read"
+ successful = []
+ failed = []
+ if self.handles:
+ if self._counter.next() % 17 == 0:
+ curl = self.handles[0]
+ if curl in self.will_fail:
+ failed.append((curl, -1, "test error"))
+ else:
+ successful.append(curl)
+ remaining_messages = len(self.handles) % 3
+ if remaining_messages > 0:
+ self._expect.append("info_read")
+ else:
+ self._expect.append("select")
+ else:
+ remaining_messages = 0
+ self._expect.append("select")
+ return (remaining_messages, successful, failed)
+
+ def select(self, timeout):
+ # Never compare floats for equality
+ assert timeout >= 0.95 and timeout <= 1.05
+ assert self._expect.pop(0) == "select"
+ self._expect.append("perform")
+
+ def test(self):
+ requests = [_FakeCurl() for _ in range(10)]
+ multi = self._FakeCurlMulti()
+ for (curl, errmsg) in http.client._ProcessCurlRequests(multi, requests):
+ self.assertTrue(curl not in multi.handles)
+ if curl in multi.will_fail:
+ self.assertTrue("test error" in errmsg)
+ else:
+ self.assertTrue(errmsg is None)
+ self.assertFalse(multi.handles)
+ self.assertEqual(multi._expect, ["select"])
+
+
+class TestProcessRequests(unittest.TestCase):
+ class _DummyCurlMulti:
+ pass
+
+ def testNoMonitor(self):
+ self._Test(False)
+
+ def testWithMonitor(self):
+ self._Test(True)
+
+ class _MonitorChecker:
+ def __init__(self):
+ self._monitor = None
+
+ def GetMonitor(self):
+ return self._monitor
+
+ def __call__(self, monitor):
+ assert callable(monitor.GetLockInfo)
+ self._monitor = monitor
+
+ def _Test(self, use_monitor):
+ def cfg_fn(port, curl):
+ curl.opts["__port__"] = port
+
+ def _LockCheckReset(monitor, req):
+ self.assertTrue(monitor._lock.is_owned(shared=0),
+ msg="Lock must be owned in exclusive mode")
+ assert not hasattr(req, "lockcheck__")
+ setattr(req, "lockcheck__", True)
+
+ def _BuildNiceName(port, default=None):
+ if port % 5 == 0:
+ return "nicename%s" % port
+ else:
+ # Use standard name
+ return default
+
+ requests = \
+ [http.client.HttpClientRequest("localhost", i, "POST", "/version%s" % i,
+ curl_config_fn=compat.partial(cfg_fn, i),
+ completion_cb=NotImplementedError,
+ nicename=_BuildNiceName(i))
+ for i in range(15176, 15501)]
+ requests_count = len(requests)
+
+ if use_monitor:
+ lock_monitor_cb = self._MonitorChecker()
+ else:
+ lock_monitor_cb = None
+
+ def _ProcessRequests(multi, handles):
+ self.assertTrue(isinstance(multi, self._DummyCurlMulti))
+ self.assertEqual(len(requests), len(handles))
+ self.assertTrue(compat.all(isinstance(curl, _FakeCurl)
+ for curl in handles))
+
+ # Prepare for lock check
+ for req in requests:
+ assert req.completion_cb is NotImplementedError
+ if use_monitor:
+ req.completion_cb = \
+ compat.partial(_LockCheckReset, lock_monitor_cb.GetMonitor())
+
+ for idx, curl in enumerate(handles):
+ try:
+ port = curl.opts["__port__"]
+ except KeyError:
+ self.fail("Per-request config function was not called")
+
+ if use_monitor:
+ # Check if lock information is correct
+ lock_info = lock_monitor_cb.GetMonitor().GetLockInfo(None)
+ expected = \
+ [("rpc/%s" % (_BuildNiceName(handle.opts["__port__"],
+ default=("localhost/version%s" %
+ handle.opts["__port__"]))),
+ None,
+ [threading.currentThread().getName()], None)
+ for handle in handles[idx:]]
+ self.assertEqual(sorted(lock_info), sorted(expected))
+
+ if port % 3 == 0:
+ response_code = http.HTTP_OK
+ msg = None
+ else:
+ response_code = http.HttpNotFound.code
+ msg = "test error"
+
+ curl.info = {
+ pycurl.RESPONSE_CODE: response_code,
+ }
+
+ # Prepare for reset
+ self.assertFalse(curl.opts.pop(pycurl.POSTFIELDS))
+ self.assertTrue(callable(curl.opts.pop(pycurl.WRITEFUNCTION)))
+
+ yield (curl, msg)
+
+ if use_monitor:
+ self.assertTrue(compat.all(req.lockcheck__ for req in requests))
+
+ if use_monitor:
+ self.assertEqual(lock_monitor_cb.GetMonitor(), None)
+
+ http.client.ProcessRequests(requests, lock_monitor_cb=lock_monitor_cb,
+ _curl=_FakeCurl,
+ _curl_multi=self._DummyCurlMulti,
+ _curl_process=_ProcessRequests)
+ for req in requests:
+ if req.port % 3 == 0:
+ self.assertTrue(req.success)
+ self.assertEqual(req.error, None)
+ else:
+ self.assertFalse(req.success)
+ self.assertTrue("test error" in req.error)
+
+ # See if monitor was disabled
+ if use_monitor:
+ monitor = lock_monitor_cb.GetMonitor()
+ self.assertEqual(monitor._pending_fn, None)
+ self.assertEqual(monitor.GetLockInfo(None), [])
+ else:
+ self.assertEqual(lock_monitor_cb, None)
+
+ self.assertEqual(len(requests), requests_count)
+
+ def testBadRequest(self):
+ bad_request = http.client.HttpClientRequest("localhost", 27784,
+ "POST", "/version")
+ bad_request.success = False
+
+ self.assertRaises(AssertionError, http.client.ProcessRequests,
+ [bad_request], _curl=NotImplemented,
+ _curl_multi=NotImplemented, _curl_process=NotImplemented)
+
+
if __name__ == '__main__':
- unittest.main()
+ testutils.GanetiTestProgram()