Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ e0036155

History | View | Annotate | Download (11.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
# pylint: disable-msg=E1103
26

    
27
# # E1103: %s %r has no %r member (but some types could not be
28
# inferred), since _socketobject could be ssl or not and pylint
29
# doesn't parse that
30

    
31

    
32
import os
33
import select
34
import socket
35
import errno
36
import threading
37

    
38
from ganeti import workerpool
39
from ganeti import http
40
from ganeti import utils
41

    
42

    
43
HTTP_CLIENT_THREADS = 10
44

    
45

    
46
class HttpClientRequest(object):
47
  def __init__(self, host, port, method, path, headers=None, post_data=None,
48
               ssl_params=None, ssl_verify_peer=False, read_timeout=None):
49
    """Describes an HTTP request.
50

51
    @type host: string
52
    @param host: Hostname
53
    @type port: int
54
    @param port: Port
55
    @type method: string
56
    @param method: Method name
57
    @type path: string
58
    @param path: Request path
59
    @type headers: dict or None
60
    @param headers: Additional headers to send
61
    @type post_data: string or None
62
    @param post_data: Additional data to send
63
    @type ssl_params: HttpSslParams
64
    @param ssl_params: SSL key and certificate
65
    @type ssl_verify_peer: bool
66
    @param ssl_verify_peer: Whether to compare our certificate with
67
        server's certificate
68
    @type read_timeout: int
69
    @param read_timeout: if passed, it will be used as the read
70
        timeout while reading the response from the server
71

72
    """
73
    if post_data is not None:
74
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
75
        "Only POST and GET requests support sending data"
76

    
77
    assert path.startswith("/"), "Path must start with slash (/)"
78

    
79
    # Request attributes
80
    self.host = host
81
    self.port = port
82
    self.ssl_params = ssl_params
83
    self.ssl_verify_peer = ssl_verify_peer
84
    self.method = method
85
    self.path = path
86
    self.headers = headers
87
    self.post_data = post_data
88
    self.read_timeout = read_timeout
89

    
90
    self.success = None
91
    self.error = None
92

    
93
    # Raw response
94
    self.response = None
95

    
96
    # Response attributes
97
    self.resp_version = None
98
    self.resp_status_code = None
99
    self.resp_reason = None
100
    self.resp_headers = None
101
    self.resp_body = None
102

    
103
  def __repr__(self):
104
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105
              "%s:%s" % (self.host, self.port),
106
              self.method,
107
              self.path]
108

    
109
    return "<%s at %#x>" % (" ".join(status), id(self))
110

    
111

    
112
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
113
  pass
114

    
115

    
116
class _HttpServerToClientMessageReader(http.HttpMessageReader):
117
  # Length limits
118
  START_LINE_LENGTH_MAX = 512
119
  HEADER_LENGTH_MAX = 4096
120

    
121
  def ParseStartLine(self, start_line):
122
    """Parses the status line sent by the server.
123

124
    """
125
    # Empty lines are skipped when reading
126
    assert start_line
127

    
128
    try:
129
      [version, status, reason] = start_line.split(None, 2)
130
    except ValueError:
131
      try:
132
        [version, status] = start_line.split(None, 1)
133
        reason = ""
134
      except ValueError:
135
        version = http.HTTP_0_9
136

    
137
    if version:
138
      version = version.upper()
139

    
140
    # The status code is a three-digit number
141
    try:
142
      status = int(status)
143
      if status < 100 or status > 999:
144
        status = -1
145
    except (TypeError, ValueError):
146
      status = -1
147

    
148
    if status == -1:
149
      raise http.HttpError("Invalid status code (%r)" % start_line)
150

    
151
    return http.HttpServerToClientStartLine(version, status, reason)
152

    
153

    
154
class HttpClientRequestExecutor(http.HttpBase):
155
  # Default headers
156
  DEFAULT_HEADERS = {
157
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
158
    # TODO: For keep-alive, don't send "Connection: close"
159
    http.HTTP_CONNECTION: "close",
160
    }
161

    
162
  # Timeouts in seconds for socket layer
163
  # TODO: Soft timeout instead of only socket timeout?
164
  # TODO: Make read timeout configurable per OpCode?
165
  CONNECT_TIMEOUT = 5
166
  WRITE_TIMEOUT = 10
167
  READ_TIMEOUT = None
168
  CLOSE_TIMEOUT = 1
169

    
170
  def __init__(self, req):
171
    """Initializes the HttpClientRequestExecutor class.
172

173
    @type req: HttpClientRequest
174
    @param req: Request object
175

176
    """
177
    http.HttpBase.__init__(self)
178
    self.request = req
179

    
180
    try:
181
      # TODO: Implement connection caching/keep-alive
182
      self.sock = self._CreateSocket(req.ssl_params,
183
                                     req.ssl_verify_peer)
184

    
185
      # Disable Python's timeout
186
      self.sock.settimeout(None)
187

    
188
      # Operate in non-blocking mode
189
      self.sock.setblocking(0)
190

    
191
      response_msg_reader = None
192
      response_msg = None
193
      force_close = True
194

    
195
      self._Connect()
196
      try:
197
        self._SendRequest()
198
        (response_msg_reader, response_msg) = self._ReadResponse()
199

    
200
        # Only wait for server to close if we didn't have any exception.
201
        force_close = False
202
      finally:
203
        # TODO: Keep-alive is not supported, always close connection
204
        force_close = True
205
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
206
                                self.WRITE_TIMEOUT, response_msg_reader,
207
                                force_close)
208

    
209
      self.sock.close()
210
      self.sock = None
211

    
212
      req.response = response_msg
213

    
214
      req.resp_version = req.response.start_line.version
215
      req.resp_status_code = req.response.start_line.code
216
      req.resp_reason = req.response.start_line.reason
217
      req.resp_headers = req.response.headers
218
      req.resp_body = req.response.body
219

    
220
      req.success = True
221
      req.error = None
222

    
223
    except http.HttpError, err:
224
      req.success = False
225
      req.error = str(err)
226

    
227
  def _Connect(self):
228
    """Non-blocking connect to host with timeout.
229

230
    """
231
    connected = False
232
    while True:
233
      try:
234
        connect_error = self.sock.connect_ex((self.request.host,
235
                                              self.request.port))
236
      except socket.gaierror, err:
237
        raise http.HttpError("Connection failed: %s" % str(err))
238

    
239
      if connect_error == errno.EINTR:
240
        # Mask signals
241
        pass
242

    
243
      elif connect_error == 0:
244
        # Connection established
245
        connected = True
246
        break
247

    
248
      elif connect_error == errno.EINPROGRESS:
249
        # Connection started
250
        break
251

    
252
      raise http.HttpError("Connection failed (%s: %s)" %
253
                             (connect_error, os.strerror(connect_error)))
254

    
255
    if not connected:
256
      # Wait for connection
257
      event = utils.WaitForFdCondition(self.sock, select.POLLOUT,
258
                                       self.CONNECT_TIMEOUT)
259
      if event is None:
260
        raise http.HttpError("Timeout while connecting to server")
261

    
262
      # Get error code
263
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
264
      if connect_error != 0:
265
        raise http.HttpError("Connection failed (%s: %s)" %
266
                               (connect_error, os.strerror(connect_error)))
267

    
268
    # Enable TCP keep-alive
269
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
270

    
271
    # If needed, Linux specific options are available to change the TCP
272
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
273
    # TCP_KEEPINTVL.
274

    
275
    # Do the secret SSL handshake
276
    if self.using_ssl:
277
      self.sock.set_connect_state() # pylint: disable-msg=E1103
278
      try:
279
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
280
      except http.HttpSessionHandshakeUnexpectedEOF:
281
        raise http.HttpError("Server closed connection during SSL handshake")
282

    
283
  def _SendRequest(self):
284
    """Sends request to server.
285

286
    """
287
    # Headers
288
    send_headers = self.DEFAULT_HEADERS.copy()
289

    
290
    if self.request.headers:
291
      send_headers.update(self.request.headers)
292

    
293
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
294
                                              self.request.port)
295

    
296
    # Response message
297
    msg = http.HttpMessage()
298

    
299
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
300
    # no keep-alive).
301
    # TODO: For keep-alive, change to HTTP/1.1
302
    msg.start_line = \
303
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
304
                                       path=self.request.path,
305
                                       version=http.HTTP_1_0)
306
    msg.headers = send_headers
307
    msg.body = self.request.post_data
308

    
309
    try:
310
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
311
    except http.HttpSocketTimeout:
312
      raise http.HttpError("Timeout while sending request")
313
    except socket.error, err:
314
      raise http.HttpError("Error sending request: %s" % err)
315

    
316
  def _ReadResponse(self):
317
    """Read response from server.
318

319
    """
320
    response_msg = http.HttpMessage()
321

    
322
    if self.request.read_timeout is None:
323
      timeout = self.READ_TIMEOUT
324
    else:
325
      timeout = self.request.read_timeout
326

    
327
    try:
328
      response_msg_reader = \
329
        _HttpServerToClientMessageReader(self.sock, response_msg, timeout)
330
    except http.HttpSocketTimeout:
331
      raise http.HttpError("Timeout while reading response")
332
    except socket.error, err:
333
      raise http.HttpError("Error reading response: %s" % err)
334

    
335
    return (response_msg_reader, response_msg)
336

    
337

    
338
class _HttpClientPendingRequest(object):
339
  """Data class for pending requests.
340

341
  """
342
  def __init__(self, request):
343
    self.request = request
344

    
345
    # Thread synchronization
346
    self.done = threading.Event()
347

    
348
  def __repr__(self):
349
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
350
              "req=%r" % self.request]
351

    
352
    return "<%s at %#x>" % (" ".join(status), id(self))
353

    
354

    
355
class HttpClientWorker(workerpool.BaseWorker):
356
  """HTTP client worker class.
357

358
  """
359
  def RunTask(self, pend_req): # pylint: disable-msg=W0221
360
    try:
361
      HttpClientRequestExecutor(pend_req.request)
362
    finally:
363
      pend_req.done.set()
364

    
365

    
366
class HttpClientWorkerPool(workerpool.WorkerPool):
367
  def __init__(self, manager):
368
    workerpool.WorkerPool.__init__(self, "HttpClient",
369
                                   HTTP_CLIENT_THREADS,
370
                                   HttpClientWorker)
371
    self.manager = manager
372

    
373

    
374
class HttpClientManager(object):
375
  """Manages HTTP requests.
376

377
  """
378
  def __init__(self):
379
    self._wpool = HttpClientWorkerPool(self)
380

    
381
  def __del__(self):
382
    self.Shutdown()
383

    
384
  def ExecRequests(self, requests):
385
    """Execute HTTP requests.
386

387
    This function can be called from multiple threads at the same time.
388

389
    @type requests: List of HttpClientRequest instances
390
    @param requests: The requests to execute
391
    @rtype: List of HttpClientRequest instances
392
    @return: The list of requests passed in
393

394
    """
395
    # _HttpClientPendingRequest is used for internal thread synchronization
396
    pending = [_HttpClientPendingRequest(req) for req in requests]
397

    
398
    try:
399
      # Add requests to queue
400
      for pend_req in pending:
401
        self._wpool.AddTask(pend_req)
402

    
403
    finally:
404
      # In case of an exception we should still wait for the rest, otherwise
405
      # another thread from the worker pool could modify the request object
406
      # after we returned.
407

    
408
      # And wait for them to finish
409
      for pend_req in pending:
410
        pend_req.done.wait()
411

    
412
    # Return original list
413
    return requests
414

    
415
  def Shutdown(self):
416
    self._wpool.Quiesce()
417
    self._wpool.TerminateWorkers()