Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ dcd511c8

History | View | Annotate | Download (11.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
# pylint: disable-msg=E1103
26

    
27
# # E1103: %s %r has no %r member (but some types could not be
28
# inferred), since _socketobject could be ssl or not and pylint
29
# doesn't parse that
30

    
31

    
32
import os
33
import select
34
import socket
35
import errno
36
import threading
37

    
38
from ganeti import workerpool
39
from ganeti import http
40
from ganeti import utils
41

    
42

    
43
HTTP_CLIENT_THREADS = 10
44

    
45

    
46
class HttpClientRequest(object):
47
  def __init__(self, host, port, method, path, headers=None, post_data=None,
48
               ssl_params=None, ssl_verify_peer=False):
49
    """Describes an HTTP request.
50

51
    @type host: string
52
    @param host: Hostname
53
    @type port: int
54
    @param port: Port
55
    @type method: string
56
    @param method: Method name
57
    @type path: string
58
    @param path: Request path
59
    @type headers: dict or None
60
    @param headers: Additional headers to send
61
    @type post_data: string or None
62
    @param post_data: Additional data to send
63
    @type ssl_params: HttpSslParams
64
    @param ssl_params: SSL key and certificate
65
    @type ssl_verify_peer: bool
66
    @param ssl_verify_peer: Whether to compare our certificate with
67
        server's certificate
68

69
    """
70
    if post_data is not None:
71
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
72
        "Only POST and GET requests support sending data"
73

    
74
    assert path.startswith("/"), "Path must start with slash (/)"
75

    
76
    # Request attributes
77
    self.host = host
78
    self.port = port
79
    self.ssl_params = ssl_params
80
    self.ssl_verify_peer = ssl_verify_peer
81
    self.method = method
82
    self.path = path
83
    self.headers = headers
84
    self.post_data = post_data
85

    
86
    self.success = None
87
    self.error = None
88

    
89
    # Raw response
90
    self.response = None
91

    
92
    # Response attributes
93
    self.resp_version = None
94
    self.resp_status_code = None
95
    self.resp_reason = None
96
    self.resp_headers = None
97
    self.resp_body = None
98

    
99
  def __repr__(self):
100
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
101
              "%s:%s" % (self.host, self.port),
102
              self.method,
103
              self.path]
104

    
105
    return "<%s at %#x>" % (" ".join(status), id(self))
106

    
107

    
108
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
109
  pass
110

    
111

    
112
class _HttpServerToClientMessageReader(http.HttpMessageReader):
113
  # Length limits
114
  START_LINE_LENGTH_MAX = 512
115
  HEADER_LENGTH_MAX = 4096
116

    
117
  def ParseStartLine(self, start_line):
118
    """Parses the status line sent by the server.
119

120
    """
121
    # Empty lines are skipped when reading
122
    assert start_line
123

    
124
    try:
125
      [version, status, reason] = start_line.split(None, 2)
126
    except ValueError:
127
      try:
128
        [version, status] = start_line.split(None, 1)
129
        reason = ""
130
      except ValueError:
131
        version = http.HTTP_0_9
132

    
133
    if version:
134
      version = version.upper()
135

    
136
    # The status code is a three-digit number
137
    try:
138
      status = int(status)
139
      if status < 100 or status > 999:
140
        status = -1
141
    except (TypeError, ValueError):
142
      status = -1
143

    
144
    if status == -1:
145
      raise http.HttpError("Invalid status code (%r)" % start_line)
146

    
147
    return http.HttpServerToClientStartLine(version, status, reason)
148

    
149

    
150
class HttpClientRequestExecutor(http.HttpBase):
151
  # Default headers
152
  DEFAULT_HEADERS = {
153
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
154
    # TODO: For keep-alive, don't send "Connection: close"
155
    http.HTTP_CONNECTION: "close",
156
    }
157

    
158
  # Timeouts in seconds for socket layer
159
  # TODO: Soft timeout instead of only socket timeout?
160
  # TODO: Make read timeout configurable per OpCode?
161
  CONNECT_TIMEOUT = 5
162
  WRITE_TIMEOUT = 10
163
  READ_TIMEOUT = None
164
  CLOSE_TIMEOUT = 1
165

    
166
  def __init__(self, req):
167
    """Initializes the HttpClientRequestExecutor class.
168

169
    @type req: HttpClientRequest
170
    @param req: Request object
171

172
    """
173
    http.HttpBase.__init__(self)
174
    self.request = req
175

    
176
    try:
177
      # TODO: Implement connection caching/keep-alive
178
      self.sock = self._CreateSocket(req.ssl_params,
179
                                     req.ssl_verify_peer)
180

    
181
      # Disable Python's timeout
182
      self.sock.settimeout(None)
183

    
184
      # Operate in non-blocking mode
185
      self.sock.setblocking(0)
186

    
187
      response_msg_reader = None
188
      response_msg = None
189
      force_close = True
190

    
191
      self._Connect()
192
      try:
193
        self._SendRequest()
194
        (response_msg_reader, response_msg) = self._ReadResponse()
195

    
196
        # Only wait for server to close if we didn't have any exception.
197
        force_close = False
198
      finally:
199
        # TODO: Keep-alive is not supported, always close connection
200
        force_close = True
201
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
202
                                self.WRITE_TIMEOUT, response_msg_reader,
203
                                force_close)
204

    
205
      self.sock.close()
206
      self.sock = None
207

    
208
      req.response = response_msg
209

    
210
      req.resp_version = req.response.start_line.version
211
      req.resp_status_code = req.response.start_line.code
212
      req.resp_reason = req.response.start_line.reason
213
      req.resp_headers = req.response.headers
214
      req.resp_body = req.response.body
215

    
216
      req.success = True
217
      req.error = None
218

    
219
    except http.HttpError, err:
220
      req.success = False
221
      req.error = str(err)
222

    
223
  def _Connect(self):
224
    """Non-blocking connect to host with timeout.
225

226
    """
227
    connected = False
228
    while True:
229
      try:
230
        connect_error = self.sock.connect_ex((self.request.host,
231
                                              self.request.port))
232
      except socket.gaierror, err:
233
        raise http.HttpError("Connection failed: %s" % str(err))
234

    
235
      if connect_error == errno.EINTR:
236
        # Mask signals
237
        pass
238

    
239
      elif connect_error == 0:
240
        # Connection established
241
        connected = True
242
        break
243

    
244
      elif connect_error == errno.EINPROGRESS:
245
        # Connection started
246
        break
247

    
248
      raise http.HttpError("Connection failed (%s: %s)" %
249
                             (connect_error, os.strerror(connect_error)))
250

    
251
    if not connected:
252
      # Wait for connection
253
      event = utils.WaitForSocketCondition(self.sock, select.POLLOUT,
254
                                           self.CONNECT_TIMEOUT)
255
      if event is None:
256
        raise http.HttpError("Timeout while connecting to server")
257

    
258
      # Get error code
259
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
260
      if connect_error != 0:
261
        raise http.HttpError("Connection failed (%s: %s)" %
262
                               (connect_error, os.strerror(connect_error)))
263

    
264
    # Enable TCP keep-alive
265
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
266

    
267
    # If needed, Linux specific options are available to change the TCP
268
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
269
    # TCP_KEEPINTVL.
270

    
271
    # Do the secret SSL handshake
272
    if self.using_ssl:
273
      self.sock.set_connect_state() # pylint: disable-msg=E1103
274
      try:
275
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
276
      except http.HttpSessionHandshakeUnexpectedEOF:
277
        raise http.HttpError("Server closed connection during SSL handshake")
278

    
279
  def _SendRequest(self):
280
    """Sends request to server.
281

282
    """
283
    # Headers
284
    send_headers = self.DEFAULT_HEADERS.copy()
285

    
286
    if self.request.headers:
287
      send_headers.update(self.request.headers)
288

    
289
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
290
                                              self.request.port)
291

    
292
    # Response message
293
    msg = http.HttpMessage()
294

    
295
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
296
    # no keep-alive).
297
    # TODO: For keep-alive, change to HTTP/1.1
298
    msg.start_line = \
299
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
300
                                       path=self.request.path,
301
                                       version=http.HTTP_1_0)
302
    msg.headers = send_headers
303
    msg.body = self.request.post_data
304

    
305
    try:
306
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
307
    except http.HttpSocketTimeout:
308
      raise http.HttpError("Timeout while sending request")
309
    except socket.error, err:
310
      raise http.HttpError("Error sending request: %s" % err)
311

    
312
  def _ReadResponse(self):
313
    """Read response from server.
314

315
    """
316
    response_msg = http.HttpMessage()
317

    
318
    try:
319
      response_msg_reader = \
320
        _HttpServerToClientMessageReader(self.sock, response_msg,
321
                                         self.READ_TIMEOUT)
322
    except http.HttpSocketTimeout:
323
      raise http.HttpError("Timeout while reading response")
324
    except socket.error, err:
325
      raise http.HttpError("Error reading response: %s" % err)
326

    
327
    return (response_msg_reader, response_msg)
328

    
329

    
330
class _HttpClientPendingRequest(object):
331
  """Data class for pending requests.
332

333
  """
334
  def __init__(self, request):
335
    self.request = request
336

    
337
    # Thread synchronization
338
    self.done = threading.Event()
339

    
340
  def __repr__(self):
341
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
342
              "req=%r" % self.request]
343

    
344
    return "<%s at %#x>" % (" ".join(status), id(self))
345

    
346

    
347
class HttpClientWorker(workerpool.BaseWorker):
348
  """HTTP client worker class.
349

350
  """
351
  def RunTask(self, pend_req): # pylint: disable-msg=W0221
352
    try:
353
      HttpClientRequestExecutor(pend_req.request)
354
    finally:
355
      pend_req.done.set()
356

    
357

    
358
class HttpClientWorkerPool(workerpool.WorkerPool):
359
  def __init__(self, manager):
360
    workerpool.WorkerPool.__init__(self, "HttpClient",
361
                                   HTTP_CLIENT_THREADS,
362
                                   HttpClientWorker)
363
    self.manager = manager
364

    
365

    
366
class HttpClientManager(object):
367
  """Manages HTTP requests.
368

369
  """
370
  def __init__(self):
371
    self._wpool = HttpClientWorkerPool(self)
372

    
373
  def __del__(self):
374
    self.Shutdown()
375

    
376
  def ExecRequests(self, requests):
377
    """Execute HTTP requests.
378

379
    This function can be called from multiple threads at the same time.
380

381
    @type requests: List of HttpClientRequest instances
382
    @param requests: The requests to execute
383
    @rtype: List of HttpClientRequest instances
384
    @return: The list of requests passed in
385

386
    """
387
    # _HttpClientPendingRequest is used for internal thread synchronization
388
    pending = [_HttpClientPendingRequest(req) for req in requests]
389

    
390
    try:
391
      # Add requests to queue
392
      for pend_req in pending:
393
        self._wpool.AddTask(pend_req)
394

    
395
    finally:
396
      # In case of an exception we should still wait for the rest, otherwise
397
      # another thread from the worker pool could modify the request object
398
      # after we returned.
399

    
400
      # And wait for them to finish
401
      for pend_req in pending:
402
        pend_req.done.wait()
403

    
404
    # Return original list
405
    return requests
406

    
407
  def Shutdown(self):
408
    self._wpool.Quiesce()
409
    self._wpool.TerminateWorkers()