Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 84ad6b78

History | View | Annotate | Download (10.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
import threading
28
from cStringIO import StringIO
29

    
30
from ganeti import http
31
from ganeti import compat
32
from ganeti import netutils
33
from ganeti import locking
34

    
35

    
36
class HttpClientRequest(object):
37
  def __init__(self, host, port, method, path, headers=None, post_data=None,
38
               read_timeout=None, curl_config_fn=None, nicename=None,
39
               completion_cb=None):
40
    """Describes an HTTP request.
41

42
    @type host: string
43
    @param host: Hostname
44
    @type port: int
45
    @param port: Port
46
    @type method: string
47
    @param method: Method name
48
    @type path: string
49
    @param path: Request path
50
    @type headers: list or None
51
    @param headers: Additional headers to send, list of strings
52
    @type post_data: string or None
53
    @param post_data: Additional data to send
54
    @type read_timeout: int
55
    @param read_timeout: if passed, it will be used as the read
56
        timeout while reading the response from the server
57
    @type curl_config_fn: callable
58
    @param curl_config_fn: Function to configure cURL object before request
59
    @type nicename: string
60
    @param nicename: Name, presentable to a user, to describe this request (no
61
                     whitespace)
62
    @type completion_cb: callable accepting this request object as a single
63
                         parameter
64
    @param completion_cb: Callback for request completion
65

66
    """
67
    assert path.startswith("/"), "Path must start with slash (/)"
68
    assert curl_config_fn is None or callable(curl_config_fn)
69
    assert completion_cb is None or callable(completion_cb)
70

    
71
    # Request attributes
72
    self.host = host
73
    self.port = port
74
    self.method = method
75
    self.path = path
76
    self.read_timeout = read_timeout
77
    self.curl_config_fn = curl_config_fn
78
    self.nicename = nicename
79
    self.completion_cb = completion_cb
80

    
81
    if post_data is None:
82
      self.post_data = ""
83
    else:
84
      self.post_data = post_data
85

    
86
    if headers is None:
87
      self.headers = []
88
    elif isinstance(headers, dict):
89
      # Support for old interface
90
      self.headers = ["%s: %s" % (name, value)
91
                      for name, value in headers.items()]
92
    else:
93
      self.headers = headers
94

    
95
    # Response status
96
    self.success = None
97
    self.error = None
98

    
99
    # Response attributes
100
    self.resp_status_code = None
101
    self.resp_body = None
102

    
103
  def __repr__(self):
104
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105
              "%s:%s" % (self.host, self.port),
106
              self.method,
107
              self.path]
108

    
109
    return "<%s at %#x>" % (" ".join(status), id(self))
110

    
111
  @property
112
  def url(self):
113
    """Returns the full URL for this requests.
114

115
    """
116
    if netutils.IPAddress.IsValid(self.host):
117
      address = netutils.FormatAddress((self.host, self.port))
118
    else:
119
      address = "%s:%s" % (self.host, self.port)
120
    # TODO: Support for non-SSL requests
121
    return "https://%s%s" % (address, self.path)
122

    
123

    
124
def _StartRequest(curl, req):
125
  """Starts a request on a cURL object.
126

127
  @type curl: pycurl.Curl
128
  @param curl: cURL object
129
  @type req: L{HttpClientRequest}
130
  @param req: HTTP request
131

132
  """
133
  logging.debug("Starting request %r", req)
134

    
135
  url = req.url
136
  method = req.method
137
  post_data = req.post_data
138
  headers = req.headers
139

    
140
  # PycURL requires strings to be non-unicode
141
  assert isinstance(method, str)
142
  assert isinstance(url, str)
143
  assert isinstance(post_data, str)
144
  assert compat.all(isinstance(i, str) for i in headers)
145

    
146
  # Buffer for response
147
  resp_buffer = StringIO()
148

    
149
  # Configure client for request
150
  curl.setopt(pycurl.VERBOSE, False)
151
  curl.setopt(pycurl.NOSIGNAL, True)
152
  curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
153
  curl.setopt(pycurl.PROXY, "")
154
  curl.setopt(pycurl.CUSTOMREQUEST, str(method))
155
  curl.setopt(pycurl.URL, url)
156
  curl.setopt(pycurl.POSTFIELDS, post_data)
157
  curl.setopt(pycurl.HTTPHEADER, headers)
158

    
159
  if req.read_timeout is None:
160
    curl.setopt(pycurl.TIMEOUT, 0)
161
  else:
162
    curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
163

    
164
  # Disable SSL session ID caching (pycurl >= 7.16.0)
165
  if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
166
    curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
167

    
168
  curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
169

    
170
  # Pass cURL object to external config function
171
  if req.curl_config_fn:
172
    req.curl_config_fn(curl)
173

    
174
  return _PendingRequest(curl, req, resp_buffer.getvalue)
175

    
176

    
177
class _PendingRequest:
178
  def __init__(self, curl, req, resp_buffer_read):
179
    """Initializes this class.
180

181
    @type curl: pycurl.Curl
182
    @param curl: cURL object
183
    @type req: L{HttpClientRequest}
184
    @param req: HTTP request
185
    @type resp_buffer_read: callable
186
    @param resp_buffer_read: Function to read response body
187

188
    """
189
    assert req.success is None
190

    
191
    self._curl = curl
192
    self._req = req
193
    self._resp_buffer_read = resp_buffer_read
194

    
195
  def GetCurlHandle(self):
196
    """Returns the cURL object.
197

198
    """
199
    return self._curl
200

    
201
  def GetCurrentRequest(self):
202
    """Returns the current request.
203

204
    """
205
    return self._req
206

    
207
  def Done(self, errmsg):
208
    """Finishes a request.
209

210
    @type errmsg: string or None
211
    @param errmsg: Error message if request failed
212

213
    """
214
    curl = self._curl
215
    req = self._req
216

    
217
    assert req.success is None, "Request has already been finalized"
218

    
219
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
220

    
221
    req.success = not bool(errmsg)
222
    req.error = errmsg
223

    
224
    # Get HTTP response code
225
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
226
    req.resp_body = self._resp_buffer_read()
227

    
228
    # Ensure no potentially large variables are referenced
229
    curl.setopt(pycurl.POSTFIELDS, "")
230
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
231

    
232
    if req.completion_cb:
233
      req.completion_cb(req)
234

    
235

    
236
class _NoOpRequestMonitor: # pylint: disable=W0232
237
  """No-op request monitor.
238

239
  """
240
  @staticmethod
241
  def acquire(*args, **kwargs):
242
    pass
243

    
244
  release = acquire
245
  Disable = acquire
246

    
247

    
248
class _PendingRequestMonitor:
249
  _LOCK = "_lock"
250

    
251
  def __init__(self, owner, pending_fn):
252
    """Initializes this class.
253

254
    """
255
    self._owner = owner
256
    self._pending_fn = pending_fn
257

    
258
    # The lock monitor runs in another thread, hence locking is necessary
259
    self._lock = locking.SharedLock("PendingHttpRequests")
260
    self.acquire = self._lock.acquire
261
    self.release = self._lock.release
262

    
263
  @locking.ssynchronized(_LOCK)
264
  def Disable(self):
265
    """Disable monitor.
266

267
    """
268
    self._pending_fn = None
269

    
270
  @locking.ssynchronized(_LOCK, shared=1)
271
  def GetLockInfo(self, requested): # pylint: disable=W0613
272
    """Retrieves information about pending requests.
273

274
    @type requested: set
275
    @param requested: Requested information, see C{query.LQ_*}
276

277
    """
278
    # No need to sort here, that's being done by the lock manager and query
279
    # library. There are no priorities for requests, hence all show up as
280
    # one item under "pending".
281
    result = []
282

    
283
    if self._pending_fn:
284
      owner_name = self._owner.getName()
285

    
286
      for client in self._pending_fn():
287
        req = client.GetCurrentRequest()
288
        if req:
289
          if req.nicename is None:
290
            name = "%s%s" % (req.host, req.path)
291
          else:
292
            name = req.nicename
293
          result.append(("rpc/%s" % name, None, [owner_name], None))
294

    
295
    return result
296

    
297

    
298
def _ProcessCurlRequests(multi, requests):
299
  """cURL request processor.
300

301
  This generator yields a tuple once for every completed request, successful or
302
  not. The first value in the tuple is the handle, the second an error message
303
  or C{None} for successful requests.
304

305
  @type multi: C{pycurl.CurlMulti}
306
  @param multi: cURL multi object
307
  @type requests: sequence
308
  @param requests: cURL request handles
309

310
  """
311
  for curl in requests:
312
    multi.add_handle(curl)
313

    
314
  while True:
315
    (ret, active) = multi.perform()
316
    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
317

    
318
    if ret == pycurl.E_CALL_MULTI_PERFORM:
319
      # cURL wants to be called again
320
      continue
321

    
322
    while True:
323
      (remaining_messages, successful, failed) = multi.info_read()
324

    
325
      for curl in successful:
326
        multi.remove_handle(curl)
327
        yield (curl, None)
328

    
329
      for curl, errnum, errmsg in failed:
330
        multi.remove_handle(curl)
331
        yield (curl, "Error %s: %s" % (errnum, errmsg))
332

    
333
      if remaining_messages == 0:
334
        break
335

    
336
    if active == 0:
337
      # No active handles anymore
338
      break
339

    
340
    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
341
    # timeouts, which are only evaluated in multi.perform, aren't
342
    # unnecessarily delayed.
343
    multi.select(1.0)
344

    
345

    
346
def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
347
                    _curl_multi=pycurl.CurlMulti,
348
                    _curl_process=_ProcessCurlRequests):
349
  """Processes any number of HTTP client requests.
350

351
  @type requests: list of L{HttpClientRequest}
352
  @param requests: List of all requests
353
  @param lock_monitor_cb: Callable for registering with lock monitor
354

355
  """
356
  assert compat.all((req.error is None and
357
                     req.success is None and
358
                     req.resp_status_code is None and
359
                     req.resp_body is None)
360
                    for req in requests)
361

    
362
  # Prepare all requests
363
  curl_to_client = \
364
    dict((client.GetCurlHandle(), client)
365
         for client in map(lambda req: _StartRequest(_curl(), req), requests))
366

    
367
  assert len(curl_to_client) == len(requests)
368

    
369
  if lock_monitor_cb:
370
    monitor = _PendingRequestMonitor(threading.currentThread(),
371
                                     curl_to_client.values)
372
    lock_monitor_cb(monitor)
373
  else:
374
    monitor = _NoOpRequestMonitor
375

    
376
  # Process all requests and act based on the returned values
377
  for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
378
    monitor.acquire(shared=0)
379
    try:
380
      curl_to_client.pop(curl).Done(msg)
381
    finally:
382
      monitor.release()
383

    
384
  assert not curl_to_client, "Not all requests were processed"
385

    
386
  # Don't try to read information anymore as all requests have been processed
387
  monitor.Disable()
388

    
389
  assert compat.all(req.error is not None or
390
                    (req.success and
391
                     req.resp_status_code is not None and
392
                     req.resp_body is not None)
393
                    for req in requests)