Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ f2e13d55

History | View | Annotate | Download (10.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import OpenSSL
29
import os
30
import select
31
import socket
32
import sys
33
import time
34
import signal
35
import errno
36
import threading
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import utils
42
from ganeti import http
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47

    
48
class HttpClientRequest(object):
49
  def __init__(self, host, port, method, path, headers=None, post_data=None,
50
               ssl_params=None, ssl_verify_peer=False):
51
    """Describes an HTTP request.
52

53
    @type host: string
54
    @param host: Hostname
55
    @type port: int
56
    @param port: Port
57
    @type method: string
58
    @param method: Method name
59
    @type path: string
60
    @param path: Request path
61
    @type headers: dict or None
62
    @param headers: Additional headers to send
63
    @type post_data: string or None
64
    @param post_data: Additional data to send
65
    @type ssl_params: HttpSslParams
66
    @param ssl_params: SSL key and certificate
67
    @type ssl_verify_peer: bool
68
    @param ssl_verify_peer: Whether to compare our certificate with server's
69
                            certificate
70

71
    """
72
    if post_data is not None:
73
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74
        "Only POST and GET requests support sending data"
75

    
76
    assert path.startswith("/"), "Path must start with slash (/)"
77

    
78
    # Request attributes
79
    self.host = host
80
    self.port = port
81
    self.ssl_params = ssl_params
82
    self.ssl_verify_peer = ssl_verify_peer
83
    self.method = method
84
    self.path = path
85
    self.headers = headers
86
    self.post_data = post_data
87

    
88
    self.success = None
89
    self.error = None
90

    
91
    # Raw response
92
    self.response = None
93

    
94
    # Response attributes
95
    self.resp_version = None
96
    self.resp_status_code = None
97
    self.resp_reason = None
98
    self.resp_headers = None
99
    self.resp_body = None
100

    
101

    
102
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103
  pass
104

    
105

    
106
class _HttpServerToClientMessageReader(http.HttpMessageReader):
107
  # Length limits
108
  START_LINE_LENGTH_MAX = 512
109
  HEADER_LENGTH_MAX = 4096
110

    
111
  def ParseStartLine(self, start_line):
112
    """Parses the status line sent by the server.
113

114
    """
115
    # Empty lines are skipped when reading
116
    assert start_line
117

    
118
    try:
119
      [version, status, reason] = start_line.split(None, 2)
120
    except ValueError:
121
      try:
122
        [version, status] = start_line.split(None, 1)
123
        reason = ""
124
      except ValueError:
125
        version = http.HTTP_0_9
126

    
127
    if version:
128
      version = version.upper()
129

    
130
    # The status code is a three-digit number
131
    try:
132
      status = int(status)
133
      if status < 100 or status > 999:
134
        status = -1
135
    except ValueError:
136
      status = -1
137

    
138
    if status == -1:
139
      raise http.HttpError("Invalid status code (%r)" % start_line)
140

    
141
    return http.HttpServerToClientStartLine(version, status, reason)
142

    
143

    
144
class HttpClientRequestExecutor(http.HttpBase):
145
  # Default headers
146
  DEFAULT_HEADERS = {
147
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148
    # TODO: For keep-alive, don't send "Connection: close"
149
    http.HTTP_CONNECTION: "close",
150
    }
151

    
152
  # Timeouts in seconds for socket layer
153
  # TODO: Soft timeout instead of only socket timeout?
154
  # TODO: Make read timeout configurable per OpCode?
155
  CONNECT_TIMEOUT = 5
156
  WRITE_TIMEOUT = 10
157
  READ_TIMEOUT = None
158
  CLOSE_TIMEOUT = 1
159

    
160
  def __init__(self, req):
161
    """Initializes the HttpClientRequestExecutor class.
162

163
    @type req: HttpClientRequest
164
    @param req: Request object
165

166
    """
167
    http.HttpBase.__init__(self)
168
    self.request = req
169

    
170
    self.poller = select.poll()
171

    
172
    try:
173
      # TODO: Implement connection caching/keep-alive
174
      self.sock = self._CreateSocket(req.ssl_params,
175
                                     req.ssl_verify_peer)
176

    
177
      # Disable Python's timeout
178
      self.sock.settimeout(None)
179

    
180
      # Operate in non-blocking mode
181
      self.sock.setblocking(0)
182

    
183
      response_msg_reader = None
184
      response_msg = None
185
      force_close = True
186

    
187
      self._Connect()
188
      try:
189
        self._SendRequest()
190
        (response_msg_reader, response_msg) = self._ReadResponse()
191

    
192
        # Only wait for server to close if we didn't have any exception.
193
        force_close = False
194
      finally:
195
        # TODO: Keep-alive is not supported, always close connection
196
        force_close = True
197
        http.ShutdownConnection(self.poller, self.sock,
198
                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
199
                                response_msg_reader, force_close)
200

    
201
      self.sock.close()
202
      self.sock = None
203

    
204
      req.response = response_msg
205

    
206
      req.resp_version = req.response.start_line.version
207
      req.resp_status_code = req.response.start_line.code
208
      req.resp_reason = req.response.start_line.reason
209
      req.resp_headers = req.response.headers
210
      req.resp_body = req.response.body
211

    
212
      req.success = True
213
      req.error = None
214

    
215
    except http.HttpError, err:
216
      req.success = False
217
      req.error = str(err)
218

    
219
  def _Connect(self):
220
    """Non-blocking connect to host with timeout.
221

222
    """
223
    connected = False
224
    while True:
225
      try:
226
        connect_error = self.sock.connect_ex((self.request.host,
227
                                              self.request.port))
228
      except socket.gaierror, err:
229
        raise http.HttpError("Connection failed: %s" % str(err))
230

    
231
      if connect_error == errno.EINTR:
232
        # Mask signals
233
        pass
234

    
235
      elif connect_error == 0:
236
        # Connection established
237
        connected = True
238
        break
239

    
240
      elif connect_error == errno.EINPROGRESS:
241
        # Connection started
242
        break
243

    
244
      raise http.HttpError("Connection failed (%s: %s)" %
245
                             (connect_error, os.strerror(connect_error)))
246

    
247
    if not connected:
248
      # Wait for connection
249
      event = http.WaitForSocketCondition(self.poller, self.sock,
250
                                          select.POLLOUT, self.CONNECT_TIMEOUT)
251
      if event is None:
252
        raise http.HttpError("Timeout while connecting to server")
253

    
254
      # Get error code
255
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
256
      if connect_error != 0:
257
        raise http.HttpError("Connection failed (%s: %s)" %
258
                               (connect_error, os.strerror(connect_error)))
259

    
260
    # Enable TCP keep-alive
261
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
262

    
263
    # If needed, Linux specific options are available to change the TCP
264
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
265
    # TCP_KEEPINTVL.
266

    
267
    # Do the secret SSL handshake
268
    if self.using_ssl:
269
      self.sock.set_connect_state()
270
      try:
271
        http.Handshake(self.poller, self.sock, self.WRITE_TIMEOUT)
272
      except http.HttpSessionHandshakeUnexpectedEOF:
273
        raise http.HttpError("Server closed connection during SSL handshake")
274

    
275
  def _SendRequest(self):
276
    """Sends request to server.
277

278
    """
279
    # Headers
280
    send_headers = self.DEFAULT_HEADERS.copy()
281

    
282
    if self.request.headers:
283
      send_headers.update(self.request.headers)
284

    
285
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
286
                                              self.request.port)
287

    
288
    # Response message
289
    msg = http.HttpMessage()
290

    
291
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
292
    # no keep-alive).
293
    # TODO: For keep-alive, change to HTTP/1.1
294
    msg.start_line = \
295
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
296
                                       path=self.request.path,
297
                                       version=http.HTTP_1_0)
298
    msg.headers = send_headers
299
    msg.body = self.request.post_data
300

    
301
    try:
302
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
303
    except http.HttpSocketTimeout:
304
      raise http.HttpError("Timeout while sending request")
305
    except socket.error, err:
306
      raise http.HttpError("Error sending request: %s" % err)
307

    
308
  def _ReadResponse(self):
309
    """Read response from server.
310

311
    """
312
    response_msg = http.HttpMessage()
313

    
314
    try:
315
      response_msg_reader = \
316
        _HttpServerToClientMessageReader(self.sock, response_msg,
317
                                         self.READ_TIMEOUT)
318
    except http.HttpSocketTimeout:
319
      raise http.HttpError("Timeout while reading response")
320
    except socket.error, err:
321
      raise http.HttpError("Error reading response: %s" % err)
322

    
323
    return (response_msg_reader, response_msg)
324

    
325

    
326
class _HttpClientPendingRequest(object):
327
  """Data class for pending requests.
328

329
  """
330
  def __init__(self, request):
331
    self.request = request
332

    
333
    # Thread synchronization
334
    self.done = threading.Event()
335

    
336

    
337
class HttpClientWorker(workerpool.BaseWorker):
338
  """HTTP client worker class.
339

340
  """
341
  def RunTask(self, pend_req):
342
    try:
343
      HttpClientRequestExecutor(pend_req.request)
344
    finally:
345
      pend_req.done.set()
346

    
347

    
348
class HttpClientWorkerPool(workerpool.WorkerPool):
349
  def __init__(self, manager):
350
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
351
                                   HttpClientWorker)
352
    self.manager = manager
353

    
354

    
355
class HttpClientManager(object):
356
  """Manages HTTP requests.
357

358
  """
359
  def __init__(self):
360
    self._wpool = HttpClientWorkerPool(self)
361

    
362
  def __del__(self):
363
    self.Shutdown()
364

    
365
  def ExecRequests(self, requests):
366
    """Execute HTTP requests.
367

368
    This function can be called from multiple threads at the same time.
369

370
    @type requests: List of HttpClientRequest instances
371
    @param requests: The requests to execute
372
    @rtype: List of HttpClientRequest instances
373
    @returns: The list of requests passed in
374

375
    """
376
    # _HttpClientPendingRequest is used for internal thread synchronization
377
    pending = [_HttpClientPendingRequest(req) for req in requests]
378

    
379
    try:
380
      # Add requests to queue
381
      for pend_req in pending:
382
        self._wpool.AddTask(pend_req)
383

    
384
    finally:
385
      # In case of an exception we should still wait for the rest, otherwise
386
      # another thread from the worker pool could modify the request object
387
      # after we returned.
388

    
389
      # And wait for them to finish
390
      for pend_req in pending:
391
        pend_req.done.wait()
392

    
393
    # Return original list
394
    return requests
395

    
396
  def Shutdown(self):
397
    self._wpool.Quiesce()
398
    self._wpool.TerminateWorkers()