Statistics
| Branch: | Tag: | Revision:

root / lib / http.py @ b1d979cf

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11
# General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16
# 02110-1301, USA.
17

    
18
"""HTTP server module.
19

20
"""
21

    
22
import BaseHTTPServer
23
import cgi
24
import logging
25
import mimetools
26
import OpenSSL
27
import os
28
import select
29
import socket
30
import sys
31
import time
32
import signal
33
import logging
34
import errno
35
import threading
36

    
37
from cStringIO import StringIO
38

    
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import workerpool
42
from ganeti import utils
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48

    
49
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50
MONTHNAME = [None,
51
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53

    
54
# Default error message
55
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56
DEFAULT_ERROR_MESSAGE = """\
57
<head>
58
<title>Error response</title>
59
</head>
60
<body>
61
<h1>Error response</h1>
62
<p>Error code %(code)d.
63
<p>Message: %(message)s.
64
<p>Error code explanation: %(code)s = %(explain)s.
65
</body>
66
"""
67

    
68
HTTP_OK = 200
69
HTTP_NO_CONTENT = 204
70
HTTP_NOT_MODIFIED = 304
71

    
72
HTTP_0_9 = "HTTP/0.9"
73
HTTP_1_0 = "HTTP/1.0"
74
HTTP_1_1 = "HTTP/1.1"
75

    
76
HTTP_GET = "GET"
77
HTTP_HEAD = "HEAD"
78
HTTP_POST = "POST"
79
HTTP_PUT = "PUT"
80

    
81
HTTP_ETAG = "ETag"
82
HTTP_HOST = "Host"
83
HTTP_SERVER = "Server"
84
HTTP_DATE = "Date"
85
HTTP_USER_AGENT = "User-Agent"
86
HTTP_CONTENT_TYPE = "Content-Type"
87
HTTP_CONTENT_LENGTH = "Content-Length"
88
HTTP_CONNECTION = "Connection"
89
HTTP_KEEP_ALIVE = "Keep-Alive"
90

    
91
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
92

    
93
# Socket operations
94
(SOCKOP_SEND,
95
 SOCKOP_RECV,
96
 SOCKOP_SHUTDOWN) = range(3)
97

    
98

    
99
class SocketClosed(socket.error):
100
  pass
101

    
102

    
103
class _HttpClientError(Exception):
104
  """Internal exception for HTTP client errors.
105

106
  This should only be used for internal error reporting.
107

108
  """
109

    
110

    
111
class _HttpSocketTimeout(Exception):
112
  """Internal exception for socket timeouts.
113

114
  This should only be used for internal error reporting.
115

116
  """
117

    
118

    
119
class HTTPException(Exception):
120
  code = None
121
  message = None
122

    
123
  def __init__(self, message=None):
124
    Exception.__init__(self)
125
    if message is not None:
126
      self.message = message
127

    
128

    
129
class HTTPBadRequest(HTTPException):
130
  code = 400
131

    
132

    
133
class HTTPForbidden(HTTPException):
134
  code = 403
135

    
136

    
137
class HTTPNotFound(HTTPException):
138
  code = 404
139

    
140

    
141
class HTTPGone(HTTPException):
142
  code = 410
143

    
144

    
145
class HTTPLengthRequired(HTTPException):
146
  code = 411
147

    
148

    
149
class HTTPInternalError(HTTPException):
150
  code = 500
151

    
152

    
153
class HTTPNotImplemented(HTTPException):
154
  code = 501
155

    
156

    
157
class HTTPServiceUnavailable(HTTPException):
158
  code = 503
159

    
160

    
161
class HTTPVersionNotSupported(HTTPException):
162
  code = 505
163

    
164

    
165
class HTTPJsonConverter:
166
  CONTENT_TYPE = "application/json"
167

    
168
  def Encode(self, data):
169
    return serializer.DumpJson(data)
170

    
171
  def Decode(self, data):
172
    return serializer.LoadJson(data)
173

    
174

    
175
def WaitForSocketCondition(poller, sock, event, timeout):
176
  """Waits for a condition to occur on the socket.
177

178
  @type poller: select.Poller
179
  @param poller: Poller object as created by select.poll()
180
  @type sock: socket
181
  @param socket: Wait for events on this socket
182
  @type event: int
183
  @param event: ORed condition (see select module)
184
  @type timeout: float or None
185
  @param timeout: Timeout in seconds
186
  @rtype: int or None
187
  @return: None for timeout, otherwise occured conditions
188

189
  """
190
  check = (event | select.POLLPRI |
191
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
192

    
193
  if timeout is not None:
194
    # Poller object expects milliseconds
195
    timeout *= 1000
196

    
197
  poller.register(sock, event)
198
  try:
199
    while True:
200
      # TODO: If the main thread receives a signal and we have no timeout, we
201
      # could wait forever. This should check a global "quit" flag or
202
      # something every so often.
203
      io_events = poller.poll(timeout)
204
      if not io_events:
205
        # Timeout
206
        return None
207
      for (evfd, evcond) in io_events:
208
        if evcond & check:
209
          return evcond
210
  finally:
211
    poller.unregister(sock)
212

    
213

    
214
def SocketOperation(poller, sock, op, arg1, timeout):
215
  """Wrapper around socket functions.
216

217
  This function abstracts error handling for socket operations, especially
218
  for the complicated interaction with OpenSSL.
219

220
  @type poller: select.Poller
221
  @param poller: Poller object as created by select.poll()
222
  @type sock: socket
223
  @param socket: Socket for the operation
224
  @type op: int
225
  @param op: Operation to execute (SOCKOP_* constants)
226
  @type arg1: any
227
  @param arg1: Parameter for function (if needed)
228
  @type timeout: None or float
229
  @param timeout: Timeout in seconds or None
230

231
  """
232
  # TODO: event_poll/event_check/override
233
  if op == SOCKOP_SEND:
234
    event_poll = select.POLLOUT
235
    event_check = select.POLLOUT
236

    
237
  elif op == SOCKOP_RECV:
238
    event_poll = select.POLLIN
239
    event_check = select.POLLIN | select.POLLPRI
240

    
241
  elif op == SOCKOP_SHUTDOWN:
242
    event_poll = None
243
    event_check = None
244

    
245
    # The timeout is only used when OpenSSL requests polling for a condition.
246
    # It is not advisable to have no timeout for shutdown.
247
    assert timeout
248

    
249
  else:
250
    raise AssertionError("Invalid socket operation")
251

    
252
  # No override by default
253
  event_override = 0
254

    
255
  while True:
256
    # Poll only for certain operations and when asked for by an override
257
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
258
      if event_override:
259
        wait_for_event = event_override
260
      else:
261
        wait_for_event = event_poll
262

    
263
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
264
      if event is None:
265
        raise _HttpSocketTimeout()
266

    
267
      if (op == SOCKOP_RECV and
268
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
269
        return ""
270

    
271
      if not event & wait_for_event:
272
        continue
273

    
274
    # Reset override
275
    event_override = 0
276

    
277
    try:
278
      try:
279
        if op == SOCKOP_SEND:
280
          return sock.send(arg1)
281

    
282
        elif op == SOCKOP_RECV:
283
          return sock.recv(arg1)
284

    
285
        elif op == SOCKOP_SHUTDOWN:
286
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
287
            # PyOpenSSL's shutdown() doesn't take arguments
288
            return sock.shutdown()
289
          else:
290
            return sock.shutdown(arg1)
291

    
292
      except OpenSSL.SSL.WantWriteError:
293
        # OpenSSL wants to write, poll for POLLOUT
294
        event_override = select.POLLOUT
295
        continue
296

    
297
      except OpenSSL.SSL.WantReadError:
298
        # OpenSSL wants to read, poll for POLLIN
299
        event_override = select.POLLIN | select.POLLPRI
300
        continue
301

    
302
      except OpenSSL.SSL.WantX509LookupError:
303
        continue
304

    
305
      except OpenSSL.SSL.SysCallError, err:
306
        if op == SOCKOP_SEND:
307
          # arg1 is the data when writing
308
          if err.args and err.args[0] == -1 and arg1 == "":
309
            # errors when writing empty strings are expected
310
            # and can be ignored
311
            return 0
312

    
313
        elif op == SOCKOP_RECV:
314
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
315
            return ""
316

    
317
        raise socket.error(err.args)
318

    
319
      except OpenSSL.SSL.Error, err:
320
        raise socket.error(err.args)
321

    
322
    except socket.error, err:
323
      if err.args and err.args[0] == errno.EAGAIN:
324
        # Ignore EAGAIN
325
        continue
326

    
327
      raise
328

    
329

    
330
class HttpSslParams(object):
331
  """Data class for SSL key and certificate.
332

333
  """
334
  def __init__(self, ssl_key_path, ssl_cert_path):
335
    """Initializes this class.
336

337
    @type ssl_key_path: string
338
    @param ssl_key_path: Path to file containing SSL key in PEM format
339
    @type ssl_cert_path: string
340
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
341

342
    """
343
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
344
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
345

    
346
  def GetKey(self):
347
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
348
                                          self.ssl_key_pem)
349

    
350
  def GetCertificate(self):
351
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
352
                                           self.ssl_cert_pem)
353

    
354

    
355
class _HttpSocketBase(object):
356
  """Base class for HTTP server and client.
357

358
  """
359
  def __init__(self):
360
    self._using_ssl = None
361
    self._ssl_params = None
362
    self._ssl_key = None
363
    self._ssl_cert = None
364

    
365
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
366
    """Creates a TCP socket and initializes SSL if needed.
367

368
    @type ssl_params: HttpSslParams
369
    @param ssl_params: SSL key and certificate
370
    @type ssl_verify_peer: bool
371
    @param ssl_verify_peer: Whether to require client certificate and compare
372
                            it with our certificate
373

374
    """
375
    self._ssl_params = ssl_params
376

    
377
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
378

    
379
    # Should we enable SSL?
380
    self._using_ssl = ssl_params is not None
381

    
382
    if not self._using_ssl:
383
      return sock
384

    
385
    self._ssl_key = ssl_params.GetKey()
386
    self._ssl_cert = ssl_params.GetCertificate()
387

    
388
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
389
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
390

    
391
    ctx.use_privatekey(self._ssl_key)
392
    ctx.use_certificate(self._ssl_cert)
393
    ctx.check_privatekey()
394

    
395
    if ssl_verify_peer:
396
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
397
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
398
                     self._SSLVerifyCallback)
399

    
400
    return OpenSSL.SSL.Connection(ctx, sock)
401

    
402
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
403
    """Verify the certificate provided by the peer
404

405
    We only compare fingerprints. The client must use the same certificate as
406
    we do on our side.
407

408
    """
409
    assert self._ssl_params, "SSL not initialized"
410

    
411
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
412
            self._ssl_cert.digest("md5") == cert.digest("md5"))
413

    
414

    
415
class HttpServerRequestExecutor(object):
416
  """Implements server side of HTTP
417

418
  This class implements the server side of HTTP. It's based on code of Python's
419
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
420
  character encodings. Keep-alive connections are not supported.
421

422
  """
423
  # The default request version.  This only affects responses up until
424
  # the point where the request line is parsed, so it mainly decides what
425
  # the client gets back when sending a malformed request line.
426
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
427
  default_request_version = HTTP_0_9
428

    
429
  # Error message settings
430
  error_message_format = DEFAULT_ERROR_MESSAGE
431
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
432

    
433
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
434

    
435
  def __init__(self, server, conn, client_addr, fileio_class):
436
    """Initializes this class.
437

438
    Part of the initialization is reading the request and eventual POST/PUT
439
    data sent by the client.
440

441
    """
442
    self._server = server
443

    
444
    # We default rfile to buffered because otherwise it could be
445
    # really slow for large data (a getc() call per byte); we make
446
    # wfile unbuffered because (a) often after a write() we want to
447
    # read and we need to flush the line; (b) big writes to unbuffered
448
    # files are typically optimized by stdio even when big reads
449
    # aren't.
450
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
451
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
452

    
453
    self.client_addr = client_addr
454

    
455
    self.request_headers = None
456
    self.request_method = None
457
    self.request_path = None
458
    self.request_requestline = None
459
    self.request_version = self.default_request_version
460

    
461
    self.response_body = None
462
    self.response_code = HTTP_OK
463
    self.response_content_type = None
464
    self.response_headers = {}
465

    
466
    self.should_fork = False
467

    
468
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
469
    try:
470
      try:
471
        try:
472
          try:
473
            # Read, parse and handle request
474
            self._ReadRequest()
475
            self._ReadPostData()
476
            self._HandleRequest()
477
          except HTTPException, err:
478
            self._SetErrorStatus(err)
479
        finally:
480
          # Try to send a response
481
          self._SendResponse()
482
          self._Close()
483
      except SocketClosed:
484
        pass
485
    finally:
486
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
487

    
488
  def _Close(self):
489
    if not self.wfile.closed:
490
      self.wfile.flush()
491
    self.wfile.close()
492
    self.rfile.close()
493

    
494
  def _DateTimeHeader(self):
495
    """Return the current date and time formatted for a message header.
496

497
    """
498
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
499
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
500
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
501

    
502
  def _SetErrorStatus(self, err):
503
    """Sets the response code and body from a HTTPException.
504

505
    @type err: HTTPException
506
    @param err: Exception instance
507

508
    """
509
    try:
510
      (shortmsg, longmsg) = self.responses[err.code]
511
    except KeyError:
512
      shortmsg = longmsg = "Unknown"
513

    
514
    if err.message:
515
      message = err.message
516
    else:
517
      message = shortmsg
518

    
519
    values = {
520
      "code": err.code,
521
      "message": cgi.escape(message),
522
      "explain": longmsg,
523
      }
524

    
525
    self.response_code = err.code
526
    self.response_content_type = self.error_content_type
527
    self.response_body = self.error_message_format % values
528

    
529
  def _HandleRequest(self):
530
    """Handle the actual request.
531

532
    Calls the actual handler function and converts exceptions into HTTP errors.
533

534
    """
535
    # Don't do anything if there's already been a problem
536
    if self.response_code != HTTP_OK:
537
      return
538

    
539
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
540

    
541
    # Check whether client is still there
542
    self.rfile.read(0)
543

    
544
    try:
545
      try:
546
        result = self._server.HandleRequest(self)
547

    
548
        # TODO: Content-type
549
        encoder = HTTPJsonConverter()
550
        body = encoder.Encode(result)
551

    
552
        self.response_content_type = encoder.CONTENT_TYPE
553
        self.response_body = body
554
      except (HTTPException, KeyboardInterrupt, SystemExit):
555
        raise
556
      except Exception, err:
557
        logging.exception("Caught exception")
558
        raise HTTPInternalError(message=str(err))
559
      except:
560
        logging.exception("Unknown exception")
561
        raise HTTPInternalError(message="Unknown error")
562

    
563
    except HTTPException, err:
564
      self._SetErrorStatus(err)
565

    
566
  def _SendResponse(self):
567
    """Sends response to the client.
568

569
    """
570
    # Check whether client is still there
571
    self.rfile.read(0)
572

    
573
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
574
                 self.request_requestline, self.response_code)
575

    
576
    if self.response_code in self.responses:
577
      response_message = self.responses[self.response_code][0]
578
    else:
579
      response_message = ""
580

    
581
    if self.request_version != HTTP_0_9:
582
      self.wfile.write("%s %d %s\r\n" %
583
                       (self.request_version, self.response_code,
584
                        response_message))
585
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
586
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
587
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
588
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
589
      for key, val in self.response_headers.iteritems():
590
        self._SendHeader(key, val)
591

    
592
      # We don't support keep-alive at this time
593
      self._SendHeader(HTTP_CONNECTION, "close")
594
      self.wfile.write("\r\n")
595

    
596
    if (self.request_method != HTTP_HEAD and
597
        self.response_code >= HTTP_OK and
598
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
599
      self.wfile.write(self.response_body)
600

    
601
  def _SendHeader(self, name, value):
602
    if self.request_version != HTTP_0_9:
603
      self.wfile.write("%s: %s\r\n" % (name, value))
604

    
605
  def _ReadRequest(self):
606
    """Reads and parses request line
607

608
    """
609
    raw_requestline = self.rfile.readline()
610

    
611
    requestline = raw_requestline
612
    if requestline[-2:] == '\r\n':
613
      requestline = requestline[:-2]
614
    elif requestline[-1:] == '\n':
615
      requestline = requestline[:-1]
616

    
617
    if not requestline:
618
      raise HTTPBadRequest("Empty request line")
619

    
620
    self.request_requestline = requestline
621

    
622
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
623

    
624
    words = requestline.split()
625

    
626
    if len(words) == 3:
627
      [method, path, version] = words
628
      if version[:5] != 'HTTP/':
629
        raise HTTPBadRequest("Bad request version (%r)" % version)
630

    
631
      try:
632
        base_version_number = version.split('/', 1)[1]
633
        version_number = base_version_number.split(".")
634

    
635
        # RFC 2145 section 3.1 says there can be only one "." and
636
        #   - major and minor numbers MUST be treated as
637
        #      separate integers;
638
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
639
        #      turn is lower than HTTP/12.3;
640
        #   - Leading zeros MUST be ignored by recipients.
641
        if len(version_number) != 2:
642
          raise HTTPBadRequest("Bad request version (%r)" % version)
643

    
644
        version_number = int(version_number[0]), int(version_number[1])
645
      except (ValueError, IndexError):
646
        raise HTTPBadRequest("Bad request version (%r)" % version)
647

    
648
      if version_number >= (2, 0):
649
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
650
                                      base_version_number)
651

    
652
    elif len(words) == 2:
653
      version = HTTP_0_9
654
      [method, path] = words
655
      if method != HTTP_GET:
656
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
657

    
658
    else:
659
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
660

    
661
    # Examine the headers and look for a Connection directive
662
    headers = mimetools.Message(self.rfile, 0)
663

    
664
    self.request_method = method
665
    self.request_path = path
666
    self.request_version = version
667
    self.request_headers = headers
668

    
669
  def _ReadPostData(self):
670
    """Reads POST/PUT data
671

672
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
673
    a request is signaled by the inclusion of a Content-Length header field in
674
    the request message headers. HTTP/1.0 requests containing an entity body
675
    must include a valid Content-Length header field."
676

677
    """
678
    # While not according to specification, we only support an entity body for
679
    # POST and PUT.
680
    if (not self.request_method or
681
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
682
      self.request_post_data = None
683
      return
684

    
685
    content_length = None
686
    try:
687
      if HTTP_CONTENT_LENGTH in self.request_headers:
688
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
689
    except TypeError:
690
      pass
691
    except ValueError:
692
      pass
693

    
694
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
695
    if content_length is None:
696
      raise HTTPLengthRequired("Missing Content-Length header or"
697
                               " invalid format")
698

    
699
    data = self.rfile.read(content_length)
700

    
701
    # TODO: Content-type, error handling
702
    if data:
703
      self.request_post_data = HTTPJsonConverter().Decode(data)
704
    else:
705
      self.request_post_data = None
706

    
707
    logging.debug("HTTP POST data: %s", self.request_post_data)
708

    
709

    
710
class HttpServer(_HttpSocketBase):
711
  """Generic HTTP server class
712

713
  Users of this class must subclass it and override the HandleRequest function.
714

715
  """
716
  MAX_CHILDREN = 20
717

    
718
  def __init__(self, mainloop, local_address, port,
719
               ssl_params=None, ssl_verify_peer=False):
720
    """Initializes the HTTP server
721

722
    @type mainloop: ganeti.daemon.Mainloop
723
    @param mainloop: Mainloop used to poll for I/O events
724
    @type local_addess: string
725
    @param local_address: Local IP address to bind to
726
    @type port: int
727
    @param port: TCP port to listen on
728
    @type ssl_params: HttpSslParams
729
    @param ssl_params: SSL key and certificate
730
    @type ssl_verify_peer: bool
731
    @param ssl_verify_peer: Whether to require client certificate and compare
732
                            it with our certificate
733

734
    """
735
    _HttpSocketBase.__init__(self)
736

    
737
    self.mainloop = mainloop
738
    self.local_address = local_address
739
    self.port = port
740

    
741
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
742

    
743
    # Allow port to be reused
744
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
745

    
746
    if self._using_ssl:
747
      self._fileio_class = _SSLFileObject
748
    else:
749
      self._fileio_class = socket._fileobject
750

    
751
    self._children = []
752

    
753
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
754
    mainloop.RegisterSignal(self)
755

    
756
  def Start(self):
757
    self.socket.bind((self.local_address, self.port))
758
    self.socket.listen(5)
759

    
760
  def Stop(self):
761
    self.socket.close()
762

    
763
  def OnIO(self, fd, condition):
764
    if condition & select.POLLIN:
765
      self._IncomingConnection()
766

    
767
  def OnSignal(self, signum):
768
    if signum == signal.SIGCHLD:
769
      self._CollectChildren(True)
770

    
771
  def _CollectChildren(self, quick):
772
    """Checks whether any child processes are done
773

774
    @type quick: bool
775
    @param quick: Whether to only use non-blocking functions
776

777
    """
778
    if not quick:
779
      # Don't wait for other processes if it should be a quick check
780
      while len(self._children) > self.MAX_CHILDREN:
781
        try:
782
          # Waiting without a timeout brings us into a potential DoS situation.
783
          # As soon as too many children run, we'll not respond to new
784
          # requests. The real solution would be to add a timeout for children
785
          # and killing them after some time.
786
          pid, status = os.waitpid(0, 0)
787
        except os.error:
788
          pid = None
789
        if pid and pid in self._children:
790
          self._children.remove(pid)
791

    
792
    for child in self._children:
793
      try:
794
        pid, status = os.waitpid(child, os.WNOHANG)
795
      except os.error:
796
        pid = None
797
      if pid and pid in self._children:
798
        self._children.remove(pid)
799

    
800
  def _IncomingConnection(self):
801
    """Called for each incoming connection
802

803
    """
804
    (connection, client_addr) = self.socket.accept()
805

    
806
    self._CollectChildren(False)
807

    
808
    pid = os.fork()
809
    if pid == 0:
810
      # Child process
811
      try:
812
        HttpServerRequestExecutor(self, connection, client_addr,
813
                                  self._fileio_class)
814
      except:
815
        logging.exception("Error while handling request from %s:%s",
816
                          client_addr[0], client_addr[1])
817
        os._exit(1)
818
      os._exit(0)
819
    else:
820
      self._children.append(pid)
821

    
822
  def HandleRequest(self, req):
823
    raise NotImplementedError()
824

    
825

    
826
class HttpClientRequest(object):
827
  def __init__(self, host, port, method, path, headers=None, post_data=None,
828
               ssl_params=None, ssl_verify_peer=False):
829
    """Describes an HTTP request.
830

831
    @type host: string
832
    @param host: Hostname
833
    @type port: int
834
    @param port: Port
835
    @type method: string
836
    @param method: Method name
837
    @type path: string
838
    @param path: Request path
839
    @type headers: dict or None
840
    @param headers: Additional headers to send
841
    @type post_data: string or None
842
    @param post_data: Additional data to send
843
    @type ssl_params: HttpSslParams
844
    @param ssl_params: SSL key and certificate
845
    @type ssl_verify_peer: bool
846
    @param ssl_verify_peer: Whether to compare our certificate with server's
847
                            certificate
848

849
    """
850
    if post_data is not None:
851
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
852
        "Only POST and GET requests support sending data"
853

    
854
    assert path.startswith("/"), "Path must start with slash (/)"
855

    
856
    self.host = host
857
    self.port = port
858
    self.ssl_params = ssl_params
859
    self.ssl_verify_peer = ssl_verify_peer
860
    self.method = method
861
    self.path = path
862
    self.headers = headers
863
    self.post_data = post_data
864

    
865
    self.success = None
866
    self.error = None
867

    
868
    self.resp_status_line = None
869
    self.resp_version = None
870
    self.resp_status = None
871
    self.resp_reason = None
872
    self.resp_headers = None
873
    self.resp_body = None
874

    
875

    
876
class HttpClientRequestExecutor(_HttpSocketBase):
877
  # Default headers
878
  DEFAULT_HEADERS = {
879
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
880
    # TODO: For keep-alive, don't send "Connection: close"
881
    HTTP_CONNECTION: "close",
882
    }
883

    
884
  # Length limits
885
  STATUS_LINE_LENGTH_MAX = 512
886
  HEADER_LENGTH_MAX = 4 * 1024
887

    
888
  # Timeouts in seconds for socket layer
889
  # TODO: Make read timeout configurable per OpCode
890
  CONNECT_TIMEOUT = 5.0
891
  WRITE_TIMEOUT = 10
892
  READ_TIMEOUT = None
893
  CLOSE_TIMEOUT = 1
894

    
895
  # Parser state machine
896
  PS_STATUS_LINE = "status-line"
897
  PS_HEADERS = "headers"
898
  PS_BODY = "body"
899
  PS_COMPLETE = "complete"
900

    
901
  def __init__(self, req):
902
    """Initializes the HttpClientRequestExecutor class.
903

904
    @type req: HttpClientRequest
905
    @param req: Request object
906

907
    """
908
    _HttpSocketBase.__init__(self)
909

    
910
    self.request = req
911

    
912
    self.parser_status = self.PS_STATUS_LINE
913
    self.header_buffer = StringIO()
914
    self.body_buffer = StringIO()
915
    self.content_length = None
916
    self.server_will_close = None
917

    
918
    self.poller = select.poll()
919

    
920
    try:
921
      # TODO: Implement connection caching/keep-alive
922
      self.sock = self._CreateSocket(req.ssl_params,
923
                                     req.ssl_verify_peer)
924

    
925
      # Disable Python's timeout
926
      self.sock.settimeout(None)
927

    
928
      # Operate in non-blocking mode
929
      self.sock.setblocking(0)
930

    
931
      force_close = True
932
      self._Connect()
933
      try:
934
        self._SendRequest()
935
        self._ReadResponse()
936

    
937
        # Only wait for server to close if we didn't have any exception.
938
        force_close = False
939
      finally:
940
        self._CloseConnection(force_close)
941

    
942
      self.sock.close()
943
      self.sock = None
944

    
945
      req.resp_body = self.body_buffer.getvalue()
946

    
947
      req.success = True
948
      req.error = None
949

    
950
    except _HttpClientError, err:
951
      req.success = False
952
      req.error = str(err)
953

    
954
  def _BuildRequest(self):
955
    """Build HTTP request.
956

957
    @rtype: string
958
    @return: Complete request
959

960
    """
961
    # Headers
962
    send_headers = self.DEFAULT_HEADERS.copy()
963

    
964
    if self.request.headers:
965
      send_headers.update(self.request.headers)
966

    
967
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
968

    
969
    if self.request.post_data:
970
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
971

    
972
    buf = StringIO()
973

    
974
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
975
    # keep-alive).
976
    # TODO: For keep-alive, change to HTTP/1.1
977
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
978
                                self.request.path, HTTP_1_0))
979

    
980
    # Add headers
981
    for name, value in send_headers.iteritems():
982
      buf.write("%s: %s\r\n" % (name, value))
983

    
984
    buf.write("\r\n")
985

    
986
    if self.request.post_data:
987
      buf.write(self.request.post_data)
988

    
989
    return buf.getvalue()
990

    
991
  def _ParseStatusLine(self):
992
    """Parses the status line sent by the server.
993

994
    """
995
    line = self.request.resp_status_line
996

    
997
    if not line:
998
      raise _HttpClientError("Empty status line")
999

    
1000
    try:
1001
      [version, status, reason] = line.split(None, 2)
1002
    except ValueError:
1003
      try:
1004
        [version, status] = line.split(None, 1)
1005
        reason = ""
1006
      except ValueError:
1007
        version = HTTP_9_0
1008

    
1009
    if version:
1010
      version = version.upper()
1011

    
1012
    if version not in (HTTP_1_0, HTTP_1_1):
1013
      # We do not support HTTP/0.9, despite the specification requiring it
1014
      # (RFC2616, section 19.6)
1015
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1016
                             line)
1017

    
1018
    # The status code is a three-digit number
1019
    try:
1020
      status = int(status)
1021
      if status < 100 or status > 999:
1022
        status = -1
1023
    except ValueError:
1024
      status = -1
1025

    
1026
    if status == -1:
1027
      raise _HttpClientError("Invalid status code (%r)" % line)
1028

    
1029
    self.request.resp_version = version
1030
    self.request.resp_status = status
1031
    self.request.resp_reason = reason
1032

    
1033
  def _WillServerCloseConnection(self):
1034
    """Evaluate whether server will close the connection.
1035

1036
    @rtype: bool
1037
    @return: Whether server will close the connection
1038

1039
    """
1040
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1041
    if hdr_connection:
1042
      hdr_connection = hdr_connection.lower()
1043

    
1044
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1045
    if self.request.resp_version == HTTP_1_1:
1046
      return (hdr_connection and "close" in hdr_connection)
1047

    
1048
    # Some HTTP/1.0 implementations have support for persistent connections,
1049
    # using rules different than HTTP/1.1.
1050

    
1051
    # For older HTTP, Keep-Alive indicates persistent connection.
1052
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1053
      return False
1054

    
1055
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1056
    # supposed to be sent by the client.
1057
    if hdr_connection and "keep-alive" in hdr_connection:
1058
      return False
1059

    
1060
    return True
1061

    
1062
  def _ParseHeaders(self):
1063
    """Parses the headers sent by the server.
1064

1065
    This function also adjusts internal variables based on the header values.
1066

1067
    """
1068
    req = self.request
1069

    
1070
    # Parse headers
1071
    self.header_buffer.seek(0, 0)
1072
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1073

    
1074
    self.server_will_close = self._WillServerCloseConnection()
1075

    
1076
    # Do we have a Content-Length header?
1077
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1078
    if hdr_content_length:
1079
      try:
1080
        self.content_length = int(hdr_content_length)
1081
      except ValueError:
1082
        pass
1083
      if self.content_length is not None and self.content_length < 0:
1084
        self.content_length = None
1085

    
1086
    # does the body have a fixed length? (of zero)
1087
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1088
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1089
      self.content_length = 0
1090

    
1091
    # if the connection remains open and a content-length was not provided,
1092
    # then assume that the connection WILL close.
1093
    if self.content_length is None:
1094
      self.server_will_close = True
1095

    
1096
  def _CheckStatusLineLength(self, length):
1097
    if length > self.STATUS_LINE_LENGTH_MAX:
1098
      raise _HttpClientError("Status line longer than %d chars" %
1099
                             self.STATUS_LINE_LENGTH_MAX)
1100

    
1101
  def _CheckHeaderLength(self, length):
1102
    if length > self.HEADER_LENGTH_MAX:
1103
      raise _HttpClientError("Headers longer than %d chars" %
1104
                             self.HEADER_LENGTH_MAX)
1105

    
1106
  def _ParseBuffer(self, buf, eof):
1107
    """Main function for HTTP response state machine.
1108

1109
    @type buf: string
1110
    @param buf: Receive buffer
1111
    @type eof: bool
1112
    @param eof: Whether we've reached EOF on the socket
1113
    @rtype: string
1114
    @return: Updated receive buffer
1115

1116
    """
1117
    if self.parser_status == self.PS_STATUS_LINE:
1118
      # Expect status line
1119
      idx = buf.find("\r\n")
1120
      if idx >= 0:
1121
        self.request.resp_status_line = buf[:idx]
1122

    
1123
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1124

    
1125
        # Remove status line, including CRLF
1126
        buf = buf[idx + 2:]
1127

    
1128
        self._ParseStatusLine()
1129

    
1130
        self.parser_status = self.PS_HEADERS
1131
      else:
1132
        # Check whether incoming data is getting too large, otherwise we just
1133
        # fill our read buffer.
1134
        self._CheckStatusLineLength(len(buf))
1135

    
1136
    if self.parser_status == self.PS_HEADERS:
1137
      # Wait for header end
1138
      idx = buf.find("\r\n\r\n")
1139
      if idx >= 0:
1140
        self.header_buffer.write(buf[:idx + 2])
1141

    
1142
        self._CheckHeaderLength(self.header_buffer.tell())
1143

    
1144
        # Remove headers, including CRLF
1145
        buf = buf[idx + 4:]
1146

    
1147
        self._ParseHeaders()
1148

    
1149
        self.parser_status = self.PS_BODY
1150
      else:
1151
        # Check whether incoming data is getting too large, otherwise we just
1152
        # fill our read buffer.
1153
        self._CheckHeaderLength(len(buf))
1154

    
1155
    if self.parser_status == self.PS_BODY:
1156
      self.body_buffer.write(buf)
1157
      buf = ""
1158

    
1159
      # Check whether we've read everything
1160
      if (eof or
1161
          (self.content_length is not None and
1162
           self.body_buffer.tell() >= self.content_length)):
1163
        self.parser_status = self.PS_COMPLETE
1164

    
1165
    return buf
1166

    
1167
  def _Connect(self):
1168
    """Non-blocking connect to host with timeout.
1169

1170
    """
1171
    connected = False
1172
    while True:
1173
      try:
1174
        connect_error = self.sock.connect_ex((self.request.host,
1175
                                              self.request.port))
1176
      except socket.gaierror, err:
1177
        raise _HttpClientError("Connection failed: %s" % str(err))
1178

    
1179
      if connect_error == errno.EINTR:
1180
        # Mask signals
1181
        pass
1182

    
1183
      elif connect_error == 0:
1184
        # Connection established
1185
        connected = True
1186
        break
1187

    
1188
      elif connect_error == errno.EINPROGRESS:
1189
        # Connection started
1190
        break
1191

    
1192
      raise _HttpClientError("Connection failed (%s: %s)" %
1193
                             (connect_error, os.strerror(connect_error)))
1194

    
1195
    if not connected:
1196
      # Wait for connection
1197
      event = WaitForSocketCondition(self.poller, self.sock,
1198
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1199
      if event is None:
1200
        raise _HttpClientError("Timeout while connecting to server")
1201

    
1202
      # Get error code
1203
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1204
      if connect_error != 0:
1205
        raise _HttpClientError("Connection failed (%s: %s)" %
1206
                               (connect_error, os.strerror(connect_error)))
1207

    
1208
    # Enable TCP keep-alive
1209
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1210

    
1211
    # If needed, Linux specific options are available to change the TCP
1212
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1213
    # TCP_KEEPINTVL.
1214

    
1215
  def _SendRequest(self):
1216
    """Sends request to server.
1217

1218
    """
1219
    buf = self._BuildRequest()
1220

    
1221
    while buf:
1222
      # Send only 4 KB at a time
1223
      data = buf[:4096]
1224

    
1225
      try:
1226
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1227
                               self.WRITE_TIMEOUT)
1228
      except _HttpSocketTimeout:
1229
        raise _HttpClientError("Timeout while sending request")
1230
      except socket.error, err:
1231
        raise _HttpClientError("Error sending request: %s" % err)
1232

    
1233
      # Remove sent bytes
1234
      buf = buf[sent:]
1235

    
1236
    assert not buf, "Request wasn't sent completely"
1237

    
1238
  def _ReadResponse(self):
1239
    """Read response from server.
1240

1241
    Calls the parser function after reading a chunk of data.
1242

1243
    """
1244
    buf = ""
1245
    eof = False
1246
    while self.parser_status != self.PS_COMPLETE:
1247
      try:
1248
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1249
                               self.READ_TIMEOUT)
1250
      except _HttpSocketTimeout:
1251
        raise _HttpClientError("Timeout while reading response")
1252
      except socket.error, err:
1253
        raise _HttpClientError("Error while reading response: %s" % err)
1254

    
1255
      if data:
1256
        buf += data
1257
      else:
1258
        eof = True
1259

    
1260
      # Do some parsing and error checking while more data arrives
1261
      buf = self._ParseBuffer(buf, eof)
1262

    
1263
      # Must be done only after the buffer has been evaluated
1264
      if (eof and
1265
          self.parser_status in (self.PS_STATUS_LINE,
1266
                                 self.PS_HEADERS)):
1267
        raise _HttpClientError("Connection closed prematurely")
1268

    
1269
    # Parse rest
1270
    buf = self._ParseBuffer(buf, True)
1271

    
1272
    assert self.parser_status == self.PS_COMPLETE
1273
    assert not buf, "Parser didn't read full response"
1274

    
1275
  def _CloseConnection(self, force):
1276
    """Closes the connection.
1277

1278
    """
1279
    if self.server_will_close and not force:
1280
      # Wait for server to close
1281
      try:
1282
        # Check whether it's actually closed
1283
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1284
                               self.CLOSE_TIMEOUT):
1285
          return
1286
      except (socket.error, _HttpClientError, _HttpSocketTimeout):
1287
        # Ignore errors at this stage
1288
        pass
1289

    
1290
    # Close the connection from our side
1291
    try:
1292
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1293
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1294
    except _HttpSocketTimeout:
1295
      raise _HttpClientError("Timeout while shutting down connection")
1296
    except socket.error, err:
1297
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1298

    
1299

    
1300
class _HttpClientPendingRequest(object):
1301
  """Data class for pending requests.
1302

1303
  """
1304
  def __init__(self, request):
1305
    self.request = request
1306

    
1307
    # Thread synchronization
1308
    self.done = threading.Event()
1309

    
1310

    
1311
class HttpClientWorker(workerpool.BaseWorker):
1312
  """HTTP client worker class.
1313

1314
  """
1315
  def RunTask(self, pend_req):
1316
    try:
1317
      HttpClientRequestExecutor(pend_req.request)
1318
    finally:
1319
      pend_req.done.set()
1320

    
1321

    
1322
class HttpClientWorkerPool(workerpool.WorkerPool):
1323
  def __init__(self, manager):
1324
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1325
                                   HttpClientWorker)
1326
    self.manager = manager
1327

    
1328

    
1329
class HttpClientManager(object):
1330
  """Manages HTTP requests.
1331

1332
  """
1333
  def __init__(self):
1334
    self._wpool = HttpClientWorkerPool(self)
1335

    
1336
  def __del__(self):
1337
    self.Shutdown()
1338

    
1339
  def ExecRequests(self, requests):
1340
    """Execute HTTP requests.
1341

1342
    This function can be called from multiple threads at the same time.
1343

1344
    @type requests: List of HttpClientRequest instances
1345
    @param requests: The requests to execute
1346
    @rtype: List of HttpClientRequest instances
1347
    @returns: The list of requests passed in
1348

1349
    """
1350
    # _HttpClientPendingRequest is used for internal thread synchronization
1351
    pending = [_HttpClientPendingRequest(req) for req in requests]
1352

    
1353
    try:
1354
      # Add requests to queue
1355
      for pend_req in pending:
1356
        self._wpool.AddTask(pend_req)
1357

    
1358
    finally:
1359
      # In case of an exception we should still wait for the rest, otherwise
1360
      # another thread from the worker pool could modify the request object
1361
      # after we returned.
1362

    
1363
      # And wait for them to finish
1364
      for pend_req in pending:
1365
        pend_req.done.wait()
1366

    
1367
    # Return original list
1368
    return requests
1369

    
1370
  def Shutdown(self):
1371
    self._wpool.Quiesce()
1372
    self._wpool.TerminateWorkers()
1373

    
1374

    
1375
class _SSLFileObject(object):
1376
  """Wrapper around socket._fileobject
1377

1378
  This wrapper is required to handle OpenSSL exceptions.
1379

1380
  """
1381
  def _RequireOpenSocket(fn):
1382
    def wrapper(self, *args, **kwargs):
1383
      if self.closed:
1384
        raise SocketClosed("Socket is closed")
1385
      return fn(self, *args, **kwargs)
1386
    return wrapper
1387

    
1388
  def __init__(self, sock, mode='rb', bufsize=-1):
1389
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1390

    
1391
  def _ConnectionLost(self):
1392
    self._base = None
1393

    
1394
  def _getclosed(self):
1395
    return self._base is None or self._base.closed
1396
  closed = property(_getclosed, doc="True if the file is closed")
1397

    
1398
  @_RequireOpenSocket
1399
  def close(self):
1400
    return self._base.close()
1401

    
1402
  @_RequireOpenSocket
1403
  def flush(self):
1404
    return self._base.flush()
1405

    
1406
  @_RequireOpenSocket
1407
  def fileno(self):
1408
    return self._base.fileno()
1409

    
1410
  @_RequireOpenSocket
1411
  def read(self, size=-1):
1412
    return self._ReadWrapper(self._base.read, size=size)
1413

    
1414
  @_RequireOpenSocket
1415
  def readline(self, size=-1):
1416
    return self._ReadWrapper(self._base.readline, size=size)
1417

    
1418
  def _ReadWrapper(self, fn, *args, **kwargs):
1419
    while True:
1420
      try:
1421
        return fn(*args, **kwargs)
1422

    
1423
      except OpenSSL.SSL.ZeroReturnError, err:
1424
        self._ConnectionLost()
1425
        return ""
1426

    
1427
      except OpenSSL.SSL.WantReadError:
1428
        continue
1429

    
1430
      #except OpenSSL.SSL.WantWriteError:
1431
      # TODO
1432

    
1433
      except OpenSSL.SSL.SysCallError, (retval, desc):
1434
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1435
            or retval > 0):
1436
          self._ConnectionLost()
1437
          return ""
1438

    
1439
        logging.exception("Error in OpenSSL")
1440
        self._ConnectionLost()
1441
        raise socket.error(err.args)
1442

    
1443
      except OpenSSL.SSL.Error, err:
1444
        self._ConnectionLost()
1445
        raise socket.error(err.args)
1446

    
1447
  @_RequireOpenSocket
1448
  def write(self, data):
1449
    return self._WriteWrapper(self._base.write, data)
1450

    
1451
  def _WriteWrapper(self, fn, *args, **kwargs):
1452
    while True:
1453
      try:
1454
        return fn(*args, **kwargs)
1455
      except OpenSSL.SSL.ZeroReturnError, err:
1456
        self._ConnectionLost()
1457
        return 0
1458

    
1459
      except OpenSSL.SSL.WantWriteError:
1460
        continue
1461

    
1462
      #except OpenSSL.SSL.WantReadError:
1463
      # TODO
1464

    
1465
      except OpenSSL.SSL.SysCallError, err:
1466
        if err.args[0] == -1 and data == "":
1467
          # errors when writing empty strings are expected
1468
          # and can be ignored
1469
          return 0
1470

    
1471
        self._ConnectionLost()
1472
        raise socket.error(err.args)
1473

    
1474
      except OpenSSL.SSL.Error, err:
1475
        self._ConnectionLost()
1476
        raise socket.error(err.args)