Statistics
| Branch: | Tag: | Revision:

root / lib / http.py @ 6192c9b7

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11
# General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16
# 02110-1301, USA.
17

    
18
"""HTTP server module.
19

20
"""
21

    
22
import BaseHTTPServer
23
import cgi
24
import logging
25
import mimetools
26
import OpenSSL
27
import os
28
import select
29
import socket
30
import sys
31
import time
32
import signal
33
import logging
34
import errno
35
import threading
36

    
37
from cStringIO import StringIO
38

    
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import workerpool
42
from ganeti import utils
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48

    
49
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50
MONTHNAME = [None,
51
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53

    
54
# Default error message
55
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56
DEFAULT_ERROR_MESSAGE = """\
57
<head>
58
<title>Error response</title>
59
</head>
60
<body>
61
<h1>Error response</h1>
62
<p>Error code %(code)d.
63
<p>Message: %(message)s.
64
<p>Error code explanation: %(code)s = %(explain)s.
65
</body>
66
"""
67

    
68
HTTP_OK = 200
69
HTTP_NO_CONTENT = 204
70
HTTP_NOT_MODIFIED = 304
71

    
72
HTTP_0_9 = "HTTP/0.9"
73
HTTP_1_0 = "HTTP/1.0"
74
HTTP_1_1 = "HTTP/1.1"
75

    
76
HTTP_GET = "GET"
77
HTTP_HEAD = "HEAD"
78
HTTP_POST = "POST"
79
HTTP_PUT = "PUT"
80

    
81
HTTP_ETAG = "ETag"
82
HTTP_HOST = "Host"
83
HTTP_SERVER = "Server"
84
HTTP_DATE = "Date"
85
HTTP_USER_AGENT = "User-Agent"
86
HTTP_CONTENT_TYPE = "Content-Type"
87
HTTP_CONTENT_LENGTH = "Content-Length"
88
HTTP_CONNECTION = "Connection"
89
HTTP_KEEP_ALIVE = "Keep-Alive"
90

    
91
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
92

    
93
# Socket operations
94
(SOCKOP_SEND,
95
 SOCKOP_RECV,
96
 SOCKOP_SHUTDOWN) = range(3)
97

    
98

    
99
class SocketClosed(socket.error):
100
  pass
101

    
102

    
103
class _HttpClientError(Exception):
104
  """Internal exception for HTTP client errors.
105

106
  This should only be used for internal error reporting.
107

108
  """
109

    
110

    
111
class _HttpSocketTimeout(Exception):
112
  """Internal exception for socket timeouts.
113

114
  This should only be used for internal error reporting.
115

116
  """
117

    
118

    
119
class HTTPException(Exception):
120
  code = None
121
  message = None
122

    
123
  def __init__(self, message=None):
124
    Exception.__init__(self)
125
    if message is not None:
126
      self.message = message
127

    
128

    
129
class HTTPBadRequest(HTTPException):
130
  code = 400
131

    
132

    
133
class HTTPForbidden(HTTPException):
134
  code = 403
135

    
136

    
137
class HTTPNotFound(HTTPException):
138
  code = 404
139

    
140

    
141
class HTTPGone(HTTPException):
142
  code = 410
143

    
144

    
145
class HTTPLengthRequired(HTTPException):
146
  code = 411
147

    
148

    
149
class HTTPInternalError(HTTPException):
150
  code = 500
151

    
152

    
153
class HTTPNotImplemented(HTTPException):
154
  code = 501
155

    
156

    
157
class HTTPServiceUnavailable(HTTPException):
158
  code = 503
159

    
160

    
161
class HTTPVersionNotSupported(HTTPException):
162
  code = 505
163

    
164

    
165
class HTTPJsonConverter:
166
  CONTENT_TYPE = "application/json"
167

    
168
  def Encode(self, data):
169
    return serializer.DumpJson(data)
170

    
171
  def Decode(self, data):
172
    return serializer.LoadJson(data)
173

    
174

    
175
def WaitForSocketCondition(poller, sock, event, timeout):
176
  """Waits for a condition to occur on the socket.
177

178
  @type poller: select.Poller
179
  @param poller: Poller object as created by select.poll()
180
  @type sock: socket
181
  @param socket: Wait for events on this socket
182
  @type event: int
183
  @param event: ORed condition (see select module)
184
  @type timeout: float or None
185
  @param timeout: Timeout in seconds
186
  @rtype: int or None
187
  @return: None for timeout, otherwise occured conditions
188

189
  """
190
  check = (event | select.POLLPRI |
191
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
192

    
193
  if timeout is not None:
194
    # Poller object expects milliseconds
195
    timeout *= 1000
196

    
197
  poller.register(sock, event)
198
  try:
199
    while True:
200
      # TODO: If the main thread receives a signal and we have no timeout, we
201
      # could wait forever. This should check a global "quit" flag or
202
      # something every so often.
203
      io_events = poller.poll(timeout)
204
      if not io_events:
205
        # Timeout
206
        return None
207
      for (evfd, evcond) in io_events:
208
        if evcond & check:
209
          return evcond
210
  finally:
211
    poller.unregister(sock)
212

    
213

    
214
def SocketOperation(poller, sock, op, arg1, timeout):
215
  """Wrapper around socket functions.
216

217
  This function abstracts error handling for socket operations, especially
218
  for the complicated interaction with OpenSSL.
219

220
  @type poller: select.Poller
221
  @param poller: Poller object as created by select.poll()
222
  @type sock: socket
223
  @param socket: Socket for the operation
224
  @type op: int
225
  @param op: Operation to execute (SOCKOP_* constants)
226
  @type arg1: any
227
  @param arg1: Parameter for function (if needed)
228
  @type timeout: None or float
229
  @param timeout: Timeout in seconds or None
230

231
  """
232
  # TODO: event_poll/event_check/override
233
  if op == SOCKOP_SEND:
234
    event_poll = select.POLLOUT
235
    event_check = select.POLLOUT
236

    
237
  elif op == SOCKOP_RECV:
238
    event_poll = select.POLLIN
239
    event_check = select.POLLIN | select.POLLPRI
240

    
241
  elif op == SOCKOP_SHUTDOWN:
242
    event_poll = None
243
    event_check = None
244

    
245
    # The timeout is only used when OpenSSL requests polling for a condition.
246
    # It is not advisable to have no timeout for shutdown.
247
    assert timeout
248

    
249
  else:
250
    raise AssertionError("Invalid socket operation")
251

    
252
  # No override by default
253
  event_override = 0
254

    
255
  while True:
256
    # Poll only for certain operations and when asked for by an override
257
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
258
      if event_override:
259
        wait_for_event = event_override
260
      else:
261
        wait_for_event = event_poll
262

    
263
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
264
      if event is None:
265
        raise _HttpSocketTimeout()
266

    
267
      if (op == SOCKOP_RECV and
268
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
269
        return ""
270

    
271
      if not event & wait_for_event:
272
        continue
273

    
274
    # Reset override
275
    event_override = 0
276

    
277
    try:
278
      try:
279
        if op == SOCKOP_SEND:
280
          return sock.send(arg1)
281

    
282
        elif op == SOCKOP_RECV:
283
          return sock.recv(arg1)
284

    
285
        elif op == SOCKOP_SHUTDOWN:
286
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
287
            # PyOpenSSL's shutdown() doesn't take arguments
288
            return sock.shutdown()
289
          else:
290
            return sock.shutdown(arg1)
291

    
292
      except OpenSSL.SSL.WantWriteError:
293
        # OpenSSL wants to write, poll for POLLOUT
294
        event_override = select.POLLOUT
295
        continue
296

    
297
      except OpenSSL.SSL.WantReadError:
298
        # OpenSSL wants to read, poll for POLLIN
299
        event_override = select.POLLIN | select.POLLPRI
300
        continue
301

    
302
      except OpenSSL.SSL.WantX509LookupError:
303
        continue
304

    
305
      except OpenSSL.SSL.SysCallError, err:
306
        if op == SOCKOP_SEND:
307
          # arg1 is the data when writing
308
          if err.args and err.args[0] == -1 and arg1 == "":
309
            # errors when writing empty strings are expected
310
            # and can be ignored
311
            return 0
312

    
313
        elif op == SOCKOP_RECV:
314
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
315
            return ""
316

    
317
        raise socket.error(err.args)
318

    
319
      except OpenSSL.SSL.Error, err:
320
        raise socket.error(err.args)
321

    
322
    except socket.error, err:
323
      if err.args and err.args[0] == errno.EAGAIN:
324
        # Ignore EAGAIN
325
        continue
326

    
327
      raise
328

    
329

    
330
class HttpSslParams(object):
331
  """Data class for SSL key and certificate.
332

333
  """
334
  def __init__(self, ssl_key_path, ssl_cert_path):
335
    """Initializes this class.
336

337
    @type ssl_key_path: string
338
    @param ssl_key_path: Path to file containing SSL key in PEM format
339
    @type ssl_cert_path: string
340
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
341

342
    """
343
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
344
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
345

    
346
  def GetKey(self):
347
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
348
                                          self.ssl_key_pem)
349

    
350
  def GetCertificate(self):
351
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
352
                                           self.ssl_cert_pem)
353

    
354

    
355
class _HttpSocketBase(object):
356
  """Base class for HTTP server and client.
357

358
  """
359
  def __init__(self):
360
    self._using_ssl = None
361
    self._ssl_params = None
362
    self._ssl_key = None
363
    self._ssl_cert = None
364

    
365
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
366
    """Creates a TCP socket and initializes SSL if needed.
367

368
    @type ssl_params: HttpSslParams
369
    @param ssl_params: SSL key and certificate
370
    @type ssl_verify_peer: bool
371
    @param ssl_verify_peer: Whether to require client certificate and compare
372
                            it with our certificate
373

374
    """
375
    self._ssl_params = ssl_params
376

    
377
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
378

    
379
    # Should we enable SSL?
380
    self._using_ssl = ssl_params is not None
381

    
382
    if not self._using_ssl:
383
      return sock
384

    
385
    self._ssl_key = ssl_params.GetKey()
386
    self._ssl_cert = ssl_params.GetCertificate()
387

    
388
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
389
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
390

    
391
    ctx.use_privatekey(self._ssl_key)
392
    ctx.use_certificate(self._ssl_cert)
393
    ctx.check_privatekey()
394

    
395
    if ssl_verify_peer:
396
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
397
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
398
                     self._SSLVerifyCallback)
399

    
400
    return OpenSSL.SSL.Connection(ctx, sock)
401

    
402
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
403
    """Verify the certificate provided by the peer
404

405
    We only compare fingerprints. The client must use the same certificate as
406
    we do on our side.
407

408
    """
409
    assert self._ssl_params, "SSL not initialized"
410

    
411
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
412
            self._ssl_cert.digest("md5") == cert.digest("md5"))
413

    
414

    
415
class HttpServerRequestExecutor(object):
416
  """Implements server side of HTTP
417

418
  This class implements the server side of HTTP. It's based on code of Python's
419
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
420
  character encodings. Keep-alive connections are not supported.
421

422
  """
423
  # The default request version.  This only affects responses up until
424
  # the point where the request line is parsed, so it mainly decides what
425
  # the client gets back when sending a malformed request line.
426
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
427
  default_request_version = HTTP_0_9
428

    
429
  # Error message settings
430
  error_message_format = DEFAULT_ERROR_MESSAGE
431
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
432

    
433
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
434

    
435
  def __init__(self, server, conn, client_addr, fileio_class):
436
    """Initializes this class.
437

438
    Part of the initialization is reading the request and eventual POST/PUT
439
    data sent by the client.
440

441
    """
442
    self._server = server
443

    
444
    # We default rfile to buffered because otherwise it could be
445
    # really slow for large data (a getc() call per byte); we make
446
    # wfile unbuffered because (a) often after a write() we want to
447
    # read and we need to flush the line; (b) big writes to unbuffered
448
    # files are typically optimized by stdio even when big reads
449
    # aren't.
450
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
451
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
452

    
453
    self.client_addr = client_addr
454

    
455
    self.request_headers = None
456
    self.request_method = None
457
    self.request_path = None
458
    self.request_requestline = None
459
    self.request_version = self.default_request_version
460

    
461
    self.response_body = None
462
    self.response_code = HTTP_OK
463
    self.response_content_type = None
464
    self.response_headers = {}
465

    
466
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
467
    try:
468
      try:
469
        try:
470
          try:
471
            # Read, parse and handle request
472
            self._ReadRequest()
473
            self._ReadPostData()
474
            self._HandleRequest()
475
          except HTTPException, err:
476
            self._SetErrorStatus(err)
477
        finally:
478
          # Try to send a response
479
          self._SendResponse()
480
          self._Close()
481
      except SocketClosed:
482
        pass
483
    finally:
484
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
485

    
486
  def _Close(self):
487
    if not self.wfile.closed:
488
      self.wfile.flush()
489
    self.wfile.close()
490
    self.rfile.close()
491

    
492
  def _DateTimeHeader(self):
493
    """Return the current date and time formatted for a message header.
494

495
    """
496
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
497
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
498
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
499

    
500
  def _SetErrorStatus(self, err):
501
    """Sets the response code and body from a HTTPException.
502

503
    @type err: HTTPException
504
    @param err: Exception instance
505

506
    """
507
    try:
508
      (shortmsg, longmsg) = self.responses[err.code]
509
    except KeyError:
510
      shortmsg = longmsg = "Unknown"
511

    
512
    if err.message:
513
      message = err.message
514
    else:
515
      message = shortmsg
516

    
517
    values = {
518
      "code": err.code,
519
      "message": cgi.escape(message),
520
      "explain": longmsg,
521
      }
522

    
523
    self.response_code = err.code
524
    self.response_content_type = self.error_content_type
525
    self.response_body = self.error_message_format % values
526

    
527
  def _HandleRequest(self):
528
    """Handle the actual request.
529

530
    Calls the actual handler function and converts exceptions into HTTP errors.
531

532
    """
533
    # Don't do anything if there's already been a problem
534
    if self.response_code != HTTP_OK:
535
      return
536

    
537
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
538

    
539
    # Check whether client is still there
540
    self.rfile.read(0)
541

    
542
    try:
543
      try:
544
        result = self._server.HandleRequest(self)
545

    
546
        # TODO: Content-type
547
        encoder = HTTPJsonConverter()
548
        body = encoder.Encode(result)
549

    
550
        self.response_content_type = encoder.CONTENT_TYPE
551
        self.response_body = body
552
      except (HTTPException, KeyboardInterrupt, SystemExit):
553
        raise
554
      except Exception, err:
555
        logging.exception("Caught exception")
556
        raise HTTPInternalError(message=str(err))
557
      except:
558
        logging.exception("Unknown exception")
559
        raise HTTPInternalError(message="Unknown error")
560

    
561
    except HTTPException, err:
562
      self._SetErrorStatus(err)
563

    
564
  def _SendResponse(self):
565
    """Sends response to the client.
566

567
    """
568
    # Check whether client is still there
569
    self.rfile.read(0)
570

    
571
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
572
                 self.request_requestline, self.response_code)
573

    
574
    if self.response_code in self.responses:
575
      response_message = self.responses[self.response_code][0]
576
    else:
577
      response_message = ""
578

    
579
    if self.request_version != HTTP_0_9:
580
      self.wfile.write("%s %d %s\r\n" %
581
                       (self.request_version, self.response_code,
582
                        response_message))
583
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
584
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
585
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
586
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
587
      for key, val in self.response_headers.iteritems():
588
        self._SendHeader(key, val)
589

    
590
      # We don't support keep-alive at this time
591
      self._SendHeader(HTTP_CONNECTION, "close")
592
      self.wfile.write("\r\n")
593

    
594
    if (self.request_method != HTTP_HEAD and
595
        self.response_code >= HTTP_OK and
596
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
597
      self.wfile.write(self.response_body)
598

    
599
  def _SendHeader(self, name, value):
600
    if self.request_version != HTTP_0_9:
601
      self.wfile.write("%s: %s\r\n" % (name, value))
602

    
603
  def _ReadRequest(self):
604
    """Reads and parses request line
605

606
    """
607
    raw_requestline = self.rfile.readline()
608

    
609
    requestline = raw_requestline
610
    if requestline[-2:] == '\r\n':
611
      requestline = requestline[:-2]
612
    elif requestline[-1:] == '\n':
613
      requestline = requestline[:-1]
614

    
615
    if not requestline:
616
      raise HTTPBadRequest("Empty request line")
617

    
618
    self.request_requestline = requestline
619

    
620
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
621

    
622
    words = requestline.split()
623

    
624
    if len(words) == 3:
625
      [method, path, version] = words
626
      if version[:5] != 'HTTP/':
627
        raise HTTPBadRequest("Bad request version (%r)" % version)
628

    
629
      try:
630
        base_version_number = version.split('/', 1)[1]
631
        version_number = base_version_number.split(".")
632

    
633
        # RFC 2145 section 3.1 says there can be only one "." and
634
        #   - major and minor numbers MUST be treated as
635
        #      separate integers;
636
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
637
        #      turn is lower than HTTP/12.3;
638
        #   - Leading zeros MUST be ignored by recipients.
639
        if len(version_number) != 2:
640
          raise HTTPBadRequest("Bad request version (%r)" % version)
641

    
642
        version_number = int(version_number[0]), int(version_number[1])
643
      except (ValueError, IndexError):
644
        raise HTTPBadRequest("Bad request version (%r)" % version)
645

    
646
      if version_number >= (2, 0):
647
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
648
                                      base_version_number)
649

    
650
    elif len(words) == 2:
651
      version = HTTP_0_9
652
      [method, path] = words
653
      if method != HTTP_GET:
654
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
655

    
656
    else:
657
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
658

    
659
    # Examine the headers and look for a Connection directive
660
    headers = mimetools.Message(self.rfile, 0)
661

    
662
    self.request_method = method
663
    self.request_path = path
664
    self.request_version = version
665
    self.request_headers = headers
666

    
667
  def _ReadPostData(self):
668
    """Reads POST/PUT data
669

670
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
671
    a request is signaled by the inclusion of a Content-Length header field in
672
    the request message headers. HTTP/1.0 requests containing an entity body
673
    must include a valid Content-Length header field."
674

675
    """
676
    # While not according to specification, we only support an entity body for
677
    # POST and PUT.
678
    if (not self.request_method or
679
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
680
      self.request_post_data = None
681
      return
682

    
683
    content_length = None
684
    try:
685
      if HTTP_CONTENT_LENGTH in self.request_headers:
686
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
687
    except TypeError:
688
      pass
689
    except ValueError:
690
      pass
691

    
692
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
693
    if content_length is None:
694
      raise HTTPLengthRequired("Missing Content-Length header or"
695
                               " invalid format")
696

    
697
    data = self.rfile.read(content_length)
698

    
699
    # TODO: Content-type, error handling
700
    if data:
701
      self.request_post_data = HTTPJsonConverter().Decode(data)
702
    else:
703
      self.request_post_data = None
704

    
705
    logging.debug("HTTP POST data: %s", self.request_post_data)
706

    
707

    
708
class HttpServer(_HttpSocketBase):
709
  """Generic HTTP server class
710

711
  Users of this class must subclass it and override the HandleRequest function.
712

713
  """
714
  MAX_CHILDREN = 20
715

    
716
  def __init__(self, mainloop, local_address, port,
717
               ssl_params=None, ssl_verify_peer=False):
718
    """Initializes the HTTP server
719

720
    @type mainloop: ganeti.daemon.Mainloop
721
    @param mainloop: Mainloop used to poll for I/O events
722
    @type local_addess: string
723
    @param local_address: Local IP address to bind to
724
    @type port: int
725
    @param port: TCP port to listen on
726
    @type ssl_params: HttpSslParams
727
    @param ssl_params: SSL key and certificate
728
    @type ssl_verify_peer: bool
729
    @param ssl_verify_peer: Whether to require client certificate and compare
730
                            it with our certificate
731

732
    """
733
    _HttpSocketBase.__init__(self)
734

    
735
    self.mainloop = mainloop
736
    self.local_address = local_address
737
    self.port = port
738

    
739
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
740

    
741
    # Allow port to be reused
742
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
743

    
744
    if self._using_ssl:
745
      self._fileio_class = _SSLFileObject
746
    else:
747
      self._fileio_class = socket._fileobject
748

    
749
    self._children = []
750

    
751
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
752
    mainloop.RegisterSignal(self)
753

    
754
  def Start(self):
755
    self.socket.bind((self.local_address, self.port))
756
    self.socket.listen(5)
757

    
758
  def Stop(self):
759
    self.socket.close()
760

    
761
  def OnIO(self, fd, condition):
762
    if condition & select.POLLIN:
763
      self._IncomingConnection()
764

    
765
  def OnSignal(self, signum):
766
    if signum == signal.SIGCHLD:
767
      self._CollectChildren(True)
768

    
769
  def _CollectChildren(self, quick):
770
    """Checks whether any child processes are done
771

772
    @type quick: bool
773
    @param quick: Whether to only use non-blocking functions
774

775
    """
776
    if not quick:
777
      # Don't wait for other processes if it should be a quick check
778
      while len(self._children) > self.MAX_CHILDREN:
779
        try:
780
          # Waiting without a timeout brings us into a potential DoS situation.
781
          # As soon as too many children run, we'll not respond to new
782
          # requests. The real solution would be to add a timeout for children
783
          # and killing them after some time.
784
          pid, status = os.waitpid(0, 0)
785
        except os.error:
786
          pid = None
787
        if pid and pid in self._children:
788
          self._children.remove(pid)
789

    
790
    for child in self._children:
791
      try:
792
        pid, status = os.waitpid(child, os.WNOHANG)
793
      except os.error:
794
        pid = None
795
      if pid and pid in self._children:
796
        self._children.remove(pid)
797

    
798
  def _IncomingConnection(self):
799
    """Called for each incoming connection
800

801
    """
802
    (connection, client_addr) = self.socket.accept()
803

    
804
    self._CollectChildren(False)
805

    
806
    pid = os.fork()
807
    if pid == 0:
808
      # Child process
809
      try:
810
        HttpServerRequestExecutor(self, connection, client_addr,
811
                                  self._fileio_class)
812
      except:
813
        logging.exception("Error while handling request from %s:%s",
814
                          client_addr[0], client_addr[1])
815
        os._exit(1)
816
      os._exit(0)
817
    else:
818
      self._children.append(pid)
819

    
820
  def HandleRequest(self, req):
821
    raise NotImplementedError()
822

    
823

    
824
class HttpClientRequest(object):
825
  def __init__(self, host, port, method, path, headers=None, post_data=None,
826
               ssl_params=None, ssl_verify_peer=False):
827
    """Describes an HTTP request.
828

829
    @type host: string
830
    @param host: Hostname
831
    @type port: int
832
    @param port: Port
833
    @type method: string
834
    @param method: Method name
835
    @type path: string
836
    @param path: Request path
837
    @type headers: dict or None
838
    @param headers: Additional headers to send
839
    @type post_data: string or None
840
    @param post_data: Additional data to send
841
    @type ssl_params: HttpSslParams
842
    @param ssl_params: SSL key and certificate
843
    @type ssl_verify_peer: bool
844
    @param ssl_verify_peer: Whether to compare our certificate with server's
845
                            certificate
846

847
    """
848
    if post_data is not None:
849
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
850
        "Only POST and GET requests support sending data"
851

    
852
    assert path.startswith("/"), "Path must start with slash (/)"
853

    
854
    self.host = host
855
    self.port = port
856
    self.ssl_params = ssl_params
857
    self.ssl_verify_peer = ssl_verify_peer
858
    self.method = method
859
    self.path = path
860
    self.headers = headers
861
    self.post_data = post_data
862

    
863
    self.success = None
864
    self.error = None
865

    
866
    self.resp_status_line = None
867
    self.resp_version = None
868
    self.resp_status = None
869
    self.resp_reason = None
870
    self.resp_headers = None
871
    self.resp_body = None
872

    
873

    
874
class HttpClientRequestExecutor(_HttpSocketBase):
875
  # Default headers
876
  DEFAULT_HEADERS = {
877
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
878
    # TODO: For keep-alive, don't send "Connection: close"
879
    HTTP_CONNECTION: "close",
880
    }
881

    
882
  # Length limits
883
  STATUS_LINE_LENGTH_MAX = 512
884
  HEADER_LENGTH_MAX = 4 * 1024
885

    
886
  # Timeouts in seconds for socket layer
887
  # TODO: Make read timeout configurable per OpCode
888
  CONNECT_TIMEOUT = 5.0
889
  WRITE_TIMEOUT = 10
890
  READ_TIMEOUT = None
891
  CLOSE_TIMEOUT = 1
892

    
893
  # Parser state machine
894
  PS_STATUS_LINE = "status-line"
895
  PS_HEADERS = "headers"
896
  PS_BODY = "body"
897
  PS_COMPLETE = "complete"
898

    
899
  def __init__(self, req):
900
    """Initializes the HttpClientRequestExecutor class.
901

902
    @type req: HttpClientRequest
903
    @param req: Request object
904

905
    """
906
    _HttpSocketBase.__init__(self)
907

    
908
    self.request = req
909

    
910
    self.parser_status = self.PS_STATUS_LINE
911
    self.header_buffer = StringIO()
912
    self.body_buffer = StringIO()
913
    self.content_length = None
914
    self.server_will_close = None
915

    
916
    self.poller = select.poll()
917

    
918
    try:
919
      # TODO: Implement connection caching/keep-alive
920
      self.sock = self._CreateSocket(req.ssl_params,
921
                                     req.ssl_verify_peer)
922

    
923
      # Disable Python's timeout
924
      self.sock.settimeout(None)
925

    
926
      # Operate in non-blocking mode
927
      self.sock.setblocking(0)
928

    
929
      force_close = True
930
      self._Connect()
931
      try:
932
        self._SendRequest()
933
        self._ReadResponse()
934

    
935
        # Only wait for server to close if we didn't have any exception.
936
        force_close = False
937
      finally:
938
        self._CloseConnection(force_close)
939

    
940
      self.sock.close()
941
      self.sock = None
942

    
943
      req.resp_body = self.body_buffer.getvalue()
944

    
945
      req.success = True
946
      req.error = None
947

    
948
    except _HttpClientError, err:
949
      req.success = False
950
      req.error = str(err)
951

    
952
  def _BuildRequest(self):
953
    """Build HTTP request.
954

955
    @rtype: string
956
    @return: Complete request
957

958
    """
959
    # Headers
960
    send_headers = self.DEFAULT_HEADERS.copy()
961

    
962
    if self.request.headers:
963
      send_headers.update(self.request.headers)
964

    
965
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
966

    
967
    if self.request.post_data:
968
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
969

    
970
    buf = StringIO()
971

    
972
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
973
    # keep-alive).
974
    # TODO: For keep-alive, change to HTTP/1.1
975
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
976
                                self.request.path, HTTP_1_0))
977

    
978
    # Add headers
979
    for name, value in send_headers.iteritems():
980
      buf.write("%s: %s\r\n" % (name, value))
981

    
982
    buf.write("\r\n")
983

    
984
    if self.request.post_data:
985
      buf.write(self.request.post_data)
986

    
987
    return buf.getvalue()
988

    
989
  def _ParseStatusLine(self):
990
    """Parses the status line sent by the server.
991

992
    """
993
    line = self.request.resp_status_line
994

    
995
    if not line:
996
      raise _HttpClientError("Empty status line")
997

    
998
    try:
999
      [version, status, reason] = line.split(None, 2)
1000
    except ValueError:
1001
      try:
1002
        [version, status] = line.split(None, 1)
1003
        reason = ""
1004
      except ValueError:
1005
        version = HTTP_9_0
1006

    
1007
    if version:
1008
      version = version.upper()
1009

    
1010
    if version not in (HTTP_1_0, HTTP_1_1):
1011
      # We do not support HTTP/0.9, despite the specification requiring it
1012
      # (RFC2616, section 19.6)
1013
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1014
                             line)
1015

    
1016
    # The status code is a three-digit number
1017
    try:
1018
      status = int(status)
1019
      if status < 100 or status > 999:
1020
        status = -1
1021
    except ValueError:
1022
      status = -1
1023

    
1024
    if status == -1:
1025
      raise _HttpClientError("Invalid status code (%r)" % line)
1026

    
1027
    self.request.resp_version = version
1028
    self.request.resp_status = status
1029
    self.request.resp_reason = reason
1030

    
1031
  def _WillServerCloseConnection(self):
1032
    """Evaluate whether server will close the connection.
1033

1034
    @rtype: bool
1035
    @return: Whether server will close the connection
1036

1037
    """
1038
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1039
    if hdr_connection:
1040
      hdr_connection = hdr_connection.lower()
1041

    
1042
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1043
    if self.request.resp_version == HTTP_1_1:
1044
      return (hdr_connection and "close" in hdr_connection)
1045

    
1046
    # Some HTTP/1.0 implementations have support for persistent connections,
1047
    # using rules different than HTTP/1.1.
1048

    
1049
    # For older HTTP, Keep-Alive indicates persistent connection.
1050
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1051
      return False
1052

    
1053
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1054
    # supposed to be sent by the client.
1055
    if hdr_connection and "keep-alive" in hdr_connection:
1056
      return False
1057

    
1058
    return True
1059

    
1060
  def _ParseHeaders(self):
1061
    """Parses the headers sent by the server.
1062

1063
    This function also adjusts internal variables based on the header values.
1064

1065
    """
1066
    req = self.request
1067

    
1068
    # Parse headers
1069
    self.header_buffer.seek(0, 0)
1070
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1071

    
1072
    self.server_will_close = self._WillServerCloseConnection()
1073

    
1074
    # Do we have a Content-Length header?
1075
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1076
    if hdr_content_length:
1077
      try:
1078
        self.content_length = int(hdr_content_length)
1079
      except ValueError:
1080
        pass
1081
      if self.content_length is not None and self.content_length < 0:
1082
        self.content_length = None
1083

    
1084
    # does the body have a fixed length? (of zero)
1085
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1086
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1087
      self.content_length = 0
1088

    
1089
    # if the connection remains open and a content-length was not provided,
1090
    # then assume that the connection WILL close.
1091
    if self.content_length is None:
1092
      self.server_will_close = True
1093

    
1094
  def _CheckStatusLineLength(self, length):
1095
    if length > self.STATUS_LINE_LENGTH_MAX:
1096
      raise _HttpClientError("Status line longer than %d chars" %
1097
                             self.STATUS_LINE_LENGTH_MAX)
1098

    
1099
  def _CheckHeaderLength(self, length):
1100
    if length > self.HEADER_LENGTH_MAX:
1101
      raise _HttpClientError("Headers longer than %d chars" %
1102
                             self.HEADER_LENGTH_MAX)
1103

    
1104
  def _ParseBuffer(self, buf, eof):
1105
    """Main function for HTTP response state machine.
1106

1107
    @type buf: string
1108
    @param buf: Receive buffer
1109
    @type eof: bool
1110
    @param eof: Whether we've reached EOF on the socket
1111
    @rtype: string
1112
    @return: Updated receive buffer
1113

1114
    """
1115
    if self.parser_status == self.PS_STATUS_LINE:
1116
      # Expect status line
1117
      idx = buf.find("\r\n")
1118
      if idx >= 0:
1119
        self.request.resp_status_line = buf[:idx]
1120

    
1121
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1122

    
1123
        # Remove status line, including CRLF
1124
        buf = buf[idx + 2:]
1125

    
1126
        self._ParseStatusLine()
1127

    
1128
        self.parser_status = self.PS_HEADERS
1129
      else:
1130
        # Check whether incoming data is getting too large, otherwise we just
1131
        # fill our read buffer.
1132
        self._CheckStatusLineLength(len(buf))
1133

    
1134
    if self.parser_status == self.PS_HEADERS:
1135
      # Wait for header end
1136
      idx = buf.find("\r\n\r\n")
1137
      if idx >= 0:
1138
        self.header_buffer.write(buf[:idx + 2])
1139

    
1140
        self._CheckHeaderLength(self.header_buffer.tell())
1141

    
1142
        # Remove headers, including CRLF
1143
        buf = buf[idx + 4:]
1144

    
1145
        self._ParseHeaders()
1146

    
1147
        self.parser_status = self.PS_BODY
1148
      else:
1149
        # Check whether incoming data is getting too large, otherwise we just
1150
        # fill our read buffer.
1151
        self._CheckHeaderLength(len(buf))
1152

    
1153
    if self.parser_status == self.PS_BODY:
1154
      self.body_buffer.write(buf)
1155
      buf = ""
1156

    
1157
      # Check whether we've read everything
1158
      if (eof or
1159
          (self.content_length is not None and
1160
           self.body_buffer.tell() >= self.content_length)):
1161
        self.parser_status = self.PS_COMPLETE
1162

    
1163
    return buf
1164

    
1165
  def _Connect(self):
1166
    """Non-blocking connect to host with timeout.
1167

1168
    """
1169
    connected = False
1170
    while True:
1171
      try:
1172
        connect_error = self.sock.connect_ex((self.request.host,
1173
                                              self.request.port))
1174
      except socket.gaierror, err:
1175
        raise _HttpClientError("Connection failed: %s" % str(err))
1176

    
1177
      if connect_error == errno.EINTR:
1178
        # Mask signals
1179
        pass
1180

    
1181
      elif connect_error == 0:
1182
        # Connection established
1183
        connected = True
1184
        break
1185

    
1186
      elif connect_error == errno.EINPROGRESS:
1187
        # Connection started
1188
        break
1189

    
1190
      raise _HttpClientError("Connection failed (%s: %s)" %
1191
                             (connect_error, os.strerror(connect_error)))
1192

    
1193
    if not connected:
1194
      # Wait for connection
1195
      event = WaitForSocketCondition(self.poller, self.sock,
1196
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1197
      if event is None:
1198
        raise _HttpClientError("Timeout while connecting to server")
1199

    
1200
      # Get error code
1201
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1202
      if connect_error != 0:
1203
        raise _HttpClientError("Connection failed (%s: %s)" %
1204
                               (connect_error, os.strerror(connect_error)))
1205

    
1206
    # Enable TCP keep-alive
1207
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1208

    
1209
    # If needed, Linux specific options are available to change the TCP
1210
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1211
    # TCP_KEEPINTVL.
1212

    
1213
  def _SendRequest(self):
1214
    """Sends request to server.
1215

1216
    """
1217
    buf = self._BuildRequest()
1218

    
1219
    while buf:
1220
      # Send only 4 KB at a time
1221
      data = buf[:4096]
1222

    
1223
      try:
1224
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1225
                               self.WRITE_TIMEOUT)
1226
      except _HttpSocketTimeout:
1227
        raise _HttpClientError("Timeout while sending request")
1228
      except socket.error, err:
1229
        raise _HttpClientError("Error sending request: %s" % err)
1230

    
1231
      # Remove sent bytes
1232
      buf = buf[sent:]
1233

    
1234
    assert not buf, "Request wasn't sent completely"
1235

    
1236
  def _ReadResponse(self):
1237
    """Read response from server.
1238

1239
    Calls the parser function after reading a chunk of data.
1240

1241
    """
1242
    buf = ""
1243
    eof = False
1244
    while self.parser_status != self.PS_COMPLETE:
1245
      try:
1246
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1247
                               self.READ_TIMEOUT)
1248
      except _HttpSocketTimeout:
1249
        raise _HttpClientError("Timeout while reading response")
1250
      except socket.error, err:
1251
        raise _HttpClientError("Error while reading response: %s" % err)
1252

    
1253
      if data:
1254
        buf += data
1255
      else:
1256
        eof = True
1257

    
1258
      # Do some parsing and error checking while more data arrives
1259
      buf = self._ParseBuffer(buf, eof)
1260

    
1261
      # Must be done only after the buffer has been evaluated
1262
      if (eof and
1263
          self.parser_status in (self.PS_STATUS_LINE,
1264
                                 self.PS_HEADERS)):
1265
        raise _HttpClientError("Connection closed prematurely")
1266

    
1267
    # Parse rest
1268
    buf = self._ParseBuffer(buf, True)
1269

    
1270
    assert self.parser_status == self.PS_COMPLETE
1271
    assert not buf, "Parser didn't read full response"
1272

    
1273
  def _CloseConnection(self, force):
1274
    """Closes the connection.
1275

1276
    """
1277
    if self.server_will_close and not force:
1278
      # Wait for server to close
1279
      try:
1280
        # Check whether it's actually closed
1281
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1282
                               self.CLOSE_TIMEOUT):
1283
          return
1284
      except (socket.error, _HttpClientError, _HttpSocketTimeout):
1285
        # Ignore errors at this stage
1286
        pass
1287

    
1288
    # Close the connection from our side
1289
    try:
1290
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1291
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1292
    except _HttpSocketTimeout:
1293
      raise _HttpClientError("Timeout while shutting down connection")
1294
    except socket.error, err:
1295
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1296

    
1297

    
1298
class _HttpClientPendingRequest(object):
1299
  """Data class for pending requests.
1300

1301
  """
1302
  def __init__(self, request):
1303
    self.request = request
1304

    
1305
    # Thread synchronization
1306
    self.done = threading.Event()
1307

    
1308

    
1309
class HttpClientWorker(workerpool.BaseWorker):
1310
  """HTTP client worker class.
1311

1312
  """
1313
  def RunTask(self, pend_req):
1314
    try:
1315
      HttpClientRequestExecutor(pend_req.request)
1316
    finally:
1317
      pend_req.done.set()
1318

    
1319

    
1320
class HttpClientWorkerPool(workerpool.WorkerPool):
1321
  def __init__(self, manager):
1322
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1323
                                   HttpClientWorker)
1324
    self.manager = manager
1325

    
1326

    
1327
class HttpClientManager(object):
1328
  """Manages HTTP requests.
1329

1330
  """
1331
  def __init__(self):
1332
    self._wpool = HttpClientWorkerPool(self)
1333

    
1334
  def __del__(self):
1335
    self.Shutdown()
1336

    
1337
  def ExecRequests(self, requests):
1338
    """Execute HTTP requests.
1339

1340
    This function can be called from multiple threads at the same time.
1341

1342
    @type requests: List of HttpClientRequest instances
1343
    @param requests: The requests to execute
1344
    @rtype: List of HttpClientRequest instances
1345
    @returns: The list of requests passed in
1346

1347
    """
1348
    # _HttpClientPendingRequest is used for internal thread synchronization
1349
    pending = [_HttpClientPendingRequest(req) for req in requests]
1350

    
1351
    try:
1352
      # Add requests to queue
1353
      for pend_req in pending:
1354
        self._wpool.AddTask(pend_req)
1355

    
1356
    finally:
1357
      # In case of an exception we should still wait for the rest, otherwise
1358
      # another thread from the worker pool could modify the request object
1359
      # after we returned.
1360

    
1361
      # And wait for them to finish
1362
      for pend_req in pending:
1363
        pend_req.done.wait()
1364

    
1365
    # Return original list
1366
    return requests
1367

    
1368
  def Shutdown(self):
1369
    self._wpool.Quiesce()
1370
    self._wpool.TerminateWorkers()
1371

    
1372

    
1373
class _SSLFileObject(object):
1374
  """Wrapper around socket._fileobject
1375

1376
  This wrapper is required to handle OpenSSL exceptions.
1377

1378
  """
1379
  def _RequireOpenSocket(fn):
1380
    def wrapper(self, *args, **kwargs):
1381
      if self.closed:
1382
        raise SocketClosed("Socket is closed")
1383
      return fn(self, *args, **kwargs)
1384
    return wrapper
1385

    
1386
  def __init__(self, sock, mode='rb', bufsize=-1):
1387
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1388

    
1389
  def _ConnectionLost(self):
1390
    self._base = None
1391

    
1392
  def _getclosed(self):
1393
    return self._base is None or self._base.closed
1394
  closed = property(_getclosed, doc="True if the file is closed")
1395

    
1396
  @_RequireOpenSocket
1397
  def close(self):
1398
    return self._base.close()
1399

    
1400
  @_RequireOpenSocket
1401
  def flush(self):
1402
    return self._base.flush()
1403

    
1404
  @_RequireOpenSocket
1405
  def fileno(self):
1406
    return self._base.fileno()
1407

    
1408
  @_RequireOpenSocket
1409
  def read(self, size=-1):
1410
    return self._ReadWrapper(self._base.read, size=size)
1411

    
1412
  @_RequireOpenSocket
1413
  def readline(self, size=-1):
1414
    return self._ReadWrapper(self._base.readline, size=size)
1415

    
1416
  def _ReadWrapper(self, fn, *args, **kwargs):
1417
    while True:
1418
      try:
1419
        return fn(*args, **kwargs)
1420

    
1421
      except OpenSSL.SSL.ZeroReturnError, err:
1422
        self._ConnectionLost()
1423
        return ""
1424

    
1425
      except OpenSSL.SSL.WantReadError:
1426
        continue
1427

    
1428
      #except OpenSSL.SSL.WantWriteError:
1429
      # TODO
1430

    
1431
      except OpenSSL.SSL.SysCallError, (retval, desc):
1432
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1433
            or retval > 0):
1434
          self._ConnectionLost()
1435
          return ""
1436

    
1437
        logging.exception("Error in OpenSSL")
1438
        self._ConnectionLost()
1439
        raise socket.error(err.args)
1440

    
1441
      except OpenSSL.SSL.Error, err:
1442
        self._ConnectionLost()
1443
        raise socket.error(err.args)
1444

    
1445
  @_RequireOpenSocket
1446
  def write(self, data):
1447
    return self._WriteWrapper(self._base.write, data)
1448

    
1449
  def _WriteWrapper(self, fn, *args, **kwargs):
1450
    while True:
1451
      try:
1452
        return fn(*args, **kwargs)
1453
      except OpenSSL.SSL.ZeroReturnError, err:
1454
        self._ConnectionLost()
1455
        return 0
1456

    
1457
      except OpenSSL.SSL.WantWriteError:
1458
        continue
1459

    
1460
      #except OpenSSL.SSL.WantReadError:
1461
      # TODO
1462

    
1463
      except OpenSSL.SSL.SysCallError, err:
1464
        if err.args[0] == -1 and data == "":
1465
          # errors when writing empty strings are expected
1466
          # and can be ignored
1467
          return 0
1468

    
1469
        self._ConnectionLost()
1470
        raise socket.error(err.args)
1471

    
1472
      except OpenSSL.SSL.Error, err:
1473
        self._ConnectionLost()
1474
        raise socket.error(err.args)