-
-class _PooledHttpClient:
- """Data structure for HTTP client pool.
-
- """
- def __init__(self, identity, client):
- """Initializes this class.
-
- @type identity: string
- @param identity: Client identifier for pool
- @type client: L{_HttpClient}
- @param client: HTTP client
-
- """
- self.identity = identity
- self.client = client
- self.lastused = 0
-
- def __repr__(self):
- status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
- "id=%s" % self.identity,
- "lastuse=%s" % self.lastused,
- repr(self.client)]
-
- return "<%s at %#x>" % (" ".join(status), id(self))
-
-
-class HttpClientPool:
- """A simple HTTP client pool.
-
- Supports one pooled connection per identity (see
- L{HttpClientRequest.identity}).
-
- """
- #: After how many generations to drop unused clients
- _MAX_GENERATIONS_DROP = 25
-
- def __init__(self, curl_config_fn):
- """Initializes this class.
-
- @type curl_config_fn: callable
- @param curl_config_fn: Function to configure cURL object after
- initialization
-
- """
- self._curl_config_fn = curl_config_fn
- self._generation = 0
- self._pool = {}
-
- # Create custom logger for HTTP client pool. Change logging level to
- # C{logging.NOTSET} to get more details.
- self._logger = logging.getLogger(self.__class__.__name__)
- self._logger.setLevel(logging.INFO)
-
- @staticmethod
- def _GetHttpClientCreator():
- """Returns callable to create HTTP client.
-
- """
- return _HttpClient
-
- def _Get(self, identity):
- """Gets an HTTP client from the pool.
-
- @type identity: string
- @param identity: Client identifier
-
- """
- try:
- pclient = self._pool.pop(identity)
- except KeyError:
- # Need to create new client
- client = self._GetHttpClientCreator()(self._curl_config_fn)
- pclient = _PooledHttpClient(identity, client)
- self._logger.debug("Created new client %s", pclient)
- else:
- self._logger.debug("Reusing client %s", pclient)
-
- assert pclient.identity == identity
-
- return pclient
-
- def _StartRequest(self, req):
- """Starts a request.
-
- @type req: L{HttpClientRequest}
- @param req: HTTP request
-
- """
- pclient = self._Get(req.identity)
-
- assert req.identity not in self._pool
-
- pclient.client.StartRequest(req)
- pclient.lastused = self._generation
-
- return pclient
-
- def _Return(self, pclients):
- """Returns HTTP clients to the pool.
-
- """
- assert not frozenset(pclients) & frozenset(self._pool.values())
-
- for pc in pclients:
- self._logger.debug("Returning client %s to pool", pc)
- assert pc.identity not in self._pool
- self._pool[pc.identity] = pc
-
- # Check for unused clients
- for pc in self._pool.values():
- if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
- self._logger.debug("Removing client %s which hasn't been used"
- " for %s generations",
- pc, self._MAX_GENERATIONS_DROP)
- self._pool.pop(pc.identity, None)
-
- assert compat.all(pc.lastused >= (self._generation -
- self._MAX_GENERATIONS_DROP)
- for pc in self._pool.values())
-
- @staticmethod
- def _CreateCurlMultiHandle():
- """Creates new cURL multi handle.
-
- """
- return pycurl.CurlMulti()
-
- def ProcessRequests(self, requests, lock_monitor_cb=None):
- """Processes any number of HTTP client requests using pooled objects.
-
- @type requests: list of L{HttpClientRequest}
- @param requests: List of all requests
- @param lock_monitor_cb: Callable for registering with lock monitor
-
- """
- # For client cleanup
- self._generation += 1
-
- assert compat.all((req.error is None and
- req.success is None and
- req.resp_status_code is None and
- req.resp_body is None)
- for req in requests)
-
- curl_to_pclient = {}
- for req in requests:
- pclient = self._StartRequest(req)
- curl_to_pclient[pclient.client.GetCurlHandle()] = pclient
- assert pclient.client.GetCurrentRequest() == req
- assert pclient.lastused >= 0
-
- assert len(curl_to_pclient) == len(requests)
-
- if lock_monitor_cb:
- monitor = _PendingRequestMonitor(threading.currentThread(),
- curl_to_pclient.values)
- lock_monitor_cb(monitor)
- else:
- monitor = _NoOpRequestMonitor
-
- # Process all requests and act based on the returned values
- for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
- curl_to_pclient.keys()):
- pclient = curl_to_pclient[curl]
- req = pclient.client.GetCurrentRequest()
-
- monitor.acquire(shared=0)
- try:
- pclient.client.Done(msg)
- finally:
- monitor.release()
-
- assert ((msg is None and req.success and req.error is None) ^
- (msg is not None and not req.success and req.error == msg))
-
- assert compat.all(pclient.client.GetCurrentRequest() is None
- for pclient in curl_to_pclient.values())
-
- monitor.acquire(shared=0)
- try:
- # Don't try to read information from returned clients
- monitor.Disable()
-
- # Return clients to pool
- self._Return(curl_to_pclient.values())
- finally:
- monitor.release()
-
- assert compat.all(req.error is not None or
- (req.success and
- req.resp_status_code is not None and
- req.resp_body is not None)
- for req in requests)