Statistics
| Branch: | Tag: | Revision:

root / lib / http / __init__.py @ ff9efc03

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP server module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import mimetools
29
import OpenSSL
30
import os
31
import select
32
import socket
33
import sys
34
import time
35
import signal
36
import logging
37
import errno
38
import threading
39

    
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import serializer
44
from ganeti import workerpool
45
from ganeti import utils
46

    
47

    
48
HTTP_CLIENT_THREADS = 10
49

    
50
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
51

    
52
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
53
MONTHNAME = [None,
54
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
55
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
56

    
57
# Default error message
58
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
59
DEFAULT_ERROR_MESSAGE = """\
60
<head>
61
<title>Error response</title>
62
</head>
63
<body>
64
<h1>Error response</h1>
65
<p>Error code %(code)d.
66
<p>Message: %(message)s.
67
<p>Error code explanation: %(code)s = %(explain)s.
68
</body>
69
"""
70

    
71
HTTP_OK = 200
72
HTTP_NO_CONTENT = 204
73
HTTP_NOT_MODIFIED = 304
74

    
75
HTTP_0_9 = "HTTP/0.9"
76
HTTP_1_0 = "HTTP/1.0"
77
HTTP_1_1 = "HTTP/1.1"
78

    
79
HTTP_GET = "GET"
80
HTTP_HEAD = "HEAD"
81
HTTP_POST = "POST"
82
HTTP_PUT = "PUT"
83

    
84
HTTP_ETAG = "ETag"
85
HTTP_HOST = "Host"
86
HTTP_SERVER = "Server"
87
HTTP_DATE = "Date"
88
HTTP_USER_AGENT = "User-Agent"
89
HTTP_CONTENT_TYPE = "Content-Type"
90
HTTP_CONTENT_LENGTH = "Content-Length"
91
HTTP_CONNECTION = "Connection"
92
HTTP_KEEP_ALIVE = "Keep-Alive"
93

    
94
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
95

    
96
# Socket operations
97
(SOCKOP_SEND,
98
 SOCKOP_RECV,
99
 SOCKOP_SHUTDOWN) = range(3)
100

    
101

    
102
class SocketClosed(socket.error):
103
  pass
104

    
105

    
106
class _HttpClientError(Exception):
107
  """Internal exception for HTTP client errors.
108

109
  This should only be used for internal error reporting.
110

111
  """
112

    
113

    
114
class _HttpSocketTimeout(Exception):
115
  """Internal exception for socket timeouts.
116

117
  This should only be used for internal error reporting.
118

119
  """
120

    
121

    
122
class HTTPException(Exception):
123
  code = None
124
  message = None
125

    
126
  def __init__(self, message=None):
127
    Exception.__init__(self)
128
    if message is not None:
129
      self.message = message
130

    
131

    
132
class HTTPBadRequest(HTTPException):
133
  code = 400
134

    
135

    
136
class HTTPForbidden(HTTPException):
137
  code = 403
138

    
139

    
140
class HTTPNotFound(HTTPException):
141
  code = 404
142

    
143

    
144
class HTTPGone(HTTPException):
145
  code = 410
146

    
147

    
148
class HTTPLengthRequired(HTTPException):
149
  code = 411
150

    
151

    
152
class HTTPInternalError(HTTPException):
153
  code = 500
154

    
155

    
156
class HTTPNotImplemented(HTTPException):
157
  code = 501
158

    
159

    
160
class HTTPServiceUnavailable(HTTPException):
161
  code = 503
162

    
163

    
164
class HTTPVersionNotSupported(HTTPException):
165
  code = 505
166

    
167

    
168
class HTTPJsonConverter:
169
  CONTENT_TYPE = "application/json"
170

    
171
  def Encode(self, data):
172
    return serializer.DumpJson(data)
173

    
174
  def Decode(self, data):
175
    return serializer.LoadJson(data)
176

    
177

    
178
def WaitForSocketCondition(poller, sock, event, timeout):
179
  """Waits for a condition to occur on the socket.
180

181
  @type poller: select.Poller
182
  @param poller: Poller object as created by select.poll()
183
  @type sock: socket
184
  @param socket: Wait for events on this socket
185
  @type event: int
186
  @param event: ORed condition (see select module)
187
  @type timeout: float or None
188
  @param timeout: Timeout in seconds
189
  @rtype: int or None
190
  @return: None for timeout, otherwise occured conditions
191

192
  """
193
  check = (event | select.POLLPRI |
194
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
195

    
196
  if timeout is not None:
197
    # Poller object expects milliseconds
198
    timeout *= 1000
199

    
200
  poller.register(sock, event)
201
  try:
202
    while True:
203
      # TODO: If the main thread receives a signal and we have no timeout, we
204
      # could wait forever. This should check a global "quit" flag or
205
      # something every so often.
206
      io_events = poller.poll(timeout)
207
      if not io_events:
208
        # Timeout
209
        return None
210
      for (evfd, evcond) in io_events:
211
        if evcond & check:
212
          return evcond
213
  finally:
214
    poller.unregister(sock)
215

    
216

    
217
def SocketOperation(poller, sock, op, arg1, timeout):
218
  """Wrapper around socket functions.
219

220
  This function abstracts error handling for socket operations, especially
221
  for the complicated interaction with OpenSSL.
222

223
  @type poller: select.Poller
224
  @param poller: Poller object as created by select.poll()
225
  @type sock: socket
226
  @param socket: Socket for the operation
227
  @type op: int
228
  @param op: Operation to execute (SOCKOP_* constants)
229
  @type arg1: any
230
  @param arg1: Parameter for function (if needed)
231
  @type timeout: None or float
232
  @param timeout: Timeout in seconds or None
233

234
  """
235
  # TODO: event_poll/event_check/override
236
  if op == SOCKOP_SEND:
237
    event_poll = select.POLLOUT
238
    event_check = select.POLLOUT
239

    
240
  elif op == SOCKOP_RECV:
241
    event_poll = select.POLLIN
242
    event_check = select.POLLIN | select.POLLPRI
243

    
244
  elif op == SOCKOP_SHUTDOWN:
245
    event_poll = None
246
    event_check = None
247

    
248
    # The timeout is only used when OpenSSL requests polling for a condition.
249
    # It is not advisable to have no timeout for shutdown.
250
    assert timeout
251

    
252
  else:
253
    raise AssertionError("Invalid socket operation")
254

    
255
  # No override by default
256
  event_override = 0
257

    
258
  while True:
259
    # Poll only for certain operations and when asked for by an override
260
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
261
      if event_override:
262
        wait_for_event = event_override
263
      else:
264
        wait_for_event = event_poll
265

    
266
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
267
      if event is None:
268
        raise _HttpSocketTimeout()
269

    
270
      if (op == SOCKOP_RECV and
271
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
272
        return ""
273

    
274
      if not event & wait_for_event:
275
        continue
276

    
277
    # Reset override
278
    event_override = 0
279

    
280
    try:
281
      try:
282
        if op == SOCKOP_SEND:
283
          return sock.send(arg1)
284

    
285
        elif op == SOCKOP_RECV:
286
          return sock.recv(arg1)
287

    
288
        elif op == SOCKOP_SHUTDOWN:
289
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
290
            # PyOpenSSL's shutdown() doesn't take arguments
291
            return sock.shutdown()
292
          else:
293
            return sock.shutdown(arg1)
294

    
295
      except OpenSSL.SSL.WantWriteError:
296
        # OpenSSL wants to write, poll for POLLOUT
297
        event_override = select.POLLOUT
298
        continue
299

    
300
      except OpenSSL.SSL.WantReadError:
301
        # OpenSSL wants to read, poll for POLLIN
302
        event_override = select.POLLIN | select.POLLPRI
303
        continue
304

    
305
      except OpenSSL.SSL.WantX509LookupError:
306
        continue
307

    
308
      except OpenSSL.SSL.SysCallError, err:
309
        if op == SOCKOP_SEND:
310
          # arg1 is the data when writing
311
          if err.args and err.args[0] == -1 and arg1 == "":
312
            # errors when writing empty strings are expected
313
            # and can be ignored
314
            return 0
315

    
316
        elif op == SOCKOP_RECV:
317
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
318
            return ""
319

    
320
        raise socket.error(err.args)
321

    
322
      except OpenSSL.SSL.Error, err:
323
        raise socket.error(err.args)
324

    
325
    except socket.error, err:
326
      if err.args and err.args[0] == errno.EAGAIN:
327
        # Ignore EAGAIN
328
        continue
329

    
330
      raise
331

    
332

    
333
class HttpSslParams(object):
334
  """Data class for SSL key and certificate.
335

336
  """
337
  def __init__(self, ssl_key_path, ssl_cert_path):
338
    """Initializes this class.
339

340
    @type ssl_key_path: string
341
    @param ssl_key_path: Path to file containing SSL key in PEM format
342
    @type ssl_cert_path: string
343
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
344

345
    """
346
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
347
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
348

    
349
  def GetKey(self):
350
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
351
                                          self.ssl_key_pem)
352

    
353
  def GetCertificate(self):
354
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
355
                                           self.ssl_cert_pem)
356

    
357

    
358
class _HttpSocketBase(object):
359
  """Base class for HTTP server and client.
360

361
  """
362
  def __init__(self):
363
    self._using_ssl = None
364
    self._ssl_params = None
365
    self._ssl_key = None
366
    self._ssl_cert = None
367

    
368
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
369
    """Creates a TCP socket and initializes SSL if needed.
370

371
    @type ssl_params: HttpSslParams
372
    @param ssl_params: SSL key and certificate
373
    @type ssl_verify_peer: bool
374
    @param ssl_verify_peer: Whether to require client certificate and compare
375
                            it with our certificate
376

377
    """
378
    self._ssl_params = ssl_params
379

    
380
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
381

    
382
    # Should we enable SSL?
383
    self._using_ssl = ssl_params is not None
384

    
385
    if not self._using_ssl:
386
      return sock
387

    
388
    self._ssl_key = ssl_params.GetKey()
389
    self._ssl_cert = ssl_params.GetCertificate()
390

    
391
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
392
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
393

    
394
    ctx.use_privatekey(self._ssl_key)
395
    ctx.use_certificate(self._ssl_cert)
396
    ctx.check_privatekey()
397

    
398
    if ssl_verify_peer:
399
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
400
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
401
                     self._SSLVerifyCallback)
402

    
403
    return OpenSSL.SSL.Connection(ctx, sock)
404

    
405
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
406
    """Verify the certificate provided by the peer
407

408
    We only compare fingerprints. The client must use the same certificate as
409
    we do on our side.
410

411
    """
412
    assert self._ssl_params, "SSL not initialized"
413

    
414
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
415
            self._ssl_cert.digest("md5") == cert.digest("md5"))
416

    
417

    
418
class HttpServerRequestExecutor(object):
419
  """Implements server side of HTTP
420

421
  This class implements the server side of HTTP. It's based on code of Python's
422
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
423
  character encodings. Keep-alive connections are not supported.
424

425
  """
426
  # The default request version.  This only affects responses up until
427
  # the point where the request line is parsed, so it mainly decides what
428
  # the client gets back when sending a malformed request line.
429
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
430
  default_request_version = HTTP_0_9
431

    
432
  # Error message settings
433
  error_message_format = DEFAULT_ERROR_MESSAGE
434
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
435

    
436
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
437

    
438
  def __init__(self, server, conn, client_addr, fileio_class):
439
    """Initializes this class.
440

441
    Part of the initialization is reading the request and eventual POST/PUT
442
    data sent by the client.
443

444
    """
445
    self._server = server
446

    
447
    # We default rfile to buffered because otherwise it could be
448
    # really slow for large data (a getc() call per byte); we make
449
    # wfile unbuffered because (a) often after a write() we want to
450
    # read and we need to flush the line; (b) big writes to unbuffered
451
    # files are typically optimized by stdio even when big reads
452
    # aren't.
453
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
454
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
455

    
456
    self.client_addr = client_addr
457

    
458
    self.request_headers = None
459
    self.request_method = None
460
    self.request_path = None
461
    self.request_requestline = None
462
    self.request_version = self.default_request_version
463

    
464
    self.response_body = None
465
    self.response_code = HTTP_OK
466
    self.response_content_type = None
467
    self.response_headers = {}
468

    
469
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
470
    try:
471
      try:
472
        try:
473
          try:
474
            # Read, parse and handle request
475
            self._ReadRequest()
476
            self._ReadPostData()
477
            self._HandleRequest()
478
          except HTTPException, err:
479
            self._SetErrorStatus(err)
480
        finally:
481
          # Try to send a response
482
          self._SendResponse()
483
          self._Close()
484
      except SocketClosed:
485
        pass
486
    finally:
487
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
488

    
489
  def _Close(self):
490
    if not self.wfile.closed:
491
      self.wfile.flush()
492
    self.wfile.close()
493
    self.rfile.close()
494

    
495
  def _DateTimeHeader(self):
496
    """Return the current date and time formatted for a message header.
497

498
    """
499
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
500
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
501
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
502

    
503
  def _SetErrorStatus(self, err):
504
    """Sets the response code and body from a HTTPException.
505

506
    @type err: HTTPException
507
    @param err: Exception instance
508

509
    """
510
    try:
511
      (shortmsg, longmsg) = self.responses[err.code]
512
    except KeyError:
513
      shortmsg = longmsg = "Unknown"
514

    
515
    if err.message:
516
      message = err.message
517
    else:
518
      message = shortmsg
519

    
520
    values = {
521
      "code": err.code,
522
      "message": cgi.escape(message),
523
      "explain": longmsg,
524
      }
525

    
526
    self.response_code = err.code
527
    self.response_content_type = self.error_content_type
528
    self.response_body = self.error_message_format % values
529

    
530
  def _HandleRequest(self):
531
    """Handle the actual request.
532

533
    Calls the actual handler function and converts exceptions into HTTP errors.
534

535
    """
536
    # Don't do anything if there's already been a problem
537
    if self.response_code != HTTP_OK:
538
      return
539

    
540
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
541

    
542
    # Check whether client is still there
543
    self.rfile.read(0)
544

    
545
    try:
546
      try:
547
        result = self._server.HandleRequest(self)
548

    
549
        # TODO: Content-type
550
        encoder = HTTPJsonConverter()
551
        body = encoder.Encode(result)
552

    
553
        self.response_content_type = encoder.CONTENT_TYPE
554
        self.response_body = body
555
      except (HTTPException, KeyboardInterrupt, SystemExit):
556
        raise
557
      except Exception, err:
558
        logging.exception("Caught exception")
559
        raise HTTPInternalError(message=str(err))
560
      except:
561
        logging.exception("Unknown exception")
562
        raise HTTPInternalError(message="Unknown error")
563

    
564
    except HTTPException, err:
565
      self._SetErrorStatus(err)
566

    
567
  def _SendResponse(self):
568
    """Sends response to the client.
569

570
    """
571
    # Check whether client is still there
572
    self.rfile.read(0)
573

    
574
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
575
                 self.request_requestline, self.response_code)
576

    
577
    if self.response_code in self.responses:
578
      response_message = self.responses[self.response_code][0]
579
    else:
580
      response_message = ""
581

    
582
    if self.request_version != HTTP_0_9:
583
      self.wfile.write("%s %d %s\r\n" %
584
                       (self.request_version, self.response_code,
585
                        response_message))
586
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
587
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
588
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
589
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
590
      for key, val in self.response_headers.iteritems():
591
        self._SendHeader(key, val)
592

    
593
      # We don't support keep-alive at this time
594
      self._SendHeader(HTTP_CONNECTION, "close")
595
      self.wfile.write("\r\n")
596

    
597
    if (self.request_method != HTTP_HEAD and
598
        self.response_code >= HTTP_OK and
599
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
600
      self.wfile.write(self.response_body)
601

    
602
  def _SendHeader(self, name, value):
603
    if self.request_version != HTTP_0_9:
604
      self.wfile.write("%s: %s\r\n" % (name, value))
605

    
606
  def _ReadRequest(self):
607
    """Reads and parses request line
608

609
    """
610
    raw_requestline = self.rfile.readline()
611

    
612
    requestline = raw_requestline
613
    if requestline[-2:] == '\r\n':
614
      requestline = requestline[:-2]
615
    elif requestline[-1:] == '\n':
616
      requestline = requestline[:-1]
617

    
618
    if not requestline:
619
      raise HTTPBadRequest("Empty request line")
620

    
621
    self.request_requestline = requestline
622

    
623
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
624

    
625
    words = requestline.split()
626

    
627
    if len(words) == 3:
628
      [method, path, version] = words
629
      if version[:5] != 'HTTP/':
630
        raise HTTPBadRequest("Bad request version (%r)" % version)
631

    
632
      try:
633
        base_version_number = version.split('/', 1)[1]
634
        version_number = base_version_number.split(".")
635

    
636
        # RFC 2145 section 3.1 says there can be only one "." and
637
        #   - major and minor numbers MUST be treated as
638
        #      separate integers;
639
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
640
        #      turn is lower than HTTP/12.3;
641
        #   - Leading zeros MUST be ignored by recipients.
642
        if len(version_number) != 2:
643
          raise HTTPBadRequest("Bad request version (%r)" % version)
644

    
645
        version_number = int(version_number[0]), int(version_number[1])
646
      except (ValueError, IndexError):
647
        raise HTTPBadRequest("Bad request version (%r)" % version)
648

    
649
      if version_number >= (2, 0):
650
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
651
                                      base_version_number)
652

    
653
    elif len(words) == 2:
654
      version = HTTP_0_9
655
      [method, path] = words
656
      if method != HTTP_GET:
657
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
658

    
659
    else:
660
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
661

    
662
    # Examine the headers and look for a Connection directive
663
    headers = mimetools.Message(self.rfile, 0)
664

    
665
    self.request_method = method
666
    self.request_path = path
667
    self.request_version = version
668
    self.request_headers = headers
669

    
670
  def _ReadPostData(self):
671
    """Reads POST/PUT data
672

673
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
674
    a request is signaled by the inclusion of a Content-Length header field in
675
    the request message headers. HTTP/1.0 requests containing an entity body
676
    must include a valid Content-Length header field."
677

678
    """
679
    # While not according to specification, we only support an entity body for
680
    # POST and PUT.
681
    if (not self.request_method or
682
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
683
      self.request_post_data = None
684
      return
685

    
686
    content_length = None
687
    try:
688
      if HTTP_CONTENT_LENGTH in self.request_headers:
689
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
690
    except TypeError:
691
      pass
692
    except ValueError:
693
      pass
694

    
695
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
696
    if content_length is None:
697
      raise HTTPLengthRequired("Missing Content-Length header or"
698
                               " invalid format")
699

    
700
    data = self.rfile.read(content_length)
701

    
702
    # TODO: Content-type, error handling
703
    if data:
704
      self.request_post_data = HTTPJsonConverter().Decode(data)
705
    else:
706
      self.request_post_data = None
707

    
708
    logging.debug("HTTP POST data: %s", self.request_post_data)
709

    
710

    
711
class HttpServer(_HttpSocketBase):
712
  """Generic HTTP server class
713

714
  Users of this class must subclass it and override the HandleRequest function.
715

716
  """
717
  MAX_CHILDREN = 20
718

    
719
  def __init__(self, mainloop, local_address, port,
720
               ssl_params=None, ssl_verify_peer=False):
721
    """Initializes the HTTP server
722

723
    @type mainloop: ganeti.daemon.Mainloop
724
    @param mainloop: Mainloop used to poll for I/O events
725
    @type local_addess: string
726
    @param local_address: Local IP address to bind to
727
    @type port: int
728
    @param port: TCP port to listen on
729
    @type ssl_params: HttpSslParams
730
    @param ssl_params: SSL key and certificate
731
    @type ssl_verify_peer: bool
732
    @param ssl_verify_peer: Whether to require client certificate and compare
733
                            it with our certificate
734

735
    """
736
    _HttpSocketBase.__init__(self)
737

    
738
    self.mainloop = mainloop
739
    self.local_address = local_address
740
    self.port = port
741

    
742
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
743

    
744
    # Allow port to be reused
745
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
746

    
747
    if self._using_ssl:
748
      self._fileio_class = _SSLFileObject
749
    else:
750
      self._fileio_class = socket._fileobject
751

    
752
    self._children = []
753

    
754
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
755
    mainloop.RegisterSignal(self)
756

    
757
  def Start(self):
758
    self.socket.bind((self.local_address, self.port))
759
    self.socket.listen(5)
760

    
761
  def Stop(self):
762
    self.socket.close()
763

    
764
  def OnIO(self, fd, condition):
765
    if condition & select.POLLIN:
766
      self._IncomingConnection()
767

    
768
  def OnSignal(self, signum):
769
    if signum == signal.SIGCHLD:
770
      self._CollectChildren(True)
771

    
772
  def _CollectChildren(self, quick):
773
    """Checks whether any child processes are done
774

775
    @type quick: bool
776
    @param quick: Whether to only use non-blocking functions
777

778
    """
779
    if not quick:
780
      # Don't wait for other processes if it should be a quick check
781
      while len(self._children) > self.MAX_CHILDREN:
782
        try:
783
          # Waiting without a timeout brings us into a potential DoS situation.
784
          # As soon as too many children run, we'll not respond to new
785
          # requests. The real solution would be to add a timeout for children
786
          # and killing them after some time.
787
          pid, status = os.waitpid(0, 0)
788
        except os.error:
789
          pid = None
790
        if pid and pid in self._children:
791
          self._children.remove(pid)
792

    
793
    for child in self._children:
794
      try:
795
        pid, status = os.waitpid(child, os.WNOHANG)
796
      except os.error:
797
        pid = None
798
      if pid and pid in self._children:
799
        self._children.remove(pid)
800

    
801
  def _IncomingConnection(self):
802
    """Called for each incoming connection
803

804
    """
805
    (connection, client_addr) = self.socket.accept()
806

    
807
    self._CollectChildren(False)
808

    
809
    pid = os.fork()
810
    if pid == 0:
811
      # Child process
812
      try:
813
        HttpServerRequestExecutor(self, connection, client_addr,
814
                                  self._fileio_class)
815
      except:
816
        logging.exception("Error while handling request from %s:%s",
817
                          client_addr[0], client_addr[1])
818
        os._exit(1)
819
      os._exit(0)
820
    else:
821
      self._children.append(pid)
822

    
823
  def HandleRequest(self, req):
824
    raise NotImplementedError()
825

    
826

    
827
class HttpClientRequest(object):
828
  def __init__(self, host, port, method, path, headers=None, post_data=None,
829
               ssl_params=None, ssl_verify_peer=False):
830
    """Describes an HTTP request.
831

832
    @type host: string
833
    @param host: Hostname
834
    @type port: int
835
    @param port: Port
836
    @type method: string
837
    @param method: Method name
838
    @type path: string
839
    @param path: Request path
840
    @type headers: dict or None
841
    @param headers: Additional headers to send
842
    @type post_data: string or None
843
    @param post_data: Additional data to send
844
    @type ssl_params: HttpSslParams
845
    @param ssl_params: SSL key and certificate
846
    @type ssl_verify_peer: bool
847
    @param ssl_verify_peer: Whether to compare our certificate with server's
848
                            certificate
849

850
    """
851
    if post_data is not None:
852
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
853
        "Only POST and GET requests support sending data"
854

    
855
    assert path.startswith("/"), "Path must start with slash (/)"
856

    
857
    self.host = host
858
    self.port = port
859
    self.ssl_params = ssl_params
860
    self.ssl_verify_peer = ssl_verify_peer
861
    self.method = method
862
    self.path = path
863
    self.headers = headers
864
    self.post_data = post_data
865

    
866
    self.success = None
867
    self.error = None
868

    
869
    self.resp_status_line = None
870
    self.resp_version = None
871
    self.resp_status = None
872
    self.resp_reason = None
873
    self.resp_headers = None
874
    self.resp_body = None
875

    
876

    
877
class HttpClientRequestExecutor(_HttpSocketBase):
878
  # Default headers
879
  DEFAULT_HEADERS = {
880
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
881
    # TODO: For keep-alive, don't send "Connection: close"
882
    HTTP_CONNECTION: "close",
883
    }
884

    
885
  # Length limits
886
  STATUS_LINE_LENGTH_MAX = 512
887
  HEADER_LENGTH_MAX = 4 * 1024
888

    
889
  # Timeouts in seconds for socket layer
890
  # TODO: Make read timeout configurable per OpCode
891
  CONNECT_TIMEOUT = 5.0
892
  WRITE_TIMEOUT = 10
893
  READ_TIMEOUT = None
894
  CLOSE_TIMEOUT = 1
895

    
896
  # Parser state machine
897
  PS_STATUS_LINE = "status-line"
898
  PS_HEADERS = "headers"
899
  PS_BODY = "body"
900
  PS_COMPLETE = "complete"
901

    
902
  def __init__(self, req):
903
    """Initializes the HttpClientRequestExecutor class.
904

905
    @type req: HttpClientRequest
906
    @param req: Request object
907

908
    """
909
    _HttpSocketBase.__init__(self)
910

    
911
    self.request = req
912

    
913
    self.parser_status = self.PS_STATUS_LINE
914
    self.header_buffer = StringIO()
915
    self.body_buffer = StringIO()
916
    self.content_length = None
917
    self.server_will_close = None
918

    
919
    self.poller = select.poll()
920

    
921
    try:
922
      # TODO: Implement connection caching/keep-alive
923
      self.sock = self._CreateSocket(req.ssl_params,
924
                                     req.ssl_verify_peer)
925

    
926
      # Disable Python's timeout
927
      self.sock.settimeout(None)
928

    
929
      # Operate in non-blocking mode
930
      self.sock.setblocking(0)
931

    
932
      force_close = True
933
      self._Connect()
934
      try:
935
        self._SendRequest()
936
        self._ReadResponse()
937

    
938
        # Only wait for server to close if we didn't have any exception.
939
        force_close = False
940
      finally:
941
        self._CloseConnection(force_close)
942

    
943
      self.sock.close()
944
      self.sock = None
945

    
946
      req.resp_body = self.body_buffer.getvalue()
947

    
948
      req.success = True
949
      req.error = None
950

    
951
    except _HttpClientError, err:
952
      req.success = False
953
      req.error = str(err)
954

    
955
  def _BuildRequest(self):
956
    """Build HTTP request.
957

958
    @rtype: string
959
    @return: Complete request
960

961
    """
962
    # Headers
963
    send_headers = self.DEFAULT_HEADERS.copy()
964

    
965
    if self.request.headers:
966
      send_headers.update(self.request.headers)
967

    
968
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
969

    
970
    if self.request.post_data:
971
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
972

    
973
    buf = StringIO()
974

    
975
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
976
    # keep-alive).
977
    # TODO: For keep-alive, change to HTTP/1.1
978
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
979
                                self.request.path, HTTP_1_0))
980

    
981
    # Add headers
982
    for name, value in send_headers.iteritems():
983
      buf.write("%s: %s\r\n" % (name, value))
984

    
985
    buf.write("\r\n")
986

    
987
    if self.request.post_data:
988
      buf.write(self.request.post_data)
989

    
990
    return buf.getvalue()
991

    
992
  def _ParseStatusLine(self):
993
    """Parses the status line sent by the server.
994

995
    """
996
    line = self.request.resp_status_line
997

    
998
    if not line:
999
      raise _HttpClientError("Empty status line")
1000

    
1001
    try:
1002
      [version, status, reason] = line.split(None, 2)
1003
    except ValueError:
1004
      try:
1005
        [version, status] = line.split(None, 1)
1006
        reason = ""
1007
      except ValueError:
1008
        version = HTTP_9_0
1009

    
1010
    if version:
1011
      version = version.upper()
1012

    
1013
    if version not in (HTTP_1_0, HTTP_1_1):
1014
      # We do not support HTTP/0.9, despite the specification requiring it
1015
      # (RFC2616, section 19.6)
1016
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1017
                             line)
1018

    
1019
    # The status code is a three-digit number
1020
    try:
1021
      status = int(status)
1022
      if status < 100 or status > 999:
1023
        status = -1
1024
    except ValueError:
1025
      status = -1
1026

    
1027
    if status == -1:
1028
      raise _HttpClientError("Invalid status code (%r)" % line)
1029

    
1030
    self.request.resp_version = version
1031
    self.request.resp_status = status
1032
    self.request.resp_reason = reason
1033

    
1034
  def _WillServerCloseConnection(self):
1035
    """Evaluate whether server will close the connection.
1036

1037
    @rtype: bool
1038
    @return: Whether server will close the connection
1039

1040
    """
1041
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1042
    if hdr_connection:
1043
      hdr_connection = hdr_connection.lower()
1044

    
1045
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1046
    if self.request.resp_version == HTTP_1_1:
1047
      return (hdr_connection and "close" in hdr_connection)
1048

    
1049
    # Some HTTP/1.0 implementations have support for persistent connections,
1050
    # using rules different than HTTP/1.1.
1051

    
1052
    # For older HTTP, Keep-Alive indicates persistent connection.
1053
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1054
      return False
1055

    
1056
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1057
    # supposed to be sent by the client.
1058
    if hdr_connection and "keep-alive" in hdr_connection:
1059
      return False
1060

    
1061
    return True
1062

    
1063
  def _ParseHeaders(self):
1064
    """Parses the headers sent by the server.
1065

1066
    This function also adjusts internal variables based on the header values.
1067

1068
    """
1069
    req = self.request
1070

    
1071
    # Parse headers
1072
    self.header_buffer.seek(0, 0)
1073
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1074

    
1075
    self.server_will_close = self._WillServerCloseConnection()
1076

    
1077
    # Do we have a Content-Length header?
1078
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1079
    if hdr_content_length:
1080
      try:
1081
        self.content_length = int(hdr_content_length)
1082
      except ValueError:
1083
        pass
1084
      if self.content_length is not None and self.content_length < 0:
1085
        self.content_length = None
1086

    
1087
    # does the body have a fixed length? (of zero)
1088
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1089
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1090
      self.content_length = 0
1091

    
1092
    # if the connection remains open and a content-length was not provided,
1093
    # then assume that the connection WILL close.
1094
    if self.content_length is None:
1095
      self.server_will_close = True
1096

    
1097
  def _CheckStatusLineLength(self, length):
1098
    if length > self.STATUS_LINE_LENGTH_MAX:
1099
      raise _HttpClientError("Status line longer than %d chars" %
1100
                             self.STATUS_LINE_LENGTH_MAX)
1101

    
1102
  def _CheckHeaderLength(self, length):
1103
    if length > self.HEADER_LENGTH_MAX:
1104
      raise _HttpClientError("Headers longer than %d chars" %
1105
                             self.HEADER_LENGTH_MAX)
1106

    
1107
  def _ParseBuffer(self, buf, eof):
1108
    """Main function for HTTP response state machine.
1109

1110
    @type buf: string
1111
    @param buf: Receive buffer
1112
    @type eof: bool
1113
    @param eof: Whether we've reached EOF on the socket
1114
    @rtype: string
1115
    @return: Updated receive buffer
1116

1117
    """
1118
    if self.parser_status == self.PS_STATUS_LINE:
1119
      # Expect status line
1120
      idx = buf.find("\r\n")
1121
      if idx >= 0:
1122
        self.request.resp_status_line = buf[:idx]
1123

    
1124
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1125

    
1126
        # Remove status line, including CRLF
1127
        buf = buf[idx + 2:]
1128

    
1129
        self._ParseStatusLine()
1130

    
1131
        self.parser_status = self.PS_HEADERS
1132
      else:
1133
        # Check whether incoming data is getting too large, otherwise we just
1134
        # fill our read buffer.
1135
        self._CheckStatusLineLength(len(buf))
1136

    
1137
    if self.parser_status == self.PS_HEADERS:
1138
      # Wait for header end
1139
      idx = buf.find("\r\n\r\n")
1140
      if idx >= 0:
1141
        self.header_buffer.write(buf[:idx + 2])
1142

    
1143
        self._CheckHeaderLength(self.header_buffer.tell())
1144

    
1145
        # Remove headers, including CRLF
1146
        buf = buf[idx + 4:]
1147

    
1148
        self._ParseHeaders()
1149

    
1150
        self.parser_status = self.PS_BODY
1151
      else:
1152
        # Check whether incoming data is getting too large, otherwise we just
1153
        # fill our read buffer.
1154
        self._CheckHeaderLength(len(buf))
1155

    
1156
    if self.parser_status == self.PS_BODY:
1157
      self.body_buffer.write(buf)
1158
      buf = ""
1159

    
1160
      # Check whether we've read everything
1161
      if (eof or
1162
          (self.content_length is not None and
1163
           self.body_buffer.tell() >= self.content_length)):
1164
        self.parser_status = self.PS_COMPLETE
1165

    
1166
    return buf
1167

    
1168
  def _Connect(self):
1169
    """Non-blocking connect to host with timeout.
1170

1171
    """
1172
    connected = False
1173
    while True:
1174
      try:
1175
        connect_error = self.sock.connect_ex((self.request.host,
1176
                                              self.request.port))
1177
      except socket.gaierror, err:
1178
        raise _HttpClientError("Connection failed: %s" % str(err))
1179

    
1180
      if connect_error == errno.EINTR:
1181
        # Mask signals
1182
        pass
1183

    
1184
      elif connect_error == 0:
1185
        # Connection established
1186
        connected = True
1187
        break
1188

    
1189
      elif connect_error == errno.EINPROGRESS:
1190
        # Connection started
1191
        break
1192

    
1193
      raise _HttpClientError("Connection failed (%s: %s)" %
1194
                             (connect_error, os.strerror(connect_error)))
1195

    
1196
    if not connected:
1197
      # Wait for connection
1198
      event = WaitForSocketCondition(self.poller, self.sock,
1199
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1200
      if event is None:
1201
        raise _HttpClientError("Timeout while connecting to server")
1202

    
1203
      # Get error code
1204
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1205
      if connect_error != 0:
1206
        raise _HttpClientError("Connection failed (%s: %s)" %
1207
                               (connect_error, os.strerror(connect_error)))
1208

    
1209
    # Enable TCP keep-alive
1210
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1211

    
1212
    # If needed, Linux specific options are available to change the TCP
1213
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1214
    # TCP_KEEPINTVL.
1215

    
1216
  def _SendRequest(self):
1217
    """Sends request to server.
1218

1219
    """
1220
    buf = self._BuildRequest()
1221

    
1222
    while buf:
1223
      # Send only 4 KB at a time
1224
      data = buf[:4096]
1225

    
1226
      try:
1227
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1228
                               self.WRITE_TIMEOUT)
1229
      except _HttpSocketTimeout:
1230
        raise _HttpClientError("Timeout while sending request")
1231
      except socket.error, err:
1232
        raise _HttpClientError("Error sending request: %s" % err)
1233

    
1234
      # Remove sent bytes
1235
      buf = buf[sent:]
1236

    
1237
    assert not buf, "Request wasn't sent completely"
1238

    
1239
  def _ReadResponse(self):
1240
    """Read response from server.
1241

1242
    Calls the parser function after reading a chunk of data.
1243

1244
    """
1245
    buf = ""
1246
    eof = False
1247
    while self.parser_status != self.PS_COMPLETE:
1248
      try:
1249
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1250
                               self.READ_TIMEOUT)
1251
      except _HttpSocketTimeout:
1252
        raise _HttpClientError("Timeout while reading response")
1253
      except socket.error, err:
1254
        raise _HttpClientError("Error while reading response: %s" % err)
1255

    
1256
      if data:
1257
        buf += data
1258
      else:
1259
        eof = True
1260

    
1261
      # Do some parsing and error checking while more data arrives
1262
      buf = self._ParseBuffer(buf, eof)
1263

    
1264
      # Must be done only after the buffer has been evaluated
1265
      if (eof and
1266
          self.parser_status in (self.PS_STATUS_LINE,
1267
                                 self.PS_HEADERS)):
1268
        raise _HttpClientError("Connection closed prematurely")
1269

    
1270
    # Parse rest
1271
    buf = self._ParseBuffer(buf, True)
1272

    
1273
    assert self.parser_status == self.PS_COMPLETE
1274
    assert not buf, "Parser didn't read full response"
1275

    
1276
  def _CloseConnection(self, force):
1277
    """Closes the connection.
1278

1279
    """
1280
    if self.server_will_close and not force:
1281
      # Wait for server to close
1282
      try:
1283
        # Check whether it's actually closed
1284
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1285
                               self.CLOSE_TIMEOUT):
1286
          return
1287
      except (socket.error, _HttpClientError, _HttpSocketTimeout):
1288
        # Ignore errors at this stage
1289
        pass
1290

    
1291
    # Close the connection from our side
1292
    try:
1293
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1294
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1295
    except _HttpSocketTimeout:
1296
      raise _HttpClientError("Timeout while shutting down connection")
1297
    except socket.error, err:
1298
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1299

    
1300

    
1301
class _HttpClientPendingRequest(object):
1302
  """Data class for pending requests.
1303

1304
  """
1305
  def __init__(self, request):
1306
    self.request = request
1307

    
1308
    # Thread synchronization
1309
    self.done = threading.Event()
1310

    
1311

    
1312
class HttpClientWorker(workerpool.BaseWorker):
1313
  """HTTP client worker class.
1314

1315
  """
1316
  def RunTask(self, pend_req):
1317
    try:
1318
      HttpClientRequestExecutor(pend_req.request)
1319
    finally:
1320
      pend_req.done.set()
1321

    
1322

    
1323
class HttpClientWorkerPool(workerpool.WorkerPool):
1324
  def __init__(self, manager):
1325
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1326
                                   HttpClientWorker)
1327
    self.manager = manager
1328

    
1329

    
1330
class HttpClientManager(object):
1331
  """Manages HTTP requests.
1332

1333
  """
1334
  def __init__(self):
1335
    self._wpool = HttpClientWorkerPool(self)
1336

    
1337
  def __del__(self):
1338
    self.Shutdown()
1339

    
1340
  def ExecRequests(self, requests):
1341
    """Execute HTTP requests.
1342

1343
    This function can be called from multiple threads at the same time.
1344

1345
    @type requests: List of HttpClientRequest instances
1346
    @param requests: The requests to execute
1347
    @rtype: List of HttpClientRequest instances
1348
    @returns: The list of requests passed in
1349

1350
    """
1351
    # _HttpClientPendingRequest is used for internal thread synchronization
1352
    pending = [_HttpClientPendingRequest(req) for req in requests]
1353

    
1354
    try:
1355
      # Add requests to queue
1356
      for pend_req in pending:
1357
        self._wpool.AddTask(pend_req)
1358

    
1359
    finally:
1360
      # In case of an exception we should still wait for the rest, otherwise
1361
      # another thread from the worker pool could modify the request object
1362
      # after we returned.
1363

    
1364
      # And wait for them to finish
1365
      for pend_req in pending:
1366
        pend_req.done.wait()
1367

    
1368
    # Return original list
1369
    return requests
1370

    
1371
  def Shutdown(self):
1372
    self._wpool.Quiesce()
1373
    self._wpool.TerminateWorkers()
1374

    
1375

    
1376
class _SSLFileObject(object):
1377
  """Wrapper around socket._fileobject
1378

1379
  This wrapper is required to handle OpenSSL exceptions.
1380

1381
  """
1382
  def _RequireOpenSocket(fn):
1383
    def wrapper(self, *args, **kwargs):
1384
      if self.closed:
1385
        raise SocketClosed("Socket is closed")
1386
      return fn(self, *args, **kwargs)
1387
    return wrapper
1388

    
1389
  def __init__(self, sock, mode='rb', bufsize=-1):
1390
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1391

    
1392
  def _ConnectionLost(self):
1393
    self._base = None
1394

    
1395
  def _getclosed(self):
1396
    return self._base is None or self._base.closed
1397
  closed = property(_getclosed, doc="True if the file is closed")
1398

    
1399
  @_RequireOpenSocket
1400
  def close(self):
1401
    return self._base.close()
1402

    
1403
  @_RequireOpenSocket
1404
  def flush(self):
1405
    return self._base.flush()
1406

    
1407
  @_RequireOpenSocket
1408
  def fileno(self):
1409
    return self._base.fileno()
1410

    
1411
  @_RequireOpenSocket
1412
  def read(self, size=-1):
1413
    return self._ReadWrapper(self._base.read, size=size)
1414

    
1415
  @_RequireOpenSocket
1416
  def readline(self, size=-1):
1417
    return self._ReadWrapper(self._base.readline, size=size)
1418

    
1419
  def _ReadWrapper(self, fn, *args, **kwargs):
1420
    while True:
1421
      try:
1422
        return fn(*args, **kwargs)
1423

    
1424
      except OpenSSL.SSL.ZeroReturnError, err:
1425
        self._ConnectionLost()
1426
        return ""
1427

    
1428
      except OpenSSL.SSL.WantReadError:
1429
        continue
1430

    
1431
      #except OpenSSL.SSL.WantWriteError:
1432
      # TODO
1433

    
1434
      except OpenSSL.SSL.SysCallError, (retval, desc):
1435
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1436
            or retval > 0):
1437
          self._ConnectionLost()
1438
          return ""
1439

    
1440
        logging.exception("Error in OpenSSL")
1441
        self._ConnectionLost()
1442
        raise socket.error(err.args)
1443

    
1444
      except OpenSSL.SSL.Error, err:
1445
        self._ConnectionLost()
1446
        raise socket.error(err.args)
1447

    
1448
  @_RequireOpenSocket
1449
  def write(self, data):
1450
    return self._WriteWrapper(self._base.write, data)
1451

    
1452
  def _WriteWrapper(self, fn, *args, **kwargs):
1453
    while True:
1454
      try:
1455
        return fn(*args, **kwargs)
1456
      except OpenSSL.SSL.ZeroReturnError, err:
1457
        self._ConnectionLost()
1458
        return 0
1459

    
1460
      except OpenSSL.SSL.WantWriteError:
1461
        continue
1462

    
1463
      #except OpenSSL.SSL.WantReadError:
1464
      # TODO
1465

    
1466
      except OpenSSL.SSL.SysCallError, err:
1467
        if err.args[0] == -1 and data == "":
1468
          # errors when writing empty strings are expected
1469
          # and can be ignored
1470
          return 0
1471

    
1472
        self._ConnectionLost()
1473
        raise socket.error(err.args)
1474

    
1475
      except OpenSSL.SSL.Error, err:
1476
        self._ConnectionLost()
1477
        raise socket.error(err.args)