gnt-cluster: Add hv/disk state to init
[ganeti-local] / lib / http / client.py
index dc502ab..849a195 100644 (file)
 
 import logging
 import pycurl
+import threading
 from cStringIO import StringIO
 
 from ganeti import http
 from ganeti import compat
+from ganeti import netutils
+from ganeti import locking
 
 
 class HttpClientRequest(object):
   def __init__(self, host, port, method, path, headers=None, post_data=None,
-               read_timeout=None, curl_config_fn=None):
+               read_timeout=None, curl_config_fn=None, nicename=None,
+               completion_cb=None):
     """Describes an HTTP request.
 
     @type host: string
@@ -52,14 +56,17 @@ class HttpClientRequest(object):
         timeout while reading the response from the server
     @type curl_config_fn: callable
     @param curl_config_fn: Function to configure cURL object before request
-                           (Note: if the function configures the connection in
-                           a way where it wouldn't be efficient to reuse them,
-                           a "identity" property should be defined, see
-                           L{HttpClientRequest.identity})
+    @type nicename: string
+    @param nicename: Name, presentable to a user, to describe this request (no
+                     whitespace)
+    @type completion_cb: callable accepting this request object as a single
+                         parameter
+    @param completion_cb: Callback for request completion
 
     """
     assert path.startswith("/"), "Path must start with slash (/)"
     assert curl_config_fn is None or callable(curl_config_fn)
+    assert completion_cb is None or callable(completion_cb)
 
     # Request attributes
     self.host = host
@@ -68,6 +75,8 @@ class HttpClientRequest(object):
     self.path = path
     self.read_timeout = read_timeout
     self.curl_config_fn = curl_config_fn
+    self.nicename = nicename
+    self.completion_cb = completion_cb
 
     if post_data is None:
       self.post_data = ""
@@ -104,61 +113,84 @@ class HttpClientRequest(object):
     """Returns the full URL for this requests.
 
     """
+    if netutils.IPAddress.IsValid(self.host):
+      address = netutils.FormatAddress((self.host, self.port))
+    else:
+      address = "%s:%s" % (self.host, self.port)
     # TODO: Support for non-SSL requests
-    return "https://%s:%s%s" % (self.host, self.port, self.path)
+    return "https://%s%s" % (address, self.path)
 
-  @property
-  def identity(self):
-    """Returns identifier for retrieving a pooled connection for this request.
 
-    This allows cURL client objects to be re-used and to cache information
-    (e.g. SSL session IDs or connections).
+def _StartRequest(curl, req):
+  """Starts a request on a cURL object.
 
-    """
-    parts = [self.host, self.port]
+  @type curl: pycurl.Curl
+  @param curl: cURL object
+  @type req: L{HttpClientRequest}
+  @param req: HTTP request
 
-    if self.curl_config_fn:
-      try:
-        parts.append(self.curl_config_fn.identity)
-      except AttributeError:
-        pass
+  """
+  logging.debug("Starting request %r", req)
 
-    return "/".join(str(i) for i in parts)
+  url = req.url
+  method = req.method
+  post_data = req.post_data
+  headers = req.headers
 
+  # PycURL requires strings to be non-unicode
+  assert isinstance(method, str)
+  assert isinstance(url, str)
+  assert isinstance(post_data, str)
+  assert compat.all(isinstance(i, str) for i in headers)
 
-class _HttpClient(object):
-  def __init__(self, curl_config_fn):
-    """Initializes this class.
+  # Buffer for response
+  resp_buffer = StringIO()
 
-    @type curl_config_fn: callable
-    @param curl_config_fn: Function to configure cURL object after
-                           initialization
+  # Configure client for request
+  curl.setopt(pycurl.VERBOSE, False)
+  curl.setopt(pycurl.NOSIGNAL, True)
+  curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
+  curl.setopt(pycurl.PROXY, "")
+  curl.setopt(pycurl.CUSTOMREQUEST, str(method))
+  curl.setopt(pycurl.URL, url)
+  curl.setopt(pycurl.POSTFIELDS, post_data)
+  curl.setopt(pycurl.HTTPHEADER, headers)
 
-    """
-    self._req = None
+  if req.read_timeout is None:
+    curl.setopt(pycurl.TIMEOUT, 0)
+  else:
+    curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
 
-    curl = self._CreateCurlHandle()
-    curl.setopt(pycurl.VERBOSE, False)
-    curl.setopt(pycurl.NOSIGNAL, True)
-    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
-    curl.setopt(pycurl.PROXY, "")
+  # Disable SSL session ID caching (pycurl >= 7.16.0)
+  if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+    curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
 
-    # Disable SSL session ID caching (pycurl >= 7.16.0)
-    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
-      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
+  curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
 
-    # Pass cURL object to external config function
-    if curl_config_fn:
-      curl_config_fn(curl)
+  # Pass cURL object to external config function
+  if req.curl_config_fn:
+    req.curl_config_fn(curl)
 
-    self._curl = curl
+  return _PendingRequest(curl, req, resp_buffer.getvalue)
 
-  @staticmethod
-  def _CreateCurlHandle():
-    """Returns a new cURL object.
+
+class _PendingRequest:
+  def __init__(self, curl, req, resp_buffer_read):
+    """Initializes this class.
+
+    @type curl: pycurl.Curl
+    @param curl: cURL object
+    @type req: L{HttpClientRequest}
+    @param req: HTTP request
+    @type resp_buffer_read: callable
+    @param resp_buffer_read: Function to read response body
 
     """
-    return pycurl.Curl()
+    assert req.success is None
+
+    self._curl = curl
+    self._req = req
+    self._resp_buffer_read = resp_buffer_read
 
   def GetCurlHandle(self):
     """Returns the cURL object.
@@ -169,51 +201,9 @@ class _HttpClient(object):
   def GetCurrentRequest(self):
     """Returns the current request.
 
-    @rtype: L{HttpClientRequest} or None
-
     """
     return self._req
 
-  def StartRequest(self, req):
-    """Starts a request on this client.
-
-    @type req: L{HttpClientRequest}
-    @param req: HTTP request
-
-    """
-    assert not self._req, "Another request is already started"
-
-    self._req = req
-    self._resp_buffer = StringIO()
-
-    url = req.url
-    method = req.method
-    post_data = req.post_data
-    headers = req.headers
-
-    # PycURL requires strings to be non-unicode
-    assert isinstance(method, str)
-    assert isinstance(url, str)
-    assert isinstance(post_data, str)
-    assert compat.all(isinstance(i, str) for i in headers)
-
-    # Configure cURL object for request
-    curl = self._curl
-    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
-    curl.setopt(pycurl.URL, url)
-    curl.setopt(pycurl.POSTFIELDS, post_data)
-    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
-    curl.setopt(pycurl.HTTPHEADER, headers)
-
-    if req.read_timeout is None:
-      curl.setopt(pycurl.TIMEOUT, 0)
-    else:
-      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
-
-    # Pass cURL object to external config function
-    if req.curl_config_fn:
-      req.curl_config_fn(curl)
-
   def Done(self, errmsg):
     """Finishes a request.
 
@@ -221,232 +211,183 @@ class _HttpClient(object):
     @param errmsg: Error message if request failed
 
     """
+    curl = self._curl
     req = self._req
-    assert req, "No request"
 
-    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
+    assert req.success is None, "Request has already been finalized"
 
-    curl = self._curl
+    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
 
     req.success = not bool(errmsg)
     req.error = errmsg
 
     # Get HTTP response code
     req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
-    req.resp_body = self._resp_buffer.getvalue()
-
-    # Reset client object
-    self._req = None
-    self._resp_buffer = None
+    req.resp_body = self._resp_buffer_read()
 
     # Ensure no potentially large variables are referenced
     curl.setopt(pycurl.POSTFIELDS, "")
     curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
 
+    if req.completion_cb:
+      req.completion_cb(req)
 
-class _PooledHttpClient:
-  """Data structure for HTTP client pool.
-
-  """
-  def __init__(self, identity, client):
-    """Initializes this class.
-
-    @type identity: string
-    @param identity: Client identifier for pool
-    @type client: L{_HttpClient}
-    @param client: HTTP client
-
-    """
-    self.identity = identity
-    self.client = client
-    self.lastused = 0
-
-  def __repr__(self):
-    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
-              "id=%s" % self.identity,
-              "lastuse=%s" % self.lastused,
-              repr(self.client)]
 
-    return "<%s at %#x>" % (" ".join(status), id(self))
+class _NoOpRequestMonitor: # pylint: disable=W0232
+  """No-op request monitor.
 
+  """
+  @staticmethod
+  def acquire(*args, **kwargs):
+    pass
 
-class HttpClientPool:
-  """A simple HTTP client pool.
+  release = acquire
+  Disable = acquire
 
-  Supports one pooled connection per identity (see
-  L{HttpClientRequest.identity}).
 
-  """
-  #: After how many generations to drop unused clients
-  _MAX_GENERATIONS_DROP = 25
+class _PendingRequestMonitor:
+  _LOCK = "_lock"
 
-  def __init__(self, curl_config_fn):
+  def __init__(self, owner, pending_fn):
     """Initializes this class.
 
-    @type curl_config_fn: callable
-    @param curl_config_fn: Function to configure cURL object after
-                           initialization
-
     """
-    self._curl_config_fn = curl_config_fn
-    self._generation = 0
-    self._pool = {}
+    self._owner = owner
+    self._pending_fn = pending_fn
 
-  @staticmethod
-  def _GetHttpClientCreator():
-    """Returns callable to create HTTP client.
+    # The lock monitor runs in another thread, hence locking is necessary
+    self._lock = locking.SharedLock("PendingHttpRequests")
+    self.acquire = self._lock.acquire
+    self.release = self._lock.release
+
+  @locking.ssynchronized(_LOCK)
+  def Disable(self):
+    """Disable monitor.
 
     """
-    return _HttpClient
+    self._pending_fn = None
 
-  def _Get(self, identity):
-    """Gets an HTTP client from the pool.
+  @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about pending requests.
 
-    @type identity: string
-    @param identity: Client identifier
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
 
     """
-    try:
-      pclient  = self._pool.pop(identity)
-    except KeyError:
-      # Need to create new client
-      client = self._GetHttpClientCreator()(self._curl_config_fn)
-      pclient = _PooledHttpClient(identity, client)
-      logging.debug("Created new client %s", pclient)
-    else:
-      logging.debug("Reusing client %s", pclient)
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for requests, hence all show up as
+    # one item under "pending".
+    result = []
 
-    assert pclient.identity == identity
+    if self._pending_fn:
+      owner_name = self._owner.getName()
 
-    return pclient
+      for client in self._pending_fn():
+        req = client.GetCurrentRequest()
+        if req:
+          if req.nicename is None:
+            name = "%s%s" % (req.host, req.path)
+          else:
+            name = req.nicename
+          result.append(("rpc/%s" % name, None, [owner_name], None))
 
-  def _StartRequest(self, req):
-    """Starts a request.
+    return result
 
-    @type req: L{HttpClientRequest}
-    @param req: HTTP request
 
-    """
-    logging.debug("Starting request %r", req)
-    pclient = self._Get(req.identity)
+def _ProcessCurlRequests(multi, requests):
+  """cURL request processor.
 
-    assert req.identity not in self._pool
+  This generator yields a tuple once for every completed request, successful or
+  not. The first value in the tuple is the handle, the second an error message
+  or C{None} for successful requests.
 
-    pclient.client.StartRequest(req)
-    pclient.lastused = self._generation
+  @type multi: C{pycurl.CurlMulti}
+  @param multi: cURL multi object
+  @type requests: sequence
+  @param requests: cURL request handles
 
-    return pclient
-
-  def _Return(self, pclients):
-    """Returns HTTP clients to the pool.
+  """
+  for curl in requests:
+    multi.add_handle(curl)
 
-    """
-    for pc in pclients:
-      logging.debug("Returning client %s to pool", pc)
-      assert pc.identity not in self._pool
-      assert pc not in self._pool.values()
-      self._pool[pc.identity] = pc
-
-    # Check for unused clients
-    for pc in self._pool.values():
-      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
-        logging.debug("Removing client %s which hasn't been used"
-                      " for %s generations",
-                      pc, self._MAX_GENERATIONS_DROP)
-        self._pool.pop(pc.identity, None)
-
-    assert compat.all(pc.lastused >= (self._generation -
-                                      self._MAX_GENERATIONS_DROP)
-                      for pc in self._pool.values())
+  while True:
+    (ret, active) = multi.perform()
+    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
 
-  @staticmethod
-  def _CreateCurlMultiHandle():
-    """Creates new cURL multi handle.
+    if ret == pycurl.E_CALL_MULTI_PERFORM:
+      # cURL wants to be called again
+      continue
 
-    """
-    return pycurl.CurlMulti()
+    while True:
+      (remaining_messages, successful, failed) = multi.info_read()
 
-  def ProcessRequests(self, requests):
-    """Processes any number of HTTP client requests using pooled objects.
+      for curl in successful:
+        multi.remove_handle(curl)
+        yield (curl, None)
 
-    @type requests: list of L{HttpClientRequest}
-    @param requests: List of all requests
+      for curl, errnum, errmsg in failed:
+        multi.remove_handle(curl)
+        yield (curl, "Error %s: %s" % (errnum, errmsg))
 
-    """
-    multi = self._CreateCurlMultiHandle()
+      if remaining_messages == 0:
+        break
 
-    # For client cleanup
-    self._generation += 1
+    if active == 0:
+      # No active handles anymore
+      break
 
-    assert compat.all((req.error is None and
-                       req.success is None and
-                       req.resp_status_code is None and
-                       req.resp_body is None)
-                      for req in requests)
+    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
+    # timeouts, which are only evaluated in multi.perform, aren't
+    # unnecessarily delayed.
+    multi.select(1.0)
 
-    curl_to_pclient = {}
-    for req in requests:
-      pclient = self._StartRequest(req)
-      curl = pclient.client.GetCurlHandle()
-      curl_to_pclient[curl] = pclient
-      multi.add_handle(curl)
-      assert pclient.client.GetCurrentRequest() == req
-      assert pclient.lastused >= 0
 
-    assert len(curl_to_pclient) == len(requests)
+def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
+                    _curl_multi=pycurl.CurlMulti,
+                    _curl_process=_ProcessCurlRequests):
+  """Processes any number of HTTP client requests.
 
-    done_count = 0
-    while True:
-      (ret, _) = multi.perform()
-      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
-
-      if ret == pycurl.E_CALL_MULTI_PERFORM:
-        # cURL wants to be called again
-        continue
-
-      while True:
-        (remaining_messages, successful, failed) = multi.info_read()
-
-        for curl in successful:
-          multi.remove_handle(curl)
-          done_count += 1
-          pclient = curl_to_pclient[curl]
-          req = pclient.client.GetCurrentRequest()
-          pclient.client.Done(None)
-          assert req.success
-          assert not pclient.client.GetCurrentRequest()
-
-        for curl, errnum, errmsg in failed:
-          multi.remove_handle(curl)
-          done_count += 1
-          pclient = curl_to_pclient[curl]
-          req = pclient.client.GetCurrentRequest()
-          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
-          assert req.error
-          assert not pclient.client.GetCurrentRequest()
-
-        if remaining_messages == 0:
-          break
-
-      assert done_count <= len(requests)
-
-      if done_count == len(requests):
-        break
+  @type requests: list of L{HttpClientRequest}
+  @param requests: List of all requests
+  @param lock_monitor_cb: Callable for registering with lock monitor
 
-      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
-      # timeouts, which are only evaluated in multi.perform, aren't
-      # unnecessarily delayed.
-      multi.select(1.0)
+  """
+  assert compat.all((req.error is None and
+                     req.success is None and
+                     req.resp_status_code is None and
+                     req.resp_body is None)
+                    for req in requests)
+
+  # Prepare all requests
+  curl_to_client = \
+    dict((client.GetCurlHandle(), client)
+         for client in map(lambda req: _StartRequest(_curl(), req), requests))
+
+  assert len(curl_to_client) == len(requests)
+
+  if lock_monitor_cb:
+    monitor = _PendingRequestMonitor(threading.currentThread(),
+                                     curl_to_client.values)
+    lock_monitor_cb(monitor)
+  else:
+    monitor = _NoOpRequestMonitor
+
+  # Process all requests and act based on the returned values
+  for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
+    monitor.acquire(shared=0)
+    try:
+      curl_to_client.pop(curl).Done(msg)
+    finally:
+      monitor.release()
 
-    assert compat.all(pclient.client.GetCurrentRequest() is None
-                      for pclient in curl_to_pclient.values())
+  assert not curl_to_client, "Not all requests were processed"
 
-    # Return clients to pool
-    self._Return(curl_to_pclient.values())
+  # Don't try to read information anymore as all requests have been processed
+  monitor.Disable()
 
-    assert done_count == len(requests)
-    assert compat.all(req.error is not None or
-                      (req.success and
-                       req.resp_status_code is not None and
-                       req.resp_body is not None)
-                      for req in requests)
+  assert compat.all(req.error is not None or
+                    (req.success and
+                     req.resp_status_code is not None and
+                     req.resp_body is not None)
+                    for req in requests)