X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7260cfbe90e7bf0a30b296a8196618c8558d08b9..091232220479c4b35c69d19d53a729c9874d8fda:/lib/http/client.py diff --git a/lib/http/client.py b/lib/http/client.py index 490e4da..849a195 100644 --- a/lib/http/client.py +++ b/lib/http/client.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2007, 2008 Google Inc. +# Copyright (C) 2007, 2008, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,29 +22,21 @@ """ -# pylint: disable-msg=E1103 - -# # E1103: %s %r has no %r member (but some types could not be -# inferred), since _socketobject could be ssl or not and pylint -# doesn't parse that - - -import os -import select -import socket -import errno +import logging +import pycurl import threading +from cStringIO import StringIO -from ganeti import workerpool from ganeti import http - - -HTTP_CLIENT_THREADS = 10 +from ganeti import compat +from ganeti import netutils +from ganeti import locking class HttpClientRequest(object): def __init__(self, host, port, method, path, headers=None, post_data=None, - ssl_params=None, ssl_verify_peer=False): + read_timeout=None, curl_config_fn=None, nicename=None, + completion_cb=None): """Describes an HTTP request. @type host: string @@ -55,339 +47,347 @@ class HttpClientRequest(object): @param method: Method name @type path: string @param path: Request path - @type headers: dict or None - @param headers: Additional headers to send + @type headers: list or None + @param headers: Additional headers to send, list of strings @type post_data: string or None @param post_data: Additional data to send - @type ssl_params: HttpSslParams - @param ssl_params: SSL key and certificate - @type ssl_verify_peer: bool - @param ssl_verify_peer: Whether to compare our certificate with - server's certificate + @type read_timeout: int + @param read_timeout: if passed, it will be used as the read + timeout while reading the response from the server + @type curl_config_fn: callable + @param curl_config_fn: Function to configure cURL object before request + @type nicename: string + @param nicename: Name, presentable to a user, to describe this request (no + whitespace) + @type completion_cb: callable accepting this request object as a single + parameter + @param completion_cb: Callback for request completion """ - if post_data is not None: - assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \ - "Only POST and GET requests support sending data" - assert path.startswith("/"), "Path must start with slash (/)" + assert curl_config_fn is None or callable(curl_config_fn) + assert completion_cb is None or callable(completion_cb) # Request attributes self.host = host self.port = port - self.ssl_params = ssl_params - self.ssl_verify_peer = ssl_verify_peer self.method = method self.path = path - self.headers = headers - self.post_data = post_data - + self.read_timeout = read_timeout + self.curl_config_fn = curl_config_fn + self.nicename = nicename + self.completion_cb = completion_cb + + if post_data is None: + self.post_data = "" + else: + self.post_data = post_data + + if headers is None: + self.headers = [] + elif isinstance(headers, dict): + # Support for old interface + self.headers = ["%s: %s" % (name, value) + for name, value in headers.items()] + else: + self.headers = headers + + # Response status self.success = None self.error = None - # Raw response - self.response = None - # Response attributes - self.resp_version = None self.resp_status_code = None - self.resp_reason = None - self.resp_headers = None self.resp_body = None + def __repr__(self): + status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), + "%s:%s" % (self.host, self.port), + self.method, + self.path] -class _HttpClientToServerMessageWriter(http.HttpMessageWriter): - pass + return "<%s at %#x>" % (" ".join(status), id(self)) + @property + def url(self): + """Returns the full URL for this requests. -class _HttpServerToClientMessageReader(http.HttpMessageReader): - # Length limits - START_LINE_LENGTH_MAX = 512 - HEADER_LENGTH_MAX = 4096 + """ + if netutils.IPAddress.IsValid(self.host): + address = netutils.FormatAddress((self.host, self.port)) + else: + address = "%s:%s" % (self.host, self.port) + # TODO: Support for non-SSL requests + return "https://%s%s" % (address, self.path) - def ParseStartLine(self, start_line): - """Parses the status line sent by the server. - """ - # Empty lines are skipped when reading - assert start_line +def _StartRequest(curl, req): + """Starts a request on a cURL object. - try: - [version, status, reason] = start_line.split(None, 2) - except ValueError: - try: - [version, status] = start_line.split(None, 1) - reason = "" - except ValueError: - version = http.HTTP_0_9 - - if version: - version = version.upper() - - # The status code is a three-digit number - try: - status = int(status) - if status < 100 or status > 999: - status = -1 - except ValueError: - status = -1 + @type curl: pycurl.Curl + @param curl: cURL object + @type req: L{HttpClientRequest} + @param req: HTTP request - if status == -1: - raise http.HttpError("Invalid status code (%r)" % start_line) + """ + logging.debug("Starting request %r", req) - return http.HttpServerToClientStartLine(version, status, reason) + url = req.url + method = req.method + post_data = req.post_data + headers = req.headers + # PycURL requires strings to be non-unicode + assert isinstance(method, str) + assert isinstance(url, str) + assert isinstance(post_data, str) + assert compat.all(isinstance(i, str) for i in headers) -class HttpClientRequestExecutor(http.HttpBase): - # Default headers - DEFAULT_HEADERS = { - http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION, - # TODO: For keep-alive, don't send "Connection: close" - http.HTTP_CONNECTION: "close", - } + # Buffer for response + resp_buffer = StringIO() - # Timeouts in seconds for socket layer - # TODO: Soft timeout instead of only socket timeout? - # TODO: Make read timeout configurable per OpCode? - CONNECT_TIMEOUT = 5 - WRITE_TIMEOUT = 10 - READ_TIMEOUT = None - CLOSE_TIMEOUT = 1 + # Configure client for request + curl.setopt(pycurl.VERBOSE, False) + curl.setopt(pycurl.NOSIGNAL, True) + curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION) + curl.setopt(pycurl.PROXY, "") + curl.setopt(pycurl.CUSTOMREQUEST, str(method)) + curl.setopt(pycurl.URL, url) + curl.setopt(pycurl.POSTFIELDS, post_data) + curl.setopt(pycurl.HTTPHEADER, headers) - def __init__(self, req): - """Initializes the HttpClientRequestExecutor class. + if req.read_timeout is None: + curl.setopt(pycurl.TIMEOUT, 0) + else: + curl.setopt(pycurl.TIMEOUT, int(req.read_timeout)) - @type req: HttpClientRequest - @param req: Request object + # Disable SSL session ID caching (pycurl >= 7.16.0) + if hasattr(pycurl, "SSL_SESSIONID_CACHE"): + curl.setopt(pycurl.SSL_SESSIONID_CACHE, False) - """ - http.HttpBase.__init__(self) - self.request = req + curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write) - try: - # TODO: Implement connection caching/keep-alive - self.sock = self._CreateSocket(req.ssl_params, - req.ssl_verify_peer) + # Pass cURL object to external config function + if req.curl_config_fn: + req.curl_config_fn(curl) + + return _PendingRequest(curl, req, resp_buffer.getvalue) - # Disable Python's timeout - self.sock.settimeout(None) - # Operate in non-blocking mode - self.sock.setblocking(0) +class _PendingRequest: + def __init__(self, curl, req, resp_buffer_read): + """Initializes this class. - response_msg_reader = None - response_msg = None - force_close = True + @type curl: pycurl.Curl + @param curl: cURL object + @type req: L{HttpClientRequest} + @param req: HTTP request + @type resp_buffer_read: callable + @param resp_buffer_read: Function to read response body - self._Connect() - try: - self._SendRequest() - (response_msg_reader, response_msg) = self._ReadResponse() + """ + assert req.success is None - # Only wait for server to close if we didn't have any exception. - force_close = False - finally: - # TODO: Keep-alive is not supported, always close connection - force_close = True - http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT, - self.WRITE_TIMEOUT, response_msg_reader, - force_close) + self._curl = curl + self._req = req + self._resp_buffer_read = resp_buffer_read - self.sock.close() - self.sock = None + def GetCurlHandle(self): + """Returns the cURL object. - req.response = response_msg + """ + return self._curl - req.resp_version = req.response.start_line.version - req.resp_status_code = req.response.start_line.code - req.resp_reason = req.response.start_line.reason - req.resp_headers = req.response.headers - req.resp_body = req.response.body + def GetCurrentRequest(self): + """Returns the current request. - req.success = True - req.error = None + """ + return self._req - except http.HttpError, err: - req.success = False - req.error = str(err) + def Done(self, errmsg): + """Finishes a request. - def _Connect(self): - """Non-blocking connect to host with timeout. + @type errmsg: string or None + @param errmsg: Error message if request failed """ - connected = False - while True: - try: - connect_error = self.sock.connect_ex((self.request.host, - self.request.port)) - except socket.gaierror, err: - raise http.HttpError("Connection failed: %s" % str(err)) - - if connect_error == errno.EINTR: - # Mask signals - pass - - elif connect_error == 0: - # Connection established - connected = True - break + curl = self._curl + req = self._req - elif connect_error == errno.EINPROGRESS: - # Connection started - break + assert req.success is None, "Request has already been finalized" - raise http.HttpError("Connection failed (%s: %s)" % - (connect_error, os.strerror(connect_error))) + logging.debug("Request %s finished, errmsg=%s", req, errmsg) - if not connected: - # Wait for connection - event = http.WaitForSocketCondition(self.sock, select.POLLOUT, - self.CONNECT_TIMEOUT) - if event is None: - raise http.HttpError("Timeout while connecting to server") + req.success = not bool(errmsg) + req.error = errmsg - # Get error code - connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if connect_error != 0: - raise http.HttpError("Connection failed (%s: %s)" % - (connect_error, os.strerror(connect_error))) + # Get HTTP response code + req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE) + req.resp_body = self._resp_buffer_read() - # Enable TCP keep-alive - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + # Ensure no potentially large variables are referenced + curl.setopt(pycurl.POSTFIELDS, "") + curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) - # If needed, Linux specific options are available to change the TCP - # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and - # TCP_KEEPINTVL. + if req.completion_cb: + req.completion_cb(req) - # Do the secret SSL handshake - if self.using_ssl: - self.sock.set_connect_state() # pylint: disable-msg=E1103 - try: - http.Handshake(self.sock, self.WRITE_TIMEOUT) - except http.HttpSessionHandshakeUnexpectedEOF: - raise http.HttpError("Server closed connection during SSL handshake") - def _SendRequest(self): - """Sends request to server. +class _NoOpRequestMonitor: # pylint: disable=W0232 + """No-op request monitor. - """ - # Headers - send_headers = self.DEFAULT_HEADERS.copy() + """ + @staticmethod + def acquire(*args, **kwargs): + pass - if self.request.headers: - send_headers.update(self.request.headers) + release = acquire + Disable = acquire - send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, - self.request.port) - # Response message - msg = http.HttpMessage() +class _PendingRequestMonitor: + _LOCK = "_lock" - # Combine request line. We only support HTTP/1.0 (no chunked transfers and - # no keep-alive). - # TODO: For keep-alive, change to HTTP/1.1 - msg.start_line = \ - http.HttpClientToServerStartLine(method=self.request.method.upper(), - path=self.request.path, - version=http.HTTP_1_0) - msg.headers = send_headers - msg.body = self.request.post_data + def __init__(self, owner, pending_fn): + """Initializes this class. - try: - _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT) - except http.HttpSocketTimeout: - raise http.HttpError("Timeout while sending request") - except socket.error, err: - raise http.HttpError("Error sending request: %s" % err) + """ + self._owner = owner + self._pending_fn = pending_fn - def _ReadResponse(self): - """Read response from server. + # The lock monitor runs in another thread, hence locking is necessary + self._lock = locking.SharedLock("PendingHttpRequests") + self.acquire = self._lock.acquire + self.release = self._lock.release + + @locking.ssynchronized(_LOCK) + def Disable(self): + """Disable monitor. """ - response_msg = http.HttpMessage() + self._pending_fn = None - try: - response_msg_reader = \ - _HttpServerToClientMessageReader(self.sock, response_msg, - self.READ_TIMEOUT) - except http.HttpSocketTimeout: - raise http.HttpError("Timeout while reading response") - except socket.error, err: - raise http.HttpError("Error reading response: %s" % err) + @locking.ssynchronized(_LOCK, shared=1) + def GetLockInfo(self, requested): # pylint: disable=W0613 + """Retrieves information about pending requests. - return (response_msg_reader, response_msg) + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + """ + # No need to sort here, that's being done by the lock manager and query + # library. There are no priorities for requests, hence all show up as + # one item under "pending". + result = [] -class _HttpClientPendingRequest(object): - """Data class for pending requests. + if self._pending_fn: + owner_name = self._owner.getName() - """ - def __init__(self, request): - self.request = request + for client in self._pending_fn(): + req = client.GetCurrentRequest() + if req: + if req.nicename is None: + name = "%s%s" % (req.host, req.path) + else: + name = req.nicename + result.append(("rpc/%s" % name, None, [owner_name], None)) + + return result - # Thread synchronization - self.done = threading.Event() +def _ProcessCurlRequests(multi, requests): + """cURL request processor. -class HttpClientWorker(workerpool.BaseWorker): - """HTTP client worker class. + This generator yields a tuple once for every completed request, successful or + not. The first value in the tuple is the handle, the second an error message + or C{None} for successful requests. + + @type multi: C{pycurl.CurlMulti} + @param multi: cURL multi object + @type requests: sequence + @param requests: cURL request handles """ - def RunTask(self, pend_req): # pylint: disable-msg=W0221 - try: - HttpClientRequestExecutor(pend_req.request) - finally: - pend_req.done.set() + for curl in requests: + multi.add_handle(curl) + while True: + (ret, active) = multi.perform() + assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM) -class HttpClientWorkerPool(workerpool.WorkerPool): - def __init__(self, manager): - workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS, - HttpClientWorker) - self.manager = manager + if ret == pycurl.E_CALL_MULTI_PERFORM: + # cURL wants to be called again + continue + while True: + (remaining_messages, successful, failed) = multi.info_read() -class HttpClientManager(object): - """Manages HTTP requests. + for curl in successful: + multi.remove_handle(curl) + yield (curl, None) - """ - def __init__(self): - self._wpool = HttpClientWorkerPool(self) + for curl, errnum, errmsg in failed: + multi.remove_handle(curl) + yield (curl, "Error %s: %s" % (errnum, errmsg)) - def __del__(self): - self.Shutdown() + if remaining_messages == 0: + break - def ExecRequests(self, requests): - """Execute HTTP requests. + if active == 0: + # No active handles anymore + break - This function can be called from multiple threads at the same time. + # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP + # timeouts, which are only evaluated in multi.perform, aren't + # unnecessarily delayed. + multi.select(1.0) - @type requests: List of HttpClientRequest instances - @param requests: The requests to execute - @rtype: List of HttpClientRequest instances - @return: The list of requests passed in - """ - # _HttpClientPendingRequest is used for internal thread synchronization - pending = [_HttpClientPendingRequest(req) for req in requests] +def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl, + _curl_multi=pycurl.CurlMulti, + _curl_process=_ProcessCurlRequests): + """Processes any number of HTTP client requests. - try: - # Add requests to queue - for pend_req in pending: - self._wpool.AddTask(pend_req) + @type requests: list of L{HttpClientRequest} + @param requests: List of all requests + @param lock_monitor_cb: Callable for registering with lock monitor + """ + assert compat.all((req.error is None and + req.success is None and + req.resp_status_code is None and + req.resp_body is None) + for req in requests) + + # Prepare all requests + curl_to_client = \ + dict((client.GetCurlHandle(), client) + for client in map(lambda req: _StartRequest(_curl(), req), requests)) + + assert len(curl_to_client) == len(requests) + + if lock_monitor_cb: + monitor = _PendingRequestMonitor(threading.currentThread(), + curl_to_client.values) + lock_monitor_cb(monitor) + else: + monitor = _NoOpRequestMonitor + + # Process all requests and act based on the returned values + for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()): + monitor.acquire(shared=0) + try: + curl_to_client.pop(curl).Done(msg) finally: - # In case of an exception we should still wait for the rest, otherwise - # another thread from the worker pool could modify the request object - # after we returned. + monitor.release() - # And wait for them to finish - for pend_req in pending: - pend_req.done.wait() + assert not curl_to_client, "Not all requests were processed" - # Return original list - return requests + # Don't try to read information anymore as all requests have been processed + monitor.Disable() - def Shutdown(self): - self._wpool.Quiesce() - self._wpool.TerminateWorkers() + assert compat.all(req.error is not None or + (req.success and + req.resp_status_code is not None and + req.resp_body is not None) + for req in requests)