Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ aea5caef

History | View | Annotate | Download (15 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
import threading
28
from cStringIO import StringIO
29

    
30
from ganeti import http
31
from ganeti import compat
32
from ganeti import netutils
33
from ganeti import locking
34

    
35

    
36
class HttpClientRequest(object):
37
  def __init__(self, host, port, method, path, headers=None, post_data=None,
38
               read_timeout=None, curl_config_fn=None):
39
    """Describes an HTTP request.
40

41
    @type host: string
42
    @param host: Hostname
43
    @type port: int
44
    @param port: Port
45
    @type method: string
46
    @param method: Method name
47
    @type path: string
48
    @param path: Request path
49
    @type headers: list or None
50
    @param headers: Additional headers to send, list of strings
51
    @type post_data: string or None
52
    @param post_data: Additional data to send
53
    @type read_timeout: int
54
    @param read_timeout: if passed, it will be used as the read
55
        timeout while reading the response from the server
56
    @type curl_config_fn: callable
57
    @param curl_config_fn: Function to configure cURL object before request
58
                           (Note: if the function configures the connection in
59
                           a way where it wouldn't be efficient to reuse them,
60
                           a "identity" property should be defined, see
61
                           L{HttpClientRequest.identity})
62

63
    """
64
    assert path.startswith("/"), "Path must start with slash (/)"
65
    assert curl_config_fn is None or callable(curl_config_fn)
66

    
67
    # Request attributes
68
    self.host = host
69
    self.port = port
70
    self.method = method
71
    self.path = path
72
    self.read_timeout = read_timeout
73
    self.curl_config_fn = curl_config_fn
74

    
75
    if post_data is None:
76
      self.post_data = ""
77
    else:
78
      self.post_data = post_data
79

    
80
    if headers is None:
81
      self.headers = []
82
    elif isinstance(headers, dict):
83
      # Support for old interface
84
      self.headers = ["%s: %s" % (name, value)
85
                      for name, value in headers.items()]
86
    else:
87
      self.headers = headers
88

    
89
    # Response status
90
    self.success = None
91
    self.error = None
92

    
93
    # Response attributes
94
    self.resp_status_code = None
95
    self.resp_body = None
96

    
97
  def __repr__(self):
98
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
99
              "%s:%s" % (self.host, self.port),
100
              self.method,
101
              self.path]
102

    
103
    return "<%s at %#x>" % (" ".join(status), id(self))
104

    
105
  @property
106
  def url(self):
107
    """Returns the full URL for this requests.
108

109
    """
110
    if netutils.IPAddress.IsValid(self.host):
111
      address = netutils.FormatAddress((self.host, self.port))
112
    else:
113
      address = "%s:%s" % (self.host, self.port)
114
    # TODO: Support for non-SSL requests
115
    return "https://%s%s" % (address, self.path)
116

    
117
  @property
118
  def identity(self):
119
    """Returns identifier for retrieving a pooled connection for this request.
120

121
    This allows cURL client objects to be re-used and to cache information
122
    (e.g. SSL session IDs or connections).
123

124
    """
125
    parts = [self.host, self.port]
126

    
127
    if self.curl_config_fn:
128
      try:
129
        parts.append(self.curl_config_fn.identity)
130
      except AttributeError:
131
        pass
132

    
133
    return "/".join(str(i) for i in parts)
134

    
135

    
136
class _HttpClient(object):
137
  def __init__(self, curl_config_fn):
138
    """Initializes this class.
139

140
    @type curl_config_fn: callable
141
    @param curl_config_fn: Function to configure cURL object after
142
                           initialization
143

144
    """
145
    self._req = None
146

    
147
    curl = self._CreateCurlHandle()
148
    curl.setopt(pycurl.VERBOSE, False)
149
    curl.setopt(pycurl.NOSIGNAL, True)
150
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
151
    curl.setopt(pycurl.PROXY, "")
152

    
153
    # Disable SSL session ID caching (pycurl >= 7.16.0)
154
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
155
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
156

    
157
    # Pass cURL object to external config function
158
    if curl_config_fn:
159
      curl_config_fn(curl)
160

    
161
    self._curl = curl
162

    
163
  @staticmethod
164
  def _CreateCurlHandle():
165
    """Returns a new cURL object.
166

167
    """
168
    return pycurl.Curl()
169

    
170
  def GetCurlHandle(self):
171
    """Returns the cURL object.
172

173
    """
174
    return self._curl
175

    
176
  def GetCurrentRequest(self):
177
    """Returns the current request.
178

179
    @rtype: L{HttpClientRequest} or None
180

181
    """
182
    return self._req
183

    
184
  def StartRequest(self, req):
185
    """Starts a request on this client.
186

187
    @type req: L{HttpClientRequest}
188
    @param req: HTTP request
189

190
    """
191
    assert not self._req, "Another request is already started"
192

    
193
    logging.debug("Starting request %r", req)
194

    
195
    self._req = req
196
    self._resp_buffer = StringIO()
197

    
198
    url = req.url
199
    method = req.method
200
    post_data = req.post_data
201
    headers = req.headers
202

    
203
    # PycURL requires strings to be non-unicode
204
    assert isinstance(method, str)
205
    assert isinstance(url, str)
206
    assert isinstance(post_data, str)
207
    assert compat.all(isinstance(i, str) for i in headers)
208

    
209
    # Configure cURL object for request
210
    curl = self._curl
211
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
212
    curl.setopt(pycurl.URL, url)
213
    curl.setopt(pycurl.POSTFIELDS, post_data)
214
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
215
    curl.setopt(pycurl.HTTPHEADER, headers)
216

    
217
    if req.read_timeout is None:
218
      curl.setopt(pycurl.TIMEOUT, 0)
219
    else:
220
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
221

    
222
    # Pass cURL object to external config function
223
    if req.curl_config_fn:
224
      req.curl_config_fn(curl)
225

    
226
  def Done(self, errmsg):
227
    """Finishes a request.
228

229
    @type errmsg: string or None
230
    @param errmsg: Error message if request failed
231

232
    """
233
    req = self._req
234
    assert req, "No request"
235

    
236
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
237

    
238
    curl = self._curl
239

    
240
    req.success = not bool(errmsg)
241
    req.error = errmsg
242

    
243
    # Get HTTP response code
244
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
245
    req.resp_body = self._resp_buffer.getvalue()
246

    
247
    # Reset client object
248
    self._req = None
249
    self._resp_buffer = None
250

    
251
    # Ensure no potentially large variables are referenced
252
    curl.setopt(pycurl.POSTFIELDS, "")
253
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
254

    
255

    
256
class _PooledHttpClient:
257
  """Data structure for HTTP client pool.
258

259
  """
260
  def __init__(self, identity, client):
261
    """Initializes this class.
262

263
    @type identity: string
264
    @param identity: Client identifier for pool
265
    @type client: L{_HttpClient}
266
    @param client: HTTP client
267

268
    """
269
    self.identity = identity
270
    self.client = client
271
    self.lastused = 0
272

    
273
  def __repr__(self):
274
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
275
              "id=%s" % self.identity,
276
              "lastuse=%s" % self.lastused,
277
              repr(self.client)]
278

    
279
    return "<%s at %#x>" % (" ".join(status), id(self))
280

    
281

    
282
class HttpClientPool:
283
  """A simple HTTP client pool.
284

285
  Supports one pooled connection per identity (see
286
  L{HttpClientRequest.identity}).
287

288
  """
289
  #: After how many generations to drop unused clients
290
  _MAX_GENERATIONS_DROP = 25
291

    
292
  def __init__(self, curl_config_fn):
293
    """Initializes this class.
294

295
    @type curl_config_fn: callable
296
    @param curl_config_fn: Function to configure cURL object after
297
                           initialization
298

299
    """
300
    self._curl_config_fn = curl_config_fn
301
    self._generation = 0
302
    self._pool = {}
303

    
304
    # Create custom logger for HTTP client pool. Change logging level to
305
    # C{logging.NOTSET} to get more details.
306
    self._logger = logging.getLogger(self.__class__.__name__)
307
    self._logger.setLevel(logging.INFO)
308

    
309
  @staticmethod
310
  def _GetHttpClientCreator():
311
    """Returns callable to create HTTP client.
312

313
    """
314
    return _HttpClient
315

    
316
  def _Get(self, identity):
317
    """Gets an HTTP client from the pool.
318

319
    @type identity: string
320
    @param identity: Client identifier
321

322
    """
323
    try:
324
      pclient = self._pool.pop(identity)
325
    except KeyError:
326
      # Need to create new client
327
      client = self._GetHttpClientCreator()(self._curl_config_fn)
328
      pclient = _PooledHttpClient(identity, client)
329
      self._logger.debug("Created new client %s", pclient)
330
    else:
331
      self._logger.debug("Reusing client %s", pclient)
332

    
333
    assert pclient.identity == identity
334

    
335
    return pclient
336

    
337
  def _StartRequest(self, req):
338
    """Starts a request.
339

340
    @type req: L{HttpClientRequest}
341
    @param req: HTTP request
342

343
    """
344
    pclient = self._Get(req.identity)
345

    
346
    assert req.identity not in self._pool
347

    
348
    pclient.client.StartRequest(req)
349
    pclient.lastused = self._generation
350

    
351
    return pclient
352

    
353
  def _Return(self, pclients):
354
    """Returns HTTP clients to the pool.
355

356
    """
357
    assert not frozenset(pclients) & frozenset(self._pool.values())
358

    
359
    for pc in pclients:
360
      self._logger.debug("Returning client %s to pool", pc)
361
      assert pc.identity not in self._pool
362
      self._pool[pc.identity] = pc
363

    
364
    # Check for unused clients
365
    for pc in self._pool.values():
366
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
367
        self._logger.debug("Removing client %s which hasn't been used"
368
                           " for %s generations",
369
                           pc, self._MAX_GENERATIONS_DROP)
370
        self._pool.pop(pc.identity, None)
371

    
372
    assert compat.all(pc.lastused >= (self._generation -
373
                                      self._MAX_GENERATIONS_DROP)
374
                      for pc in self._pool.values())
375

    
376
  @staticmethod
377
  def _CreateCurlMultiHandle():
378
    """Creates new cURL multi handle.
379

380
    """
381
    return pycurl.CurlMulti()
382

    
383
  def ProcessRequests(self, requests, lock_monitor_cb=None):
384
    """Processes any number of HTTP client requests using pooled objects.
385

386
    @type requests: list of L{HttpClientRequest}
387
    @param requests: List of all requests
388
    @param lock_monitor_cb: Callable for registering with lock monitor
389

390
    """
391
    # For client cleanup
392
    self._generation += 1
393

    
394
    assert compat.all((req.error is None and
395
                       req.success is None and
396
                       req.resp_status_code is None and
397
                       req.resp_body is None)
398
                      for req in requests)
399

    
400
    curl_to_pclient = {}
401
    for req in requests:
402
      pclient = self._StartRequest(req)
403
      curl_to_pclient[pclient.client.GetCurlHandle()] = pclient
404
      assert pclient.client.GetCurrentRequest() == req
405
      assert pclient.lastused >= 0
406

    
407
    assert len(curl_to_pclient) == len(requests)
408

    
409
    if lock_monitor_cb:
410
      monitor = _PendingRequestMonitor(threading.currentThread(),
411
                                       curl_to_pclient.values)
412
      lock_monitor_cb(monitor)
413
    else:
414
      monitor = _NoOpRequestMonitor
415

    
416
    # Process all requests and act based on the returned values
417
    for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
418
                                            curl_to_pclient.keys()):
419
      pclient = curl_to_pclient[curl]
420
      req = pclient.client.GetCurrentRequest()
421

    
422
      monitor.acquire(shared=0)
423
      try:
424
        pclient.client.Done(msg)
425
      finally:
426
        monitor.release()
427

    
428
      assert ((msg is None and req.success and req.error is None) ^
429
              (msg is not None and not req.success and req.error == msg))
430

    
431
    assert compat.all(pclient.client.GetCurrentRequest() is None
432
                      for pclient in curl_to_pclient.values())
433

    
434
    monitor.acquire(shared=0)
435
    try:
436
      # Don't try to read information from returned clients
437
      monitor.Disable()
438

    
439
      # Return clients to pool
440
      self._Return(curl_to_pclient.values())
441
    finally:
442
      monitor.release()
443

    
444
    assert compat.all(req.error is not None or
445
                      (req.success and
446
                       req.resp_status_code is not None and
447
                       req.resp_body is not None)
448
                      for req in requests)
449

    
450

    
451
class _NoOpRequestMonitor: # pylint: disable=W0232
452
  """No-op request monitor.
453

454
  """
455
  @staticmethod
456
  def acquire(*args, **kwargs):
457
    pass
458

    
459
  release = acquire
460
  Disable = acquire
461

    
462

    
463
class _PendingRequestMonitor:
464
  _LOCK = "_lock"
465

    
466
  def __init__(self, owner, pending_fn):
467
    """Initializes this class.
468

469
    """
470
    self._owner = owner
471
    self._pending_fn = pending_fn
472

    
473
    # The lock monitor runs in another thread, hence locking is necessary
474
    self._lock = locking.SharedLock("PendingHttpRequests")
475
    self.acquire = self._lock.acquire
476
    self.release = self._lock.release
477

    
478
  def Disable(self):
479
    """Disable monitor.
480

481
    """
482
    self._pending_fn = None
483

    
484
  @locking.ssynchronized(_LOCK, shared=1)
485
  def GetLockInfo(self, requested): # pylint: disable=W0613
486
    """Retrieves information about pending requests.
487

488
    @type requested: set
489
    @param requested: Requested information, see C{query.LQ_*}
490

491
    """
492
    # No need to sort here, that's being done by the lock manager and query
493
    # library. There are no priorities for requests, hence all show up as
494
    # one item under "pending".
495
    result = []
496

    
497
    if self._pending_fn:
498
      owner_name = self._owner.getName()
499

    
500
      for pclient in self._pending_fn():
501
        req = pclient.client.GetCurrentRequest()
502
        if req:
503
          result.append(("rpc/%s%s" % (req.host, req.path), None, None,
504
                         [("thread", [owner_name])]))
505

    
506
    return result
507

    
508

    
509
def _ProcessCurlRequests(multi, requests):
510
  """cURL request processor.
511

512
  This generator yields a tuple once for every completed request, successful or
513
  not. The first value in the tuple is the handle, the second an error message
514
  or C{None} for successful requests.
515

516
  @type multi: C{pycurl.CurlMulti}
517
  @param multi: cURL multi object
518
  @type requests: sequence
519
  @param requests: cURL request handles
520

521
  """
522
  for curl in requests:
523
    multi.add_handle(curl)
524

    
525
  while True:
526
    (ret, active) = multi.perform()
527
    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
528

    
529
    if ret == pycurl.E_CALL_MULTI_PERFORM:
530
      # cURL wants to be called again
531
      continue
532

    
533
    while True:
534
      (remaining_messages, successful, failed) = multi.info_read()
535

    
536
      for curl in successful:
537
        multi.remove_handle(curl)
538
        yield (curl, None)
539

    
540
      for curl, errnum, errmsg in failed:
541
        multi.remove_handle(curl)
542
        yield (curl, "Error %s: %s" % (errnum, errmsg))
543

    
544
      if remaining_messages == 0:
545
        break
546

    
547
    if active == 0:
548
      # No active handles anymore
549
      break
550

    
551
    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
552
    # timeouts, which are only evaluated in multi.perform, aren't
553
    # unnecessarily delayed.
554
    multi.select(1.0)