Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 33231500

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31

    
32

    
33
class HttpClientRequest(object):
34
  def __init__(self, host, port, method, path, headers=None, post_data=None,
35
               read_timeout=None, curl_config_fn=None):
36
    """Describes an HTTP request.
37

38
    @type host: string
39
    @param host: Hostname
40
    @type port: int
41
    @param port: Port
42
    @type method: string
43
    @param method: Method name
44
    @type path: string
45
    @param path: Request path
46
    @type headers: list or None
47
    @param headers: Additional headers to send, list of strings
48
    @type post_data: string or None
49
    @param post_data: Additional data to send
50
    @type read_timeout: int
51
    @param read_timeout: if passed, it will be used as the read
52
        timeout while reading the response from the server
53
    @type curl_config_fn: callable
54
    @param curl_config_fn: Function to configure cURL object before request
55
                           (Note: if the function configures the connection in
56
                           a way where it wouldn't be efficient to reuse them,
57
                           a "identity" property should be defined, see
58
                           L{HttpClientRequest.identity})
59

60
    """
61
    assert path.startswith("/"), "Path must start with slash (/)"
62
    assert curl_config_fn is None or callable(curl_config_fn)
63

    
64
    # Request attributes
65
    self.host = host
66
    self.port = port
67
    self.method = method
68
    self.path = path
69
    self.read_timeout = read_timeout
70
    self.curl_config_fn = curl_config_fn
71

    
72
    if post_data is None:
73
      self.post_data = ""
74
    else:
75
      self.post_data = post_data
76

    
77
    if headers is None:
78
      self.headers = []
79
    elif isinstance(headers, dict):
80
      # Support for old interface
81
      self.headers = ["%s: %s" % (name, value)
82
                      for name, value in headers.items()]
83
    else:
84
      self.headers = headers
85

    
86
    # Response status
87
    self.success = None
88
    self.error = None
89

    
90
    # Response attributes
91
    self.resp_status_code = None
92
    self.resp_body = None
93

    
94
  def __repr__(self):
95
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
96
              "%s:%s" % (self.host, self.port),
97
              self.method,
98
              self.path]
99

    
100
    return "<%s at %#x>" % (" ".join(status), id(self))
101

    
102
  @property
103
  def url(self):
104
    """Returns the full URL for this requests.
105

106
    """
107
    # TODO: Support for non-SSL requests
108
    return "https://%s:%s%s" % (self.host, self.port, self.path)
109

    
110
  @property
111
  def identity(self):
112
    """Returns identifier for retrieving a pooled connection for this request.
113

114
    This allows cURL client objects to be re-used and to cache information
115
    (e.g. SSL session IDs or connections).
116

117
    """
118
    parts = [self.host, self.port]
119

    
120
    if self.curl_config_fn:
121
      try:
122
        parts.append(self.curl_config_fn.identity)
123
      except AttributeError:
124
        pass
125

    
126
    return "/".join(str(i) for i in parts)
127

    
128

    
129
class _HttpClient(object):
130
  def __init__(self, curl_config_fn):
131
    """Initializes this class.
132

133
    @type curl_config_fn: callable
134
    @param curl_config_fn: Function to configure cURL object after
135
                           initialization
136

137
    """
138
    self._req = None
139

    
140
    curl = self._CreateCurlHandle()
141
    curl.setopt(pycurl.VERBOSE, False)
142
    curl.setopt(pycurl.NOSIGNAL, True)
143
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
144
    curl.setopt(pycurl.PROXY, "")
145

    
146
    # Pass cURL object to external config function
147
    if curl_config_fn:
148
      curl_config_fn(curl)
149

    
150
    self._curl = curl
151

    
152
  @staticmethod
153
  def _CreateCurlHandle():
154
    """Returns a new cURL object.
155

156
    """
157
    return pycurl.Curl()
158

    
159
  def GetCurlHandle(self):
160
    """Returns the cURL object.
161

162
    """
163
    return self._curl
164

    
165
  def GetCurrentRequest(self):
166
    """Returns the current request.
167

168
    @rtype: L{HttpClientRequest} or None
169

170
    """
171
    return self._req
172

    
173
  def StartRequest(self, req):
174
    """Starts a request on this client.
175

176
    @type req: L{HttpClientRequest}
177
    @param req: HTTP request
178

179
    """
180
    assert not self._req, "Another request is already started"
181

    
182
    self._req = req
183
    self._resp_buffer = StringIO()
184

    
185
    url = req.url
186
    method = req.method
187
    post_data = req.post_data
188
    headers = req.headers
189

    
190
    # PycURL requires strings to be non-unicode
191
    assert isinstance(method, str)
192
    assert isinstance(url, str)
193
    assert isinstance(post_data, str)
194
    assert compat.all(isinstance(i, str) for i in headers)
195

    
196
    # Configure cURL object for request
197
    curl = self._curl
198
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
199
    curl.setopt(pycurl.URL, url)
200
    curl.setopt(pycurl.POSTFIELDS, post_data)
201
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
202
    curl.setopt(pycurl.HTTPHEADER, headers)
203

    
204
    if req.read_timeout is None:
205
      curl.setopt(pycurl.TIMEOUT, 0)
206
    else:
207
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
208

    
209
    # Pass cURL object to external config function
210
    if req.curl_config_fn:
211
      req.curl_config_fn(curl)
212

    
213
  def Done(self, errmsg):
214
    """Finishes a request.
215

216
    @type errmsg: string or None
217
    @param errmsg: Error message if request failed
218

219
    """
220
    req = self._req
221
    assert req, "No request"
222

    
223
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
224

    
225
    curl = self._curl
226

    
227
    req.success = not bool(errmsg)
228
    req.error = errmsg
229

    
230
    # Get HTTP response code
231
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
232
    req.resp_body = self._resp_buffer.getvalue()
233

    
234
    # Reset client object
235
    self._req = None
236
    self._resp_buffer = None
237

    
238
    # Ensure no potentially large variables are referenced
239
    curl.setopt(pycurl.POSTFIELDS, "")
240
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
241

    
242

    
243
class _PooledHttpClient:
244
  """Data structure for HTTP client pool.
245

246
  """
247
  def __init__(self, identity, client):
248
    """Initializes this class.
249

250
    @type identity: string
251
    @param identity: Client identifier for pool
252
    @type client: L{_HttpClient}
253
    @param client: HTTP client
254

255
    """
256
    self.identity = identity
257
    self.client = client
258
    self.lastused = 0
259

    
260
  def __repr__(self):
261
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262
              "id=%s" % self.identity,
263
              "lastuse=%s" % self.lastused,
264
              repr(self.client)]
265

    
266
    return "<%s at %#x>" % (" ".join(status), id(self))
267

    
268

    
269
class HttpClientPool:
270
  """A simple HTTP client pool.
271

272
  Supports one pooled connection per identity (see
273
  L{HttpClientRequest.identity}).
274

275
  """
276
  #: After how many generations to drop unused clients
277
  _MAX_GENERATIONS_DROP = 25
278

    
279
  def __init__(self, curl_config_fn):
280
    """Initializes this class.
281

282
    @type curl_config_fn: callable
283
    @param curl_config_fn: Function to configure cURL object after
284
                           initialization
285

286
    """
287
    self._curl_config_fn = curl_config_fn
288
    self._generation = 0
289
    self._pool = {}
290

    
291
  @staticmethod
292
  def _GetHttpClientCreator():
293
    """Returns callable to create HTTP client.
294

295
    """
296
    return _HttpClient
297

    
298
  def _Get(self, identity):
299
    """Gets an HTTP client from the pool.
300

301
    @type identity: string
302
    @param identity: Client identifier
303

304
    """
305
    try:
306
      pclient  = self._pool.pop(identity)
307
    except KeyError:
308
      # Need to create new client
309
      client = self._GetHttpClientCreator()(self._curl_config_fn)
310
      pclient = _PooledHttpClient(identity, client)
311
      logging.debug("Created new client %s", pclient)
312
    else:
313
      logging.debug("Reusing client %s", pclient)
314

    
315
    assert pclient.identity == identity
316

    
317
    return pclient
318

    
319
  def _StartRequest(self, req):
320
    """Starts a request.
321

322
    @type req: L{HttpClientRequest}
323
    @param req: HTTP request
324

325
    """
326
    logging.debug("Starting request %r", req)
327
    pclient = self._Get(req.identity)
328

    
329
    assert req.identity not in self._pool
330

    
331
    pclient.client.StartRequest(req)
332
    pclient.lastused = self._generation
333

    
334
    return pclient
335

    
336
  def _Return(self, pclients):
337
    """Returns HTTP clients to the pool.
338

339
    """
340
    for pc in pclients:
341
      logging.debug("Returning client %s to pool", pc)
342
      assert pc.identity not in self._pool
343
      assert pc not in self._pool.values()
344
      self._pool[pc.identity] = pc
345

    
346
    # Check for unused clients
347
    for pc in self._pool.values():
348
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
349
        logging.debug("Removing client %s which hasn't been used"
350
                      " for %s generations",
351
                      pc, self._MAX_GENERATIONS_DROP)
352
        self._pool.pop(pc.identity, None)
353

    
354
    assert compat.all(pc.lastused >= (self._generation -
355
                                      self._MAX_GENERATIONS_DROP)
356
                      for pc in self._pool.values())
357

    
358
  @staticmethod
359
  def _CreateCurlMultiHandle():
360
    """Creates new cURL multi handle.
361

362
    """
363
    return pycurl.CurlMulti()
364

    
365
  def ProcessRequests(self, requests):
366
    """Processes any number of HTTP client requests using pooled objects.
367

368
    @type requests: list of L{HttpClientRequest}
369
    @param requests: List of all requests
370

371
    """
372
    multi = self._CreateCurlMultiHandle()
373

    
374
    # For client cleanup
375
    self._generation += 1
376

    
377
    assert compat.all((req.error is None and
378
                       req.success is None and
379
                       req.resp_status_code is None and
380
                       req.resp_body is None)
381
                      for req in requests)
382

    
383
    curl_to_pclient = {}
384
    for req in requests:
385
      pclient = self._StartRequest(req)
386
      curl = pclient.client.GetCurlHandle()
387
      curl_to_pclient[curl] = pclient
388
      multi.add_handle(curl)
389
      assert pclient.client.GetCurrentRequest() == req
390
      assert pclient.lastused >= 0
391

    
392
    assert len(curl_to_pclient) == len(requests)
393

    
394
    done_count = 0
395
    while True:
396
      (ret, _) = multi.perform()
397
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
398

    
399
      if ret == pycurl.E_CALL_MULTI_PERFORM:
400
        # cURL wants to be called again
401
        continue
402

    
403
      while True:
404
        (remaining_messages, successful, failed) = multi.info_read()
405

    
406
        for curl in successful:
407
          multi.remove_handle(curl)
408
          done_count += 1
409
          pclient = curl_to_pclient[curl]
410
          req = pclient.client.GetCurrentRequest()
411
          pclient.client.Done(None)
412
          assert req.success
413
          assert not pclient.client.GetCurrentRequest()
414

    
415
        for curl, errnum, errmsg in failed:
416
          multi.remove_handle(curl)
417
          done_count += 1
418
          pclient = curl_to_pclient[curl]
419
          req = pclient.client.GetCurrentRequest()
420
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
421
          assert req.error
422
          assert not pclient.client.GetCurrentRequest()
423

    
424
        if remaining_messages == 0:
425
          break
426

    
427
      assert done_count <= len(requests)
428

    
429
      if done_count == len(requests):
430
        break
431

    
432
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
433
      # timeouts, which are only evaluated in multi.perform, aren't
434
      # unnecessarily delayed.
435
      multi.select(1.0)
436

    
437
    assert compat.all(pclient.client.GetCurrentRequest() is None
438
                      for pclient in curl_to_pclient.values())
439

    
440
    # Return clients to pool
441
    self._Return(curl_to_pclient.values())
442

    
443
    assert done_count == len(requests)
444
    assert compat.all(req.error is not None or
445
                      (req.success and
446
                       req.resp_status_code is not None and
447
                       req.resp_body is not None)
448
                      for req in requests)