Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ abbf2cd9

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
import threading
28
from cStringIO import StringIO
29

    
30
from ganeti import http
31
from ganeti import compat
32
from ganeti import netutils
33
from ganeti import locking
34

    
35

    
36
class HttpClientRequest(object):
37
  def __init__(self, host, port, method, path, headers=None, post_data=None,
38
               read_timeout=None, curl_config_fn=None, nicename=None):
39
    """Describes an HTTP request.
40

41
    @type host: string
42
    @param host: Hostname
43
    @type port: int
44
    @param port: Port
45
    @type method: string
46
    @param method: Method name
47
    @type path: string
48
    @param path: Request path
49
    @type headers: list or None
50
    @param headers: Additional headers to send, list of strings
51
    @type post_data: string or None
52
    @param post_data: Additional data to send
53
    @type read_timeout: int
54
    @param read_timeout: if passed, it will be used as the read
55
        timeout while reading the response from the server
56
    @type curl_config_fn: callable
57
    @param curl_config_fn: Function to configure cURL object before request
58
    @type nicename: string
59
    @param nicename: Name, presentable to a user, to describe this request (no
60
                     whitespace)
61

62
    """
63
    assert path.startswith("/"), "Path must start with slash (/)"
64
    assert curl_config_fn is None or callable(curl_config_fn)
65

    
66
    # Request attributes
67
    self.host = host
68
    self.port = port
69
    self.method = method
70
    self.path = path
71
    self.read_timeout = read_timeout
72
    self.curl_config_fn = curl_config_fn
73
    self.nicename = nicename
74

    
75
    if post_data is None:
76
      self.post_data = ""
77
    else:
78
      self.post_data = post_data
79

    
80
    if headers is None:
81
      self.headers = []
82
    elif isinstance(headers, dict):
83
      # Support for old interface
84
      self.headers = ["%s: %s" % (name, value)
85
                      for name, value in headers.items()]
86
    else:
87
      self.headers = headers
88

    
89
    # Response status
90
    self.success = None
91
    self.error = None
92

    
93
    # Response attributes
94
    self.resp_status_code = None
95
    self.resp_body = None
96

    
97
  def __repr__(self):
98
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
99
              "%s:%s" % (self.host, self.port),
100
              self.method,
101
              self.path]
102

    
103
    return "<%s at %#x>" % (" ".join(status), id(self))
104

    
105
  @property
106
  def url(self):
107
    """Returns the full URL for this requests.
108

109
    """
110
    if netutils.IPAddress.IsValid(self.host):
111
      address = netutils.FormatAddress((self.host, self.port))
112
    else:
113
      address = "%s:%s" % (self.host, self.port)
114
    # TODO: Support for non-SSL requests
115
    return "https://%s%s" % (address, self.path)
116

    
117

    
118
def _StartRequest(curl, req):
119
  """Starts a request on a cURL object.
120

121
  @type curl: pycurl.Curl
122
  @param curl: cURL object
123
  @type req: L{HttpClientRequest}
124
  @param req: HTTP request
125

126
  """
127
  logging.debug("Starting request %r", req)
128

    
129
  url = req.url
130
  method = req.method
131
  post_data = req.post_data
132
  headers = req.headers
133

    
134
  # PycURL requires strings to be non-unicode
135
  assert isinstance(method, str)
136
  assert isinstance(url, str)
137
  assert isinstance(post_data, str)
138
  assert compat.all(isinstance(i, str) for i in headers)
139

    
140
  # Buffer for response
141
  resp_buffer = StringIO()
142

    
143
  # Configure client for request
144
  curl.setopt(pycurl.VERBOSE, False)
145
  curl.setopt(pycurl.NOSIGNAL, True)
146
  curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
147
  curl.setopt(pycurl.PROXY, "")
148
  curl.setopt(pycurl.CUSTOMREQUEST, str(method))
149
  curl.setopt(pycurl.URL, url)
150
  curl.setopt(pycurl.POSTFIELDS, post_data)
151
  curl.setopt(pycurl.HTTPHEADER, headers)
152

    
153
  if req.read_timeout is None:
154
    curl.setopt(pycurl.TIMEOUT, 0)
155
  else:
156
    curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
157

    
158
  # Disable SSL session ID caching (pycurl >= 7.16.0)
159
  if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
160
    curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
161

    
162
  curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
163

    
164
  # Pass cURL object to external config function
165
  if req.curl_config_fn:
166
    req.curl_config_fn(curl)
167

    
168
  return _PendingRequest(curl, req, resp_buffer.getvalue)
169

    
170

    
171
class _PendingRequest:
172
  def __init__(self, curl, req, resp_buffer_read):
173
    """Initializes this class.
174

175
    @type curl: pycurl.Curl
176
    @param curl: cURL object
177
    @type req: L{HttpClientRequest}
178
    @param req: HTTP request
179
    @type resp_buffer_read: callable
180
    @param resp_buffer_read: Function to read response body
181

182
    """
183
    assert req.success is None
184

    
185
    self._curl = curl
186
    self._req = req
187
    self._resp_buffer_read = resp_buffer_read
188

    
189
  def GetCurlHandle(self):
190
    """Returns the cURL object.
191

192
    """
193
    return self._curl
194

    
195
  def GetCurrentRequest(self):
196
    """Returns the current request.
197

198
    """
199
    return self._req
200

    
201
  def Done(self, errmsg):
202
    """Finishes a request.
203

204
    @type errmsg: string or None
205
    @param errmsg: Error message if request failed
206

207
    """
208
    curl = self._curl
209
    req = self._req
210

    
211
    assert req.success is None, "Request has already been finalized"
212

    
213
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
214

    
215
    req.success = not bool(errmsg)
216
    req.error = errmsg
217

    
218
    # Get HTTP response code
219
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
220
    req.resp_body = self._resp_buffer_read()
221

    
222
    # Ensure no potentially large variables are referenced
223
    try:
224
      # Only available in PycURL 7.19.0 and above
225
      reset_fn = curl.reset
226
    except AttributeError:
227
      curl.setopt(pycurl.POSTFIELDS, "")
228
      curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
229
    else:
230
      reset_fn()
231

    
232

    
233
class _NoOpRequestMonitor: # pylint: disable=W0232
234
  """No-op request monitor.
235

236
  """
237
  @staticmethod
238
  def acquire(*args, **kwargs):
239
    pass
240

    
241
  release = acquire
242
  Disable = acquire
243

    
244

    
245
class _PendingRequestMonitor:
246
  _LOCK = "_lock"
247

    
248
  def __init__(self, owner, pending_fn):
249
    """Initializes this class.
250

251
    """
252
    self._owner = owner
253
    self._pending_fn = pending_fn
254

    
255
    # The lock monitor runs in another thread, hence locking is necessary
256
    self._lock = locking.SharedLock("PendingHttpRequests")
257
    self.acquire = self._lock.acquire
258
    self.release = self._lock.release
259

    
260
  @locking.ssynchronized(_LOCK)
261
  def Disable(self):
262
    """Disable monitor.
263

264
    """
265
    self._pending_fn = None
266

    
267
  @locking.ssynchronized(_LOCK, shared=1)
268
  def GetLockInfo(self, requested): # pylint: disable=W0613
269
    """Retrieves information about pending requests.
270

271
    @type requested: set
272
    @param requested: Requested information, see C{query.LQ_*}
273

274
    """
275
    # No need to sort here, that's being done by the lock manager and query
276
    # library. There are no priorities for requests, hence all show up as
277
    # one item under "pending".
278
    result = []
279

    
280
    if self._pending_fn:
281
      owner_name = self._owner.getName()
282

    
283
      for client in self._pending_fn():
284
        req = client.GetCurrentRequest()
285
        if req:
286
          if req.nicename is None:
287
            name = "%s%s" % (req.host, req.path)
288
          else:
289
            name = req.nicename
290
          result.append(("rpc/%s" % name, None, [owner_name], None))
291

    
292
    return result
293

    
294

    
295
def _ProcessCurlRequests(multi, requests):
296
  """cURL request processor.
297

298
  This generator yields a tuple once for every completed request, successful or
299
  not. The first value in the tuple is the handle, the second an error message
300
  or C{None} for successful requests.
301

302
  @type multi: C{pycurl.CurlMulti}
303
  @param multi: cURL multi object
304
  @type requests: sequence
305
  @param requests: cURL request handles
306

307
  """
308
  for curl in requests:
309
    multi.add_handle(curl)
310

    
311
  while True:
312
    (ret, active) = multi.perform()
313
    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
314

    
315
    if ret == pycurl.E_CALL_MULTI_PERFORM:
316
      # cURL wants to be called again
317
      continue
318

    
319
    while True:
320
      (remaining_messages, successful, failed) = multi.info_read()
321

    
322
      for curl in successful:
323
        multi.remove_handle(curl)
324
        yield (curl, None)
325

    
326
      for curl, errnum, errmsg in failed:
327
        multi.remove_handle(curl)
328
        yield (curl, "Error %s: %s" % (errnum, errmsg))
329

    
330
      if remaining_messages == 0:
331
        break
332

    
333
    if active == 0:
334
      # No active handles anymore
335
      break
336

    
337
    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
338
    # timeouts, which are only evaluated in multi.perform, aren't
339
    # unnecessarily delayed.
340
    multi.select(1.0)
341

    
342

    
343
def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
344
                    _curl_multi=pycurl.CurlMulti,
345
                    _curl_process=_ProcessCurlRequests):
346
  """Processes any number of HTTP client requests.
347

348
  @type requests: list of L{HttpClientRequest}
349
  @param requests: List of all requests
350
  @param lock_monitor_cb: Callable for registering with lock monitor
351

352
  """
353
  assert compat.all((req.error is None and
354
                     req.success is None and
355
                     req.resp_status_code is None and
356
                     req.resp_body is None)
357
                    for req in requests)
358

    
359
  # Prepare all requests
360
  curl_to_client = \
361
    dict((client.GetCurlHandle(), client)
362
         for client in map(lambda req: _StartRequest(_curl(), req), requests))
363

    
364
  assert len(curl_to_client) == len(requests)
365

    
366
  if lock_monitor_cb:
367
    monitor = _PendingRequestMonitor(threading.currentThread(),
368
                                     curl_to_client.values)
369
    lock_monitor_cb(monitor)
370
  else:
371
    monitor = _NoOpRequestMonitor
372

    
373
  # Process all requests and act based on the returned values
374
  for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
375
    monitor.acquire(shared=0)
376
    try:
377
      curl_to_client.pop(curl).Done(msg)
378
    finally:
379
      monitor.release()
380

    
381
  assert not curl_to_client, "Not all requests were processed"
382

    
383
  # Don't try to read information anymore as all requests have been processed
384
  monitor.Disable()
385

    
386
  assert compat.all(req.error is not None or
387
                    (req.success and
388
                     req.resp_status_code is not None and
389
                     req.resp_body is not None)
390
                    for req in requests)