Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 02cab3e7

History | View | Annotate | Download (10.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import OpenSSL
29
import os
30
import select
31
import socket
32
import sys
33
import time
34
import signal
35
import errno
36
import threading
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import utils
42
from ganeti import http
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47

    
48
class HttpClientRequest(object):
49
  def __init__(self, host, port, method, path, headers=None, post_data=None,
50
               ssl_params=None, ssl_verify_peer=False):
51
    """Describes an HTTP request.
52

53
    @type host: string
54
    @param host: Hostname
55
    @type port: int
56
    @param port: Port
57
    @type method: string
58
    @param method: Method name
59
    @type path: string
60
    @param path: Request path
61
    @type headers: dict or None
62
    @param headers: Additional headers to send
63
    @type post_data: string or None
64
    @param post_data: Additional data to send
65
    @type ssl_params: HttpSslParams
66
    @param ssl_params: SSL key and certificate
67
    @type ssl_verify_peer: bool
68
    @param ssl_verify_peer: Whether to compare our certificate with server's
69
                            certificate
70

71
    """
72
    if post_data is not None:
73
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74
        "Only POST and GET requests support sending data"
75

    
76
    assert path.startswith("/"), "Path must start with slash (/)"
77

    
78
    # Request attributes
79
    self.host = host
80
    self.port = port
81
    self.ssl_params = ssl_params
82
    self.ssl_verify_peer = ssl_verify_peer
83
    self.method = method
84
    self.path = path
85
    self.headers = headers
86
    self.post_data = post_data
87

    
88
    self.success = None
89
    self.error = None
90

    
91
    # Raw response
92
    self.response = None
93

    
94
    # Response attributes
95
    self.resp_version = None
96
    self.resp_status_code = None
97
    self.resp_reason = None
98
    self.resp_headers = None
99
    self.resp_body = None
100

    
101

    
102
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103
  pass
104

    
105

    
106
class _HttpServerToClientMessageReader(http.HttpMessageReader):
107
  # Length limits
108
  START_LINE_LENGTH_MAX = 512
109
  HEADER_LENGTH_MAX = 4096
110

    
111
  def ParseStartLine(self, start_line):
112
    """Parses the status line sent by the server.
113

114
    """
115
    # Empty lines are skipped when reading
116
    assert start_line
117

    
118
    try:
119
      [version, status, reason] = start_line.split(None, 2)
120
    except ValueError:
121
      try:
122
        [version, status] = start_line.split(None, 1)
123
        reason = ""
124
      except ValueError:
125
        version = http.HTTP_0_9
126

    
127
    if version:
128
      version = version.upper()
129

    
130
    # The status code is a three-digit number
131
    try:
132
      status = int(status)
133
      if status < 100 or status > 999:
134
        status = -1
135
    except ValueError:
136
      status = -1
137

    
138
    if status == -1:
139
      raise http.HttpError("Invalid status code (%r)" % start_line)
140

    
141
    return http.HttpServerToClientStartLine(version, status, reason)
142

    
143

    
144
class HttpClientRequestExecutor(http.HttpSocketBase):
145
  # Default headers
146
  DEFAULT_HEADERS = {
147
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148
    # TODO: For keep-alive, don't send "Connection: close"
149
    http.HTTP_CONNECTION: "close",
150
    }
151

    
152
  # Timeouts in seconds for socket layer
153
  # TODO: Soft timeout instead of only socket timeout?
154
  # TODO: Make read timeout configurable per OpCode?
155
  CONNECT_TIMEOUT = 5
156
  WRITE_TIMEOUT = 10
157
  READ_TIMEOUT = None
158
  CLOSE_TIMEOUT = 1
159

    
160
  def __init__(self, req):
161
    """Initializes the HttpClientRequestExecutor class.
162

163
    @type req: HttpClientRequest
164
    @param req: Request object
165

166
    """
167
    http.HttpSocketBase.__init__(self)
168
    self.request = req
169

    
170
    self.poller = select.poll()
171

    
172
    try:
173
      # TODO: Implement connection caching/keep-alive
174
      self.sock = self._CreateSocket(req.ssl_params,
175
                                     req.ssl_verify_peer)
176

    
177
      # Disable Python's timeout
178
      self.sock.settimeout(None)
179

    
180
      # Operate in non-blocking mode
181
      self.sock.setblocking(0)
182

    
183
      response_msg_reader = None
184
      response_msg = None
185
      force_close = True
186

    
187
      self._Connect()
188
      try:
189
        self._SendRequest()
190
        (response_msg_reader, response_msg) = self._ReadResponse()
191

    
192
        # Only wait for server to close if we didn't have any exception.
193
        force_close = False
194
      finally:
195
        # TODO: Keep-alive is not supported, always close connection
196
        force_close = True
197
        http.ShutdownConnection(self.poller, self.sock,
198
                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
199
                                response_msg_reader, force_close)
200

    
201
      self.sock.close()
202
      self.sock = None
203

    
204
      req.response = response_msg
205

    
206
      req.resp_version = req.response.start_line.version
207
      req.resp_status_code = req.response.start_line.code
208
      req.resp_reason = req.response.start_line.reason
209
      req.resp_headers = req.response.headers
210
      req.resp_body = req.response.body
211

    
212
      req.success = True
213
      req.error = None
214

    
215
    except http.HttpError, err:
216
      req.success = False
217
      req.error = str(err)
218

    
219
  def _Connect(self):
220
    """Non-blocking connect to host with timeout.
221

222
    """
223
    connected = False
224
    while True:
225
      try:
226
        connect_error = self.sock.connect_ex((self.request.host,
227
                                              self.request.port))
228
      except socket.gaierror, err:
229
        raise http.HttpError("Connection failed: %s" % str(err))
230

    
231
      if connect_error == errno.EINTR:
232
        # Mask signals
233
        pass
234

    
235
      elif connect_error == 0:
236
        # Connection established
237
        connected = True
238
        break
239

    
240
      elif connect_error == errno.EINPROGRESS:
241
        # Connection started
242
        break
243

    
244
      raise http.HttpError("Connection failed (%s: %s)" %
245
                             (connect_error, os.strerror(connect_error)))
246

    
247
    if not connected:
248
      # Wait for connection
249
      event = http.WaitForSocketCondition(self.poller, self.sock,
250
                                          select.POLLOUT, self.CONNECT_TIMEOUT)
251
      if event is None:
252
        raise http.HttpError("Timeout while connecting to server")
253

    
254
      # Get error code
255
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
256
      if connect_error != 0:
257
        raise http.HttpError("Connection failed (%s: %s)" %
258
                               (connect_error, os.strerror(connect_error)))
259

    
260
    # Enable TCP keep-alive
261
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
262

    
263
    # If needed, Linux specific options are available to change the TCP
264
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
265
    # TCP_KEEPINTVL.
266

    
267
  def _SendRequest(self):
268
    """Sends request to server.
269

270
    """
271
    # Headers
272
    send_headers = self.DEFAULT_HEADERS.copy()
273

    
274
    if self.request.headers:
275
      send_headers.update(self.request.headers)
276

    
277
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
278

    
279
    # Response message
280
    msg = http.HttpMessage()
281

    
282
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
283
    # no keep-alive).
284
    # TODO: For keep-alive, change to HTTP/1.1
285
    msg.start_line = \
286
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
287
                                       path=self.request.path, version=http.HTTP_1_0)
288
    msg.headers = send_headers
289
    msg.body = self.request.post_data
290

    
291
    try:
292
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
293
    except http.HttpSocketTimeout:
294
      raise http.HttpError("Timeout while sending request")
295
    except socket.error, err:
296
      raise http.HttpError("Error sending request: %s" % err)
297

    
298
  def _ReadResponse(self):
299
    """Read response from server.
300

301
    """
302
    response_msg = http.HttpMessage()
303

    
304
    try:
305
      response_msg_reader = \
306
        _HttpServerToClientMessageReader(self.sock, response_msg,
307
                                         self.READ_TIMEOUT)
308
    except http.HttpSocketTimeout:
309
      raise http.HttpError("Timeout while reading response")
310
    except socket.error, err:
311
      raise http.HttpError("Error reading response: %s" % err)
312

    
313
    return (response_msg_reader, response_msg)
314

    
315

    
316
class _HttpClientPendingRequest(object):
317
  """Data class for pending requests.
318

319
  """
320
  def __init__(self, request):
321
    self.request = request
322

    
323
    # Thread synchronization
324
    self.done = threading.Event()
325

    
326

    
327
class HttpClientWorker(workerpool.BaseWorker):
328
  """HTTP client worker class.
329

330
  """
331
  def RunTask(self, pend_req):
332
    try:
333
      HttpClientRequestExecutor(pend_req.request)
334
    finally:
335
      pend_req.done.set()
336

    
337

    
338
class HttpClientWorkerPool(workerpool.WorkerPool):
339
  def __init__(self, manager):
340
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
341
                                   HttpClientWorker)
342
    self.manager = manager
343

    
344

    
345
class HttpClientManager(object):
346
  """Manages HTTP requests.
347

348
  """
349
  def __init__(self):
350
    self._wpool = HttpClientWorkerPool(self)
351

    
352
  def __del__(self):
353
    self.Shutdown()
354

    
355
  def ExecRequests(self, requests):
356
    """Execute HTTP requests.
357

358
    This function can be called from multiple threads at the same time.
359

360
    @type requests: List of HttpClientRequest instances
361
    @param requests: The requests to execute
362
    @rtype: List of HttpClientRequest instances
363
    @returns: The list of requests passed in
364

365
    """
366
    # _HttpClientPendingRequest is used for internal thread synchronization
367
    pending = [_HttpClientPendingRequest(req) for req in requests]
368

    
369
    try:
370
      # Add requests to queue
371
      for pend_req in pending:
372
        self._wpool.AddTask(pend_req)
373

    
374
    finally:
375
      # In case of an exception we should still wait for the rest, otherwise
376
      # another thread from the worker pool could modify the request object
377
      # after we returned.
378

    
379
      # And wait for them to finish
380
      for pend_req in pending:
381
        pend_req.done.wait()
382

    
383
    # Return original list
384
    return requests
385

    
386
  def Shutdown(self):
387
    self._wpool.Quiesce()
388
    self._wpool.TerminateWorkers()