Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 90b2eeb0

History | View | Annotate | Download (15.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
import threading
28
from cStringIO import StringIO
29

    
30
from ganeti import http
31
from ganeti import compat
32
from ganeti import netutils
33
from ganeti import locking
34

    
35

    
36
class HttpClientRequest(object):
37
  def __init__(self, host, port, method, path, headers=None, post_data=None,
38
               read_timeout=None, curl_config_fn=None, nicename=None):
39
    """Describes an HTTP request.
40

41
    @type host: string
42
    @param host: Hostname
43
    @type port: int
44
    @param port: Port
45
    @type method: string
46
    @param method: Method name
47
    @type path: string
48
    @param path: Request path
49
    @type headers: list or None
50
    @param headers: Additional headers to send, list of strings
51
    @type post_data: string or None
52
    @param post_data: Additional data to send
53
    @type read_timeout: int
54
    @param read_timeout: if passed, it will be used as the read
55
        timeout while reading the response from the server
56
    @type curl_config_fn: callable
57
    @param curl_config_fn: Function to configure cURL object before request
58
                           (Note: if the function configures the connection in
59
                           a way where it wouldn't be efficient to reuse them,
60
                           an "identity" property should be defined, see
61
                           L{HttpClientRequest.identity})
62
    @type nicename: string
63
    @param nicename: Name, presentable to a user, to describe this request (no
64
                     whitespace)
65

66
    """
67
    assert path.startswith("/"), "Path must start with slash (/)"
68
    assert curl_config_fn is None or callable(curl_config_fn)
69

    
70
    # Request attributes
71
    self.host = host
72
    self.port = port
73
    self.method = method
74
    self.path = path
75
    self.read_timeout = read_timeout
76
    self.curl_config_fn = curl_config_fn
77
    self.nicename = nicename
78

    
79
    if post_data is None:
80
      self.post_data = ""
81
    else:
82
      self.post_data = post_data
83

    
84
    if headers is None:
85
      self.headers = []
86
    elif isinstance(headers, dict):
87
      # Support for old interface
88
      self.headers = ["%s: %s" % (name, value)
89
                      for name, value in headers.items()]
90
    else:
91
      self.headers = headers
92

    
93
    # Response status
94
    self.success = None
95
    self.error = None
96

    
97
    # Response attributes
98
    self.resp_status_code = None
99
    self.resp_body = None
100

    
101
  def __repr__(self):
102
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
103
              "%s:%s" % (self.host, self.port),
104
              self.method,
105
              self.path]
106

    
107
    return "<%s at %#x>" % (" ".join(status), id(self))
108

    
109
  @property
110
  def url(self):
111
    """Returns the full URL for this requests.
112

113
    """
114
    if netutils.IPAddress.IsValid(self.host):
115
      address = netutils.FormatAddress((self.host, self.port))
116
    else:
117
      address = "%s:%s" % (self.host, self.port)
118
    # TODO: Support for non-SSL requests
119
    return "https://%s%s" % (address, self.path)
120

    
121
  @property
122
  def identity(self):
123
    """Returns identifier for retrieving a pooled connection for this request.
124

125
    This allows cURL client objects to be re-used and to cache information
126
    (e.g. SSL session IDs or connections).
127

128
    """
129
    parts = [self.host, self.port]
130

    
131
    if self.curl_config_fn:
132
      try:
133
        parts.append(self.curl_config_fn.identity)
134
      except AttributeError:
135
        pass
136

    
137
    return "/".join(str(i) for i in parts)
138

    
139

    
140
class _HttpClient(object):
141
  def __init__(self, curl_config_fn):
142
    """Initializes this class.
143

144
    @type curl_config_fn: callable
145
    @param curl_config_fn: Function to configure cURL object after
146
                           initialization
147

148
    """
149
    self._req = None
150

    
151
    curl = self._CreateCurlHandle()
152
    curl.setopt(pycurl.VERBOSE, False)
153
    curl.setopt(pycurl.NOSIGNAL, True)
154
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
155
    curl.setopt(pycurl.PROXY, "")
156

    
157
    # Disable SSL session ID caching (pycurl >= 7.16.0)
158
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
159
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
160

    
161
    # Pass cURL object to external config function
162
    if curl_config_fn:
163
      curl_config_fn(curl)
164

    
165
    self._curl = curl
166

    
167
  @staticmethod
168
  def _CreateCurlHandle():
169
    """Returns a new cURL object.
170

171
    """
172
    return pycurl.Curl()
173

    
174
  def GetCurlHandle(self):
175
    """Returns the cURL object.
176

177
    """
178
    return self._curl
179

    
180
  def GetCurrentRequest(self):
181
    """Returns the current request.
182

183
    @rtype: L{HttpClientRequest} or None
184

185
    """
186
    return self._req
187

    
188
  def StartRequest(self, req):
189
    """Starts a request on this client.
190

191
    @type req: L{HttpClientRequest}
192
    @param req: HTTP request
193

194
    """
195
    assert not self._req, "Another request is already started"
196

    
197
    logging.debug("Starting request %r", req)
198

    
199
    self._req = req
200
    self._resp_buffer = StringIO()
201

    
202
    url = req.url
203
    method = req.method
204
    post_data = req.post_data
205
    headers = req.headers
206

    
207
    # PycURL requires strings to be non-unicode
208
    assert isinstance(method, str)
209
    assert isinstance(url, str)
210
    assert isinstance(post_data, str)
211
    assert compat.all(isinstance(i, str) for i in headers)
212

    
213
    # Configure cURL object for request
214
    curl = self._curl
215
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
216
    curl.setopt(pycurl.URL, url)
217
    curl.setopt(pycurl.POSTFIELDS, post_data)
218
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
219
    curl.setopt(pycurl.HTTPHEADER, headers)
220

    
221
    if req.read_timeout is None:
222
      curl.setopt(pycurl.TIMEOUT, 0)
223
    else:
224
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
225

    
226
    # Pass cURL object to external config function
227
    if req.curl_config_fn:
228
      req.curl_config_fn(curl)
229

    
230
  def Done(self, errmsg):
231
    """Finishes a request.
232

233
    @type errmsg: string or None
234
    @param errmsg: Error message if request failed
235

236
    """
237
    req = self._req
238
    assert req, "No request"
239

    
240
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
241

    
242
    curl = self._curl
243

    
244
    req.success = not bool(errmsg)
245
    req.error = errmsg
246

    
247
    # Get HTTP response code
248
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
249
    req.resp_body = self._resp_buffer.getvalue()
250

    
251
    # Reset client object
252
    self._req = None
253
    self._resp_buffer = None
254

    
255
    # Ensure no potentially large variables are referenced
256
    curl.setopt(pycurl.POSTFIELDS, "")
257
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
258

    
259

    
260
class _PooledHttpClient:
261
  """Data structure for HTTP client pool.
262

263
  """
264
  def __init__(self, identity, client):
265
    """Initializes this class.
266

267
    @type identity: string
268
    @param identity: Client identifier for pool
269
    @type client: L{_HttpClient}
270
    @param client: HTTP client
271

272
    """
273
    self.identity = identity
274
    self.client = client
275
    self.lastused = 0
276

    
277
  def __repr__(self):
278
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
279
              "id=%s" % self.identity,
280
              "lastuse=%s" % self.lastused,
281
              repr(self.client)]
282

    
283
    return "<%s at %#x>" % (" ".join(status), id(self))
284

    
285

    
286
class HttpClientPool:
287
  """A simple HTTP client pool.
288

289
  Supports one pooled connection per identity (see
290
  L{HttpClientRequest.identity}).
291

292
  """
293
  #: After how many generations to drop unused clients
294
  _MAX_GENERATIONS_DROP = 25
295

    
296
  def __init__(self, curl_config_fn):
297
    """Initializes this class.
298

299
    @type curl_config_fn: callable
300
    @param curl_config_fn: Function to configure cURL object after
301
                           initialization
302

303
    """
304
    self._curl_config_fn = curl_config_fn
305
    self._generation = 0
306
    self._pool = {}
307

    
308
    # Create custom logger for HTTP client pool. Change logging level to
309
    # C{logging.NOTSET} to get more details.
310
    self._logger = logging.getLogger(self.__class__.__name__)
311
    self._logger.setLevel(logging.INFO)
312

    
313
  @staticmethod
314
  def _GetHttpClientCreator():
315
    """Returns callable to create HTTP client.
316

317
    """
318
    return _HttpClient
319

    
320
  def _Get(self, identity):
321
    """Gets an HTTP client from the pool.
322

323
    @type identity: string
324
    @param identity: Client identifier
325

326
    """
327
    try:
328
      pclient = self._pool.pop(identity)
329
    except KeyError:
330
      # Need to create new client
331
      client = self._GetHttpClientCreator()(self._curl_config_fn)
332
      pclient = _PooledHttpClient(identity, client)
333
      self._logger.debug("Created new client %s", pclient)
334
    else:
335
      self._logger.debug("Reusing client %s", pclient)
336

    
337
    assert pclient.identity == identity
338

    
339
    return pclient
340

    
341
  def _StartRequest(self, req):
342
    """Starts a request.
343

344
    @type req: L{HttpClientRequest}
345
    @param req: HTTP request
346

347
    """
348
    pclient = self._Get(req.identity)
349

    
350
    assert req.identity not in self._pool
351

    
352
    pclient.client.StartRequest(req)
353
    pclient.lastused = self._generation
354

    
355
    return pclient
356

    
357
  def _Return(self, pclients):
358
    """Returns HTTP clients to the pool.
359

360
    """
361
    assert not frozenset(pclients) & frozenset(self._pool.values())
362

    
363
    for pc in pclients:
364
      self._logger.debug("Returning client %s to pool", pc)
365
      assert pc.identity not in self._pool
366
      self._pool[pc.identity] = pc
367

    
368
    # Check for unused clients
369
    for pc in self._pool.values():
370
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
371
        self._logger.debug("Removing client %s which hasn't been used"
372
                           " for %s generations",
373
                           pc, self._MAX_GENERATIONS_DROP)
374
        self._pool.pop(pc.identity, None)
375

    
376
    assert compat.all(pc.lastused >= (self._generation -
377
                                      self._MAX_GENERATIONS_DROP)
378
                      for pc in self._pool.values())
379

    
380
  @staticmethod
381
  def _CreateCurlMultiHandle():
382
    """Creates new cURL multi handle.
383

384
    """
385
    return pycurl.CurlMulti()
386

    
387
  def ProcessRequests(self, requests, lock_monitor_cb=None):
388
    """Processes any number of HTTP client requests using pooled objects.
389

390
    @type requests: list of L{HttpClientRequest}
391
    @param requests: List of all requests
392
    @param lock_monitor_cb: Callable for registering with lock monitor
393

394
    """
395
    # For client cleanup
396
    self._generation += 1
397

    
398
    assert compat.all((req.error is None and
399
                       req.success is None and
400
                       req.resp_status_code is None and
401
                       req.resp_body is None)
402
                      for req in requests)
403

    
404
    curl_to_pclient = {}
405
    for req in requests:
406
      pclient = self._StartRequest(req)
407
      curl_to_pclient[pclient.client.GetCurlHandle()] = pclient
408
      assert pclient.client.GetCurrentRequest() == req
409
      assert pclient.lastused >= 0
410

    
411
    assert len(curl_to_pclient) == len(requests)
412

    
413
    if lock_monitor_cb:
414
      monitor = _PendingRequestMonitor(threading.currentThread(),
415
                                       curl_to_pclient.values)
416
      lock_monitor_cb(monitor)
417
    else:
418
      monitor = _NoOpRequestMonitor
419

    
420
    # Process all requests and act based on the returned values
421
    for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
422
                                            curl_to_pclient.keys()):
423
      pclient = curl_to_pclient[curl]
424
      req = pclient.client.GetCurrentRequest()
425

    
426
      monitor.acquire(shared=0)
427
      try:
428
        pclient.client.Done(msg)
429
      finally:
430
        monitor.release()
431

    
432
      assert ((msg is None and req.success and req.error is None) ^
433
              (msg is not None and not req.success and req.error == msg))
434

    
435
    assert compat.all(pclient.client.GetCurrentRequest() is None
436
                      for pclient in curl_to_pclient.values())
437

    
438
    monitor.acquire(shared=0)
439
    try:
440
      # Don't try to read information from returned clients
441
      monitor.Disable()
442

    
443
      # Return clients to pool
444
      self._Return(curl_to_pclient.values())
445
    finally:
446
      monitor.release()
447

    
448
    assert compat.all(req.error is not None or
449
                      (req.success and
450
                       req.resp_status_code is not None and
451
                       req.resp_body is not None)
452
                      for req in requests)
453

    
454

    
455
class _NoOpRequestMonitor: # pylint: disable=W0232
456
  """No-op request monitor.
457

458
  """
459
  @staticmethod
460
  def acquire(*args, **kwargs):
461
    pass
462

    
463
  release = acquire
464
  Disable = acquire
465

    
466

    
467
class _PendingRequestMonitor:
468
  _LOCK = "_lock"
469

    
470
  def __init__(self, owner, pending_fn):
471
    """Initializes this class.
472

473
    """
474
    self._owner = owner
475
    self._pending_fn = pending_fn
476

    
477
    # The lock monitor runs in another thread, hence locking is necessary
478
    self._lock = locking.SharedLock("PendingHttpRequests")
479
    self.acquire = self._lock.acquire
480
    self.release = self._lock.release
481

    
482
  def Disable(self):
483
    """Disable monitor.
484

485
    """
486
    self._pending_fn = None
487

    
488
  @locking.ssynchronized(_LOCK, shared=1)
489
  def GetLockInfo(self, requested): # pylint: disable=W0613
490
    """Retrieves information about pending requests.
491

492
    @type requested: set
493
    @param requested: Requested information, see C{query.LQ_*}
494

495
    """
496
    # No need to sort here, that's being done by the lock manager and query
497
    # library. There are no priorities for requests, hence all show up as
498
    # one item under "pending".
499
    result = []
500

    
501
    if self._pending_fn:
502
      owner_name = self._owner.getName()
503

    
504
      for pclient in self._pending_fn():
505
        req = pclient.client.GetCurrentRequest()
506
        if req:
507
          if req.nicename is None:
508
            name = "%s%s" % (req.host, req.path)
509
          else:
510
            name = req.nicename
511
          result.append(("rpc/%s" % name, None, [owner_name], None))
512

    
513
    return result
514

    
515

    
516
def _ProcessCurlRequests(multi, requests):
517
  """cURL request processor.
518

519
  This generator yields a tuple once for every completed request, successful or
520
  not. The first value in the tuple is the handle, the second an error message
521
  or C{None} for successful requests.
522

523
  @type multi: C{pycurl.CurlMulti}
524
  @param multi: cURL multi object
525
  @type requests: sequence
526
  @param requests: cURL request handles
527

528
  """
529
  for curl in requests:
530
    multi.add_handle(curl)
531

    
532
  while True:
533
    (ret, active) = multi.perform()
534
    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
535

    
536
    if ret == pycurl.E_CALL_MULTI_PERFORM:
537
      # cURL wants to be called again
538
      continue
539

    
540
    while True:
541
      (remaining_messages, successful, failed) = multi.info_read()
542

    
543
      for curl in successful:
544
        multi.remove_handle(curl)
545
        yield (curl, None)
546

    
547
      for curl, errnum, errmsg in failed:
548
        multi.remove_handle(curl)
549
        yield (curl, "Error %s: %s" % (errnum, errmsg))
550

    
551
      if remaining_messages == 0:
552
        break
553

    
554
    if active == 0:
555
      # No active handles anymore
556
      break
557

    
558
    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
559
    # timeouts, which are only evaluated in multi.perform, aren't
560
    # unnecessarily delayed.
561
    multi.select(1.0)