Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 6c881c52

History | View | Annotate | Download (10.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
# pylint: disable-msg=E1103
26

    
27
# # E1103: %s %r has no %r member (but some types could not be
28
# inferred), since _socketobject could be ssl or not and pylint
29
# doesn't parse that
30

    
31

    
32
import os
33
import select
34
import socket
35
import errno
36
import threading
37

    
38
from ganeti import workerpool
39
from ganeti import http
40

    
41

    
42
HTTP_CLIENT_THREADS = 10
43

    
44

    
45
class HttpClientRequest(object):
46
  def __init__(self, host, port, method, path, headers=None, post_data=None,
47
               ssl_params=None, ssl_verify_peer=False):
48
    """Describes an HTTP request.
49

50
    @type host: string
51
    @param host: Hostname
52
    @type port: int
53
    @param port: Port
54
    @type method: string
55
    @param method: Method name
56
    @type path: string
57
    @param path: Request path
58
    @type headers: dict or None
59
    @param headers: Additional headers to send
60
    @type post_data: string or None
61
    @param post_data: Additional data to send
62
    @type ssl_params: HttpSslParams
63
    @param ssl_params: SSL key and certificate
64
    @type ssl_verify_peer: bool
65
    @param ssl_verify_peer: Whether to compare our certificate with
66
        server's certificate
67

68
    """
69
    if post_data is not None:
70
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
71
        "Only POST and GET requests support sending data"
72

    
73
    assert path.startswith("/"), "Path must start with slash (/)"
74

    
75
    # Request attributes
76
    self.host = host
77
    self.port = port
78
    self.ssl_params = ssl_params
79
    self.ssl_verify_peer = ssl_verify_peer
80
    self.method = method
81
    self.path = path
82
    self.headers = headers
83
    self.post_data = post_data
84

    
85
    self.success = None
86
    self.error = None
87

    
88
    # Raw response
89
    self.response = None
90

    
91
    # Response attributes
92
    self.resp_version = None
93
    self.resp_status_code = None
94
    self.resp_reason = None
95
    self.resp_headers = None
96
    self.resp_body = None
97

    
98

    
99
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
100
  pass
101

    
102

    
103
class _HttpServerToClientMessageReader(http.HttpMessageReader):
104
  # Length limits
105
  START_LINE_LENGTH_MAX = 512
106
  HEADER_LENGTH_MAX = 4096
107

    
108
  def ParseStartLine(self, start_line):
109
    """Parses the status line sent by the server.
110

111
    """
112
    # Empty lines are skipped when reading
113
    assert start_line
114

    
115
    try:
116
      [version, status, reason] = start_line.split(None, 2)
117
    except ValueError:
118
      try:
119
        [version, status] = start_line.split(None, 1)
120
        reason = ""
121
      except ValueError:
122
        version = http.HTTP_0_9
123

    
124
    if version:
125
      version = version.upper()
126

    
127
    # The status code is a three-digit number
128
    try:
129
      status = int(status)
130
      if status < 100 or status > 999:
131
        status = -1
132
    except ValueError:
133
      status = -1
134

    
135
    if status == -1:
136
      raise http.HttpError("Invalid status code (%r)" % start_line)
137

    
138
    return http.HttpServerToClientStartLine(version, status, reason)
139

    
140

    
141
class HttpClientRequestExecutor(http.HttpBase):
142
  # Default headers
143
  DEFAULT_HEADERS = {
144
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
145
    # TODO: For keep-alive, don't send "Connection: close"
146
    http.HTTP_CONNECTION: "close",
147
    }
148

    
149
  # Timeouts in seconds for socket layer
150
  # TODO: Soft timeout instead of only socket timeout?
151
  # TODO: Make read timeout configurable per OpCode?
152
  CONNECT_TIMEOUT = 5
153
  WRITE_TIMEOUT = 10
154
  READ_TIMEOUT = None
155
  CLOSE_TIMEOUT = 1
156

    
157
  def __init__(self, req):
158
    """Initializes the HttpClientRequestExecutor class.
159

160
    @type req: HttpClientRequest
161
    @param req: Request object
162

163
    """
164
    http.HttpBase.__init__(self)
165
    self.request = req
166

    
167
    try:
168
      # TODO: Implement connection caching/keep-alive
169
      self.sock = self._CreateSocket(req.ssl_params,
170
                                     req.ssl_verify_peer)
171

    
172
      # Disable Python's timeout
173
      self.sock.settimeout(None)
174

    
175
      # Operate in non-blocking mode
176
      self.sock.setblocking(0)
177

    
178
      response_msg_reader = None
179
      response_msg = None
180
      force_close = True
181

    
182
      self._Connect()
183
      try:
184
        self._SendRequest()
185
        (response_msg_reader, response_msg) = self._ReadResponse()
186

    
187
        # Only wait for server to close if we didn't have any exception.
188
        force_close = False
189
      finally:
190
        # TODO: Keep-alive is not supported, always close connection
191
        force_close = True
192
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
193
                                self.WRITE_TIMEOUT, response_msg_reader,
194
                                force_close)
195

    
196
      self.sock.close()
197
      self.sock = None
198

    
199
      req.response = response_msg
200

    
201
      req.resp_version = req.response.start_line.version
202
      req.resp_status_code = req.response.start_line.code
203
      req.resp_reason = req.response.start_line.reason
204
      req.resp_headers = req.response.headers
205
      req.resp_body = req.response.body
206

    
207
      req.success = True
208
      req.error = None
209

    
210
    except http.HttpError, err:
211
      req.success = False
212
      req.error = str(err)
213

    
214
  def _Connect(self):
215
    """Non-blocking connect to host with timeout.
216

217
    """
218
    connected = False
219
    while True:
220
      try:
221
        connect_error = self.sock.connect_ex((self.request.host,
222
                                              self.request.port))
223
      except socket.gaierror, err:
224
        raise http.HttpError("Connection failed: %s" % str(err))
225

    
226
      if connect_error == errno.EINTR:
227
        # Mask signals
228
        pass
229

    
230
      elif connect_error == 0:
231
        # Connection established
232
        connected = True
233
        break
234

    
235
      elif connect_error == errno.EINPROGRESS:
236
        # Connection started
237
        break
238

    
239
      raise http.HttpError("Connection failed (%s: %s)" %
240
                             (connect_error, os.strerror(connect_error)))
241

    
242
    if not connected:
243
      # Wait for connection
244
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
245
                                          self.CONNECT_TIMEOUT)
246
      if event is None:
247
        raise http.HttpError("Timeout while connecting to server")
248

    
249
      # Get error code
250
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
251
      if connect_error != 0:
252
        raise http.HttpError("Connection failed (%s: %s)" %
253
                               (connect_error, os.strerror(connect_error)))
254

    
255
    # Enable TCP keep-alive
256
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
257

    
258
    # If needed, Linux specific options are available to change the TCP
259
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
260
    # TCP_KEEPINTVL.
261

    
262
    # Do the secret SSL handshake
263
    if self.using_ssl:
264
      self.sock.set_connect_state()
265
      try:
266
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
267
      except http.HttpSessionHandshakeUnexpectedEOF:
268
        raise http.HttpError("Server closed connection during SSL handshake")
269

    
270
  def _SendRequest(self):
271
    """Sends request to server.
272

273
    """
274
    # Headers
275
    send_headers = self.DEFAULT_HEADERS.copy()
276

    
277
    if self.request.headers:
278
      send_headers.update(self.request.headers)
279

    
280
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
281
                                              self.request.port)
282

    
283
    # Response message
284
    msg = http.HttpMessage()
285

    
286
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
287
    # no keep-alive).
288
    # TODO: For keep-alive, change to HTTP/1.1
289
    msg.start_line = \
290
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
291
                                       path=self.request.path,
292
                                       version=http.HTTP_1_0)
293
    msg.headers = send_headers
294
    msg.body = self.request.post_data
295

    
296
    try:
297
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
298
    except http.HttpSocketTimeout:
299
      raise http.HttpError("Timeout while sending request")
300
    except socket.error, err:
301
      raise http.HttpError("Error sending request: %s" % err)
302

    
303
  def _ReadResponse(self):
304
    """Read response from server.
305

306
    """
307
    response_msg = http.HttpMessage()
308

    
309
    try:
310
      response_msg_reader = \
311
        _HttpServerToClientMessageReader(self.sock, response_msg,
312
                                         self.READ_TIMEOUT)
313
    except http.HttpSocketTimeout:
314
      raise http.HttpError("Timeout while reading response")
315
    except socket.error, err:
316
      raise http.HttpError("Error reading response: %s" % err)
317

    
318
    return (response_msg_reader, response_msg)
319

    
320

    
321
class _HttpClientPendingRequest(object):
322
  """Data class for pending requests.
323

324
  """
325
  def __init__(self, request):
326
    self.request = request
327

    
328
    # Thread synchronization
329
    self.done = threading.Event()
330

    
331

    
332
class HttpClientWorker(workerpool.BaseWorker):
333
  """HTTP client worker class.
334

335
  """
336
  def RunTask(self, pend_req):
337
    try:
338
      HttpClientRequestExecutor(pend_req.request)
339
    finally:
340
      pend_req.done.set()
341

    
342

    
343
class HttpClientWorkerPool(workerpool.WorkerPool):
344
  def __init__(self, manager):
345
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
346
                                   HttpClientWorker)
347
    self.manager = manager
348

    
349

    
350
class HttpClientManager(object):
351
  """Manages HTTP requests.
352

353
  """
354
  def __init__(self):
355
    self._wpool = HttpClientWorkerPool(self)
356

    
357
  def __del__(self):
358
    self.Shutdown()
359

    
360
  def ExecRequests(self, requests):
361
    """Execute HTTP requests.
362

363
    This function can be called from multiple threads at the same time.
364

365
    @type requests: List of HttpClientRequest instances
366
    @param requests: The requests to execute
367
    @rtype: List of HttpClientRequest instances
368
    @return: The list of requests passed in
369

370
    """
371
    # _HttpClientPendingRequest is used for internal thread synchronization
372
    pending = [_HttpClientPendingRequest(req) for req in requests]
373

    
374
    try:
375
      # Add requests to queue
376
      for pend_req in pending:
377
        self._wpool.AddTask(pend_req)
378

    
379
    finally:
380
      # In case of an exception we should still wait for the rest, otherwise
381
      # another thread from the worker pool could modify the request object
382
      # after we returned.
383

    
384
      # And wait for them to finish
385
      for pend_req in pending:
386
        pend_req.done.wait()
387

    
388
    # Return original list
389
    return requests
390

    
391
  def Shutdown(self):
392
    self._wpool.Quiesce()
393
    self._wpool.TerminateWorkers()