Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ fe267188

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import os
26
import select
27
import socket
28
import errno
29
import threading
30

    
31
from ganeti import workerpool
32
from ganeti import http
33

    
34

    
35
HTTP_CLIENT_THREADS = 10
36

    
37

    
38
class HttpClientRequest(object):
39
  def __init__(self, host, port, method, path, headers=None, post_data=None,
40
               ssl_params=None, ssl_verify_peer=False):
41
    """Describes an HTTP request.
42

43
    @type host: string
44
    @param host: Hostname
45
    @type port: int
46
    @param port: Port
47
    @type method: string
48
    @param method: Method name
49
    @type path: string
50
    @param path: Request path
51
    @type headers: dict or None
52
    @param headers: Additional headers to send
53
    @type post_data: string or None
54
    @param post_data: Additional data to send
55
    @type ssl_params: HttpSslParams
56
    @param ssl_params: SSL key and certificate
57
    @type ssl_verify_peer: bool
58
    @param ssl_verify_peer: Whether to compare our certificate with
59
        server's certificate
60

61
    """
62
    if post_data is not None:
63
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
64
        "Only POST and GET requests support sending data"
65

    
66
    assert path.startswith("/"), "Path must start with slash (/)"
67

    
68
    # Request attributes
69
    self.host = host
70
    self.port = port
71
    self.ssl_params = ssl_params
72
    self.ssl_verify_peer = ssl_verify_peer
73
    self.method = method
74
    self.path = path
75
    self.headers = headers
76
    self.post_data = post_data
77

    
78
    self.success = None
79
    self.error = None
80

    
81
    # Raw response
82
    self.response = None
83

    
84
    # Response attributes
85
    self.resp_version = None
86
    self.resp_status_code = None
87
    self.resp_reason = None
88
    self.resp_headers = None
89
    self.resp_body = None
90

    
91

    
92
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
93
  pass
94

    
95

    
96
class _HttpServerToClientMessageReader(http.HttpMessageReader):
97
  # Length limits
98
  START_LINE_LENGTH_MAX = 512
99
  HEADER_LENGTH_MAX = 4096
100

    
101
  def ParseStartLine(self, start_line):
102
    """Parses the status line sent by the server.
103

104
    """
105
    # Empty lines are skipped when reading
106
    assert start_line
107

    
108
    try:
109
      [version, status, reason] = start_line.split(None, 2)
110
    except ValueError:
111
      try:
112
        [version, status] = start_line.split(None, 1)
113
        reason = ""
114
      except ValueError:
115
        version = http.HTTP_0_9
116

    
117
    if version:
118
      version = version.upper()
119

    
120
    # The status code is a three-digit number
121
    try:
122
      status = int(status)
123
      if status < 100 or status > 999:
124
        status = -1
125
    except ValueError:
126
      status = -1
127

    
128
    if status == -1:
129
      raise http.HttpError("Invalid status code (%r)" % start_line)
130

    
131
    return http.HttpServerToClientStartLine(version, status, reason)
132

    
133

    
134
class HttpClientRequestExecutor(http.HttpBase):
135
  # Default headers
136
  DEFAULT_HEADERS = {
137
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
138
    # TODO: For keep-alive, don't send "Connection: close"
139
    http.HTTP_CONNECTION: "close",
140
    }
141

    
142
  # Timeouts in seconds for socket layer
143
  # TODO: Soft timeout instead of only socket timeout?
144
  # TODO: Make read timeout configurable per OpCode?
145
  CONNECT_TIMEOUT = 5
146
  WRITE_TIMEOUT = 10
147
  READ_TIMEOUT = None
148
  CLOSE_TIMEOUT = 1
149

    
150
  def __init__(self, req):
151
    """Initializes the HttpClientRequestExecutor class.
152

153
    @type req: HttpClientRequest
154
    @param req: Request object
155

156
    """
157
    http.HttpBase.__init__(self)
158
    self.request = req
159

    
160
    try:
161
      # TODO: Implement connection caching/keep-alive
162
      self.sock = self._CreateSocket(req.ssl_params,
163
                                     req.ssl_verify_peer)
164

    
165
      # Disable Python's timeout
166
      self.sock.settimeout(None)
167

    
168
      # Operate in non-blocking mode
169
      self.sock.setblocking(0)
170

    
171
      response_msg_reader = None
172
      response_msg = None
173
      force_close = True
174

    
175
      self._Connect()
176
      try:
177
        self._SendRequest()
178
        (response_msg_reader, response_msg) = self._ReadResponse()
179

    
180
        # Only wait for server to close if we didn't have any exception.
181
        force_close = False
182
      finally:
183
        # TODO: Keep-alive is not supported, always close connection
184
        force_close = True
185
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
186
                                self.WRITE_TIMEOUT, response_msg_reader,
187
                                force_close)
188

    
189
      self.sock.close()
190
      self.sock = None
191

    
192
      req.response = response_msg
193

    
194
      req.resp_version = req.response.start_line.version
195
      req.resp_status_code = req.response.start_line.code
196
      req.resp_reason = req.response.start_line.reason
197
      req.resp_headers = req.response.headers
198
      req.resp_body = req.response.body
199

    
200
      req.success = True
201
      req.error = None
202

    
203
    except http.HttpError, err:
204
      req.success = False
205
      req.error = str(err)
206

    
207
  def _Connect(self):
208
    """Non-blocking connect to host with timeout.
209

210
    """
211
    connected = False
212
    while True:
213
      try:
214
        connect_error = self.sock.connect_ex((self.request.host,
215
                                              self.request.port))
216
      except socket.gaierror, err:
217
        raise http.HttpError("Connection failed: %s" % str(err))
218

    
219
      if connect_error == errno.EINTR:
220
        # Mask signals
221
        pass
222

    
223
      elif connect_error == 0:
224
        # Connection established
225
        connected = True
226
        break
227

    
228
      elif connect_error == errno.EINPROGRESS:
229
        # Connection started
230
        break
231

    
232
      raise http.HttpError("Connection failed (%s: %s)" %
233
                             (connect_error, os.strerror(connect_error)))
234

    
235
    if not connected:
236
      # Wait for connection
237
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
238
                                          self.CONNECT_TIMEOUT)
239
      if event is None:
240
        raise http.HttpError("Timeout while connecting to server")
241

    
242
      # Get error code
243
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
244
      if connect_error != 0:
245
        raise http.HttpError("Connection failed (%s: %s)" %
246
                               (connect_error, os.strerror(connect_error)))
247

    
248
    # Enable TCP keep-alive
249
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
250

    
251
    # If needed, Linux specific options are available to change the TCP
252
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
253
    # TCP_KEEPINTVL.
254

    
255
    # Do the secret SSL handshake
256
    if self.using_ssl:
257
      self.sock.set_connect_state() # pylint: disable-msg=E1103
258
      try:
259
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
260
      except http.HttpSessionHandshakeUnexpectedEOF:
261
        raise http.HttpError("Server closed connection during SSL handshake")
262

    
263
  def _SendRequest(self):
264
    """Sends request to server.
265

266
    """
267
    # Headers
268
    send_headers = self.DEFAULT_HEADERS.copy()
269

    
270
    if self.request.headers:
271
      send_headers.update(self.request.headers)
272

    
273
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
274
                                              self.request.port)
275

    
276
    # Response message
277
    msg = http.HttpMessage()
278

    
279
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
280
    # no keep-alive).
281
    # TODO: For keep-alive, change to HTTP/1.1
282
    msg.start_line = \
283
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
284
                                       path=self.request.path,
285
                                       version=http.HTTP_1_0)
286
    msg.headers = send_headers
287
    msg.body = self.request.post_data
288

    
289
    try:
290
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
291
    except http.HttpSocketTimeout:
292
      raise http.HttpError("Timeout while sending request")
293
    except socket.error, err:
294
      raise http.HttpError("Error sending request: %s" % err)
295

    
296
  def _ReadResponse(self):
297
    """Read response from server.
298

299
    """
300
    response_msg = http.HttpMessage()
301

    
302
    try:
303
      response_msg_reader = \
304
        _HttpServerToClientMessageReader(self.sock, response_msg,
305
                                         self.READ_TIMEOUT)
306
    except http.HttpSocketTimeout:
307
      raise http.HttpError("Timeout while reading response")
308
    except socket.error, err:
309
      raise http.HttpError("Error reading response: %s" % err)
310

    
311
    return (response_msg_reader, response_msg)
312

    
313

    
314
class _HttpClientPendingRequest(object):
315
  """Data class for pending requests.
316

317
  """
318
  def __init__(self, request):
319
    self.request = request
320

    
321
    # Thread synchronization
322
    self.done = threading.Event()
323

    
324

    
325
class HttpClientWorker(workerpool.BaseWorker):
326
  """HTTP client worker class.
327

328
  """
329
  def RunTask(self, pend_req):
330
    try:
331
      HttpClientRequestExecutor(pend_req.request)
332
    finally:
333
      pend_req.done.set()
334

    
335

    
336
class HttpClientWorkerPool(workerpool.WorkerPool):
337
  def __init__(self, manager):
338
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
339
                                   HttpClientWorker)
340
    self.manager = manager
341

    
342

    
343
class HttpClientManager(object):
344
  """Manages HTTP requests.
345

346
  """
347
  def __init__(self):
348
    self._wpool = HttpClientWorkerPool(self)
349

    
350
  def __del__(self):
351
    self.Shutdown()
352

    
353
  def ExecRequests(self, requests):
354
    """Execute HTTP requests.
355

356
    This function can be called from multiple threads at the same time.
357

358
    @type requests: List of HttpClientRequest instances
359
    @param requests: The requests to execute
360
    @rtype: List of HttpClientRequest instances
361
    @return: The list of requests passed in
362

363
    """
364
    # _HttpClientPendingRequest is used for internal thread synchronization
365
    pending = [_HttpClientPendingRequest(req) for req in requests]
366

    
367
    try:
368
      # Add requests to queue
369
      for pend_req in pending:
370
        self._wpool.AddTask(pend_req)
371

    
372
    finally:
373
      # In case of an exception we should still wait for the rest, otherwise
374
      # another thread from the worker pool could modify the request object
375
      # after we returned.
376

    
377
      # And wait for them to finish
378
      for pend_req in pending:
379
        pend_req.done.wait()
380

    
381
    # Return original list
382
    return requests
383

    
384
  def Shutdown(self):
385
    self._wpool.Quiesce()
386
    self._wpool.TerminateWorkers()