Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 81b59aaf

History | View | Annotate | Download (10.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import OpenSSL
29
import os
30
import select
31
import socket
32
import sys
33
import time
34
import signal
35
import errno
36
import threading
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import utils
42
from ganeti import http
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47

    
48
class HttpClientRequest(object):
49
  def __init__(self, host, port, method, path, headers=None, post_data=None,
50
               ssl_params=None, ssl_verify_peer=False):
51
    """Describes an HTTP request.
52

53
    @type host: string
54
    @param host: Hostname
55
    @type port: int
56
    @param port: Port
57
    @type method: string
58
    @param method: Method name
59
    @type path: string
60
    @param path: Request path
61
    @type headers: dict or None
62
    @param headers: Additional headers to send
63
    @type post_data: string or None
64
    @param post_data: Additional data to send
65
    @type ssl_params: HttpSslParams
66
    @param ssl_params: SSL key and certificate
67
    @type ssl_verify_peer: bool
68
    @param ssl_verify_peer: Whether to compare our certificate with
69
        server's certificate
70

71
    """
72
    if post_data is not None:
73
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74
        "Only POST and GET requests support sending data"
75

    
76
    assert path.startswith("/"), "Path must start with slash (/)"
77

    
78
    # Request attributes
79
    self.host = host
80
    self.port = port
81
    self.ssl_params = ssl_params
82
    self.ssl_verify_peer = ssl_verify_peer
83
    self.method = method
84
    self.path = path
85
    self.headers = headers
86
    self.post_data = post_data
87

    
88
    self.success = None
89
    self.error = None
90

    
91
    # Raw response
92
    self.response = None
93

    
94
    # Response attributes
95
    self.resp_version = None
96
    self.resp_status_code = None
97
    self.resp_reason = None
98
    self.resp_headers = None
99
    self.resp_body = None
100

    
101

    
102
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103
  pass
104

    
105

    
106
class _HttpServerToClientMessageReader(http.HttpMessageReader):
107
  # Length limits
108
  START_LINE_LENGTH_MAX = 512
109
  HEADER_LENGTH_MAX = 4096
110

    
111
  def ParseStartLine(self, start_line):
112
    """Parses the status line sent by the server.
113

114
    """
115
    # Empty lines are skipped when reading
116
    assert start_line
117

    
118
    try:
119
      [version, status, reason] = start_line.split(None, 2)
120
    except ValueError:
121
      try:
122
        [version, status] = start_line.split(None, 1)
123
        reason = ""
124
      except ValueError:
125
        version = http.HTTP_0_9
126

    
127
    if version:
128
      version = version.upper()
129

    
130
    # The status code is a three-digit number
131
    try:
132
      status = int(status)
133
      if status < 100 or status > 999:
134
        status = -1
135
    except ValueError:
136
      status = -1
137

    
138
    if status == -1:
139
      raise http.HttpError("Invalid status code (%r)" % start_line)
140

    
141
    return http.HttpServerToClientStartLine(version, status, reason)
142

    
143

    
144
class HttpClientRequestExecutor(http.HttpBase):
145
  # Default headers
146
  DEFAULT_HEADERS = {
147
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148
    # TODO: For keep-alive, don't send "Connection: close"
149
    http.HTTP_CONNECTION: "close",
150
    }
151

    
152
  # Timeouts in seconds for socket layer
153
  # TODO: Soft timeout instead of only socket timeout?
154
  # TODO: Make read timeout configurable per OpCode?
155
  CONNECT_TIMEOUT = 5
156
  WRITE_TIMEOUT = 10
157
  READ_TIMEOUT = None
158
  CLOSE_TIMEOUT = 1
159

    
160
  def __init__(self, req):
161
    """Initializes the HttpClientRequestExecutor class.
162

163
    @type req: HttpClientRequest
164
    @param req: Request object
165

166
    """
167
    http.HttpBase.__init__(self)
168
    self.request = req
169

    
170
    try:
171
      # TODO: Implement connection caching/keep-alive
172
      self.sock = self._CreateSocket(req.ssl_params,
173
                                     req.ssl_verify_peer)
174

    
175
      # Disable Python's timeout
176
      self.sock.settimeout(None)
177

    
178
      # Operate in non-blocking mode
179
      self.sock.setblocking(0)
180

    
181
      response_msg_reader = None
182
      response_msg = None
183
      force_close = True
184

    
185
      self._Connect()
186
      try:
187
        self._SendRequest()
188
        (response_msg_reader, response_msg) = self._ReadResponse()
189

    
190
        # Only wait for server to close if we didn't have any exception.
191
        force_close = False
192
      finally:
193
        # TODO: Keep-alive is not supported, always close connection
194
        force_close = True
195
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
196
                                self.WRITE_TIMEOUT, response_msg_reader,
197
                                force_close)
198

    
199
      self.sock.close()
200
      self.sock = None
201

    
202
      req.response = response_msg
203

    
204
      req.resp_version = req.response.start_line.version
205
      req.resp_status_code = req.response.start_line.code
206
      req.resp_reason = req.response.start_line.reason
207
      req.resp_headers = req.response.headers
208
      req.resp_body = req.response.body
209

    
210
      req.success = True
211
      req.error = None
212

    
213
    except http.HttpError, err:
214
      req.success = False
215
      req.error = str(err)
216

    
217
  def _Connect(self):
218
    """Non-blocking connect to host with timeout.
219

220
    """
221
    connected = False
222
    while True:
223
      try:
224
        connect_error = self.sock.connect_ex((self.request.host,
225
                                              self.request.port))
226
      except socket.gaierror, err:
227
        raise http.HttpError("Connection failed: %s" % str(err))
228

    
229
      if connect_error == errno.EINTR:
230
        # Mask signals
231
        pass
232

    
233
      elif connect_error == 0:
234
        # Connection established
235
        connected = True
236
        break
237

    
238
      elif connect_error == errno.EINPROGRESS:
239
        # Connection started
240
        break
241

    
242
      raise http.HttpError("Connection failed (%s: %s)" %
243
                             (connect_error, os.strerror(connect_error)))
244

    
245
    if not connected:
246
      # Wait for connection
247
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
248
                                          self.CONNECT_TIMEOUT)
249
      if event is None:
250
        raise http.HttpError("Timeout while connecting to server")
251

    
252
      # Get error code
253
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
254
      if connect_error != 0:
255
        raise http.HttpError("Connection failed (%s: %s)" %
256
                               (connect_error, os.strerror(connect_error)))
257

    
258
    # Enable TCP keep-alive
259
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
260

    
261
    # If needed, Linux specific options are available to change the TCP
262
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
263
    # TCP_KEEPINTVL.
264

    
265
    # Do the secret SSL handshake
266
    if self.using_ssl:
267
      self.sock.set_connect_state()
268
      try:
269
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
270
      except http.HttpSessionHandshakeUnexpectedEOF:
271
        raise http.HttpError("Server closed connection during SSL handshake")
272

    
273
  def _SendRequest(self):
274
    """Sends request to server.
275

276
    """
277
    # Headers
278
    send_headers = self.DEFAULT_HEADERS.copy()
279

    
280
    if self.request.headers:
281
      send_headers.update(self.request.headers)
282

    
283
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
284
                                              self.request.port)
285

    
286
    # Response message
287
    msg = http.HttpMessage()
288

    
289
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
290
    # no keep-alive).
291
    # TODO: For keep-alive, change to HTTP/1.1
292
    msg.start_line = \
293
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
294
                                       path=self.request.path,
295
                                       version=http.HTTP_1_0)
296
    msg.headers = send_headers
297
    msg.body = self.request.post_data
298

    
299
    try:
300
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
301
    except http.HttpSocketTimeout:
302
      raise http.HttpError("Timeout while sending request")
303
    except socket.error, err:
304
      raise http.HttpError("Error sending request: %s" % err)
305

    
306
  def _ReadResponse(self):
307
    """Read response from server.
308

309
    """
310
    response_msg = http.HttpMessage()
311

    
312
    try:
313
      response_msg_reader = \
314
        _HttpServerToClientMessageReader(self.sock, response_msg,
315
                                         self.READ_TIMEOUT)
316
    except http.HttpSocketTimeout:
317
      raise http.HttpError("Timeout while reading response")
318
    except socket.error, err:
319
      raise http.HttpError("Error reading response: %s" % err)
320

    
321
    return (response_msg_reader, response_msg)
322

    
323

    
324
class _HttpClientPendingRequest(object):
325
  """Data class for pending requests.
326

327
  """
328
  def __init__(self, request):
329
    self.request = request
330

    
331
    # Thread synchronization
332
    self.done = threading.Event()
333

    
334

    
335
class HttpClientWorker(workerpool.BaseWorker):
336
  """HTTP client worker class.
337

338
  """
339
  def RunTask(self, pend_req):
340
    try:
341
      HttpClientRequestExecutor(pend_req.request)
342
    finally:
343
      pend_req.done.set()
344

    
345

    
346
class HttpClientWorkerPool(workerpool.WorkerPool):
347
  def __init__(self, manager):
348
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
349
                                   HttpClientWorker)
350
    self.manager = manager
351

    
352

    
353
class HttpClientManager(object):
354
  """Manages HTTP requests.
355

356
  """
357
  def __init__(self):
358
    self._wpool = HttpClientWorkerPool(self)
359

    
360
  def __del__(self):
361
    self.Shutdown()
362

    
363
  def ExecRequests(self, requests):
364
    """Execute HTTP requests.
365

366
    This function can be called from multiple threads at the same time.
367

368
    @type requests: List of HttpClientRequest instances
369
    @param requests: The requests to execute
370
    @rtype: List of HttpClientRequest instances
371
    @return: The list of requests passed in
372

373
    """
374
    # _HttpClientPendingRequest is used for internal thread synchronization
375
    pending = [_HttpClientPendingRequest(req) for req in requests]
376

    
377
    try:
378
      # Add requests to queue
379
      for pend_req in pending:
380
        self._wpool.AddTask(pend_req)
381

    
382
    finally:
383
      # In case of an exception we should still wait for the rest, otherwise
384
      # another thread from the worker pool could modify the request object
385
      # after we returned.
386

    
387
      # And wait for them to finish
388
      for pend_req in pending:
389
        pend_req.done.wait()
390

    
391
    # Return original list
392
    return requests
393

    
394
  def Shutdown(self):
395
    self._wpool.Quiesce()
396
    self._wpool.TerminateWorkers()