Statistics
| Branch: | Tag: | Revision:

root / lib / http.py @ 87622829

History | View | Annotate | Download (41.1 kB)

1
#
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11
# General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16
# 02110-1301, USA.
17

    
18
"""HTTP server module.
19

20
"""
21

    
22
import BaseHTTPServer
23
import cgi
24
import logging
25
import mimetools
26
import OpenSSL
27
import os
28
import select
29
import socket
30
import sys
31
import time
32
import signal
33
import logging
34
import errno
35
import threading
36

    
37
from cStringIO import StringIO
38

    
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import workerpool
42
from ganeti import utils
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48

    
49
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50
MONTHNAME = [None,
51
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53

    
54
# Default error message
55
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56
DEFAULT_ERROR_MESSAGE = """\
57
<head>
58
<title>Error response</title>
59
</head>
60
<body>
61
<h1>Error response</h1>
62
<p>Error code %(code)d.
63
<p>Message: %(message)s.
64
<p>Error code explanation: %(code)s = %(explain)s.
65
</body>
66
"""
67

    
68
HTTP_OK = 200
69
HTTP_NO_CONTENT = 204
70
HTTP_NOT_MODIFIED = 304
71

    
72
HTTP_0_9 = "HTTP/0.9"
73
HTTP_1_0 = "HTTP/1.0"
74
HTTP_1_1 = "HTTP/1.1"
75

    
76
HTTP_GET = "GET"
77
HTTP_HEAD = "HEAD"
78
HTTP_POST = "POST"
79
HTTP_PUT = "PUT"
80

    
81
HTTP_ETAG = "ETag"
82
HTTP_HOST = "Host"
83
HTTP_SERVER = "Server"
84
HTTP_DATE = "Date"
85
HTTP_USER_AGENT = "User-Agent"
86
HTTP_CONTENT_TYPE = "Content-Type"
87
HTTP_CONTENT_LENGTH = "Content-Length"
88
HTTP_CONNECTION = "Connection"
89
HTTP_KEEP_ALIVE = "Keep-Alive"
90

    
91
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
92

    
93

    
94
class SocketClosed(socket.error):
95
  pass
96

    
97

    
98
class _HttpClientError(Exception):
99
  """Internal exception for HTTP client errors.
100

101
  This should only be used for internal error reporting.
102

103
  """
104
  pass
105

    
106

    
107
class HTTPException(Exception):
108
  code = None
109
  message = None
110

    
111
  def __init__(self, message=None):
112
    Exception.__init__(self)
113
    if message is not None:
114
      self.message = message
115

    
116

    
117
class HTTPBadRequest(HTTPException):
118
  code = 400
119

    
120

    
121
class HTTPForbidden(HTTPException):
122
  code = 403
123

    
124

    
125
class HTTPNotFound(HTTPException):
126
  code = 404
127

    
128

    
129
class HTTPGone(HTTPException):
130
  code = 410
131

    
132

    
133
class HTTPLengthRequired(HTTPException):
134
  code = 411
135

    
136

    
137
class HTTPInternalError(HTTPException):
138
  code = 500
139

    
140

    
141
class HTTPNotImplemented(HTTPException):
142
  code = 501
143

    
144

    
145
class HTTPServiceUnavailable(HTTPException):
146
  code = 503
147

    
148

    
149
class HTTPVersionNotSupported(HTTPException):
150
  code = 505
151

    
152

    
153
class ApacheLogfile:
154
  """Utility class to write HTTP server log files.
155

156
  The written format is the "Common Log Format" as defined by Apache:
157
  http://httpd.apache.org/docs/2.2/mod/mod_log_config.html#examples
158

159
  """
160
  def __init__(self, fd):
161
    """Constructor for ApacheLogfile class.
162

163
    Args:
164
    - fd: Open file object
165

166
    """
167
    self._fd = fd
168

    
169
  def LogRequest(self, request, format, *args):
170
    self._fd.write("%s %s %s [%s] %s\n" % (
171
      # Remote host address
172
      request.address_string(),
173

    
174
      # RFC1413 identity (identd)
175
      "-",
176

    
177
      # Remote user
178
      "-",
179

    
180
      # Request time
181
      self._FormatCurrentTime(),
182

    
183
      # Message
184
      format % args,
185
      ))
186
    self._fd.flush()
187

    
188
  def _FormatCurrentTime(self):
189
    """Formats current time in Common Log Format.
190

191
    """
192
    return self._FormatLogTime(time.time())
193

    
194
  def _FormatLogTime(self, seconds):
195
    """Formats time for Common Log Format.
196

197
    All timestamps are logged in the UTC timezone.
198

199
    Args:
200
    - seconds: Time in seconds since the epoch
201

202
    """
203
    (_, month, _, _, _, _, _, _, _) = tm = time.gmtime(seconds)
204
    format = "%d/" + MONTHNAME[month] + "/%Y:%H:%M:%S +0000"
205
    return time.strftime(format, tm)
206

    
207

    
208
class HTTPJsonConverter:
209
  CONTENT_TYPE = "application/json"
210

    
211
  def Encode(self, data):
212
    return serializer.DumpJson(data)
213

    
214
  def Decode(self, data):
215
    return serializer.LoadJson(data)
216

    
217

    
218
class HttpSslParams(object):
219
  """Data class for SSL key and certificate.
220

221
  """
222
  def __init__(self, ssl_key_path, ssl_cert_path):
223
    """Initializes this class.
224

225
    @type ssl_key_path: string
226
    @param ssl_key_path: Path to file containing SSL key in PEM format
227
    @type ssl_cert_path: string
228
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
229

230
    """
231
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
232
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
233

    
234
  def GetKey(self):
235
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
236
                                          self.ssl_key_pem)
237

    
238
  def GetCertificate(self):
239
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
240
                                           self.ssl_cert_pem)
241

    
242

    
243
class _HttpSocketBase(object):
244
  """Base class for HTTP server and client.
245

246
  """
247
  def __init__(self):
248
    self._using_ssl = None
249
    self._ssl_params = None
250
    self._ssl_key = None
251
    self._ssl_cert = None
252

    
253
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
254
    """Creates a TCP socket and initializes SSL if needed.
255

256
    @type ssl_params: HttpSslParams
257
    @param ssl_params: SSL key and certificate
258
    @type ssl_verify_peer: bool
259
    @param ssl_verify_peer: Whether to require client certificate and compare
260
                            it with our certificate
261

262
    """
263
    self._ssl_params = ssl_params
264

    
265
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266

    
267
    # Should we enable SSL?
268
    self._using_ssl = ssl_params is not None
269

    
270
    if not self._using_ssl:
271
      return sock
272

    
273
    self._ssl_key = ssl_params.GetKey()
274
    self._ssl_cert = ssl_params.GetCertificate()
275

    
276
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
277
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
278

    
279
    ctx.use_privatekey(self._ssl_key)
280
    ctx.use_certificate(self._ssl_cert)
281
    ctx.check_privatekey()
282

    
283
    if ssl_verify_peer:
284
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
285
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
286
                     self._SSLVerifyCallback)
287

    
288
    return OpenSSL.SSL.Connection(ctx, sock)
289

    
290
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
291
    """Verify the certificate provided by the peer
292

293
    We only compare fingerprints. The client must use the same certificate as
294
    we do on our side.
295

296
    """
297
    assert self._ssl_params, "SSL not initialized"
298

    
299
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
300
            self._ssl_cert.digest("md5") == cert.digest("md5"))
301

    
302

    
303
class _HttpConnectionHandler(object):
304
  """Implements server side of HTTP
305

306
  This class implements the server side of HTTP. It's based on code of Python's
307
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
308
  character encodings. Keep-alive connections are not supported.
309

310
  """
311
  # The default request version.  This only affects responses up until
312
  # the point where the request line is parsed, so it mainly decides what
313
  # the client gets back when sending a malformed request line.
314
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
315
  default_request_version = HTTP_0_9
316

    
317
  # Error message settings
318
  error_message_format = DEFAULT_ERROR_MESSAGE
319
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
320

    
321
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
322

    
323
  def __init__(self, server, conn, client_addr, fileio_class):
324
    """Initializes this class.
325

326
    Part of the initialization is reading the request and eventual POST/PUT
327
    data sent by the client.
328

329
    """
330
    self._server = server
331

    
332
    # We default rfile to buffered because otherwise it could be
333
    # really slow for large data (a getc() call per byte); we make
334
    # wfile unbuffered because (a) often after a write() we want to
335
    # read and we need to flush the line; (b) big writes to unbuffered
336
    # files are typically optimized by stdio even when big reads
337
    # aren't.
338
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
339
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
340

    
341
    self.client_addr = client_addr
342

    
343
    self.request_headers = None
344
    self.request_method = None
345
    self.request_path = None
346
    self.request_requestline = None
347
    self.request_version = self.default_request_version
348

    
349
    self.response_body = None
350
    self.response_code = HTTP_OK
351
    self.response_content_type = None
352
    self.response_headers = {}
353

    
354
    self.should_fork = False
355

    
356
    try:
357
      self._ReadRequest()
358
      self._ReadPostData()
359
    except HTTPException, err:
360
      self._SetErrorStatus(err)
361

    
362
  def Close(self):
363
    if not self.wfile.closed:
364
      self.wfile.flush()
365
    self.wfile.close()
366
    self.rfile.close()
367

    
368
  def _DateTimeHeader(self):
369
    """Return the current date and time formatted for a message header.
370

371
    """
372
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
373
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
374
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
375

    
376
  def _SetErrorStatus(self, err):
377
    """Sets the response code and body from a HTTPException.
378

379
    @type err: HTTPException
380
    @param err: Exception instance
381

382
    """
383
    try:
384
      (shortmsg, longmsg) = self.responses[err.code]
385
    except KeyError:
386
      shortmsg = longmsg = "Unknown"
387

    
388
    if err.message:
389
      message = err.message
390
    else:
391
      message = shortmsg
392

    
393
    values = {
394
      "code": err.code,
395
      "message": cgi.escape(message),
396
      "explain": longmsg,
397
      }
398

    
399
    self.response_code = err.code
400
    self.response_content_type = self.error_content_type
401
    self.response_body = self.error_message_format % values
402

    
403
  def HandleRequest(self):
404
    """Handle the actual request.
405

406
    Calls the actual handler function and converts exceptions into HTTP errors.
407

408
    """
409
    # Don't do anything if there's already been a problem
410
    if self.response_code != HTTP_OK:
411
      return
412

    
413
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
414

    
415
    # Check whether client is still there
416
    self.rfile.read(0)
417

    
418
    try:
419
      try:
420
        result = self._server.HandleRequest(self)
421

    
422
        # TODO: Content-type
423
        encoder = HTTPJsonConverter()
424
        body = encoder.Encode(result)
425

    
426
        self.response_content_type = encoder.CONTENT_TYPE
427
        self.response_body = body
428
      except (HTTPException, KeyboardInterrupt, SystemExit):
429
        raise
430
      except Exception, err:
431
        logging.exception("Caught exception")
432
        raise HTTPInternalError(message=str(err))
433
      except:
434
        logging.exception("Unknown exception")
435
        raise HTTPInternalError(message="Unknown error")
436

    
437
    except HTTPException, err:
438
      self._SetErrorStatus(err)
439

    
440
  def SendResponse(self):
441
    """Sends response to the client.
442

443
    """
444
    # Check whether client is still there
445
    self.rfile.read(0)
446

    
447
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
448
                 self.request_requestline, self.response_code)
449

    
450
    if self.response_code in self.responses:
451
      response_message = self.responses[self.response_code][0]
452
    else:
453
      response_message = ""
454

    
455
    if self.request_version != HTTP_0_9:
456
      self.wfile.write("%s %d %s\r\n" %
457
                       (self.request_version, self.response_code,
458
                        response_message))
459
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
460
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
461
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
462
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
463
      for key, val in self.response_headers.iteritems():
464
        self._SendHeader(key, val)
465

    
466
      # We don't support keep-alive at this time
467
      self._SendHeader(HTTP_CONNECTION, "close")
468
      self.wfile.write("\r\n")
469

    
470
    if (self.request_method != HTTP_HEAD and
471
        self.response_code >= HTTP_OK and
472
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
473
      self.wfile.write(self.response_body)
474

    
475
  def _SendHeader(self, name, value):
476
    if self.request_version != HTTP_0_9:
477
      self.wfile.write("%s: %s\r\n" % (name, value))
478

    
479
  def _ReadRequest(self):
480
    """Reads and parses request line
481

482
    """
483
    raw_requestline = self.rfile.readline()
484

    
485
    requestline = raw_requestline
486
    if requestline[-2:] == '\r\n':
487
      requestline = requestline[:-2]
488
    elif requestline[-1:] == '\n':
489
      requestline = requestline[:-1]
490

    
491
    if not requestline:
492
      raise HTTPBadRequest("Empty request line")
493

    
494
    self.request_requestline = requestline
495

    
496
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
497

    
498
    words = requestline.split()
499

    
500
    if len(words) == 3:
501
      [method, path, version] = words
502
      if version[:5] != 'HTTP/':
503
        raise HTTPBadRequest("Bad request version (%r)" % version)
504

    
505
      try:
506
        base_version_number = version.split('/', 1)[1]
507
        version_number = base_version_number.split(".")
508

    
509
        # RFC 2145 section 3.1 says there can be only one "." and
510
        #   - major and minor numbers MUST be treated as
511
        #      separate integers;
512
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
513
        #      turn is lower than HTTP/12.3;
514
        #   - Leading zeros MUST be ignored by recipients.
515
        if len(version_number) != 2:
516
          raise HTTPBadRequest("Bad request version (%r)" % version)
517

    
518
        version_number = int(version_number[0]), int(version_number[1])
519
      except (ValueError, IndexError):
520
        raise HTTPBadRequest("Bad request version (%r)" % version)
521

    
522
      if version_number >= (2, 0):
523
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
524
                                      base_version_number)
525

    
526
    elif len(words) == 2:
527
      version = HTTP_0_9
528
      [method, path] = words
529
      if method != HTTP_GET:
530
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
531

    
532
    else:
533
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
534

    
535
    # Examine the headers and look for a Connection directive
536
    headers = mimetools.Message(self.rfile, 0)
537

    
538
    self.request_method = method
539
    self.request_path = path
540
    self.request_version = version
541
    self.request_headers = headers
542

    
543
  def _ReadPostData(self):
544
    """Reads POST/PUT data
545

546
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
547
    a request is signaled by the inclusion of a Content-Length header field in
548
    the request message headers. HTTP/1.0 requests containing an entity body
549
    must include a valid Content-Length header field."
550

551
    """
552
    # While not according to specification, we only support an entity body for
553
    # POST and PUT.
554
    if (not self.request_method or
555
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
556
      self.request_post_data = None
557
      return
558

    
559
    content_length = None
560
    try:
561
      if HTTP_CONTENT_LENGTH in self.request_headers:
562
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
563
    except TypeError:
564
      pass
565
    except ValueError:
566
      pass
567

    
568
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
569
    if content_length is None:
570
      raise HTTPLengthRequired("Missing Content-Length header or"
571
                               " invalid format")
572

    
573
    data = self.rfile.read(content_length)
574

    
575
    # TODO: Content-type, error handling
576
    if data:
577
      self.request_post_data = HTTPJsonConverter().Decode(data)
578
    else:
579
      self.request_post_data = None
580

    
581
    logging.debug("HTTP POST data: %s", self.request_post_data)
582

    
583

    
584
class HttpServer(_HttpSocketBase):
585
  """Generic HTTP server class
586

587
  Users of this class must subclass it and override the HandleRequest function.
588

589
  """
590
  MAX_CHILDREN = 20
591

    
592
  def __init__(self, mainloop, local_address, port,
593
               ssl_params=None, ssl_verify_peer=False):
594
    """Initializes the HTTP server
595

596
    @type mainloop: ganeti.daemon.Mainloop
597
    @param mainloop: Mainloop used to poll for I/O events
598
    @type local_addess: string
599
    @param local_address: Local IP address to bind to
600
    @type port: int
601
    @param port: TCP port to listen on
602
    @type ssl_params: HttpSslParams
603
    @param ssl_params: SSL key and certificate
604
    @type ssl_verify_peer: bool
605
    @param ssl_verify_peer: Whether to require client certificate and compare
606
                            it with our certificate
607

608
    """
609
    _HttpSocketBase.__init__(self)
610

    
611
    self.mainloop = mainloop
612
    self.local_address = local_address
613
    self.port = port
614

    
615
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
616

    
617
    # Allow port to be reused
618
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
619

    
620
    if self._using_ssl:
621
      self._fileio_class = _SSLFileObject
622
    else:
623
      self._fileio_class = socket._fileobject
624

    
625
    self._children = []
626

    
627
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
628
    mainloop.RegisterSignal(self)
629

    
630
  def Start(self):
631
    self.socket.bind((self.local_address, self.port))
632
    self.socket.listen(5)
633

    
634
  def Stop(self):
635
    self.socket.close()
636

    
637
  def OnIO(self, fd, condition):
638
    if condition & select.POLLIN:
639
      self._IncomingConnection()
640

    
641
  def OnSignal(self, signum):
642
    if signum == signal.SIGCHLD:
643
      self._CollectChildren(True)
644

    
645
  def _CollectChildren(self, quick):
646
    """Checks whether any child processes are done
647

648
    @type quick: bool
649
    @param quick: Whether to only use non-blocking functions
650

651
    """
652
    if not quick:
653
      # Don't wait for other processes if it should be a quick check
654
      while len(self._children) > self.MAX_CHILDREN:
655
        try:
656
          # Waiting without a timeout brings us into a potential DoS situation.
657
          # As soon as too many children run, we'll not respond to new
658
          # requests. The real solution would be to add a timeout for children
659
          # and killing them after some time.
660
          pid, status = os.waitpid(0, 0)
661
        except os.error:
662
          pid = None
663
        if pid and pid in self._children:
664
          self._children.remove(pid)
665

    
666
    for child in self._children:
667
      try:
668
        pid, status = os.waitpid(child, os.WNOHANG)
669
      except os.error:
670
        pid = None
671
      if pid and pid in self._children:
672
        self._children.remove(pid)
673

    
674
  def _IncomingConnection(self):
675
    """Called for each incoming connection
676

677
    """
678
    (connection, client_addr) = self.socket.accept()
679

    
680
    self._CollectChildren(False)
681

    
682
    pid = os.fork()
683
    if pid == 0:
684
      # Child process
685
      logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
686

    
687
      try:
688
        try:
689
          try:
690
            handler = None
691
            try:
692
              # Read, parse and handle request
693
              handler = _HttpConnectionHandler(self, connection, client_addr,
694
                                               self._fileio_class)
695
              handler.HandleRequest()
696
            finally:
697
              # Try to send a response
698
              if handler:
699
                handler.SendResponse()
700
                handler.Close()
701
          except SocketClosed:
702
            pass
703
        finally:
704
          logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
705
      except:
706
        logging.exception("Error while handling request from %s:%s",
707
                          client_addr[0], client_addr[1])
708
        os._exit(1)
709
      os._exit(0)
710
    else:
711
      self._children.append(pid)
712

    
713
  def HandleRequest(self, req):
714
    raise NotImplementedError()
715

    
716

    
717
class HttpClientRequest(object):
718
  def __init__(self, host, port, method, path, headers=None, post_data=None,
719
               ssl_params=None, ssl_verify_peer=False):
720
    """Describes an HTTP request.
721

722
    @type host: string
723
    @param host: Hostname
724
    @type port: int
725
    @param port: Port
726
    @type method: string
727
    @param method: Method name
728
    @type path: string
729
    @param path: Request path
730
    @type headers: dict or None
731
    @param headers: Additional headers to send
732
    @type post_data: string or None
733
    @param post_data: Additional data to send
734
    @type ssl_params: HttpSslParams
735
    @param ssl_params: SSL key and certificate
736
    @type ssl_verify_peer: bool
737
    @param ssl_verify_peer: Whether to compare our certificate with server's
738
                            certificate
739

740
    """
741
    if post_data is not None:
742
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
743
        "Only POST and GET requests support sending data"
744

    
745
    assert path.startswith("/"), "Path must start with slash (/)"
746

    
747
    self.host = host
748
    self.port = port
749
    self.ssl_params = ssl_params
750
    self.ssl_verify_peer = ssl_verify_peer
751
    self.method = method
752
    self.path = path
753
    self.headers = headers
754
    self.post_data = post_data
755

    
756
    self.success = None
757
    self.error = None
758

    
759
    self.resp_status_line = None
760
    self.resp_version = None
761
    self.resp_status = None
762
    self.resp_reason = None
763
    self.resp_headers = None
764
    self.resp_body = None
765

    
766

    
767
class HttpClientRequestExecutor(_HttpSocketBase):
768
  # Default headers
769
  DEFAULT_HEADERS = {
770
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
771
    # TODO: For keep-alive, don't send "Connection: close"
772
    HTTP_CONNECTION: "close",
773
    }
774

    
775
  # Length limits
776
  STATUS_LINE_LENGTH_MAX = 512
777
  HEADER_LENGTH_MAX = 4 * 1024
778

    
779
  # Timeouts in seconds for socket layer
780
  # TODO: Make read timeout configurable per OpCode
781
  CONNECT_TIMEOUT = 5.0
782
  WRITE_TIMEOUT = 10
783
  READ_TIMEOUT = None
784
  CLOSE_TIMEOUT = 1
785

    
786
  # Parser state machine
787
  PS_STATUS_LINE = "status-line"
788
  PS_HEADERS = "headers"
789
  PS_BODY = "body"
790
  PS_COMPLETE = "complete"
791

    
792
  # Socket operations
793
  (OP_SEND,
794
   OP_RECV,
795
   OP_CLOSE_CHECK,
796
   OP_SHUTDOWN) = range(4)
797

    
798
  def __init__(self, req):
799
    """Initializes the HttpClientRequestExecutor class.
800

801
    @type req: HttpClientRequest
802
    @param req: Request object
803

804
    """
805
    _HttpSocketBase.__init__(self)
806

    
807
    self.request = req
808

    
809
    self.parser_status = self.PS_STATUS_LINE
810
    self.header_buffer = StringIO()
811
    self.body_buffer = StringIO()
812
    self.content_length = None
813
    self.server_will_close = None
814

    
815
    self.poller = select.poll()
816

    
817
    try:
818
      # TODO: Implement connection caching/keep-alive
819
      self.sock = self._CreateSocket(req.ssl_params,
820
                                     req.ssl_verify_peer)
821

    
822
      # Disable Python's timeout
823
      self.sock.settimeout(None)
824

    
825
      # Operate in non-blocking mode
826
      self.sock.setblocking(0)
827

    
828
      force_close = True
829
      self._Connect()
830
      try:
831
        self._SendRequest()
832
        self._ReadResponse()
833

    
834
        # Only wait for server to close if we didn't have any exception.
835
        force_close = False
836
      finally:
837
        self._CloseConnection(force_close)
838

    
839
      self.sock.close()
840
      self.sock = None
841

    
842
      req.resp_body = self.body_buffer.getvalue()
843

    
844
      req.success = True
845
      req.error = None
846

    
847
    except _HttpClientError, err:
848
      req.success = False
849
      req.error = str(err)
850

    
851
  def _BuildRequest(self):
852
    """Build HTTP request.
853

854
    @rtype: string
855
    @return: Complete request
856

857
    """
858
    # Headers
859
    send_headers = self.DEFAULT_HEADERS.copy()
860

    
861
    if self.request.headers:
862
      send_headers.update(self.request.headers)
863

    
864
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
865

    
866
    if self.request.post_data:
867
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
868

    
869
    buf = StringIO()
870

    
871
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
872
    # keep-alive).
873
    # TODO: For keep-alive, change to HTTP/1.1
874
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
875
                                self.request.path, HTTP_1_0))
876

    
877
    # Add headers
878
    for name, value in send_headers.iteritems():
879
      buf.write("%s: %s\r\n" % (name, value))
880

    
881
    buf.write("\r\n")
882

    
883
    if self.request.post_data:
884
      buf.write(self.request.post_data)
885

    
886
    return buf.getvalue()
887

    
888
  def _ParseStatusLine(self):
889
    """Parses the status line sent by the server.
890

891
    """
892
    line = self.request.resp_status_line
893

    
894
    if not line:
895
      raise _HttpClientError("Empty status line")
896

    
897
    try:
898
      [version, status, reason] = line.split(None, 2)
899
    except ValueError:
900
      try:
901
        [version, status] = line.split(None, 1)
902
        reason = ""
903
      except ValueError:
904
        version = HTTP_9_0
905

    
906
    if version:
907
      version = version.upper()
908

    
909
    if version not in (HTTP_1_0, HTTP_1_1):
910
      # We do not support HTTP/0.9, despite the specification requiring it
911
      # (RFC2616, section 19.6)
912
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
913
                             line)
914

    
915
    # The status code is a three-digit number
916
    try:
917
      status = int(status)
918
      if status < 100 or status > 999:
919
        status = -1
920
    except ValueError:
921
      status = -1
922

    
923
    if status == -1:
924
      raise _HttpClientError("Invalid status code (%r)" % line)
925

    
926
    self.request.resp_version = version
927
    self.request.resp_status = status
928
    self.request.resp_reason = reason
929

    
930
  def _WillServerCloseConnection(self):
931
    """Evaluate whether server will close the connection.
932

933
    @rtype: bool
934
    @return: Whether server will close the connection
935

936
    """
937
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
938
    if hdr_connection:
939
      hdr_connection = hdr_connection.lower()
940

    
941
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
942
    if self.request.resp_version == HTTP_1_1:
943
      return (hdr_connection and "close" in hdr_connection)
944

    
945
    # Some HTTP/1.0 implementations have support for persistent connections,
946
    # using rules different than HTTP/1.1.
947

    
948
    # For older HTTP, Keep-Alive indicates persistent connection.
949
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
950
      return False
951

    
952
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
953
    # supposed to be sent by the client.
954
    if hdr_connection and "keep-alive" in hdr_connection:
955
      return False
956

    
957
    return True
958

    
959
  def _ParseHeaders(self):
960
    """Parses the headers sent by the server.
961

962
    This function also adjusts internal variables based on the header values.
963

964
    """
965
    req = self.request
966

    
967
    # Parse headers
968
    self.header_buffer.seek(0, 0)
969
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
970

    
971
    self.server_will_close = self._WillServerCloseConnection()
972

    
973
    # Do we have a Content-Length header?
974
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
975
    if hdr_content_length:
976
      try:
977
        self.content_length = int(hdr_content_length)
978
      except ValueError:
979
        pass
980
      if self.content_length is not None and self.content_length < 0:
981
        self.content_length = None
982

    
983
    # does the body have a fixed length? (of zero)
984
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
985
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
986
      self.content_length = 0
987

    
988
    # if the connection remains open and a content-length was not provided,
989
    # then assume that the connection WILL close.
990
    if self.content_length is None:
991
      self.server_will_close = True
992

    
993
  def _CheckStatusLineLength(self, length):
994
    if length > self.STATUS_LINE_LENGTH_MAX:
995
      raise _HttpClientError("Status line longer than %d chars" %
996
                             self.STATUS_LINE_LENGTH_MAX)
997

    
998
  def _CheckHeaderLength(self, length):
999
    if length > self.HEADER_LENGTH_MAX:
1000
      raise _HttpClientError("Headers longer than %d chars" %
1001
                             self.HEADER_LENGTH_MAX)
1002

    
1003
  def _ParseBuffer(self, buf, eof):
1004
    """Main function for HTTP response state machine.
1005

1006
    @type buf: string
1007
    @param buf: Receive buffer
1008
    @type eof: bool
1009
    @param eof: Whether we've reached EOF on the socket
1010
    @rtype: string
1011
    @return: Updated receive buffer
1012

1013
    """
1014
    if self.parser_status == self.PS_STATUS_LINE:
1015
      # Expect status line
1016
      idx = buf.find("\r\n")
1017
      if idx >= 0:
1018
        self.request.resp_status_line = buf[:idx]
1019

    
1020
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1021

    
1022
        # Remove status line, including CRLF
1023
        buf = buf[idx + 2:]
1024

    
1025
        self._ParseStatusLine()
1026

    
1027
        self.parser_status = self.PS_HEADERS
1028
      else:
1029
        # Check whether incoming data is getting too large, otherwise we just
1030
        # fill our read buffer.
1031
        self._CheckStatusLineLength(len(buf))
1032

    
1033
    if self.parser_status == self.PS_HEADERS:
1034
      # Wait for header end
1035
      idx = buf.find("\r\n\r\n")
1036
      if idx >= 0:
1037
        self.header_buffer.write(buf[:idx + 2])
1038

    
1039
        self._CheckHeaderLength(self.header_buffer.tell())
1040

    
1041
        # Remove headers, including CRLF
1042
        buf = buf[idx + 4:]
1043

    
1044
        self._ParseHeaders()
1045

    
1046
        self.parser_status = self.PS_BODY
1047
      else:
1048
        # Check whether incoming data is getting too large, otherwise we just
1049
        # fill our read buffer.
1050
        self._CheckHeaderLength(len(buf))
1051

    
1052
    if self.parser_status == self.PS_BODY:
1053
      self.body_buffer.write(buf)
1054
      buf = ""
1055

    
1056
      # Check whether we've read everything
1057
      if (eof or
1058
          (self.content_length is not None and
1059
           self.body_buffer.tell() >= self.content_length)):
1060
        self.parser_status = self.PS_COMPLETE
1061

    
1062
    return buf
1063

    
1064
  def _WaitForCondition(self, event, timeout):
1065
    """Waits for a condition to occur on the socket.
1066

1067
    @type event: int
1068
    @param event: ORed condition (see select module)
1069
    @type timeout: float or None
1070
    @param timeout: Timeout in seconds
1071
    @rtype: int or None
1072
    @return: None for timeout, otherwise occured conditions
1073

1074
    """
1075
    check = (event | select.POLLPRI |
1076
             select.POLLNVAL | select.POLLHUP | select.POLLERR)
1077

    
1078
    if timeout is not None:
1079
      # Poller object expects milliseconds
1080
      timeout *= 1000
1081

    
1082
    self.poller.register(self.sock, event)
1083
    try:
1084
      while True:
1085
        # TODO: If the main thread receives a signal and we have no timeout, we
1086
        # could wait forever. This should check a global "quit" flag or
1087
        # something every so often.
1088
        io_events = self.poller.poll(timeout)
1089
        if io_events:
1090
          for (evfd, evcond) in io_events:
1091
            if evcond & check:
1092
              return evcond
1093
        else:
1094
          # Timeout
1095
          return None
1096
    finally:
1097
      self.poller.unregister(self.sock)
1098

    
1099
  def _SocketOperation(self, op, arg1, error_msg, timeout_msg):
1100
    """Wrapper around socket functions.
1101

1102
    This function abstracts error handling for socket operations, especially
1103
    for the complicated interaction with OpenSSL.
1104

1105
    """
1106
    if op == self.OP_SEND:
1107
      event_poll = select.POLLOUT
1108
      event_check = select.POLLOUT
1109
      timeout = self.WRITE_TIMEOUT
1110

    
1111
    elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1112
      event_poll = select.POLLIN
1113
      event_check = select.POLLIN | select.POLLPRI
1114
      if op == self.OP_CLOSE_CHECK:
1115
        timeout = self.CLOSE_TIMEOUT
1116
      else:
1117
        timeout = self.READ_TIMEOUT
1118

    
1119
    elif op == self.OP_SHUTDOWN:
1120
      event_poll = None
1121
      event_check = None
1122

    
1123
      # The timeout is only used when OpenSSL requests polling for a condition.
1124
      # It is not advisable to have no timeout for shutdown.
1125
      timeout = self.WRITE_TIMEOUT
1126

    
1127
    else:
1128
      raise AssertionError("Invalid socket operation")
1129

    
1130
    # No override by default
1131
    event_override = 0
1132

    
1133
    while True:
1134
      # Poll only for certain operations and when asked for by an override
1135
      if (event_override or
1136
          op in (self.OP_SEND, self.OP_RECV, self.OP_CLOSE_CHECK)):
1137
        if event_override:
1138
          wait_for_event = event_override
1139
        else:
1140
          wait_for_event = event_poll
1141

    
1142
        event = self._WaitForCondition(wait_for_event, timeout)
1143
        if event is None:
1144
          raise _HttpClientTimeout(timeout_msg)
1145

    
1146
        if (op == self.OP_RECV and
1147
            event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
1148
          return ""
1149

    
1150
        if not event & wait_for_event:
1151
          continue
1152

    
1153
      # Reset override
1154
      event_override = 0
1155

    
1156
      try:
1157
        try:
1158
          if op == self.OP_SEND:
1159
            return self.sock.send(arg1)
1160

    
1161
          elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1162
            return self.sock.recv(arg1)
1163

    
1164
          elif op == self.OP_SHUTDOWN:
1165
            if self._using_ssl:
1166
              # PyOpenSSL's shutdown() doesn't take arguments
1167
              return self.sock.shutdown()
1168
            else:
1169
              return self.sock.shutdown(arg1)
1170

    
1171
        except OpenSSL.SSL.WantWriteError:
1172
          # OpenSSL wants to write, poll for POLLOUT
1173
          event_override = select.POLLOUT
1174
          continue
1175

    
1176
        except OpenSSL.SSL.WantReadError:
1177
          # OpenSSL wants to read, poll for POLLIN
1178
          event_override = select.POLLIN | select.POLLPRI
1179
          continue
1180

    
1181
        except OpenSSL.SSL.WantX509LookupError:
1182
          continue
1183

    
1184
        except OpenSSL.SSL.SysCallError, err:
1185
          if op == self.OP_SEND:
1186
            # arg1 is the data when writing
1187
            if err.args and err.args[0] == -1 and arg1 == "":
1188
              # errors when writing empty strings are expected
1189
              # and can be ignored
1190
              return 0
1191

    
1192
          elif op == self.OP_RECV:
1193
            if err.args == (-1, _SSL_UNEXPECTED_EOF):
1194
              return ""
1195

    
1196
          raise socket.error(err.args)
1197

    
1198
        except OpenSSL.SSL.Error, err:
1199
          raise socket.error(err.args)
1200

    
1201
      except socket.error, err:
1202
        if err.args and err.args[0] == errno.EAGAIN:
1203
          # Ignore EAGAIN
1204
          continue
1205

    
1206
        raise _HttpClientError("%s: %s" % (error_msg, str(err)))
1207

    
1208
  def _Connect(self):
1209
    """Non-blocking connect to host with timeout.
1210

1211
    """
1212
    connected = False
1213
    while True:
1214
      try:
1215
        connect_error = self.sock.connect_ex((self.request.host,
1216
                                              self.request.port))
1217
      except socket.gaierror, err:
1218
        raise _HttpClientError("Connection failed: %s" % str(err))
1219

    
1220
      if connect_error == errno.EINTR:
1221
        # Mask signals
1222
        pass
1223

    
1224
      elif connect_error == 0:
1225
        # Connection established
1226
        connected = True
1227
        break
1228

    
1229
      elif connect_error == errno.EINPROGRESS:
1230
        # Connection started
1231
        break
1232

    
1233
      raise _HttpClientError("Connection failed (%s: %s)" %
1234
                             (connect_error, os.strerror(connect_error)))
1235

    
1236
    if not connected:
1237
      # Wait for connection
1238
      event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT)
1239
      if event is None:
1240
        raise _HttpClientError("Timeout while connecting to server")
1241

    
1242
      # Get error code
1243
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1244
      if connect_error != 0:
1245
        raise _HttpClientError("Connection failed (%s: %s)" %
1246
                               (connect_error, os.strerror(connect_error)))
1247

    
1248
    # Enable TCP keep-alive
1249
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1250

    
1251
    # If needed, Linux specific options are available to change the TCP
1252
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1253
    # TCP_KEEPINTVL.
1254

    
1255
  def _SendRequest(self):
1256
    """Sends request to server.
1257

1258
    """
1259
    buf = self._BuildRequest()
1260

    
1261
    while buf:
1262
      # Send only 4 KB at a time
1263
      data = buf[:4096]
1264

    
1265
      sent = self._SocketOperation(self.OP_SEND, data,
1266
                                   "Error while sending request",
1267
                                   "Timeout while sending request")
1268

    
1269
      # Remove sent bytes
1270
      buf = buf[sent:]
1271

    
1272
    assert not buf, "Request wasn't sent completely"
1273

    
1274
  def _ReadResponse(self):
1275
    """Read response from server.
1276

1277
    Calls the parser function after reading a chunk of data.
1278

1279
    """
1280
    buf = ""
1281
    eof = False
1282
    while self.parser_status != self.PS_COMPLETE:
1283
      data = self._SocketOperation(self.OP_RECV, 4096,
1284
                                   "Error while reading response",
1285
                                   "Timeout while reading response")
1286

    
1287
      if data:
1288
        buf += data
1289
      else:
1290
        eof = True
1291

    
1292
      # Do some parsing and error checking while more data arrives
1293
      buf = self._ParseBuffer(buf, eof)
1294

    
1295
      # Must be done only after the buffer has been evaluated
1296
      if (eof and
1297
          self.parser_status in (self.PS_STATUS_LINE,
1298
                                 self.PS_HEADERS)):
1299
        raise _HttpClientError("Connection closed prematurely")
1300

    
1301
    # Parse rest
1302
    buf = self._ParseBuffer(buf, True)
1303

    
1304
    assert self.parser_status == self.PS_COMPLETE
1305
    assert not buf, "Parser didn't read full response"
1306

    
1307
  def _CloseConnection(self, force):
1308
    """Closes the connection.
1309

1310
    """
1311
    if self.server_will_close and not force:
1312
      # Wait for server to close
1313
      try:
1314
        # Check whether it's actually closed
1315
        if not self._SocketOperation(self.OP_CLOSE_CHECK, 1,
1316
                                     "Error", "Timeout"):
1317
          return
1318
      except (socket.error, _HttpClientError):
1319
        # Ignore errors at this stage
1320
        pass
1321

    
1322
    # Close the connection from our side
1323
    self._SocketOperation(self.OP_SHUTDOWN, socket.SHUT_RDWR,
1324
                          "Error while shutting down connection",
1325
                          "Timeout while shutting down connection")
1326

    
1327

    
1328
class _HttpClientPendingRequest(object):
1329
  """Data class for pending requests.
1330

1331
  """
1332
  def __init__(self, request):
1333
    self.request = request
1334

    
1335
    # Thread synchronization
1336
    self.done = threading.Event()
1337

    
1338

    
1339
class HttpClientWorker(workerpool.BaseWorker):
1340
  """HTTP client worker class.
1341

1342
  """
1343
  def RunTask(self, pend_req):
1344
    try:
1345
      HttpClientRequestExecutor(pend_req.request)
1346
    finally:
1347
      pend_req.done.set()
1348

    
1349

    
1350
class HttpClientWorkerPool(workerpool.WorkerPool):
1351
  def __init__(self, manager):
1352
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1353
                                   HttpClientWorker)
1354
    self.manager = manager
1355

    
1356

    
1357
class HttpClientManager(object):
1358
  """Manages HTTP requests.
1359

1360
  """
1361
  def __init__(self):
1362
    self._wpool = HttpClientWorkerPool(self)
1363

    
1364
  def __del__(self):
1365
    self.Shutdown()
1366

    
1367
  def ExecRequests(self, requests):
1368
    """Execute HTTP requests.
1369

1370
    This function can be called from multiple threads at the same time.
1371

1372
    @type requests: List of HttpClientRequest instances
1373
    @param requests: The requests to execute
1374
    @rtype: List of HttpClientRequest instances
1375
    @returns: The list of requests passed in
1376

1377
    """
1378
    # _HttpClientPendingRequest is used for internal thread synchronization
1379
    pending = [_HttpClientPendingRequest(req) for req in requests]
1380

    
1381
    try:
1382
      # Add requests to queue
1383
      for pend_req in pending:
1384
        self._wpool.AddTask(pend_req)
1385

    
1386
    finally:
1387
      # In case of an exception we should still wait for the rest, otherwise
1388
      # another thread from the worker pool could modify the request object
1389
      # after we returned.
1390

    
1391
      # And wait for them to finish
1392
      for pend_req in pending:
1393
        pend_req.done.wait()
1394

    
1395
    # Return original list
1396
    return requests
1397

    
1398
  def Shutdown(self):
1399
    self._wpool.Quiesce()
1400
    self._wpool.TerminateWorkers()
1401

    
1402

    
1403
class _SSLFileObject(object):
1404
  """Wrapper around socket._fileobject
1405

1406
  This wrapper is required to handle OpenSSL exceptions.
1407

1408
  """
1409
  def _RequireOpenSocket(fn):
1410
    def wrapper(self, *args, **kwargs):
1411
      if self.closed:
1412
        raise SocketClosed("Socket is closed")
1413
      return fn(self, *args, **kwargs)
1414
    return wrapper
1415

    
1416
  def __init__(self, sock, mode='rb', bufsize=-1):
1417
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1418

    
1419
  def _ConnectionLost(self):
1420
    self._base = None
1421

    
1422
  def _getclosed(self):
1423
    return self._base is None or self._base.closed
1424
  closed = property(_getclosed, doc="True if the file is closed")
1425

    
1426
  @_RequireOpenSocket
1427
  def close(self):
1428
    return self._base.close()
1429

    
1430
  @_RequireOpenSocket
1431
  def flush(self):
1432
    return self._base.flush()
1433

    
1434
  @_RequireOpenSocket
1435
  def fileno(self):
1436
    return self._base.fileno()
1437

    
1438
  @_RequireOpenSocket
1439
  def read(self, size=-1):
1440
    return self._ReadWrapper(self._base.read, size=size)
1441

    
1442
  @_RequireOpenSocket
1443
  def readline(self, size=-1):
1444
    return self._ReadWrapper(self._base.readline, size=size)
1445

    
1446
  def _ReadWrapper(self, fn, *args, **kwargs):
1447
    while True:
1448
      try:
1449
        return fn(*args, **kwargs)
1450

    
1451
      except OpenSSL.SSL.ZeroReturnError, err:
1452
        self._ConnectionLost()
1453
        return ""
1454

    
1455
      except OpenSSL.SSL.WantReadError:
1456
        continue
1457

    
1458
      #except OpenSSL.SSL.WantWriteError:
1459
      # TODO
1460

    
1461
      except OpenSSL.SSL.SysCallError, (retval, desc):
1462
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1463
            or retval > 0):
1464
          self._ConnectionLost()
1465
          return ""
1466

    
1467
        logging.exception("Error in OpenSSL")
1468
        self._ConnectionLost()
1469
        raise socket.error(err.args)
1470

    
1471
      except OpenSSL.SSL.Error, err:
1472
        self._ConnectionLost()
1473
        raise socket.error(err.args)
1474

    
1475
  @_RequireOpenSocket
1476
  def write(self, data):
1477
    return self._WriteWrapper(self._base.write, data)
1478

    
1479
  def _WriteWrapper(self, fn, *args, **kwargs):
1480
    while True:
1481
      try:
1482
        return fn(*args, **kwargs)
1483
      except OpenSSL.SSL.ZeroReturnError, err:
1484
        self._ConnectionLost()
1485
        return 0
1486

    
1487
      except OpenSSL.SSL.WantWriteError:
1488
        continue
1489

    
1490
      #except OpenSSL.SSL.WantReadError:
1491
      # TODO
1492

    
1493
      except OpenSSL.SSL.SysCallError, err:
1494
        if err.args[0] == -1 and data == "":
1495
          # errors when writing empty strings are expected
1496
          # and can be ignored
1497
          return 0
1498

    
1499
        self._ConnectionLost()
1500
        raise socket.error(err.args)
1501

    
1502
      except OpenSSL.SSL.Error, err:
1503
        self._ConnectionLost()
1504
        raise socket.error(err.args)