Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 5a9c3f46

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import OpenSSL
29
import os
30
import select
31
import socket
32
import sys
33
import time
34
import signal
35
import errno
36
import threading
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import utils
42
from ganeti import http
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47

    
48
class HttpClientRequest(object):
49
  def __init__(self, host, port, method, path, headers=None, post_data=None,
50
               ssl_params=None, ssl_verify_peer=False):
51
    """Describes an HTTP request.
52

53
    @type host: string
54
    @param host: Hostname
55
    @type port: int
56
    @param port: Port
57
    @type method: string
58
    @param method: Method name
59
    @type path: string
60
    @param path: Request path
61
    @type headers: dict or None
62
    @param headers: Additional headers to send
63
    @type post_data: string or None
64
    @param post_data: Additional data to send
65
    @type ssl_params: HttpSslParams
66
    @param ssl_params: SSL key and certificate
67
    @type ssl_verify_peer: bool
68
    @param ssl_verify_peer: Whether to compare our certificate with server's
69
                            certificate
70

71
    """
72
    if post_data is not None:
73
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74
        "Only POST and GET requests support sending data"
75

    
76
    assert path.startswith("/"), "Path must start with slash (/)"
77

    
78
    # Request attributes
79
    self.host = host
80
    self.port = port
81
    self.ssl_params = ssl_params
82
    self.ssl_verify_peer = ssl_verify_peer
83
    self.method = method
84
    self.path = path
85
    self.headers = headers
86
    self.post_data = post_data
87

    
88
    self.success = None
89
    self.error = None
90

    
91
    # Raw response
92
    self.response = None
93

    
94
    # Response attributes
95
    self.resp_version = None
96
    self.resp_status_code = None
97
    self.resp_reason = None
98
    self.resp_headers = None
99
    self.resp_body = None
100

    
101

    
102
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103
  pass
104

    
105

    
106
class _HttpServerToClientMessageReader(http.HttpMessageReader):
107
  # Length limits
108
  START_LINE_LENGTH_MAX = 512
109
  HEADER_LENGTH_MAX = 4096
110

    
111
  def ParseStartLine(self, start_line):
112
    """Parses the status line sent by the server.
113

114
    """
115
    # Empty lines are skipped when reading
116
    assert start_line
117

    
118
    try:
119
      [version, status, reason] = start_line.split(None, 2)
120
    except ValueError:
121
      try:
122
        [version, status] = start_line.split(None, 1)
123
        reason = ""
124
      except ValueError:
125
        version = http.HTTP_0_9
126

    
127
    if version:
128
      version = version.upper()
129

    
130
    # The status code is a three-digit number
131
    try:
132
      status = int(status)
133
      if status < 100 or status > 999:
134
        status = -1
135
    except ValueError:
136
      status = -1
137

    
138
    if status == -1:
139
      raise http.HttpError("Invalid status code (%r)" % start_line)
140

    
141
    return http.HttpServerToClientStartLine(version, status, reason)
142

    
143

    
144
class HttpClientRequestExecutor(http.HttpBase):
145
  # Default headers
146
  DEFAULT_HEADERS = {
147
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148
    # TODO: For keep-alive, don't send "Connection: close"
149
    http.HTTP_CONNECTION: "close",
150
    }
151

    
152
  # Timeouts in seconds for socket layer
153
  # TODO: Soft timeout instead of only socket timeout?
154
  # TODO: Make read timeout configurable per OpCode?
155
  CONNECT_TIMEOUT = 5
156
  WRITE_TIMEOUT = 10
157
  READ_TIMEOUT = None
158
  CLOSE_TIMEOUT = 1
159

    
160
  def __init__(self, req):
161
    """Initializes the HttpClientRequestExecutor class.
162

163
    @type req: HttpClientRequest
164
    @param req: Request object
165

166
    """
167
    http.HttpBase.__init__(self)
168
    self.request = req
169

    
170
    self.poller = select.poll()
171

    
172
    try:
173
      # TODO: Implement connection caching/keep-alive
174
      self.sock = self._CreateSocket(req.ssl_params,
175
                                     req.ssl_verify_peer)
176

    
177
      # Disable Python's timeout
178
      self.sock.settimeout(None)
179

    
180
      # Operate in non-blocking mode
181
      self.sock.setblocking(0)
182

    
183
      response_msg_reader = None
184
      response_msg = None
185
      force_close = True
186

    
187
      self._Connect()
188
      try:
189
        self._SendRequest()
190
        (response_msg_reader, response_msg) = self._ReadResponse()
191

    
192
        # Only wait for server to close if we didn't have any exception.
193
        force_close = False
194
      finally:
195
        # TODO: Keep-alive is not supported, always close connection
196
        force_close = True
197
        http.ShutdownConnection(self.poller, self.sock,
198
                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
199
                                response_msg_reader, force_close)
200

    
201
      self.sock.close()
202
      self.sock = None
203

    
204
      req.response = response_msg
205

    
206
      req.resp_version = req.response.start_line.version
207
      req.resp_status_code = req.response.start_line.code
208
      req.resp_reason = req.response.start_line.reason
209
      req.resp_headers = req.response.headers
210
      req.resp_body = req.response.body
211

    
212
      req.success = True
213
      req.error = None
214

    
215
    except http.HttpError, err:
216
      req.success = False
217
      req.error = str(err)
218

    
219
  def _Connect(self):
220
    """Non-blocking connect to host with timeout.
221

222
    """
223
    connected = False
224
    while True:
225
      try:
226
        connect_error = self.sock.connect_ex((self.request.host,
227
                                              self.request.port))
228
      except socket.gaierror, err:
229
        raise http.HttpError("Connection failed: %s" % str(err))
230

    
231
      if connect_error == errno.EINTR:
232
        # Mask signals
233
        pass
234

    
235
      elif connect_error == 0:
236
        # Connection established
237
        connected = True
238
        break
239

    
240
      elif connect_error == errno.EINPROGRESS:
241
        # Connection started
242
        break
243

    
244
      raise http.HttpError("Connection failed (%s: %s)" %
245
                             (connect_error, os.strerror(connect_error)))
246

    
247
    if not connected:
248
      # Wait for connection
249
      event = http.WaitForSocketCondition(self.poller, self.sock,
250
                                          select.POLLOUT, self.CONNECT_TIMEOUT)
251
      if event is None:
252
        raise http.HttpError("Timeout while connecting to server")
253

    
254
      # Get error code
255
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
256
      if connect_error != 0:
257
        raise http.HttpError("Connection failed (%s: %s)" %
258
                               (connect_error, os.strerror(connect_error)))
259

    
260
    # Enable TCP keep-alive
261
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
262

    
263
    # If needed, Linux specific options are available to change the TCP
264
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
265
    # TCP_KEEPINTVL.
266

    
267
  def _SendRequest(self):
268
    """Sends request to server.
269

270
    """
271
    # Headers
272
    send_headers = self.DEFAULT_HEADERS.copy()
273

    
274
    if self.request.headers:
275
      send_headers.update(self.request.headers)
276

    
277
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
278
                                              self.request.port)
279

    
280
    # Response message
281
    msg = http.HttpMessage()
282

    
283
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
284
    # no keep-alive).
285
    # TODO: For keep-alive, change to HTTP/1.1
286
    msg.start_line = \
287
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
288
                                       path=self.request.path,
289
                                       version=http.HTTP_1_0)
290
    msg.headers = send_headers
291
    msg.body = self.request.post_data
292

    
293
    try:
294
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
295
    except http.HttpSocketTimeout:
296
      raise http.HttpError("Timeout while sending request")
297
    except socket.error, err:
298
      raise http.HttpError("Error sending request: %s" % err)
299

    
300
  def _ReadResponse(self):
301
    """Read response from server.
302

303
    """
304
    response_msg = http.HttpMessage()
305

    
306
    try:
307
      response_msg_reader = \
308
        _HttpServerToClientMessageReader(self.sock, response_msg,
309
                                         self.READ_TIMEOUT)
310
    except http.HttpSocketTimeout:
311
      raise http.HttpError("Timeout while reading response")
312
    except socket.error, err:
313
      raise http.HttpError("Error reading response: %s" % err)
314

    
315
    return (response_msg_reader, response_msg)
316

    
317

    
318
class _HttpClientPendingRequest(object):
319
  """Data class for pending requests.
320

321
  """
322
  def __init__(self, request):
323
    self.request = request
324

    
325
    # Thread synchronization
326
    self.done = threading.Event()
327

    
328

    
329
class HttpClientWorker(workerpool.BaseWorker):
330
  """HTTP client worker class.
331

332
  """
333
  def RunTask(self, pend_req):
334
    try:
335
      HttpClientRequestExecutor(pend_req.request)
336
    finally:
337
      pend_req.done.set()
338

    
339

    
340
class HttpClientWorkerPool(workerpool.WorkerPool):
341
  def __init__(self, manager):
342
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
343
                                   HttpClientWorker)
344
    self.manager = manager
345

    
346

    
347
class HttpClientManager(object):
348
  """Manages HTTP requests.
349

350
  """
351
  def __init__(self):
352
    self._wpool = HttpClientWorkerPool(self)
353

    
354
  def __del__(self):
355
    self.Shutdown()
356

    
357
  def ExecRequests(self, requests):
358
    """Execute HTTP requests.
359

360
    This function can be called from multiple threads at the same time.
361

362
    @type requests: List of HttpClientRequest instances
363
    @param requests: The requests to execute
364
    @rtype: List of HttpClientRequest instances
365
    @returns: The list of requests passed in
366

367
    """
368
    # _HttpClientPendingRequest is used for internal thread synchronization
369
    pending = [_HttpClientPendingRequest(req) for req in requests]
370

    
371
    try:
372
      # Add requests to queue
373
      for pend_req in pending:
374
        self._wpool.AddTask(pend_req)
375

    
376
    finally:
377
      # In case of an exception we should still wait for the rest, otherwise
378
      # another thread from the worker pool could modify the request object
379
      # after we returned.
380

    
381
      # And wait for them to finish
382
      for pend_req in pending:
383
        pend_req.done.wait()
384

    
385
    # Return original list
386
    return requests
387

    
388
  def Shutdown(self):
389
    self._wpool.Quiesce()
390
    self._wpool.TerminateWorkers()