X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/aea5caefb5908bfe19a981d8d7b19c116330a213..c4929a8bcca4a43dc6434394a91a8ea67d854844:/lib/http/client.py?ds=sidebyside diff --git a/lib/http/client.py b/lib/http/client.py index 4095bb7..849a195 100644 --- a/lib/http/client.py +++ b/lib/http/client.py @@ -35,7 +35,8 @@ from ganeti import locking class HttpClientRequest(object): def __init__(self, host, port, method, path, headers=None, post_data=None, - read_timeout=None, curl_config_fn=None): + read_timeout=None, curl_config_fn=None, nicename=None, + completion_cb=None): """Describes an HTTP request. @type host: string @@ -55,14 +56,17 @@ class HttpClientRequest(object): timeout while reading the response from the server @type curl_config_fn: callable @param curl_config_fn: Function to configure cURL object before request - (Note: if the function configures the connection in - a way where it wouldn't be efficient to reuse them, - a "identity" property should be defined, see - L{HttpClientRequest.identity}) + @type nicename: string + @param nicename: Name, presentable to a user, to describe this request (no + whitespace) + @type completion_cb: callable accepting this request object as a single + parameter + @param completion_cb: Callback for request completion """ assert path.startswith("/"), "Path must start with slash (/)" assert curl_config_fn is None or callable(curl_config_fn) + assert completion_cb is None or callable(completion_cb) # Request attributes self.host = host @@ -71,6 +75,8 @@ class HttpClientRequest(object): self.path = path self.read_timeout = read_timeout self.curl_config_fn = curl_config_fn + self.nicename = nicename + self.completion_cb = completion_cb if post_data is None: self.post_data = "" @@ -114,58 +120,77 @@ class HttpClientRequest(object): # TODO: Support for non-SSL requests return "https://%s%s" % (address, self.path) - @property - def identity(self): - """Returns identifier for retrieving a pooled connection for this request. - This allows cURL client objects to be re-used and to cache information - (e.g. SSL session IDs or connections). +def _StartRequest(curl, req): + """Starts a request on a cURL object. - """ - parts = [self.host, self.port] + @type curl: pycurl.Curl + @param curl: cURL object + @type req: L{HttpClientRequest} + @param req: HTTP request - if self.curl_config_fn: - try: - parts.append(self.curl_config_fn.identity) - except AttributeError: - pass + """ + logging.debug("Starting request %r", req) - return "/".join(str(i) for i in parts) + url = req.url + method = req.method + post_data = req.post_data + headers = req.headers + # PycURL requires strings to be non-unicode + assert isinstance(method, str) + assert isinstance(url, str) + assert isinstance(post_data, str) + assert compat.all(isinstance(i, str) for i in headers) -class _HttpClient(object): - def __init__(self, curl_config_fn): - """Initializes this class. + # Buffer for response + resp_buffer = StringIO() - @type curl_config_fn: callable - @param curl_config_fn: Function to configure cURL object after - initialization + # Configure client for request + curl.setopt(pycurl.VERBOSE, False) + curl.setopt(pycurl.NOSIGNAL, True) + curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) + curl.setopt(pycurl.PROXY, "") + curl.setopt(pycurl.CUSTOMREQUEST, str(method)) + curl.setopt(pycurl.URL, url) + curl.setopt(pycurl.POSTFIELDS, post_data) + curl.setopt(pycurl.HTTPHEADER, headers) - """ - self._req = None + if req.read_timeout is None: + curl.setopt(pycurl.TIMEOUT, 0) + else: + curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) - curl = self._CreateCurlHandle() - curl.setopt(pycurl.VERBOSE, False) - curl.setopt(pycurl.NOSIGNAL, True) - curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) - curl.setopt(pycurl.PROXY, "") + # Disable SSL session ID caching (pycurl >= 7.16.0) + if hasattr(pycurl, "SSL_SESSIONID_CACHE"): + curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) - # Disable SSL session ID caching (pycurl >= 7.16.0) - if hasattr(pycurl, "SSL_SESSIONID_CACHE"): - curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) + curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write) - # Pass cURL object to external config function - if curl_config_fn: - curl_config_fn(curl) + # Pass cURL object to external config function + if req.curl_config_fn: + req.curl_config_fn(curl) - self._curl = curl + return _PendingRequest(curl, req, resp_buffer.getvalue) - @staticmethod - def _CreateCurlHandle(): - """Returns a new cURL object. + +class _PendingRequest: + def __init__(self, curl, req, resp_buffer_read): + """Initializes this class. + + @type curl: pycurl.Curl + @param curl: cURL object + @type req: L{HttpClientRequest} + @param req: HTTP request + @type resp_buffer_read: callable + @param resp_buffer_read: Function to read response body """ - return pycurl.Curl() + assert req.success is None + + self._curl = curl + self._req = req + self._resp_buffer_read = resp_buffer_read def GetCurlHandle(self): """Returns the cURL object. @@ -176,53 +201,9 @@ class _HttpClient(object): def GetCurrentRequest(self): """Returns the current request. - @rtype: L{HttpClientRequest} or None - """ return self._req - def StartRequest(self, req): - """Starts a request on this client. - - @type req: L{HttpClientRequest} - @param req: HTTP request - - """ - assert not self._req, "Another request is already started" - - logging.debug("Starting request %r", req) - - self._req = req - self._resp_buffer = StringIO() - - url = req.url - method = req.method - post_data = req.post_data - headers = req.headers - - # PycURL requires strings to be non-unicode - assert isinstance(method, str) - assert isinstance(url, str) - assert isinstance(post_data, str) - assert compat.all(isinstance(i, str) for i in headers) - - # Configure cURL object for request - curl = self._curl - curl.setopt(pycurl.CUSTOMREQUEST, str(method)) - curl.setopt(pycurl.URL, url) - curl.setopt(pycurl.POSTFIELDS, post_data) - curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write) - curl.setopt(pycurl.HTTPHEADER, headers) - - if req.read_timeout is None: - curl.setopt(pycurl.TIMEOUT, 0) - else: - curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) - - # Pass cURL object to external config function - if req.curl_config_fn: - req.curl_config_fn(curl) - def Done(self, errmsg): """Finishes a request. @@ -230,222 +211,26 @@ class _HttpClient(object): @param errmsg: Error message if request failed """ + curl = self._curl req = self._req - assert req, "No request" - logging.debug("Request %s finished, errmsg=%s", req, errmsg) + assert req.success is None, "Request has already been finalized" - curl = self._curl + logging.debug("Request %s finished, errmsg=%s", req, errmsg) req.success = not bool(errmsg) req.error = errmsg # Get HTTP response code req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) - req.resp_body = self._resp_buffer.getvalue() - - # Reset client object - self._req = None - self._resp_buffer = None + req.resp_body = self._resp_buffer_read() # Ensure no potentially large variables are referenced curl.setopt(pycurl.POSTFIELDS, "") curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) - -class _PooledHttpClient: - """Data structure for HTTP client pool. - - """ - def __init__(self, identity, client): - """Initializes this class. - - @type identity: string - @param identity: Client identifier for pool - @type client: L{_HttpClient} - @param client: HTTP client - - """ - self.identity = identity - self.client = client - self.lastused = 0 - - def __repr__(self): - status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), - "id=%s" % self.identity, - "lastuse=%s" % self.lastused, - repr(self.client)] - - return "<%s at %#x>" % (" ".join(status), id(self)) - - -class HttpClientPool: - """A simple HTTP client pool. - - Supports one pooled connection per identity (see - L{HttpClientRequest.identity}). - - """ - #: After how many generations to drop unused clients - _MAX_GENERATIONS_DROP = 25 - - def __init__(self, curl_config_fn): - """Initializes this class. - - @type curl_config_fn: callable - @param curl_config_fn: Function to configure cURL object after - initialization - - """ - self._curl_config_fn = curl_config_fn - self._generation = 0 - self._pool = {} - - # Create custom logger for HTTP client pool. Change logging level to - # C{logging.NOTSET} to get more details. - self._logger = logging.getLogger(self.__class__.__name__) - self._logger.setLevel(logging.INFO) - - @staticmethod - def _GetHttpClientCreator(): - """Returns callable to create HTTP client. - - """ - return _HttpClient - - def _Get(self, identity): - """Gets an HTTP client from the pool. - - @type identity: string - @param identity: Client identifier - - """ - try: - pclient = self._pool.pop(identity) - except KeyError: - # Need to create new client - client = self._GetHttpClientCreator()(self._curl_config_fn) - pclient = _PooledHttpClient(identity, client) - self._logger.debug("Created new client %s", pclient) - else: - self._logger.debug("Reusing client %s", pclient) - - assert pclient.identity == identity - - return pclient - - def _StartRequest(self, req): - """Starts a request. - - @type req: L{HttpClientRequest} - @param req: HTTP request - - """ - pclient = self._Get(req.identity) - - assert req.identity not in self._pool - - pclient.client.StartRequest(req) - pclient.lastused = self._generation - - return pclient - - def _Return(self, pclients): - """Returns HTTP clients to the pool. - - """ - assert not frozenset(pclients) & frozenset(self._pool.values()) - - for pc in pclients: - self._logger.debug("Returning client %s to pool", pc) - assert pc.identity not in self._pool - self._pool[pc.identity] = pc - - # Check for unused clients - for pc in self._pool.values(): - if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation: - self._logger.debug("Removing client %s which hasn't been used" - " for %s generations", - pc, self._MAX_GENERATIONS_DROP) - self._pool.pop(pc.identity, None) - - assert compat.all(pc.lastused >= (self._generation - - self._MAX_GENERATIONS_DROP) - for pc in self._pool.values()) - - @staticmethod - def _CreateCurlMultiHandle(): - """Creates new cURL multi handle. - - """ - return pycurl.CurlMulti() - - def ProcessRequests(self, requests, lock_monitor_cb=None): - """Processes any number of HTTP client requests using pooled objects. - - @type requests: list of L{HttpClientRequest} - @param requests: List of all requests - @param lock_monitor_cb: Callable for registering with lock monitor - - """ - # For client cleanup - self._generation += 1 - - assert compat.all((req.error is None and - req.success is None and - req.resp_status_code is None and - req.resp_body is None) - for req in requests) - - curl_to_pclient = {} - for req in requests: - pclient = self._StartRequest(req) - curl_to_pclient[pclient.client.GetCurlHandle()] = pclient - assert pclient.client.GetCurrentRequest() == req - assert pclient.lastused >= 0 - - assert len(curl_to_pclient) == len(requests) - - if lock_monitor_cb: - monitor = _PendingRequestMonitor(threading.currentThread(), - curl_to_pclient.values) - lock_monitor_cb(monitor) - else: - monitor = _NoOpRequestMonitor - - # Process all requests and act based on the returned values - for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(), - curl_to_pclient.keys()): - pclient = curl_to_pclient[curl] - req = pclient.client.GetCurrentRequest() - - monitor.acquire(shared=0) - try: - pclient.client.Done(msg) - finally: - monitor.release() - - assert ((msg is None and req.success and req.error is None) ^ - (msg is not None and not req.success and req.error == msg)) - - assert compat.all(pclient.client.GetCurrentRequest() is None - for pclient in curl_to_pclient.values()) - - monitor.acquire(shared=0) - try: - # Don't try to read information from returned clients - monitor.Disable() - - # Return clients to pool - self._Return(curl_to_pclient.values()) - finally: - monitor.release() - - assert compat.all(req.error is not None or - (req.success and - req.resp_status_code is not None and - req.resp_body is not None) - for req in requests) + if req.completion_cb: + req.completion_cb(req) class _NoOpRequestMonitor: # pylint: disable=W0232 @@ -475,6 +260,7 @@ class _PendingRequestMonitor: self.acquire = self._lock.acquire self.release = self._lock.release + @locking.ssynchronized(_LOCK) def Disable(self): """Disable monitor. @@ -497,11 +283,14 @@ class _PendingRequestMonitor: if self._pending_fn: owner_name = self._owner.getName() - for pclient in self._pending_fn(): - req = pclient.client.GetCurrentRequest() + for client in self._pending_fn(): + req = client.GetCurrentRequest() if req: - result.append(("rpc/%s%s" % (req.host, req.path), None, None, - [("thread", [owner_name])])) + if req.nicename is None: + name = "%s%s" % (req.host, req.path) + else: + name = req.nicename + result.append(("rpc/%s" % name, None, [owner_name], None)) return result @@ -552,3 +341,53 @@ def _ProcessCurlRequests(multi, requests): # timeouts, which are only evaluated in multi.perform, aren't # unnecessarily delayed. multi.select(1.0) + + +def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl, + _curl_multi=pycurl.CurlMulti, + _curl_process=_ProcessCurlRequests): + """Processes any number of HTTP client requests. + + @type requests: list of L{HttpClientRequest} + @param requests: List of all requests + @param lock_monitor_cb: Callable for registering with lock monitor + + """ + assert compat.all((req.error is None and + req.success is None and + req.resp_status_code is None and + req.resp_body is None) + for req in requests) + + # Prepare all requests + curl_to_client = \ + dict((client.GetCurlHandle(), client) + for client in map(lambda req: _StartRequest(_curl(), req), requests)) + + assert len(curl_to_client) == len(requests) + + if lock_monitor_cb: + monitor = _PendingRequestMonitor(threading.currentThread(), + curl_to_client.values) + lock_monitor_cb(monitor) + else: + monitor = _NoOpRequestMonitor + + # Process all requests and act based on the returned values + for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()): + monitor.acquire(shared=0) + try: + curl_to_client.pop(curl).Done(msg) + finally: + monitor.release() + + assert not curl_to_client, "Not all requests were processed" + + # Don't try to read information anymore as all requests have been processed + monitor.Disable() + + assert compat.all(req.error is not None or + (req.success and + req.resp_status_code is not None and + req.resp_body is not None) + for req in requests)