Revision abbf2cd9 lib/http/client.py

b/lib/http/client.py
55 55
        timeout while reading the response from the server
56 56
    @type curl_config_fn: callable
57 57
    @param curl_config_fn: Function to configure cURL object before request
58
                           (Note: if the function configures the connection in
59
                           a way where it wouldn't be efficient to reuse them,
60
                           an "identity" property should be defined, see
61
                           L{HttpClientRequest.identity})
62 58
    @type nicename: string
63 59
    @param nicename: Name, presentable to a user, to describe this request (no
64 60
                     whitespace)
......
118 114
    # TODO: Support for non-SSL requests
119 115
    return "https://%s%s" % (address, self.path)
120 116

  
121
  @property
122
  def identity(self):
123
    """Returns identifier for retrieving a pooled connection for this request.
124 117

  
125
    This allows cURL client objects to be re-used and to cache information
126
    (e.g. SSL session IDs or connections).
118
def _StartRequest(curl, req):
119
  """Starts a request on a cURL object.
127 120

  
128
    """
129
    parts = [self.host, self.port]
121
  @type curl: pycurl.Curl
122
  @param curl: cURL object
123
  @type req: L{HttpClientRequest}
124
  @param req: HTTP request
130 125

  
131
    if self.curl_config_fn:
132
      try:
133
        parts.append(self.curl_config_fn.identity)
134
      except AttributeError:
135
        pass
126
  """
127
  logging.debug("Starting request %r", req)
136 128

  
137
    return "/".join(str(i) for i in parts)
129
  url = req.url
130
  method = req.method
131
  post_data = req.post_data
132
  headers = req.headers
138 133

  
134
  # PycURL requires strings to be non-unicode
135
  assert isinstance(method, str)
136
  assert isinstance(url, str)
137
  assert isinstance(post_data, str)
138
  assert compat.all(isinstance(i, str) for i in headers)
139 139

  
140
class _HttpClient(object):
141
  def __init__(self, curl_config_fn):
142
    """Initializes this class.
140
  # Buffer for response
141
  resp_buffer = StringIO()
143 142

  
144
    @type curl_config_fn: callable
145
    @param curl_config_fn: Function to configure cURL object after
146
                           initialization
143
  # Configure client for request
144
  curl.setopt(pycurl.VERBOSE, False)
145
  curl.setopt(pycurl.NOSIGNAL, True)
146
  curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
147
  curl.setopt(pycurl.PROXY, "")
148
  curl.setopt(pycurl.CUSTOMREQUEST, str(method))
149
  curl.setopt(pycurl.URL, url)
150
  curl.setopt(pycurl.POSTFIELDS, post_data)
151
  curl.setopt(pycurl.HTTPHEADER, headers)
147 152

  
148
    """
149
    self._req = None
153
  if req.read_timeout is None:
154
    curl.setopt(pycurl.TIMEOUT, 0)
155
  else:
156
    curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
150 157

  
151
    curl = self._CreateCurlHandle()
152
    curl.setopt(pycurl.VERBOSE, False)
153
    curl.setopt(pycurl.NOSIGNAL, True)
154
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
155
    curl.setopt(pycurl.PROXY, "")
158
  # Disable SSL session ID caching (pycurl >= 7.16.0)
159
  if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
160
    curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
156 161

  
157
    # Disable SSL session ID caching (pycurl >= 7.16.0)
158
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
159
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
162
  curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
160 163

  
161
    # Pass cURL object to external config function
162
    if curl_config_fn:
163
      curl_config_fn(curl)
164
  # Pass cURL object to external config function
165
  if req.curl_config_fn:
166
    req.curl_config_fn(curl)
164 167

  
165
    self._curl = curl
168
  return _PendingRequest(curl, req, resp_buffer.getvalue)
166 169

  
167
  @staticmethod
168
  def _CreateCurlHandle():
169
    """Returns a new cURL object.
170

  
171
class _PendingRequest:
172
  def __init__(self, curl, req, resp_buffer_read):
173
    """Initializes this class.
174

  
175
    @type curl: pycurl.Curl
176
    @param curl: cURL object
177
    @type req: L{HttpClientRequest}
178
    @param req: HTTP request
179
    @type resp_buffer_read: callable
180
    @param resp_buffer_read: Function to read response body
170 181

  
171 182
    """
172
    return pycurl.Curl()
183
    assert req.success is None
184

  
185
    self._curl = curl
186
    self._req = req
187
    self._resp_buffer_read = resp_buffer_read
173 188

  
174 189
  def GetCurlHandle(self):
175 190
    """Returns the cURL object.
......
180 195
  def GetCurrentRequest(self):
181 196
    """Returns the current request.
182 197

  
183
    @rtype: L{HttpClientRequest} or None
184

  
185 198
    """
186 199
    return self._req
187 200

  
188
  def StartRequest(self, req):
189
    """Starts a request on this client.
190

  
191
    @type req: L{HttpClientRequest}
192
    @param req: HTTP request
193

  
194
    """
195
    assert not self._req, "Another request is already started"
196

  
197
    logging.debug("Starting request %r", req)
198

  
199
    self._req = req
200
    self._resp_buffer = StringIO()
201

  
202
    url = req.url
203
    method = req.method
204
    post_data = req.post_data
205
    headers = req.headers
206

  
207
    # PycURL requires strings to be non-unicode
208
    assert isinstance(method, str)
209
    assert isinstance(url, str)
210
    assert isinstance(post_data, str)
211
    assert compat.all(isinstance(i, str) for i in headers)
212

  
213
    # Configure cURL object for request
214
    curl = self._curl
215
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
216
    curl.setopt(pycurl.URL, url)
217
    curl.setopt(pycurl.POSTFIELDS, post_data)
218
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
219
    curl.setopt(pycurl.HTTPHEADER, headers)
220

  
221
    if req.read_timeout is None:
222
      curl.setopt(pycurl.TIMEOUT, 0)
223
    else:
224
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
225

  
226
    # Pass cURL object to external config function
227
    if req.curl_config_fn:
228
      req.curl_config_fn(curl)
229

  
230 201
  def Done(self, errmsg):
231 202
    """Finishes a request.
232 203

  
......
234 205
    @param errmsg: Error message if request failed
235 206

  
236 207
    """
208
    curl = self._curl
237 209
    req = self._req
238
    assert req, "No request"
239 210

  
240
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
211
    assert req.success is None, "Request has already been finalized"
241 212

  
242
    curl = self._curl
213
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
243 214

  
244 215
    req.success = not bool(errmsg)
245 216
    req.error = errmsg
246 217

  
247 218
    # Get HTTP response code
248 219
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
249
    req.resp_body = self._resp_buffer.getvalue()
250

  
251
    # Reset client object
252
    self._req = None
253
    self._resp_buffer = None
220
    req.resp_body = self._resp_buffer_read()
254 221

  
255 222
    # Ensure no potentially large variables are referenced
256
    curl.setopt(pycurl.POSTFIELDS, "")
257
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
258

  
259

  
260
class _PooledHttpClient:
261
  """Data structure for HTTP client pool.
262

  
263
  """
264
  def __init__(self, identity, client):
265
    """Initializes this class.
266

  
267
    @type identity: string
268
    @param identity: Client identifier for pool
269
    @type client: L{_HttpClient}
270
    @param client: HTTP client
271

  
272
    """
273
    self.identity = identity
274
    self.client = client
275
    self.lastused = 0
276

  
277
  def __repr__(self):
278
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
279
              "id=%s" % self.identity,
280
              "lastuse=%s" % self.lastused,
281
              repr(self.client)]
282

  
283
    return "<%s at %#x>" % (" ".join(status), id(self))
284

  
285

  
286
class HttpClientPool:
287
  """A simple HTTP client pool.
288

  
289
  Supports one pooled connection per identity (see
290
  L{HttpClientRequest.identity}).
291

  
292
  """
293
  #: After how many generations to drop unused clients
294
  _MAX_GENERATIONS_DROP = 25
295

  
296
  def __init__(self, curl_config_fn):
297
    """Initializes this class.
298

  
299
    @type curl_config_fn: callable
300
    @param curl_config_fn: Function to configure cURL object after
301
                           initialization
302

  
303
    """
304
    self._curl_config_fn = curl_config_fn
305
    self._generation = 0
306
    self._pool = {}
307

  
308
    # Create custom logger for HTTP client pool. Change logging level to
309
    # C{logging.NOTSET} to get more details.
310
    self._logger = logging.getLogger(self.__class__.__name__)
311
    self._logger.setLevel(logging.INFO)
312

  
313
  @staticmethod
314
  def _GetHttpClientCreator():
315
    """Returns callable to create HTTP client.
316

  
317
    """
318
    return _HttpClient
319

  
320
  def _Get(self, identity):
321
    """Gets an HTTP client from the pool.
322

  
323
    @type identity: string
324
    @param identity: Client identifier
325

  
326
    """
327 223
    try:
328
      pclient = self._pool.pop(identity)
329
    except KeyError:
330
      # Need to create new client
331
      client = self._GetHttpClientCreator()(self._curl_config_fn)
332
      pclient = _PooledHttpClient(identity, client)
333
      self._logger.debug("Created new client %s", pclient)
224
      # Only available in PycURL 7.19.0 and above
225
      reset_fn = curl.reset
226
    except AttributeError:
227
      curl.setopt(pycurl.POSTFIELDS, "")
228
      curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
334 229
    else:
335
      self._logger.debug("Reusing client %s", pclient)
336

  
337
    assert pclient.identity == identity
338

  
339
    return pclient
340

  
341
  def _StartRequest(self, req):
342
    """Starts a request.
343

  
344
    @type req: L{HttpClientRequest}
345
    @param req: HTTP request
346

  
347
    """
348
    pclient = self._Get(req.identity)
349

  
350
    assert req.identity not in self._pool
351

  
352
    pclient.client.StartRequest(req)
353
    pclient.lastused = self._generation
354

  
355
    return pclient
356

  
357
  def _Return(self, pclients):
358
    """Returns HTTP clients to the pool.
359

  
360
    """
361
    assert not frozenset(pclients) & frozenset(self._pool.values())
362

  
363
    for pc in pclients:
364
      self._logger.debug("Returning client %s to pool", pc)
365
      assert pc.identity not in self._pool
366
      self._pool[pc.identity] = pc
367

  
368
    # Check for unused clients
369
    for pc in self._pool.values():
370
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
371
        self._logger.debug("Removing client %s which hasn't been used"
372
                           " for %s generations",
373
                           pc, self._MAX_GENERATIONS_DROP)
374
        self._pool.pop(pc.identity, None)
375

  
376
    assert compat.all(pc.lastused >= (self._generation -
377
                                      self._MAX_GENERATIONS_DROP)
378
                      for pc in self._pool.values())
379

  
380
  @staticmethod
381
  def _CreateCurlMultiHandle():
382
    """Creates new cURL multi handle.
383

  
384
    """
385
    return pycurl.CurlMulti()
386

  
387
  def ProcessRequests(self, requests, lock_monitor_cb=None):
388
    """Processes any number of HTTP client requests using pooled objects.
389

  
390
    @type requests: list of L{HttpClientRequest}
391
    @param requests: List of all requests
392
    @param lock_monitor_cb: Callable for registering with lock monitor
393

  
394
    """
395
    # For client cleanup
396
    self._generation += 1
397

  
398
    assert compat.all((req.error is None and
399
                       req.success is None and
400
                       req.resp_status_code is None and
401
                       req.resp_body is None)
402
                      for req in requests)
403

  
404
    curl_to_pclient = {}
405
    for req in requests:
406
      pclient = self._StartRequest(req)
407
      curl_to_pclient[pclient.client.GetCurlHandle()] = pclient
408
      assert pclient.client.GetCurrentRequest() == req
409
      assert pclient.lastused >= 0
410

  
411
    assert len(curl_to_pclient) == len(requests)
412

  
413
    if lock_monitor_cb:
414
      monitor = _PendingRequestMonitor(threading.currentThread(),
415
                                       curl_to_pclient.values)
416
      lock_monitor_cb(monitor)
417
    else:
418
      monitor = _NoOpRequestMonitor
419

  
420
    # Process all requests and act based on the returned values
421
    for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
422
                                            curl_to_pclient.keys()):
423
      pclient = curl_to_pclient[curl]
424
      req = pclient.client.GetCurrentRequest()
425

  
426
      monitor.acquire(shared=0)
427
      try:
428
        pclient.client.Done(msg)
429
      finally:
430
        monitor.release()
431

  
432
      assert ((msg is None and req.success and req.error is None) ^
433
              (msg is not None and not req.success and req.error == msg))
434

  
435
    assert compat.all(pclient.client.GetCurrentRequest() is None
436
                      for pclient in curl_to_pclient.values())
437

  
438
    monitor.acquire(shared=0)
439
    try:
440
      # Don't try to read information from returned clients
441
      monitor.Disable()
442

  
443
      # Return clients to pool
444
      self._Return(curl_to_pclient.values())
445
    finally:
446
      monitor.release()
447

  
448
    assert compat.all(req.error is not None or
449
                      (req.success and
450
                       req.resp_status_code is not None and
451
                       req.resp_body is not None)
452
                      for req in requests)
230
      reset_fn()
453 231

  
454 232

  
455 233
class _NoOpRequestMonitor: # pylint: disable=W0232
......
479 257
    self.acquire = self._lock.acquire
480 258
    self.release = self._lock.release
481 259

  
260
  @locking.ssynchronized(_LOCK)
482 261
  def Disable(self):
483 262
    """Disable monitor.
484 263

  
......
501 280
    if self._pending_fn:
502 281
      owner_name = self._owner.getName()
503 282

  
504
      for pclient in self._pending_fn():
505
        req = pclient.client.GetCurrentRequest()
283
      for client in self._pending_fn():
284
        req = client.GetCurrentRequest()
506 285
        if req:
507 286
          if req.nicename is None:
508 287
            name = "%s%s" % (req.host, req.path)
......
559 338
    # timeouts, which are only evaluated in multi.perform, aren't
560 339
    # unnecessarily delayed.
561 340
    multi.select(1.0)
341

  
342

  
343
def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
344
                    _curl_multi=pycurl.CurlMulti,
345
                    _curl_process=_ProcessCurlRequests):
346
  """Processes any number of HTTP client requests.
347

  
348
  @type requests: list of L{HttpClientRequest}
349
  @param requests: List of all requests
350
  @param lock_monitor_cb: Callable for registering with lock monitor
351

  
352
  """
353
  assert compat.all((req.error is None and
354
                     req.success is None and
355
                     req.resp_status_code is None and
356
                     req.resp_body is None)
357
                    for req in requests)
358

  
359
  # Prepare all requests
360
  curl_to_client = \
361
    dict((client.GetCurlHandle(), client)
362
         for client in map(lambda req: _StartRequest(_curl(), req), requests))
363

  
364
  assert len(curl_to_client) == len(requests)
365

  
366
  if lock_monitor_cb:
367
    monitor = _PendingRequestMonitor(threading.currentThread(),
368
                                     curl_to_client.values)
369
    lock_monitor_cb(monitor)
370
  else:
371
    monitor = _NoOpRequestMonitor
372

  
373
  # Process all requests and act based on the returned values
374
  for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
375
    monitor.acquire(shared=0)
376
    try:
377
      curl_to_client.pop(curl).Done(msg)
378
    finally:
379
      monitor.release()
380

  
381
  assert not curl_to_client, "Not all requests were processed"
382

  
383
  # Don't try to read information anymore as all requests have been processed
384
  monitor.Disable()
385

  
386
  assert compat.all(req.error is not None or
387
                    (req.success and
388
                     req.resp_status_code is not None and
389
                     req.resp_body is not None)
390
                    for req in requests)

Also available in: Unified diff