Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 9fa2e150

History | View | Annotate | Download (11.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
# pylint: disable-msg=E1103
26

    
27
# # E1103: %s %r has no %r member (but some types could not be
28
# inferred), since _socketobject could be ssl or not and pylint
29
# doesn't parse that
30

    
31

    
32
import os
33
import select
34
import socket
35
import errno
36
import threading
37

    
38
from ganeti import workerpool
39
from ganeti import http
40

    
41

    
42
HTTP_CLIENT_THREADS = 10
43

    
44

    
45
class HttpClientRequest(object):
46
  def __init__(self, host, port, method, path, headers=None, post_data=None,
47
               ssl_params=None, ssl_verify_peer=False):
48
    """Describes an HTTP request.
49

50
    @type host: string
51
    @param host: Hostname
52
    @type port: int
53
    @param port: Port
54
    @type method: string
55
    @param method: Method name
56
    @type path: string
57
    @param path: Request path
58
    @type headers: dict or None
59
    @param headers: Additional headers to send
60
    @type post_data: string or None
61
    @param post_data: Additional data to send
62
    @type ssl_params: HttpSslParams
63
    @param ssl_params: SSL key and certificate
64
    @type ssl_verify_peer: bool
65
    @param ssl_verify_peer: Whether to compare our certificate with
66
        server's certificate
67

68
    """
69
    if post_data is not None:
70
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
71
        "Only POST and GET requests support sending data"
72

    
73
    assert path.startswith("/"), "Path must start with slash (/)"
74

    
75
    # Request attributes
76
    self.host = host
77
    self.port = port
78
    self.ssl_params = ssl_params
79
    self.ssl_verify_peer = ssl_verify_peer
80
    self.method = method
81
    self.path = path
82
    self.headers = headers
83
    self.post_data = post_data
84

    
85
    self.success = None
86
    self.error = None
87

    
88
    # Raw response
89
    self.response = None
90

    
91
    # Response attributes
92
    self.resp_version = None
93
    self.resp_status_code = None
94
    self.resp_reason = None
95
    self.resp_headers = None
96
    self.resp_body = None
97

    
98
  def __repr__(self):
99
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
100
              "%s:%s" % (self.host, self.port),
101
              self.method,
102
              self.path]
103

    
104
    return "<%s at %#x>" % (" ".join(status), id(self))
105

    
106

    
107
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
108
  pass
109

    
110

    
111
class _HttpServerToClientMessageReader(http.HttpMessageReader):
112
  # Length limits
113
  START_LINE_LENGTH_MAX = 512
114
  HEADER_LENGTH_MAX = 4096
115

    
116
  def ParseStartLine(self, start_line):
117
    """Parses the status line sent by the server.
118

119
    """
120
    # Empty lines are skipped when reading
121
    assert start_line
122

    
123
    try:
124
      [version, status, reason] = start_line.split(None, 2)
125
    except ValueError:
126
      try:
127
        [version, status] = start_line.split(None, 1)
128
        reason = ""
129
      except ValueError:
130
        version = http.HTTP_0_9
131

    
132
    if version:
133
      version = version.upper()
134

    
135
    # The status code is a three-digit number
136
    try:
137
      status = int(status)
138
      if status < 100 or status > 999:
139
        status = -1
140
    except ValueError:
141
      status = -1
142

    
143
    if status == -1:
144
      raise http.HttpError("Invalid status code (%r)" % start_line)
145

    
146
    return http.HttpServerToClientStartLine(version, status, reason)
147

    
148

    
149
class HttpClientRequestExecutor(http.HttpBase):
150
  # Default headers
151
  DEFAULT_HEADERS = {
152
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
153
    # TODO: For keep-alive, don't send "Connection: close"
154
    http.HTTP_CONNECTION: "close",
155
    }
156

    
157
  # Timeouts in seconds for socket layer
158
  # TODO: Soft timeout instead of only socket timeout?
159
  # TODO: Make read timeout configurable per OpCode?
160
  CONNECT_TIMEOUT = 5
161
  WRITE_TIMEOUT = 10
162
  READ_TIMEOUT = None
163
  CLOSE_TIMEOUT = 1
164

    
165
  def __init__(self, req):
166
    """Initializes the HttpClientRequestExecutor class.
167

168
    @type req: HttpClientRequest
169
    @param req: Request object
170

171
    """
172
    http.HttpBase.__init__(self)
173
    self.request = req
174

    
175
    try:
176
      # TODO: Implement connection caching/keep-alive
177
      self.sock = self._CreateSocket(req.ssl_params,
178
                                     req.ssl_verify_peer)
179

    
180
      # Disable Python's timeout
181
      self.sock.settimeout(None)
182

    
183
      # Operate in non-blocking mode
184
      self.sock.setblocking(0)
185

    
186
      response_msg_reader = None
187
      response_msg = None
188
      force_close = True
189

    
190
      self._Connect()
191
      try:
192
        self._SendRequest()
193
        (response_msg_reader, response_msg) = self._ReadResponse()
194

    
195
        # Only wait for server to close if we didn't have any exception.
196
        force_close = False
197
      finally:
198
        # TODO: Keep-alive is not supported, always close connection
199
        force_close = True
200
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
201
                                self.WRITE_TIMEOUT, response_msg_reader,
202
                                force_close)
203

    
204
      self.sock.close()
205
      self.sock = None
206

    
207
      req.response = response_msg
208

    
209
      req.resp_version = req.response.start_line.version
210
      req.resp_status_code = req.response.start_line.code
211
      req.resp_reason = req.response.start_line.reason
212
      req.resp_headers = req.response.headers
213
      req.resp_body = req.response.body
214

    
215
      req.success = True
216
      req.error = None
217

    
218
    except http.HttpError, err:
219
      req.success = False
220
      req.error = str(err)
221

    
222
  def _Connect(self):
223
    """Non-blocking connect to host with timeout.
224

225
    """
226
    connected = False
227
    while True:
228
      try:
229
        connect_error = self.sock.connect_ex((self.request.host,
230
                                              self.request.port))
231
      except socket.gaierror, err:
232
        raise http.HttpError("Connection failed: %s" % str(err))
233

    
234
      if connect_error == errno.EINTR:
235
        # Mask signals
236
        pass
237

    
238
      elif connect_error == 0:
239
        # Connection established
240
        connected = True
241
        break
242

    
243
      elif connect_error == errno.EINPROGRESS:
244
        # Connection started
245
        break
246

    
247
      raise http.HttpError("Connection failed (%s: %s)" %
248
                             (connect_error, os.strerror(connect_error)))
249

    
250
    if not connected:
251
      # Wait for connection
252
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
253
                                          self.CONNECT_TIMEOUT)
254
      if event is None:
255
        raise http.HttpError("Timeout while connecting to server")
256

    
257
      # Get error code
258
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
259
      if connect_error != 0:
260
        raise http.HttpError("Connection failed (%s: %s)" %
261
                               (connect_error, os.strerror(connect_error)))
262

    
263
    # Enable TCP keep-alive
264
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
265

    
266
    # If needed, Linux specific options are available to change the TCP
267
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
268
    # TCP_KEEPINTVL.
269

    
270
    # Do the secret SSL handshake
271
    if self.using_ssl:
272
      self.sock.set_connect_state() # pylint: disable-msg=E1103
273
      try:
274
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
275
      except http.HttpSessionHandshakeUnexpectedEOF:
276
        raise http.HttpError("Server closed connection during SSL handshake")
277

    
278
  def _SendRequest(self):
279
    """Sends request to server.
280

281
    """
282
    # Headers
283
    send_headers = self.DEFAULT_HEADERS.copy()
284

    
285
    if self.request.headers:
286
      send_headers.update(self.request.headers)
287

    
288
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
289
                                              self.request.port)
290

    
291
    # Response message
292
    msg = http.HttpMessage()
293

    
294
    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
295
    # no keep-alive).
296
    # TODO: For keep-alive, change to HTTP/1.1
297
    msg.start_line = \
298
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
299
                                       path=self.request.path,
300
                                       version=http.HTTP_1_0)
301
    msg.headers = send_headers
302
    msg.body = self.request.post_data
303

    
304
    try:
305
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
306
    except http.HttpSocketTimeout:
307
      raise http.HttpError("Timeout while sending request")
308
    except socket.error, err:
309
      raise http.HttpError("Error sending request: %s" % err)
310

    
311
  def _ReadResponse(self):
312
    """Read response from server.
313

314
    """
315
    response_msg = http.HttpMessage()
316

    
317
    try:
318
      response_msg_reader = \
319
        _HttpServerToClientMessageReader(self.sock, response_msg,
320
                                         self.READ_TIMEOUT)
321
    except http.HttpSocketTimeout:
322
      raise http.HttpError("Timeout while reading response")
323
    except socket.error, err:
324
      raise http.HttpError("Error reading response: %s" % err)
325

    
326
    return (response_msg_reader, response_msg)
327

    
328

    
329
class _HttpClientPendingRequest(object):
330
  """Data class for pending requests.
331

332
  """
333
  def __init__(self, request):
334
    self.request = request
335

    
336
    # Thread synchronization
337
    self.done = threading.Event()
338

    
339
  def __repr__(self):
340
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
341
              "req=%r" % self.request]
342

    
343
    return "<%s at %#x>" % (" ".join(status), id(self))
344

    
345

    
346
class HttpClientWorker(workerpool.BaseWorker):
347
  """HTTP client worker class.
348

349
  """
350
  def RunTask(self, pend_req): # pylint: disable-msg=W0221
351
    try:
352
      HttpClientRequestExecutor(pend_req.request)
353
    finally:
354
      pend_req.done.set()
355

    
356

    
357
class HttpClientWorkerPool(workerpool.WorkerPool):
358
  def __init__(self, manager):
359
    workerpool.WorkerPool.__init__(self, "HttpClient",
360
                                   HTTP_CLIENT_THREADS,
361
                                   HttpClientWorker)
362
    self.manager = manager
363

    
364

    
365
class HttpClientManager(object):
366
  """Manages HTTP requests.
367

368
  """
369
  def __init__(self):
370
    self._wpool = HttpClientWorkerPool(self)
371

    
372
  def __del__(self):
373
    self.Shutdown()
374

    
375
  def ExecRequests(self, requests):
376
    """Execute HTTP requests.
377

378
    This function can be called from multiple threads at the same time.
379

380
    @type requests: List of HttpClientRequest instances
381
    @param requests: The requests to execute
382
    @rtype: List of HttpClientRequest instances
383
    @return: The list of requests passed in
384

385
    """
386
    # _HttpClientPendingRequest is used for internal thread synchronization
387
    pending = [_HttpClientPendingRequest(req) for req in requests]
388

    
389
    try:
390
      # Add requests to queue
391
      for pend_req in pending:
392
        self._wpool.AddTask(pend_req)
393

    
394
    finally:
395
      # In case of an exception we should still wait for the rest, otherwise
396
      # another thread from the worker pool could modify the request object
397
      # after we returned.
398

    
399
      # And wait for them to finish
400
      for pend_req in pending:
401
        pend_req.done.wait()
402

    
403
    # Return original list
404
    return requests
405

    
406
  def Shutdown(self):
407
    self._wpool.Quiesce()
408
    self._wpool.TerminateWorkers()