Revision ff9efc03

b/Makefile.am
15 15
REPLACE_VARS_SED = autotools/replace_vars.sed
16 16

  
17 17
hypervisordir = $(pkgpythondir)/hypervisor
18
httpdir = $(pkgpythondir)/http
18 19
rapidir = $(pkgpythondir)/rapi
19 20
toolsdir = $(pkglibdir)/tools
20 21
docdir = $(datadir)/doc/$(PACKAGE)
......
26 27
	doc \
27 28
	doc/examples \
28 29
	lib \
30
	lib/http \
29 31
	lib/hypervisor \
30 32
	lib/rapi \
31 33
	man \
......
46 48
	doc/examples/ganeti.initd \
47 49
	doc/examples/ganeti.cron \
48 50
	lib/*.py[co] \
51
	lib/http/*.py[co] \
49 52
	lib/hypervisor/*.py[co] \
50 53
	lib/rapi/*.py[co] \
51 54
	man/*.[78] \
......
69 72
	lib/constants.py \
70 73
	lib/daemon.py \
71 74
	lib/errors.py \
72
	lib/http.py \
73 75
	lib/jqueue.py \
74 76
	lib/jstore.py \
75 77
	lib/locking.py \
......
98 100
	lib/rapi/rlib1.py \
99 101
	lib/rapi/rlib2.py
100 102

  
103
http_PYTHON = \
104
	lib/http/__init__.py
105

  
101 106

  
102 107
docsgml = \
103 108
	doc/hooks.sgml \
......
301 306
#.PHONY: srclinks
302 307
srclinks: stamp-directories
303 308
	set -e; \
304
	for i in man/footer.sgml $(pkgpython_PYTHON) $(hypervisor_PYTHON) $(rapi_PYTHON); do \
309
	for i in man/footer.sgml $(pkgpython_PYTHON) $(hypervisor_PYTHON) \
310
			$(rapi_PYTHON) $(http_PYTHON); do \
305 311
		if test ! -f $$i -a -f $(abs_top_srcdir)/$$i; then \
306 312
			$(LN_S) $(abs_top_srcdir)/$$i $$i; \
307 313
		fi; \
/dev/null
1
#
2
#
3

  
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
"""HTTP server module.
22

  
23
"""
24

  
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import mimetools
29
import OpenSSL
30
import os
31
import select
32
import socket
33
import sys
34
import time
35
import signal
36
import logging
37
import errno
38
import threading
39

  
40
from cStringIO import StringIO
41

  
42
from ganeti import constants
43
from ganeti import serializer
44
from ganeti import workerpool
45
from ganeti import utils
46

  
47

  
48
HTTP_CLIENT_THREADS = 10
49

  
50
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
51

  
52
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
53
MONTHNAME = [None,
54
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
55
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
56

  
57
# Default error message
58
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
59
DEFAULT_ERROR_MESSAGE = """\
60
<head>
61
<title>Error response</title>
62
</head>
63
<body>
64
<h1>Error response</h1>
65
<p>Error code %(code)d.
66
<p>Message: %(message)s.
67
<p>Error code explanation: %(code)s = %(explain)s.
68
</body>
69
"""
70

  
71
HTTP_OK = 200
72
HTTP_NO_CONTENT = 204
73
HTTP_NOT_MODIFIED = 304
74

  
75
HTTP_0_9 = "HTTP/0.9"
76
HTTP_1_0 = "HTTP/1.0"
77
HTTP_1_1 = "HTTP/1.1"
78

  
79
HTTP_GET = "GET"
80
HTTP_HEAD = "HEAD"
81
HTTP_POST = "POST"
82
HTTP_PUT = "PUT"
83

  
84
HTTP_ETAG = "ETag"
85
HTTP_HOST = "Host"
86
HTTP_SERVER = "Server"
87
HTTP_DATE = "Date"
88
HTTP_USER_AGENT = "User-Agent"
89
HTTP_CONTENT_TYPE = "Content-Type"
90
HTTP_CONTENT_LENGTH = "Content-Length"
91
HTTP_CONNECTION = "Connection"
92
HTTP_KEEP_ALIVE = "Keep-Alive"
93

  
94
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
95

  
96
# Socket operations
97
(SOCKOP_SEND,
98
 SOCKOP_RECV,
99
 SOCKOP_SHUTDOWN) = range(3)
100

  
101

  
102
class SocketClosed(socket.error):
103
  pass
104

  
105

  
106
class _HttpClientError(Exception):
107
  """Internal exception for HTTP client errors.
108

  
109
  This should only be used for internal error reporting.
110

  
111
  """
112

  
113

  
114
class _HttpSocketTimeout(Exception):
115
  """Internal exception for socket timeouts.
116

  
117
  This should only be used for internal error reporting.
118

  
119
  """
120

  
121

  
122
class HTTPException(Exception):
123
  code = None
124
  message = None
125

  
126
  def __init__(self, message=None):
127
    Exception.__init__(self)
128
    if message is not None:
129
      self.message = message
130

  
131

  
132
class HTTPBadRequest(HTTPException):
133
  code = 400
134

  
135

  
136
class HTTPForbidden(HTTPException):
137
  code = 403
138

  
139

  
140
class HTTPNotFound(HTTPException):
141
  code = 404
142

  
143

  
144
class HTTPGone(HTTPException):
145
  code = 410
146

  
147

  
148
class HTTPLengthRequired(HTTPException):
149
  code = 411
150

  
151

  
152
class HTTPInternalError(HTTPException):
153
  code = 500
154

  
155

  
156
class HTTPNotImplemented(HTTPException):
157
  code = 501
158

  
159

  
160
class HTTPServiceUnavailable(HTTPException):
161
  code = 503
162

  
163

  
164
class HTTPVersionNotSupported(HTTPException):
165
  code = 505
166

  
167

  
168
class HTTPJsonConverter:
169
  CONTENT_TYPE = "application/json"
170

  
171
  def Encode(self, data):
172
    return serializer.DumpJson(data)
173

  
174
  def Decode(self, data):
175
    return serializer.LoadJson(data)
176

  
177

  
178
def WaitForSocketCondition(poller, sock, event, timeout):
179
  """Waits for a condition to occur on the socket.
180

  
181
  @type poller: select.Poller
182
  @param poller: Poller object as created by select.poll()
183
  @type sock: socket
184
  @param socket: Wait for events on this socket
185
  @type event: int
186
  @param event: ORed condition (see select module)
187
  @type timeout: float or None
188
  @param timeout: Timeout in seconds
189
  @rtype: int or None
190
  @return: None for timeout, otherwise occured conditions
191

  
192
  """
193
  check = (event | select.POLLPRI |
194
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
195

  
196
  if timeout is not None:
197
    # Poller object expects milliseconds
198
    timeout *= 1000
199

  
200
  poller.register(sock, event)
201
  try:
202
    while True:
203
      # TODO: If the main thread receives a signal and we have no timeout, we
204
      # could wait forever. This should check a global "quit" flag or
205
      # something every so often.
206
      io_events = poller.poll(timeout)
207
      if not io_events:
208
        # Timeout
209
        return None
210
      for (evfd, evcond) in io_events:
211
        if evcond & check:
212
          return evcond
213
  finally:
214
    poller.unregister(sock)
215

  
216

  
217
def SocketOperation(poller, sock, op, arg1, timeout):
218
  """Wrapper around socket functions.
219

  
220
  This function abstracts error handling for socket operations, especially
221
  for the complicated interaction with OpenSSL.
222

  
223
  @type poller: select.Poller
224
  @param poller: Poller object as created by select.poll()
225
  @type sock: socket
226
  @param socket: Socket for the operation
227
  @type op: int
228
  @param op: Operation to execute (SOCKOP_* constants)
229
  @type arg1: any
230
  @param arg1: Parameter for function (if needed)
231
  @type timeout: None or float
232
  @param timeout: Timeout in seconds or None
233

  
234
  """
235
  # TODO: event_poll/event_check/override
236
  if op == SOCKOP_SEND:
237
    event_poll = select.POLLOUT
238
    event_check = select.POLLOUT
239

  
240
  elif op == SOCKOP_RECV:
241
    event_poll = select.POLLIN
242
    event_check = select.POLLIN | select.POLLPRI
243

  
244
  elif op == SOCKOP_SHUTDOWN:
245
    event_poll = None
246
    event_check = None
247

  
248
    # The timeout is only used when OpenSSL requests polling for a condition.
249
    # It is not advisable to have no timeout for shutdown.
250
    assert timeout
251

  
252
  else:
253
    raise AssertionError("Invalid socket operation")
254

  
255
  # No override by default
256
  event_override = 0
257

  
258
  while True:
259
    # Poll only for certain operations and when asked for by an override
260
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
261
      if event_override:
262
        wait_for_event = event_override
263
      else:
264
        wait_for_event = event_poll
265

  
266
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
267
      if event is None:
268
        raise _HttpSocketTimeout()
269

  
270
      if (op == SOCKOP_RECV and
271
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
272
        return ""
273

  
274
      if not event & wait_for_event:
275
        continue
276

  
277
    # Reset override
278
    event_override = 0
279

  
280
    try:
281
      try:
282
        if op == SOCKOP_SEND:
283
          return sock.send(arg1)
284

  
285
        elif op == SOCKOP_RECV:
286
          return sock.recv(arg1)
287

  
288
        elif op == SOCKOP_SHUTDOWN:
289
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
290
            # PyOpenSSL's shutdown() doesn't take arguments
291
            return sock.shutdown()
292
          else:
293
            return sock.shutdown(arg1)
294

  
295
      except OpenSSL.SSL.WantWriteError:
296
        # OpenSSL wants to write, poll for POLLOUT
297
        event_override = select.POLLOUT
298
        continue
299

  
300
      except OpenSSL.SSL.WantReadError:
301
        # OpenSSL wants to read, poll for POLLIN
302
        event_override = select.POLLIN | select.POLLPRI
303
        continue
304

  
305
      except OpenSSL.SSL.WantX509LookupError:
306
        continue
307

  
308
      except OpenSSL.SSL.SysCallError, err:
309
        if op == SOCKOP_SEND:
310
          # arg1 is the data when writing
311
          if err.args and err.args[0] == -1 and arg1 == "":
312
            # errors when writing empty strings are expected
313
            # and can be ignored
314
            return 0
315

  
316
        elif op == SOCKOP_RECV:
317
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
318
            return ""
319

  
320
        raise socket.error(err.args)
321

  
322
      except OpenSSL.SSL.Error, err:
323
        raise socket.error(err.args)
324

  
325
    except socket.error, err:
326
      if err.args and err.args[0] == errno.EAGAIN:
327
        # Ignore EAGAIN
328
        continue
329

  
330
      raise
331

  
332

  
333
class HttpSslParams(object):
334
  """Data class for SSL key and certificate.
335

  
336
  """
337
  def __init__(self, ssl_key_path, ssl_cert_path):
338
    """Initializes this class.
339

  
340
    @type ssl_key_path: string
341
    @param ssl_key_path: Path to file containing SSL key in PEM format
342
    @type ssl_cert_path: string
343
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
344

  
345
    """
346
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
347
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
348

  
349
  def GetKey(self):
350
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
351
                                          self.ssl_key_pem)
352

  
353
  def GetCertificate(self):
354
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
355
                                           self.ssl_cert_pem)
356

  
357

  
358
class _HttpSocketBase(object):
359
  """Base class for HTTP server and client.
360

  
361
  """
362
  def __init__(self):
363
    self._using_ssl = None
364
    self._ssl_params = None
365
    self._ssl_key = None
366
    self._ssl_cert = None
367

  
368
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
369
    """Creates a TCP socket and initializes SSL if needed.
370

  
371
    @type ssl_params: HttpSslParams
372
    @param ssl_params: SSL key and certificate
373
    @type ssl_verify_peer: bool
374
    @param ssl_verify_peer: Whether to require client certificate and compare
375
                            it with our certificate
376

  
377
    """
378
    self._ssl_params = ssl_params
379

  
380
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
381

  
382
    # Should we enable SSL?
383
    self._using_ssl = ssl_params is not None
384

  
385
    if not self._using_ssl:
386
      return sock
387

  
388
    self._ssl_key = ssl_params.GetKey()
389
    self._ssl_cert = ssl_params.GetCertificate()
390

  
391
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
392
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
393

  
394
    ctx.use_privatekey(self._ssl_key)
395
    ctx.use_certificate(self._ssl_cert)
396
    ctx.check_privatekey()
397

  
398
    if ssl_verify_peer:
399
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
400
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
401
                     self._SSLVerifyCallback)
402

  
403
    return OpenSSL.SSL.Connection(ctx, sock)
404

  
405
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
406
    """Verify the certificate provided by the peer
407

  
408
    We only compare fingerprints. The client must use the same certificate as
409
    we do on our side.
410

  
411
    """
412
    assert self._ssl_params, "SSL not initialized"
413

  
414
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
415
            self._ssl_cert.digest("md5") == cert.digest("md5"))
416

  
417

  
418
class HttpServerRequestExecutor(object):
419
  """Implements server side of HTTP
420

  
421
  This class implements the server side of HTTP. It's based on code of Python's
422
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
423
  character encodings. Keep-alive connections are not supported.
424

  
425
  """
426
  # The default request version.  This only affects responses up until
427
  # the point where the request line is parsed, so it mainly decides what
428
  # the client gets back when sending a malformed request line.
429
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
430
  default_request_version = HTTP_0_9
431

  
432
  # Error message settings
433
  error_message_format = DEFAULT_ERROR_MESSAGE
434
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
435

  
436
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
437

  
438
  def __init__(self, server, conn, client_addr, fileio_class):
439
    """Initializes this class.
440

  
441
    Part of the initialization is reading the request and eventual POST/PUT
442
    data sent by the client.
443

  
444
    """
445
    self._server = server
446

  
447
    # We default rfile to buffered because otherwise it could be
448
    # really slow for large data (a getc() call per byte); we make
449
    # wfile unbuffered because (a) often after a write() we want to
450
    # read and we need to flush the line; (b) big writes to unbuffered
451
    # files are typically optimized by stdio even when big reads
452
    # aren't.
453
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
454
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
455

  
456
    self.client_addr = client_addr
457

  
458
    self.request_headers = None
459
    self.request_method = None
460
    self.request_path = None
461
    self.request_requestline = None
462
    self.request_version = self.default_request_version
463

  
464
    self.response_body = None
465
    self.response_code = HTTP_OK
466
    self.response_content_type = None
467
    self.response_headers = {}
468

  
469
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
470
    try:
471
      try:
472
        try:
473
          try:
474
            # Read, parse and handle request
475
            self._ReadRequest()
476
            self._ReadPostData()
477
            self._HandleRequest()
478
          except HTTPException, err:
479
            self._SetErrorStatus(err)
480
        finally:
481
          # Try to send a response
482
          self._SendResponse()
483
          self._Close()
484
      except SocketClosed:
485
        pass
486
    finally:
487
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
488

  
489
  def _Close(self):
490
    if not self.wfile.closed:
491
      self.wfile.flush()
492
    self.wfile.close()
493
    self.rfile.close()
494

  
495
  def _DateTimeHeader(self):
496
    """Return the current date and time formatted for a message header.
497

  
498
    """
499
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
500
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
501
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
502

  
503
  def _SetErrorStatus(self, err):
504
    """Sets the response code and body from a HTTPException.
505

  
506
    @type err: HTTPException
507
    @param err: Exception instance
508

  
509
    """
510
    try:
511
      (shortmsg, longmsg) = self.responses[err.code]
512
    except KeyError:
513
      shortmsg = longmsg = "Unknown"
514

  
515
    if err.message:
516
      message = err.message
517
    else:
518
      message = shortmsg
519

  
520
    values = {
521
      "code": err.code,
522
      "message": cgi.escape(message),
523
      "explain": longmsg,
524
      }
525

  
526
    self.response_code = err.code
527
    self.response_content_type = self.error_content_type
528
    self.response_body = self.error_message_format % values
529

  
530
  def _HandleRequest(self):
531
    """Handle the actual request.
532

  
533
    Calls the actual handler function and converts exceptions into HTTP errors.
534

  
535
    """
536
    # Don't do anything if there's already been a problem
537
    if self.response_code != HTTP_OK:
538
      return
539

  
540
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
541

  
542
    # Check whether client is still there
543
    self.rfile.read(0)
544

  
545
    try:
546
      try:
547
        result = self._server.HandleRequest(self)
548

  
549
        # TODO: Content-type
550
        encoder = HTTPJsonConverter()
551
        body = encoder.Encode(result)
552

  
553
        self.response_content_type = encoder.CONTENT_TYPE
554
        self.response_body = body
555
      except (HTTPException, KeyboardInterrupt, SystemExit):
556
        raise
557
      except Exception, err:
558
        logging.exception("Caught exception")
559
        raise HTTPInternalError(message=str(err))
560
      except:
561
        logging.exception("Unknown exception")
562
        raise HTTPInternalError(message="Unknown error")
563

  
564
    except HTTPException, err:
565
      self._SetErrorStatus(err)
566

  
567
  def _SendResponse(self):
568
    """Sends response to the client.
569

  
570
    """
571
    # Check whether client is still there
572
    self.rfile.read(0)
573

  
574
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
575
                 self.request_requestline, self.response_code)
576

  
577
    if self.response_code in self.responses:
578
      response_message = self.responses[self.response_code][0]
579
    else:
580
      response_message = ""
581

  
582
    if self.request_version != HTTP_0_9:
583
      self.wfile.write("%s %d %s\r\n" %
584
                       (self.request_version, self.response_code,
585
                        response_message))
586
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
587
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
588
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
589
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
590
      for key, val in self.response_headers.iteritems():
591
        self._SendHeader(key, val)
592

  
593
      # We don't support keep-alive at this time
594
      self._SendHeader(HTTP_CONNECTION, "close")
595
      self.wfile.write("\r\n")
596

  
597
    if (self.request_method != HTTP_HEAD and
598
        self.response_code >= HTTP_OK and
599
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
600
      self.wfile.write(self.response_body)
601

  
602
  def _SendHeader(self, name, value):
603
    if self.request_version != HTTP_0_9:
604
      self.wfile.write("%s: %s\r\n" % (name, value))
605

  
606
  def _ReadRequest(self):
607
    """Reads and parses request line
608

  
609
    """
610
    raw_requestline = self.rfile.readline()
611

  
612
    requestline = raw_requestline
613
    if requestline[-2:] == '\r\n':
614
      requestline = requestline[:-2]
615
    elif requestline[-1:] == '\n':
616
      requestline = requestline[:-1]
617

  
618
    if not requestline:
619
      raise HTTPBadRequest("Empty request line")
620

  
621
    self.request_requestline = requestline
622

  
623
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
624

  
625
    words = requestline.split()
626

  
627
    if len(words) == 3:
628
      [method, path, version] = words
629
      if version[:5] != 'HTTP/':
630
        raise HTTPBadRequest("Bad request version (%r)" % version)
631

  
632
      try:
633
        base_version_number = version.split('/', 1)[1]
634
        version_number = base_version_number.split(".")
635

  
636
        # RFC 2145 section 3.1 says there can be only one "." and
637
        #   - major and minor numbers MUST be treated as
638
        #      separate integers;
639
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
640
        #      turn is lower than HTTP/12.3;
641
        #   - Leading zeros MUST be ignored by recipients.
642
        if len(version_number) != 2:
643
          raise HTTPBadRequest("Bad request version (%r)" % version)
644

  
645
        version_number = int(version_number[0]), int(version_number[1])
646
      except (ValueError, IndexError):
647
        raise HTTPBadRequest("Bad request version (%r)" % version)
648

  
649
      if version_number >= (2, 0):
650
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
651
                                      base_version_number)
652

  
653
    elif len(words) == 2:
654
      version = HTTP_0_9
655
      [method, path] = words
656
      if method != HTTP_GET:
657
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
658

  
659
    else:
660
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
661

  
662
    # Examine the headers and look for a Connection directive
663
    headers = mimetools.Message(self.rfile, 0)
664

  
665
    self.request_method = method
666
    self.request_path = path
667
    self.request_version = version
668
    self.request_headers = headers
669

  
670
  def _ReadPostData(self):
671
    """Reads POST/PUT data
672

  
673
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
674
    a request is signaled by the inclusion of a Content-Length header field in
675
    the request message headers. HTTP/1.0 requests containing an entity body
676
    must include a valid Content-Length header field."
677

  
678
    """
679
    # While not according to specification, we only support an entity body for
680
    # POST and PUT.
681
    if (not self.request_method or
682
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
683
      self.request_post_data = None
684
      return
685

  
686
    content_length = None
687
    try:
688
      if HTTP_CONTENT_LENGTH in self.request_headers:
689
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
690
    except TypeError:
691
      pass
692
    except ValueError:
693
      pass
694

  
695
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
696
    if content_length is None:
697
      raise HTTPLengthRequired("Missing Content-Length header or"
698
                               " invalid format")
699

  
700
    data = self.rfile.read(content_length)
701

  
702
    # TODO: Content-type, error handling
703
    if data:
704
      self.request_post_data = HTTPJsonConverter().Decode(data)
705
    else:
706
      self.request_post_data = None
707

  
708
    logging.debug("HTTP POST data: %s", self.request_post_data)
709

  
710

  
711
class HttpServer(_HttpSocketBase):
712
  """Generic HTTP server class
713

  
714
  Users of this class must subclass it and override the HandleRequest function.
715

  
716
  """
717
  MAX_CHILDREN = 20
718

  
719
  def __init__(self, mainloop, local_address, port,
720
               ssl_params=None, ssl_verify_peer=False):
721
    """Initializes the HTTP server
722

  
723
    @type mainloop: ganeti.daemon.Mainloop
724
    @param mainloop: Mainloop used to poll for I/O events
725
    @type local_addess: string
726
    @param local_address: Local IP address to bind to
727
    @type port: int
728
    @param port: TCP port to listen on
729
    @type ssl_params: HttpSslParams
730
    @param ssl_params: SSL key and certificate
731
    @type ssl_verify_peer: bool
732
    @param ssl_verify_peer: Whether to require client certificate and compare
733
                            it with our certificate
734

  
735
    """
736
    _HttpSocketBase.__init__(self)
737

  
738
    self.mainloop = mainloop
739
    self.local_address = local_address
740
    self.port = port
741

  
742
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
743

  
744
    # Allow port to be reused
745
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
746

  
747
    if self._using_ssl:
748
      self._fileio_class = _SSLFileObject
749
    else:
750
      self._fileio_class = socket._fileobject
751

  
752
    self._children = []
753

  
754
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
755
    mainloop.RegisterSignal(self)
756

  
757
  def Start(self):
758
    self.socket.bind((self.local_address, self.port))
759
    self.socket.listen(5)
760

  
761
  def Stop(self):
762
    self.socket.close()
763

  
764
  def OnIO(self, fd, condition):
765
    if condition & select.POLLIN:
766
      self._IncomingConnection()
767

  
768
  def OnSignal(self, signum):
769
    if signum == signal.SIGCHLD:
770
      self._CollectChildren(True)
771

  
772
  def _CollectChildren(self, quick):
773
    """Checks whether any child processes are done
774

  
775
    @type quick: bool
776
    @param quick: Whether to only use non-blocking functions
777

  
778
    """
779
    if not quick:
780
      # Don't wait for other processes if it should be a quick check
781
      while len(self._children) > self.MAX_CHILDREN:
782
        try:
783
          # Waiting without a timeout brings us into a potential DoS situation.
784
          # As soon as too many children run, we'll not respond to new
785
          # requests. The real solution would be to add a timeout for children
786
          # and killing them after some time.
787
          pid, status = os.waitpid(0, 0)
788
        except os.error:
789
          pid = None
790
        if pid and pid in self._children:
791
          self._children.remove(pid)
792

  
793
    for child in self._children:
794
      try:
795
        pid, status = os.waitpid(child, os.WNOHANG)
796
      except os.error:
797
        pid = None
798
      if pid and pid in self._children:
799
        self._children.remove(pid)
800

  
801
  def _IncomingConnection(self):
802
    """Called for each incoming connection
803

  
804
    """
805
    (connection, client_addr) = self.socket.accept()
806

  
807
    self._CollectChildren(False)
808

  
809
    pid = os.fork()
810
    if pid == 0:
811
      # Child process
812
      try:
813
        HttpServerRequestExecutor(self, connection, client_addr,
814
                                  self._fileio_class)
815
      except:
816
        logging.exception("Error while handling request from %s:%s",
817
                          client_addr[0], client_addr[1])
818
        os._exit(1)
819
      os._exit(0)
820
    else:
821
      self._children.append(pid)
822

  
823
  def HandleRequest(self, req):
824
    raise NotImplementedError()
825

  
826

  
827
class HttpClientRequest(object):
828
  def __init__(self, host, port, method, path, headers=None, post_data=None,
829
               ssl_params=None, ssl_verify_peer=False):
830
    """Describes an HTTP request.
831

  
832
    @type host: string
833
    @param host: Hostname
834
    @type port: int
835
    @param port: Port
836
    @type method: string
837
    @param method: Method name
838
    @type path: string
839
    @param path: Request path
840
    @type headers: dict or None
841
    @param headers: Additional headers to send
842
    @type post_data: string or None
843
    @param post_data: Additional data to send
844
    @type ssl_params: HttpSslParams
845
    @param ssl_params: SSL key and certificate
846
    @type ssl_verify_peer: bool
847
    @param ssl_verify_peer: Whether to compare our certificate with server's
848
                            certificate
849

  
850
    """
851
    if post_data is not None:
852
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
853
        "Only POST and GET requests support sending data"
854

  
855
    assert path.startswith("/"), "Path must start with slash (/)"
856

  
857
    self.host = host
858
    self.port = port
859
    self.ssl_params = ssl_params
860
    self.ssl_verify_peer = ssl_verify_peer
861
    self.method = method
862
    self.path = path
863
    self.headers = headers
864
    self.post_data = post_data
865

  
866
    self.success = None
867
    self.error = None
868

  
869
    self.resp_status_line = None
870
    self.resp_version = None
871
    self.resp_status = None
872
    self.resp_reason = None
873
    self.resp_headers = None
874
    self.resp_body = None
875

  
876

  
877
class HttpClientRequestExecutor(_HttpSocketBase):
878
  # Default headers
879
  DEFAULT_HEADERS = {
880
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
881
    # TODO: For keep-alive, don't send "Connection: close"
882
    HTTP_CONNECTION: "close",
883
    }
884

  
885
  # Length limits
886
  STATUS_LINE_LENGTH_MAX = 512
887
  HEADER_LENGTH_MAX = 4 * 1024
888

  
889
  # Timeouts in seconds for socket layer
890
  # TODO: Make read timeout configurable per OpCode
891
  CONNECT_TIMEOUT = 5.0
892
  WRITE_TIMEOUT = 10
893
  READ_TIMEOUT = None
894
  CLOSE_TIMEOUT = 1
895

  
896
  # Parser state machine
897
  PS_STATUS_LINE = "status-line"
898
  PS_HEADERS = "headers"
899
  PS_BODY = "body"
900
  PS_COMPLETE = "complete"
901

  
902
  def __init__(self, req):
903
    """Initializes the HttpClientRequestExecutor class.
904

  
905
    @type req: HttpClientRequest
906
    @param req: Request object
907

  
908
    """
909
    _HttpSocketBase.__init__(self)
910

  
911
    self.request = req
912

  
913
    self.parser_status = self.PS_STATUS_LINE
914
    self.header_buffer = StringIO()
915
    self.body_buffer = StringIO()
916
    self.content_length = None
917
    self.server_will_close = None
918

  
919
    self.poller = select.poll()
920

  
921
    try:
922
      # TODO: Implement connection caching/keep-alive
923
      self.sock = self._CreateSocket(req.ssl_params,
924
                                     req.ssl_verify_peer)
925

  
926
      # Disable Python's timeout
927
      self.sock.settimeout(None)
928

  
929
      # Operate in non-blocking mode
930
      self.sock.setblocking(0)
931

  
932
      force_close = True
933
      self._Connect()
934
      try:
935
        self._SendRequest()
936
        self._ReadResponse()
937

  
938
        # Only wait for server to close if we didn't have any exception.
939
        force_close = False
940
      finally:
941
        self._CloseConnection(force_close)
942

  
943
      self.sock.close()
944
      self.sock = None
945

  
946
      req.resp_body = self.body_buffer.getvalue()
947

  
948
      req.success = True
949
      req.error = None
950

  
951
    except _HttpClientError, err:
952
      req.success = False
953
      req.error = str(err)
954

  
955
  def _BuildRequest(self):
956
    """Build HTTP request.
957

  
958
    @rtype: string
959
    @return: Complete request
960

  
961
    """
962
    # Headers
963
    send_headers = self.DEFAULT_HEADERS.copy()
964

  
965
    if self.request.headers:
966
      send_headers.update(self.request.headers)
967

  
968
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
969

  
970
    if self.request.post_data:
971
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
972

  
973
    buf = StringIO()
974

  
975
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
976
    # keep-alive).
977
    # TODO: For keep-alive, change to HTTP/1.1
978
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
979
                                self.request.path, HTTP_1_0))
980

  
981
    # Add headers
982
    for name, value in send_headers.iteritems():
983
      buf.write("%s: %s\r\n" % (name, value))
984

  
985
    buf.write("\r\n")
986

  
987
    if self.request.post_data:
988
      buf.write(self.request.post_data)
989

  
990
    return buf.getvalue()
991

  
992
  def _ParseStatusLine(self):
993
    """Parses the status line sent by the server.
994

  
995
    """
996
    line = self.request.resp_status_line
997

  
998
    if not line:
999
      raise _HttpClientError("Empty status line")
1000

  
1001
    try:
1002
      [version, status, reason] = line.split(None, 2)
1003
    except ValueError:
1004
      try:
1005
        [version, status] = line.split(None, 1)
1006
        reason = ""
1007
      except ValueError:
1008
        version = HTTP_9_0
1009

  
1010
    if version:
1011
      version = version.upper()
1012

  
1013
    if version not in (HTTP_1_0, HTTP_1_1):
1014
      # We do not support HTTP/0.9, despite the specification requiring it
1015
      # (RFC2616, section 19.6)
1016
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1017
                             line)
1018

  
1019
    # The status code is a three-digit number
1020
    try:
1021
      status = int(status)
1022
      if status < 100 or status > 999:
1023
        status = -1
1024
    except ValueError:
1025
      status = -1
1026

  
1027
    if status == -1:
1028
      raise _HttpClientError("Invalid status code (%r)" % line)
1029

  
1030
    self.request.resp_version = version
1031
    self.request.resp_status = status
1032
    self.request.resp_reason = reason
1033

  
1034
  def _WillServerCloseConnection(self):
1035
    """Evaluate whether server will close the connection.
1036

  
1037
    @rtype: bool
1038
    @return: Whether server will close the connection
1039

  
1040
    """
1041
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1042
    if hdr_connection:
1043
      hdr_connection = hdr_connection.lower()
1044

  
1045
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1046
    if self.request.resp_version == HTTP_1_1:
1047
      return (hdr_connection and "close" in hdr_connection)
1048

  
1049
    # Some HTTP/1.0 implementations have support for persistent connections,
1050
    # using rules different than HTTP/1.1.
1051

  
1052
    # For older HTTP, Keep-Alive indicates persistent connection.
1053
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1054
      return False
1055

  
1056
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1057
    # supposed to be sent by the client.
1058
    if hdr_connection and "keep-alive" in hdr_connection:
1059
      return False
1060

  
1061
    return True
1062

  
1063
  def _ParseHeaders(self):
1064
    """Parses the headers sent by the server.
1065

  
1066
    This function also adjusts internal variables based on the header values.
1067

  
1068
    """
1069
    req = self.request
1070

  
1071
    # Parse headers
1072
    self.header_buffer.seek(0, 0)
1073
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1074

  
1075
    self.server_will_close = self._WillServerCloseConnection()
1076

  
1077
    # Do we have a Content-Length header?
1078
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1079
    if hdr_content_length:
1080
      try:
1081
        self.content_length = int(hdr_content_length)
1082
      except ValueError:
1083
        pass
1084
      if self.content_length is not None and self.content_length < 0:
1085
        self.content_length = None
1086

  
1087
    # does the body have a fixed length? (of zero)
1088
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1089
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1090
      self.content_length = 0
1091

  
1092
    # if the connection remains open and a content-length was not provided,
1093
    # then assume that the connection WILL close.
1094
    if self.content_length is None:
1095
      self.server_will_close = True
1096

  
1097
  def _CheckStatusLineLength(self, length):
1098
    if length > self.STATUS_LINE_LENGTH_MAX:
1099
      raise _HttpClientError("Status line longer than %d chars" %
1100
                             self.STATUS_LINE_LENGTH_MAX)
1101

  
1102
  def _CheckHeaderLength(self, length):
1103
    if length > self.HEADER_LENGTH_MAX:
1104
      raise _HttpClientError("Headers longer than %d chars" %
1105
                             self.HEADER_LENGTH_MAX)
1106

  
1107
  def _ParseBuffer(self, buf, eof):
1108
    """Main function for HTTP response state machine.
1109

  
1110
    @type buf: string
1111
    @param buf: Receive buffer
1112
    @type eof: bool
1113
    @param eof: Whether we've reached EOF on the socket
1114
    @rtype: string
1115
    @return: Updated receive buffer
1116

  
1117
    """
1118
    if self.parser_status == self.PS_STATUS_LINE:
1119
      # Expect status line
1120
      idx = buf.find("\r\n")
1121
      if idx >= 0:
1122
        self.request.resp_status_line = buf[:idx]
1123

  
1124
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1125

  
1126
        # Remove status line, including CRLF
1127
        buf = buf[idx + 2:]
1128

  
1129
        self._ParseStatusLine()
1130

  
1131
        self.parser_status = self.PS_HEADERS
1132
      else:
1133
        # Check whether incoming data is getting too large, otherwise we just
1134
        # fill our read buffer.
1135
        self._CheckStatusLineLength(len(buf))
1136

  
1137
    if self.parser_status == self.PS_HEADERS:
1138
      # Wait for header end
1139
      idx = buf.find("\r\n\r\n")
1140
      if idx >= 0:
1141
        self.header_buffer.write(buf[:idx + 2])
1142

  
1143
        self._CheckHeaderLength(self.header_buffer.tell())
1144

  
1145
        # Remove headers, including CRLF
1146
        buf = buf[idx + 4:]
1147

  
1148
        self._ParseHeaders()
1149

  
1150
        self.parser_status = self.PS_BODY
1151
      else:
1152
        # Check whether incoming data is getting too large, otherwise we just
1153
        # fill our read buffer.
1154
        self._CheckHeaderLength(len(buf))
1155

  
1156
    if self.parser_status == self.PS_BODY:
1157
      self.body_buffer.write(buf)
1158
      buf = ""
1159

  
1160
      # Check whether we've read everything
1161
      if (eof or
1162
          (self.content_length is not None and
1163
           self.body_buffer.tell() >= self.content_length)):
1164
        self.parser_status = self.PS_COMPLETE
1165

  
1166
    return buf
1167

  
1168
  def _Connect(self):
1169
    """Non-blocking connect to host with timeout.
1170

  
1171
    """
1172
    connected = False
1173
    while True:
1174
      try:
1175
        connect_error = self.sock.connect_ex((self.request.host,
1176
                                              self.request.port))
1177
      except socket.gaierror, err:
1178
        raise _HttpClientError("Connection failed: %s" % str(err))
1179

  
1180
      if connect_error == errno.EINTR:
1181
        # Mask signals
1182
        pass
1183

  
1184
      elif connect_error == 0:
1185
        # Connection established
1186
        connected = True
1187
        break
1188

  
1189
      elif connect_error == errno.EINPROGRESS:
1190
        # Connection started
1191
        break
1192

  
1193
      raise _HttpClientError("Connection failed (%s: %s)" %
1194
                             (connect_error, os.strerror(connect_error)))
1195

  
1196
    if not connected:
1197
      # Wait for connection
1198
      event = WaitForSocketCondition(self.poller, self.sock,
1199
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1200
      if event is None:
1201
        raise _HttpClientError("Timeout while connecting to server")
1202

  
1203
      # Get error code
1204
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1205
      if connect_error != 0:
1206
        raise _HttpClientError("Connection failed (%s: %s)" %
1207
                               (connect_error, os.strerror(connect_error)))
1208

  
1209
    # Enable TCP keep-alive
1210
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1211

  
1212
    # If needed, Linux specific options are available to change the TCP
1213
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1214
    # TCP_KEEPINTVL.
1215

  
1216
  def _SendRequest(self):
1217
    """Sends request to server.
1218

  
1219
    """
1220
    buf = self._BuildRequest()
1221

  
1222
    while buf:
1223
      # Send only 4 KB at a time
1224
      data = buf[:4096]
1225

  
1226
      try:
1227
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1228
                               self.WRITE_TIMEOUT)
1229
      except _HttpSocketTimeout:
1230
        raise _HttpClientError("Timeout while sending request")
1231
      except socket.error, err:
1232
        raise _HttpClientError("Error sending request: %s" % err)
1233

  
1234
      # Remove sent bytes
1235
      buf = buf[sent:]
1236

  
1237
    assert not buf, "Request wasn't sent completely"
1238

  
1239
  def _ReadResponse(self):
1240
    """Read response from server.
1241

  
1242
    Calls the parser function after reading a chunk of data.
1243

  
1244
    """
1245
    buf = ""
1246
    eof = False
1247
    while self.parser_status != self.PS_COMPLETE:
1248
      try:
1249
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1250
                               self.READ_TIMEOUT)
1251
      except _HttpSocketTimeout:
1252
        raise _HttpClientError("Timeout while reading response")
1253
      except socket.error, err:
1254
        raise _HttpClientError("Error while reading response: %s" % err)
1255

  
1256
      if data:
1257
        buf += data
1258
      else:
1259
        eof = True
1260

  
1261
      # Do some parsing and error checking while more data arrives
1262
      buf = self._ParseBuffer(buf, eof)
1263

  
1264
      # Must be done only after the buffer has been evaluated
1265
      if (eof and
1266
          self.parser_status in (self.PS_STATUS_LINE,
1267
                                 self.PS_HEADERS)):
1268
        raise _HttpClientError("Connection closed prematurely")
1269

  
1270
    # Parse rest
1271
    buf = self._ParseBuffer(buf, True)
1272

  
1273
    assert self.parser_status == self.PS_COMPLETE
1274
    assert not buf, "Parser didn't read full response"
1275

  
1276
  def _CloseConnection(self, force):
1277
    """Closes the connection.
1278

  
1279
    """
1280
    if self.server_will_close and not force:
1281
      # Wait for server to close
1282
      try:
1283
        # Check whether it's actually closed
1284
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1285
                               self.CLOSE_TIMEOUT):
1286
          return
1287
      except (socket.error, _HttpClientError, _HttpSocketTimeout):
1288
        # Ignore errors at this stage
1289
        pass
1290

  
1291
    # Close the connection from our side
1292
    try:
1293
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1294
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1295
    except _HttpSocketTimeout:
1296
      raise _HttpClientError("Timeout while shutting down connection")
1297
    except socket.error, err:
1298
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1299

  
1300

  
1301
class _HttpClientPendingRequest(object):
1302
  """Data class for pending requests.
1303

  
1304
  """
1305
  def __init__(self, request):
1306
    self.request = request
1307

  
1308
    # Thread synchronization
1309
    self.done = threading.Event()
1310

  
1311

  
1312
class HttpClientWorker(workerpool.BaseWorker):
1313
  """HTTP client worker class.
1314

  
1315
  """
1316
  def RunTask(self, pend_req):
1317
    try:
1318
      HttpClientRequestExecutor(pend_req.request)
1319
    finally:
1320
      pend_req.done.set()
1321

  
1322

  
1323
class HttpClientWorkerPool(workerpool.WorkerPool):
1324
  def __init__(self, manager):
1325
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1326
                                   HttpClientWorker)
1327
    self.manager = manager
1328

  
1329

  
1330
class HttpClientManager(object):
1331
  """Manages HTTP requests.
1332

  
1333
  """
1334
  def __init__(self):
1335
    self._wpool = HttpClientWorkerPool(self)
1336

  
1337
  def __del__(self):
1338
    self.Shutdown()
1339

  
1340
  def ExecRequests(self, requests):
1341
    """Execute HTTP requests.
1342

  
1343
    This function can be called from multiple threads at the same time.
1344

  
1345
    @type requests: List of HttpClientRequest instances
1346
    @param requests: The requests to execute
1347
    @rtype: List of HttpClientRequest instances
1348
    @returns: The list of requests passed in
1349

  
1350
    """
1351
    # _HttpClientPendingRequest is used for internal thread synchronization
1352
    pending = [_HttpClientPendingRequest(req) for req in requests]
1353

  
1354
    try:
1355
      # Add requests to queue
1356
      for pend_req in pending:
1357
        self._wpool.AddTask(pend_req)
1358

  
1359
    finally:
1360
      # In case of an exception we should still wait for the rest, otherwise
1361
      # another thread from the worker pool could modify the request object
1362
      # after we returned.
1363

  
1364
      # And wait for them to finish
1365
      for pend_req in pending:
1366
        pend_req.done.wait()
1367

  
1368
    # Return original list
1369
    return requests
1370

  
1371
  def Shutdown(self):
1372
    self._wpool.Quiesce()
1373
    self._wpool.TerminateWorkers()
1374

  
1375

  
1376
class _SSLFileObject(object):
1377
  """Wrapper around socket._fileobject
1378

  
1379
  This wrapper is required to handle OpenSSL exceptions.
1380

  
1381
  """
1382
  def _RequireOpenSocket(fn):
1383
    def wrapper(self, *args, **kwargs):
1384
      if self.closed:
1385
        raise SocketClosed("Socket is closed")
1386
      return fn(self, *args, **kwargs)
1387
    return wrapper
1388

  
1389
  def __init__(self, sock, mode='rb', bufsize=-1):
1390
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1391

  
1392
  def _ConnectionLost(self):
1393
    self._base = None
1394

  
1395
  def _getclosed(self):
1396
    return self._base is None or self._base.closed
1397
  closed = property(_getclosed, doc="True if the file is closed")
1398

  
1399
  @_RequireOpenSocket
1400
  def close(self):
1401
    return self._base.close()
1402

  
1403
  @_RequireOpenSocket
1404
  def flush(self):
1405
    return self._base.flush()
1406

  
1407
  @_RequireOpenSocket
1408
  def fileno(self):
1409
    return self._base.fileno()
1410

  
1411
  @_RequireOpenSocket
1412
  def read(self, size=-1):
1413
    return self._ReadWrapper(self._base.read, size=size)
1414

  
1415
  @_RequireOpenSocket
1416
  def readline(self, size=-1):
1417
    return self._ReadWrapper(self._base.readline, size=size)
1418

  
1419
  def _ReadWrapper(self, fn, *args, **kwargs):
1420
    while True:
1421
      try:
1422
        return fn(*args, **kwargs)
1423

  
1424
      except OpenSSL.SSL.ZeroReturnError, err:
1425
        self._ConnectionLost()
1426
        return ""
1427

  
1428
      except OpenSSL.SSL.WantReadError:
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff