Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 981732fb

History | View | Annotate | Download (12.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31
from ganeti import netutils
32

    
33

    
34
class HttpClientRequest(object):
35
  def __init__(self, host, port, method, path, headers=None, post_data=None,
36
               read_timeout=None, curl_config_fn=None):
37
    """Describes an HTTP request.
38

39
    @type host: string
40
    @param host: Hostname
41
    @type port: int
42
    @param port: Port
43
    @type method: string
44
    @param method: Method name
45
    @type path: string
46
    @param path: Request path
47
    @type headers: list or None
48
    @param headers: Additional headers to send, list of strings
49
    @type post_data: string or None
50
    @param post_data: Additional data to send
51
    @type read_timeout: int
52
    @param read_timeout: if passed, it will be used as the read
53
        timeout while reading the response from the server
54
    @type curl_config_fn: callable
55
    @param curl_config_fn: Function to configure cURL object before request
56
                           (Note: if the function configures the connection in
57
                           a way where it wouldn't be efficient to reuse them,
58
                           a "identity" property should be defined, see
59
                           L{HttpClientRequest.identity})
60

61
    """
62
    assert path.startswith("/"), "Path must start with slash (/)"
63
    assert curl_config_fn is None or callable(curl_config_fn)
64

    
65
    # Request attributes
66
    self.host = host
67
    self.port = port
68
    self.method = method
69
    self.path = path
70
    self.read_timeout = read_timeout
71
    self.curl_config_fn = curl_config_fn
72

    
73
    if post_data is None:
74
      self.post_data = ""
75
    else:
76
      self.post_data = post_data
77

    
78
    if headers is None:
79
      self.headers = []
80
    elif isinstance(headers, dict):
81
      # Support for old interface
82
      self.headers = ["%s: %s" % (name, value)
83
                      for name, value in headers.items()]
84
    else:
85
      self.headers = headers
86

    
87
    # Response status
88
    self.success = None
89
    self.error = None
90

    
91
    # Response attributes
92
    self.resp_status_code = None
93
    self.resp_body = None
94

    
95
  def __repr__(self):
96
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97
              "%s:%s" % (self.host, self.port),
98
              self.method,
99
              self.path]
100

    
101
    return "<%s at %#x>" % (" ".join(status), id(self))
102

    
103
  @property
104
  def url(self):
105
    """Returns the full URL for this requests.
106

107
    """
108
    if netutils.IPAddress.IsValid(self.host):
109
      address = netutils.FormatAddress((self.host, self.port))
110
    else:
111
      address = "%s:%s" % (self.host, self.port)
112
    # TODO: Support for non-SSL requests
113
    return "https://%s%s" % (address, self.path)
114

    
115
  @property
116
  def identity(self):
117
    """Returns identifier for retrieving a pooled connection for this request.
118

119
    This allows cURL client objects to be re-used and to cache information
120
    (e.g. SSL session IDs or connections).
121

122
    """
123
    parts = [self.host, self.port]
124

    
125
    if self.curl_config_fn:
126
      try:
127
        parts.append(self.curl_config_fn.identity)
128
      except AttributeError:
129
        pass
130

    
131
    return "/".join(str(i) for i in parts)
132

    
133

    
134
class _HttpClient(object):
135
  def __init__(self, curl_config_fn):
136
    """Initializes this class.
137

138
    @type curl_config_fn: callable
139
    @param curl_config_fn: Function to configure cURL object after
140
                           initialization
141

142
    """
143
    self._req = None
144

    
145
    curl = self._CreateCurlHandle()
146
    curl.setopt(pycurl.VERBOSE, False)
147
    curl.setopt(pycurl.NOSIGNAL, True)
148
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149
    curl.setopt(pycurl.PROXY, "")
150

    
151
    # Pass cURL object to external config function
152
    if curl_config_fn:
153
      curl_config_fn(curl)
154

    
155
    self._curl = curl
156

    
157
  @staticmethod
158
  def _CreateCurlHandle():
159
    """Returns a new cURL object.
160

161
    """
162
    return pycurl.Curl()
163

    
164
  def GetCurlHandle(self):
165
    """Returns the cURL object.
166

167
    """
168
    return self._curl
169

    
170
  def GetCurrentRequest(self):
171
    """Returns the current request.
172

173
    @rtype: L{HttpClientRequest} or None
174

175
    """
176
    return self._req
177

    
178
  def StartRequest(self, req):
179
    """Starts a request on this client.
180

181
    @type req: L{HttpClientRequest}
182
    @param req: HTTP request
183

184
    """
185
    assert not self._req, "Another request is already started"
186

    
187
    self._req = req
188
    self._resp_buffer = StringIO()
189

    
190
    url = req.url
191
    method = req.method
192
    post_data = req.post_data
193
    headers = req.headers
194

    
195
    # PycURL requires strings to be non-unicode
196
    assert isinstance(method, str)
197
    assert isinstance(url, str)
198
    assert isinstance(post_data, str)
199
    assert compat.all(isinstance(i, str) for i in headers)
200

    
201
    # Configure cURL object for request
202
    curl = self._curl
203
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
204
    curl.setopt(pycurl.URL, url)
205
    curl.setopt(pycurl.POSTFIELDS, post_data)
206
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
207
    curl.setopt(pycurl.HTTPHEADER, headers)
208

    
209
    if req.read_timeout is None:
210
      curl.setopt(pycurl.TIMEOUT, 0)
211
    else:
212
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
213

    
214
    # Pass cURL object to external config function
215
    if req.curl_config_fn:
216
      req.curl_config_fn(curl)
217

    
218
  def Done(self, errmsg):
219
    """Finishes a request.
220

221
    @type errmsg: string or None
222
    @param errmsg: Error message if request failed
223

224
    """
225
    req = self._req
226
    assert req, "No request"
227

    
228
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
229

    
230
    curl = self._curl
231

    
232
    req.success = not bool(errmsg)
233
    req.error = errmsg
234

    
235
    # Get HTTP response code
236
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
237
    req.resp_body = self._resp_buffer.getvalue()
238

    
239
    # Reset client object
240
    self._req = None
241
    self._resp_buffer = None
242

    
243
    # Ensure no potentially large variables are referenced
244
    curl.setopt(pycurl.POSTFIELDS, "")
245
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
246

    
247

    
248
class _PooledHttpClient:
249
  """Data structure for HTTP client pool.
250

251
  """
252
  def __init__(self, identity, client):
253
    """Initializes this class.
254

255
    @type identity: string
256
    @param identity: Client identifier for pool
257
    @type client: L{_HttpClient}
258
    @param client: HTTP client
259

260
    """
261
    self.identity = identity
262
    self.client = client
263
    self.lastused = 0
264

    
265
  def __repr__(self):
266
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
267
              "id=%s" % self.identity,
268
              "lastuse=%s" % self.lastused,
269
              repr(self.client)]
270

    
271
    return "<%s at %#x>" % (" ".join(status), id(self))
272

    
273

    
274
class HttpClientPool:
275
  """A simple HTTP client pool.
276

277
  Supports one pooled connection per identity (see
278
  L{HttpClientRequest.identity}).
279

280
  """
281
  #: After how many generations to drop unused clients
282
  _MAX_GENERATIONS_DROP = 25
283

    
284
  def __init__(self, curl_config_fn):
285
    """Initializes this class.
286

287
    @type curl_config_fn: callable
288
    @param curl_config_fn: Function to configure cURL object after
289
                           initialization
290

291
    """
292
    self._curl_config_fn = curl_config_fn
293
    self._generation = 0
294
    self._pool = {}
295

    
296
  @staticmethod
297
  def _GetHttpClientCreator():
298
    """Returns callable to create HTTP client.
299

300
    """
301
    return _HttpClient
302

    
303
  def _Get(self, identity):
304
    """Gets an HTTP client from the pool.
305

306
    @type identity: string
307
    @param identity: Client identifier
308

309
    """
310
    try:
311
      pclient  = self._pool.pop(identity)
312
    except KeyError:
313
      # Need to create new client
314
      client = self._GetHttpClientCreator()(self._curl_config_fn)
315
      pclient = _PooledHttpClient(identity, client)
316
      logging.debug("Created new client %s", pclient)
317
    else:
318
      logging.debug("Reusing client %s", pclient)
319

    
320
    assert pclient.identity == identity
321

    
322
    return pclient
323

    
324
  def _StartRequest(self, req):
325
    """Starts a request.
326

327
    @type req: L{HttpClientRequest}
328
    @param req: HTTP request
329

330
    """
331
    logging.debug("Starting request %r", req)
332
    pclient = self._Get(req.identity)
333

    
334
    assert req.identity not in self._pool
335

    
336
    pclient.client.StartRequest(req)
337
    pclient.lastused = self._generation
338

    
339
    return pclient
340

    
341
  def _Return(self, pclients):
342
    """Returns HTTP clients to the pool.
343

344
    """
345
    for pc in pclients:
346
      logging.debug("Returning client %s to pool", pc)
347
      assert pc.identity not in self._pool
348
      assert pc not in self._pool.values()
349
      self._pool[pc.identity] = pc
350

    
351
    # Check for unused clients
352
    for pc in self._pool.values():
353
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
354
        logging.debug("Removing client %s which hasn't been used"
355
                      " for %s generations",
356
                      pc, self._MAX_GENERATIONS_DROP)
357
        self._pool.pop(pc.identity, None)
358

    
359
    assert compat.all(pc.lastused >= (self._generation -
360
                                      self._MAX_GENERATIONS_DROP)
361
                      for pc in self._pool.values())
362

    
363
  @staticmethod
364
  def _CreateCurlMultiHandle():
365
    """Creates new cURL multi handle.
366

367
    """
368
    return pycurl.CurlMulti()
369

    
370
  def ProcessRequests(self, requests):
371
    """Processes any number of HTTP client requests using pooled objects.
372

373
    @type requests: list of L{HttpClientRequest}
374
    @param requests: List of all requests
375

376
    """
377
    multi = self._CreateCurlMultiHandle()
378

    
379
    # For client cleanup
380
    self._generation += 1
381

    
382
    assert compat.all((req.error is None and
383
                       req.success is None and
384
                       req.resp_status_code is None and
385
                       req.resp_body is None)
386
                      for req in requests)
387

    
388
    curl_to_pclient = {}
389
    for req in requests:
390
      pclient = self._StartRequest(req)
391
      curl = pclient.client.GetCurlHandle()
392
      curl_to_pclient[curl] = pclient
393
      multi.add_handle(curl)
394
      assert pclient.client.GetCurrentRequest() == req
395
      assert pclient.lastused >= 0
396

    
397
    assert len(curl_to_pclient) == len(requests)
398

    
399
    done_count = 0
400
    while True:
401
      (ret, _) = multi.perform()
402
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
403

    
404
      if ret == pycurl.E_CALL_MULTI_PERFORM:
405
        # cURL wants to be called again
406
        continue
407

    
408
      while True:
409
        (remaining_messages, successful, failed) = multi.info_read()
410

    
411
        for curl in successful:
412
          multi.remove_handle(curl)
413
          done_count += 1
414
          pclient = curl_to_pclient[curl]
415
          req = pclient.client.GetCurrentRequest()
416
          pclient.client.Done(None)
417
          assert req.success
418
          assert not pclient.client.GetCurrentRequest()
419

    
420
        for curl, errnum, errmsg in failed:
421
          multi.remove_handle(curl)
422
          done_count += 1
423
          pclient = curl_to_pclient[curl]
424
          req = pclient.client.GetCurrentRequest()
425
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
426
          assert req.error
427
          assert not pclient.client.GetCurrentRequest()
428

    
429
        if remaining_messages == 0:
430
          break
431

    
432
      assert done_count <= len(requests)
433

    
434
      if done_count == len(requests):
435
        break
436

    
437
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
438
      # timeouts, which are only evaluated in multi.perform, aren't
439
      # unnecessarily delayed.
440
      multi.select(1.0)
441

    
442
    assert compat.all(pclient.client.GetCurrentRequest() is None
443
                      for pclient in curl_to_pclient.values())
444

    
445
    # Return clients to pool
446
    self._Return(curl_to_pclient.values())
447

    
448
    assert done_count == len(requests)
449
    assert compat.all(req.error is not None or
450
                      (req.success and
451
                       req.resp_status_code is not None and
452
                       req.resp_body is not None)
453
                      for req in requests)