import logging
import pycurl
+import threading
from cStringIO import StringIO
from ganeti import http
from ganeti import compat
+from ganeti import netutils
+from ganeti import locking
class HttpClientRequest(object):
def __init__(self, host, port, method, path, headers=None, post_data=None,
- read_timeout=None, curl_config_fn=None):
+ read_timeout=None, curl_config_fn=None, nicename=None,
+ completion_cb=None):
"""Describes an HTTP request.
@type host: string
timeout while reading the response from the server
@type curl_config_fn: callable
@param curl_config_fn: Function to configure cURL object before request
- (Note: if the function configures the connection in
- a way where it wouldn't be efficient to reuse them,
- a "identity" property should be defined, see
- L{HttpClientRequest.identity})
+ @type nicename: string
+ @param nicename: Name, presentable to a user, to describe this request (no
+ whitespace)
+ @type completion_cb: callable accepting this request object as a single
+ parameter
+ @param completion_cb: Callback for request completion
"""
assert path.startswith("/"), "Path must start with slash (/)"
assert curl_config_fn is None or callable(curl_config_fn)
+ assert completion_cb is None or callable(completion_cb)
# Request attributes
self.host = host
self.path = path
self.read_timeout = read_timeout
self.curl_config_fn = curl_config_fn
+ self.nicename = nicename
+ self.completion_cb = completion_cb
if post_data is None:
self.post_data = ""
"""Returns the full URL for this requests.
"""
+ if netutils.IPAddress.IsValid(self.host):
+ address = netutils.FormatAddress((self.host, self.port))
+ else:
+ address = "%s:%s" % (self.host, self.port)
# TODO: Support for non-SSL requests
- return "https://%s:%s%s" % (self.host, self.port, self.path)
+ return "https://%s%s" % (address, self.path)
- @property
- def identity(self):
- """Returns identifier for retrieving a pooled connection for this request.
- This allows cURL client objects to be re-used and to cache information
- (e.g. SSL session IDs or connections).
+def _StartRequest(curl, req):
+ """Starts a request on a cURL object.
- """
- parts = [self.host, self.port]
+ @type curl: pycurl.Curl
+ @param curl: cURL object
+ @type req: L{HttpClientRequest}
+ @param req: HTTP request
- if self.curl_config_fn:
- try:
- parts.append(self.curl_config_fn.identity)
- except AttributeError:
- pass
+ """
+ logging.debug("Starting request %r", req)
- return "/".join(str(i) for i in parts)
+ url = req.url
+ method = req.method
+ post_data = req.post_data
+ headers = req.headers
+ # PycURL requires strings to be non-unicode
+ assert isinstance(method, str)
+ assert isinstance(url, str)
+ assert isinstance(post_data, str)
+ assert compat.all(isinstance(i, str) for i in headers)
-class _HttpClient(object):
- def __init__(self, curl_config_fn):
- """Initializes this class.
+ # Buffer for response
+ resp_buffer = StringIO()
- @type curl_config_fn: callable
- @param curl_config_fn: Function to configure cURL object after
- initialization
+ # Configure client for request
+ curl.setopt(pycurl.VERBOSE, False)
+ curl.setopt(pycurl.NOSIGNAL, True)
+ curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
+ curl.setopt(pycurl.PROXY, "")
+ curl.setopt(pycurl.CUSTOMREQUEST, str(method))
+ curl.setopt(pycurl.URL, url)
+ curl.setopt(pycurl.POSTFIELDS, post_data)
+ curl.setopt(pycurl.HTTPHEADER, headers)
- """
- self._req = None
+ if req.read_timeout is None:
+ curl.setopt(pycurl.TIMEOUT, 0)
+ else:
+ curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
- curl = self._CreateCurlHandle()
- curl.setopt(pycurl.VERBOSE, False)
- curl.setopt(pycurl.NOSIGNAL, True)
- curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
- curl.setopt(pycurl.PROXY, "")
+ # Disable SSL session ID caching (pycurl >= 7.16.0)
+ if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+ curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
- # Disable SSL session ID caching (pycurl >= 7.16.0)
- if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
- curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
+ curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
- # Pass cURL object to external config function
- if curl_config_fn:
- curl_config_fn(curl)
+ # Pass cURL object to external config function
+ if req.curl_config_fn:
+ req.curl_config_fn(curl)
- self._curl = curl
+ return _PendingRequest(curl, req, resp_buffer.getvalue)
- @staticmethod
- def _CreateCurlHandle():
- """Returns a new cURL object.
+
+class _PendingRequest:
+ def __init__(self, curl, req, resp_buffer_read):
+ """Initializes this class.
+
+ @type curl: pycurl.Curl
+ @param curl: cURL object
+ @type req: L{HttpClientRequest}
+ @param req: HTTP request
+ @type resp_buffer_read: callable
+ @param resp_buffer_read: Function to read response body
"""
- return pycurl.Curl()
+ assert req.success is None
+
+ self._curl = curl
+ self._req = req
+ self._resp_buffer_read = resp_buffer_read
def GetCurlHandle(self):
"""Returns the cURL object.
def GetCurrentRequest(self):
"""Returns the current request.
- @rtype: L{HttpClientRequest} or None
-
"""
return self._req
- def StartRequest(self, req):
- """Starts a request on this client.
-
- @type req: L{HttpClientRequest}
- @param req: HTTP request
-
- """
- assert not self._req, "Another request is already started"
-
- self._req = req
- self._resp_buffer = StringIO()
-
- url = req.url
- method = req.method
- post_data = req.post_data
- headers = req.headers
-
- # PycURL requires strings to be non-unicode
- assert isinstance(method, str)
- assert isinstance(url, str)
- assert isinstance(post_data, str)
- assert compat.all(isinstance(i, str) for i in headers)
-
- # Configure cURL object for request
- curl = self._curl
- curl.setopt(pycurl.CUSTOMREQUEST, str(method))
- curl.setopt(pycurl.URL, url)
- curl.setopt(pycurl.POSTFIELDS, post_data)
- curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
- curl.setopt(pycurl.HTTPHEADER, headers)
-
- if req.read_timeout is None:
- curl.setopt(pycurl.TIMEOUT, 0)
- else:
- curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
-
- # Pass cURL object to external config function
- if req.curl_config_fn:
- req.curl_config_fn(curl)
-
def Done(self, errmsg):
"""Finishes a request.
@param errmsg: Error message if request failed
"""
+ curl = self._curl
req = self._req
- assert req, "No request"
- logging.debug("Request %s finished, errmsg=%s", req, errmsg)
+ assert req.success is None, "Request has already been finalized"
- curl = self._curl
+ logging.debug("Request %s finished, errmsg=%s", req, errmsg)
req.success = not bool(errmsg)
req.error = errmsg
# Get HTTP response code
req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
- req.resp_body = self._resp_buffer.getvalue()
-
- # Reset client object
- self._req = None
- self._resp_buffer = None
+ req.resp_body = self._resp_buffer_read()
# Ensure no potentially large variables are referenced
curl.setopt(pycurl.POSTFIELDS, "")
curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
+ if req.completion_cb:
+ req.completion_cb(req)
-class _PooledHttpClient:
- """Data structure for HTTP client pool.
-
- """
- def __init__(self, identity, client):
- """Initializes this class.
-
- @type identity: string
- @param identity: Client identifier for pool
- @type client: L{_HttpClient}
- @param client: HTTP client
-
- """
- self.identity = identity
- self.client = client
- self.lastused = 0
-
- def __repr__(self):
- status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
- "id=%s" % self.identity,
- "lastuse=%s" % self.lastused,
- repr(self.client)]
- return "<%s at %#x>" % (" ".join(status), id(self))
+class _NoOpRequestMonitor: # pylint: disable=W0232
+ """No-op request monitor.
+ """
+ @staticmethod
+ def acquire(*args, **kwargs):
+ pass
-class HttpClientPool:
- """A simple HTTP client pool.
+ release = acquire
+ Disable = acquire
- Supports one pooled connection per identity (see
- L{HttpClientRequest.identity}).
- """
- #: After how many generations to drop unused clients
- _MAX_GENERATIONS_DROP = 25
+class _PendingRequestMonitor:
+ _LOCK = "_lock"
- def __init__(self, curl_config_fn):
+ def __init__(self, owner, pending_fn):
"""Initializes this class.
- @type curl_config_fn: callable
- @param curl_config_fn: Function to configure cURL object after
- initialization
-
"""
- self._curl_config_fn = curl_config_fn
- self._generation = 0
- self._pool = {}
+ self._owner = owner
+ self._pending_fn = pending_fn
- @staticmethod
- def _GetHttpClientCreator():
- """Returns callable to create HTTP client.
+ # The lock monitor runs in another thread, hence locking is necessary
+ self._lock = locking.SharedLock("PendingHttpRequests")
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
+ @locking.ssynchronized(_LOCK)
+ def Disable(self):
+ """Disable monitor.
"""
- return _HttpClient
+ self._pending_fn = None
- def _Get(self, identity):
- """Gets an HTTP client from the pool.
+ @locking.ssynchronized(_LOCK, shared=1)
+ def GetLockInfo(self, requested): # pylint: disable=W0613
+ """Retrieves information about pending requests.
- @type identity: string
- @param identity: Client identifier
+ @type requested: set
+ @param requested: Requested information, see C{query.LQ_*}
"""
- try:
- pclient = self._pool.pop(identity)
- except KeyError:
- # Need to create new client
- client = self._GetHttpClientCreator()(self._curl_config_fn)
- pclient = _PooledHttpClient(identity, client)
- logging.debug("Created new client %s", pclient)
- else:
- logging.debug("Reusing client %s", pclient)
+ # No need to sort here, that's being done by the lock manager and query
+ # library. There are no priorities for requests, hence all show up as
+ # one item under "pending".
+ result = []
- assert pclient.identity == identity
+ if self._pending_fn:
+ owner_name = self._owner.getName()
- return pclient
+ for client in self._pending_fn():
+ req = client.GetCurrentRequest()
+ if req:
+ if req.nicename is None:
+ name = "%s%s" % (req.host, req.path)
+ else:
+ name = req.nicename
+ result.append(("rpc/%s" % name, None, [owner_name], None))
- def _StartRequest(self, req):
- """Starts a request.
+ return result
- @type req: L{HttpClientRequest}
- @param req: HTTP request
- """
- logging.debug("Starting request %r", req)
- pclient = self._Get(req.identity)
+def _ProcessCurlRequests(multi, requests):
+ """cURL request processor.
- assert req.identity not in self._pool
+ This generator yields a tuple once for every completed request, successful or
+ not. The first value in the tuple is the handle, the second an error message
+ or C{None} for successful requests.
- pclient.client.StartRequest(req)
- pclient.lastused = self._generation
+ @type multi: C{pycurl.CurlMulti}
+ @param multi: cURL multi object
+ @type requests: sequence
+ @param requests: cURL request handles
- return pclient
-
- def _Return(self, pclients):
- """Returns HTTP clients to the pool.
+ """
+ for curl in requests:
+ multi.add_handle(curl)
- """
- for pc in pclients:
- logging.debug("Returning client %s to pool", pc)
- assert pc.identity not in self._pool
- assert pc not in self._pool.values()
- self._pool[pc.identity] = pc
-
- # Check for unused clients
- for pc in self._pool.values():
- if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
- logging.debug("Removing client %s which hasn't been used"
- " for %s generations",
- pc, self._MAX_GENERATIONS_DROP)
- self._pool.pop(pc.identity, None)
-
- assert compat.all(pc.lastused >= (self._generation -
- self._MAX_GENERATIONS_DROP)
- for pc in self._pool.values())
+ while True:
+ (ret, active) = multi.perform()
+ assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
- @staticmethod
- def _CreateCurlMultiHandle():
- """Creates new cURL multi handle.
+ if ret == pycurl.E_CALL_MULTI_PERFORM:
+ # cURL wants to be called again
+ continue
- """
- return pycurl.CurlMulti()
+ while True:
+ (remaining_messages, successful, failed) = multi.info_read()
- def ProcessRequests(self, requests):
- """Processes any number of HTTP client requests using pooled objects.
+ for curl in successful:
+ multi.remove_handle(curl)
+ yield (curl, None)
- @type requests: list of L{HttpClientRequest}
- @param requests: List of all requests
+ for curl, errnum, errmsg in failed:
+ multi.remove_handle(curl)
+ yield (curl, "Error %s: %s" % (errnum, errmsg))
- """
- multi = self._CreateCurlMultiHandle()
+ if remaining_messages == 0:
+ break
- # For client cleanup
- self._generation += 1
+ if active == 0:
+ # No active handles anymore
+ break
- assert compat.all((req.error is None and
- req.success is None and
- req.resp_status_code is None and
- req.resp_body is None)
- for req in requests)
+ # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
+ # timeouts, which are only evaluated in multi.perform, aren't
+ # unnecessarily delayed.
+ multi.select(1.0)
- curl_to_pclient = {}
- for req in requests:
- pclient = self._StartRequest(req)
- curl = pclient.client.GetCurlHandle()
- curl_to_pclient[curl] = pclient
- multi.add_handle(curl)
- assert pclient.client.GetCurrentRequest() == req
- assert pclient.lastused >= 0
- assert len(curl_to_pclient) == len(requests)
+def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
+ _curl_multi=pycurl.CurlMulti,
+ _curl_process=_ProcessCurlRequests):
+ """Processes any number of HTTP client requests.
- done_count = 0
- while True:
- (ret, _) = multi.perform()
- assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
-
- if ret == pycurl.E_CALL_MULTI_PERFORM:
- # cURL wants to be called again
- continue
-
- while True:
- (remaining_messages, successful, failed) = multi.info_read()
-
- for curl in successful:
- multi.remove_handle(curl)
- done_count += 1
- pclient = curl_to_pclient[curl]
- req = pclient.client.GetCurrentRequest()
- pclient.client.Done(None)
- assert req.success
- assert not pclient.client.GetCurrentRequest()
-
- for curl, errnum, errmsg in failed:
- multi.remove_handle(curl)
- done_count += 1
- pclient = curl_to_pclient[curl]
- req = pclient.client.GetCurrentRequest()
- pclient.client.Done("Error %s: %s" % (errnum, errmsg))
- assert req.error
- assert not pclient.client.GetCurrentRequest()
-
- if remaining_messages == 0:
- break
-
- assert done_count <= len(requests)
-
- if done_count == len(requests):
- break
+ @type requests: list of L{HttpClientRequest}
+ @param requests: List of all requests
+ @param lock_monitor_cb: Callable for registering with lock monitor
- # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
- # timeouts, which are only evaluated in multi.perform, aren't
- # unnecessarily delayed.
- multi.select(1.0)
+ """
+ assert compat.all((req.error is None and
+ req.success is None and
+ req.resp_status_code is None and
+ req.resp_body is None)
+ for req in requests)
+
+ # Prepare all requests
+ curl_to_client = \
+ dict((client.GetCurlHandle(), client)
+ for client in map(lambda req: _StartRequest(_curl(), req), requests))
+
+ assert len(curl_to_client) == len(requests)
+
+ if lock_monitor_cb:
+ monitor = _PendingRequestMonitor(threading.currentThread(),
+ curl_to_client.values)
+ lock_monitor_cb(monitor)
+ else:
+ monitor = _NoOpRequestMonitor
+
+ # Process all requests and act based on the returned values
+ for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
+ monitor.acquire(shared=0)
+ try:
+ curl_to_client.pop(curl).Done(msg)
+ finally:
+ monitor.release()
- assert compat.all(pclient.client.GetCurrentRequest() is None
- for pclient in curl_to_pclient.values())
+ assert not curl_to_client, "Not all requests were processed"
- # Return clients to pool
- self._Return(curl_to_pclient.values())
+ # Don't try to read information anymore as all requests have been processed
+ monitor.Disable()
- assert done_count == len(requests)
- assert compat.all(req.error is not None or
- (req.success and
- req.resp_status_code is not None and
- req.resp_body is not None)
- for req in requests)
+ assert compat.all(req.error is not None or
+ (req.success and
+ req.resp_status_code is not None and
+ req.resp_body is not None)
+ for req in requests)