Statistics
| Branch: | Tag: | Revision:

root / lib / http / __init__.py @ 02cab3e7

History | View | Annotate | Download (51.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP module.
22

23
"""
24

    
25
import BaseHTTPServer
26
import cgi
27
import logging
28
import mimetools
29
import OpenSSL
30
import os
31
import select
32
import socket
33
import time
34
import signal
35
import errno
36
import threading
37

    
38
from cStringIO import StringIO
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import utils
44

    
45

    
46
HTTP_CLIENT_THREADS = 10
47

    
48
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
49

    
50
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
51
MONTHNAME = [None,
52
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
53
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
54

    
55
# Default error message
56
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
57
DEFAULT_ERROR_MESSAGE = """\
58
<head>
59
<title>Error response</title>
60
</head>
61
<body>
62
<h1>Error response</h1>
63
<p>Error code %(code)d.
64
<p>Message: %(message)s.
65
<p>Error code explanation: %(code)s = %(explain)s.
66
</body>
67
"""
68

    
69
HTTP_OK = 200
70
HTTP_NO_CONTENT = 204
71
HTTP_NOT_MODIFIED = 304
72

    
73
HTTP_0_9 = "HTTP/0.9"
74
HTTP_1_0 = "HTTP/1.0"
75
HTTP_1_1 = "HTTP/1.1"
76

    
77
HTTP_GET = "GET"
78
HTTP_HEAD = "HEAD"
79
HTTP_POST = "POST"
80
HTTP_PUT = "PUT"
81

    
82
HTTP_ETAG = "ETag"
83
HTTP_HOST = "Host"
84
HTTP_SERVER = "Server"
85
HTTP_DATE = "Date"
86
HTTP_USER_AGENT = "User-Agent"
87
HTTP_CONTENT_TYPE = "Content-Type"
88
HTTP_CONTENT_LENGTH = "Content-Length"
89
HTTP_CONNECTION = "Connection"
90
HTTP_KEEP_ALIVE = "Keep-Alive"
91

    
92
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
93

    
94
# Socket operations
95
(SOCKOP_SEND,
96
 SOCKOP_RECV,
97
 SOCKOP_SHUTDOWN) = range(3)
98

    
99

    
100
class SocketClosed(socket.error):
101
  pass
102

    
103

    
104
class HttpError(Exception):
105
  """Internal exception for HTTP errors.
106

107
  This should only be used for internal error reporting.
108

109
  """
110

    
111

    
112
class _HttpClientError(Exception):
113
  """Internal exception for HTTP client errors.
114

115
  This should only be used for internal error reporting.
116

117
  """
118

    
119

    
120
class HttpSocketTimeout(Exception):
121
  """Internal exception for socket timeouts.
122

123
  This should only be used for internal error reporting.
124

125
  """
126

    
127

    
128
class HttpException(Exception):
129
  code = None
130
  message = None
131

    
132
  def __init__(self, message=None):
133
    Exception.__init__(self)
134
    if message is not None:
135
      self.message = message
136

    
137

    
138
class HttpBadRequest(HttpException):
139
  code = 400
140

    
141

    
142
class HttpForbidden(HttpException):
143
  code = 403
144

    
145

    
146
class HttpNotFound(HttpException):
147
  code = 404
148

    
149

    
150
class HttpGone(HttpException):
151
  code = 410
152

    
153

    
154
class HttpLengthRequired(HttpException):
155
  code = 411
156

    
157

    
158
class HttpInternalError(HttpException):
159
  code = 500
160

    
161

    
162
class HttpNotImplemented(HttpException):
163
  code = 501
164

    
165

    
166
class HttpServiceUnavailable(HttpException):
167
  code = 503
168

    
169

    
170
class HttpVersionNotSupported(HttpException):
171
  code = 505
172

    
173

    
174
class HttpJsonConverter:
175
  CONTENT_TYPE = "application/json"
176

    
177
  def Encode(self, data):
178
    return serializer.DumpJson(data)
179

    
180
  def Decode(self, data):
181
    return serializer.LoadJson(data)
182

    
183

    
184
def WaitForSocketCondition(poller, sock, event, timeout):
185
  """Waits for a condition to occur on the socket.
186

187
  @type poller: select.Poller
188
  @param poller: Poller object as created by select.poll()
189
  @type sock: socket
190
  @param socket: Wait for events on this socket
191
  @type event: int
192
  @param event: ORed condition (see select module)
193
  @type timeout: float or None
194
  @param timeout: Timeout in seconds
195
  @rtype: int or None
196
  @return: None for timeout, otherwise occured conditions
197

198
  """
199
  check = (event | select.POLLPRI |
200
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
201

    
202
  if timeout is not None:
203
    # Poller object expects milliseconds
204
    timeout *= 1000
205

    
206
  poller.register(sock, event)
207
  try:
208
    while True:
209
      # TODO: If the main thread receives a signal and we have no timeout, we
210
      # could wait forever. This should check a global "quit" flag or
211
      # something every so often.
212
      io_events = poller.poll(timeout)
213
      if not io_events:
214
        # Timeout
215
        return None
216
      for (evfd, evcond) in io_events:
217
        if evcond & check:
218
          return evcond
219
  finally:
220
    poller.unregister(sock)
221

    
222

    
223
def SocketOperation(poller, sock, op, arg1, timeout):
224
  """Wrapper around socket functions.
225

226
  This function abstracts error handling for socket operations, especially
227
  for the complicated interaction with OpenSSL.
228

229
  @type poller: select.Poller
230
  @param poller: Poller object as created by select.poll()
231
  @type sock: socket
232
  @param socket: Socket for the operation
233
  @type op: int
234
  @param op: Operation to execute (SOCKOP_* constants)
235
  @type arg1: any
236
  @param arg1: Parameter for function (if needed)
237
  @type timeout: None or float
238
  @param timeout: Timeout in seconds or None
239

240
  """
241
  # TODO: event_poll/event_check/override
242
  if op == SOCKOP_SEND:
243
    event_poll = select.POLLOUT
244
    event_check = select.POLLOUT
245

    
246
  elif op == SOCKOP_RECV:
247
    event_poll = select.POLLIN
248
    event_check = select.POLLIN | select.POLLPRI
249

    
250
  elif op == SOCKOP_SHUTDOWN:
251
    event_poll = None
252
    event_check = None
253

    
254
    # The timeout is only used when OpenSSL requests polling for a condition.
255
    # It is not advisable to have no timeout for shutdown.
256
    assert timeout
257

    
258
  else:
259
    raise AssertionError("Invalid socket operation")
260

    
261
  # No override by default
262
  event_override = 0
263

    
264
  while True:
265
    # Poll only for certain operations and when asked for by an override
266
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
267
      if event_override:
268
        wait_for_event = event_override
269
      else:
270
        wait_for_event = event_poll
271

    
272
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
273
      if event is None:
274
        raise HttpSocketTimeout()
275

    
276
      if (op == SOCKOP_RECV and
277
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
278
        return ""
279

    
280
      if not event & wait_for_event:
281
        continue
282

    
283
    # Reset override
284
    event_override = 0
285

    
286
    try:
287
      try:
288
        if op == SOCKOP_SEND:
289
          return sock.send(arg1)
290

    
291
        elif op == SOCKOP_RECV:
292
          return sock.recv(arg1)
293

    
294
        elif op == SOCKOP_SHUTDOWN:
295
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
296
            # PyOpenSSL's shutdown() doesn't take arguments
297
            return sock.shutdown()
298
          else:
299
            return sock.shutdown(arg1)
300

    
301
      except OpenSSL.SSL.WantWriteError:
302
        # OpenSSL wants to write, poll for POLLOUT
303
        event_override = select.POLLOUT
304
        continue
305

    
306
      except OpenSSL.SSL.WantReadError:
307
        # OpenSSL wants to read, poll for POLLIN
308
        event_override = select.POLLIN | select.POLLPRI
309
        continue
310

    
311
      except OpenSSL.SSL.WantX509LookupError:
312
        continue
313

    
314
      except OpenSSL.SSL.SysCallError, err:
315
        if op == SOCKOP_SEND:
316
          # arg1 is the data when writing
317
          if err.args and err.args[0] == -1 and arg1 == "":
318
            # errors when writing empty strings are expected
319
            # and can be ignored
320
            return 0
321

    
322
        elif op == SOCKOP_RECV:
323
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
324
            return ""
325

    
326
        raise socket.error(err.args)
327

    
328
      except OpenSSL.SSL.Error, err:
329
        raise socket.error(err.args)
330

    
331
    except socket.error, err:
332
      if err.args and err.args[0] == errno.EAGAIN:
333
        # Ignore EAGAIN
334
        continue
335

    
336
      raise
337

    
338

    
339
def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
340
                       force):
341
  """Closes the connection.
342

343
  """
344
  poller = select.poll()
345

    
346
  #print msgreader.peer_will_close, force
347
  if msgreader and msgreader.peer_will_close and not force:
348
    # Wait for peer to close
349
    try:
350
      # Check whether it's actually closed
351
      if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
352
        return
353
    except (socket.error, HttpError, HttpSocketTimeout):
354
      # Ignore errors at this stage
355
      pass
356

    
357
  # Close the connection from our side
358
  try:
359
    SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
360
                    write_timeout)
361
  except HttpSocketTimeout:
362
    raise HttpError("Timeout while shutting down connection")
363
  except socket.error, err:
364
    raise HttpError("Error while shutting down connection: %s" % err)
365

    
366

    
367
class HttpSslParams(object):
368
  """Data class for SSL key and certificate.
369

370
  """
371
  def __init__(self, ssl_key_path, ssl_cert_path):
372
    """Initializes this class.
373

374
    @type ssl_key_path: string
375
    @param ssl_key_path: Path to file containing SSL key in PEM format
376
    @type ssl_cert_path: string
377
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
378

379
    """
380
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
381
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
382

    
383
  def GetKey(self):
384
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
385
                                          self.ssl_key_pem)
386

    
387
  def GetCertificate(self):
388
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
389
                                           self.ssl_cert_pem)
390

    
391

    
392
class HttpSocketBase(object):
393
  """Base class for HTTP server and client.
394

395
  """
396
  def __init__(self):
397
    self._using_ssl = None
398
    self._ssl_params = None
399
    self._ssl_key = None
400
    self._ssl_cert = None
401

    
402
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
403
    """Creates a TCP socket and initializes SSL if needed.
404

405
    @type ssl_params: HttpSslParams
406
    @param ssl_params: SSL key and certificate
407
    @type ssl_verify_peer: bool
408
    @param ssl_verify_peer: Whether to require client certificate and compare
409
                            it with our certificate
410

411
    """
412
    self._ssl_params = ssl_params
413

    
414
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
415

    
416
    # Should we enable SSL?
417
    self._using_ssl = ssl_params is not None
418

    
419
    if not self._using_ssl:
420
      return sock
421

    
422
    self._ssl_key = ssl_params.GetKey()
423
    self._ssl_cert = ssl_params.GetCertificate()
424

    
425
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
426
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
427

    
428
    ctx.use_privatekey(self._ssl_key)
429
    ctx.use_certificate(self._ssl_cert)
430
    ctx.check_privatekey()
431

    
432
    if ssl_verify_peer:
433
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
434
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
435
                     self._SSLVerifyCallback)
436

    
437
    return OpenSSL.SSL.Connection(ctx, sock)
438

    
439
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
440
    """Verify the certificate provided by the peer
441

442
    We only compare fingerprints. The client must use the same certificate as
443
    we do on our side.
444

445
    """
446
    assert self._ssl_params, "SSL not initialized"
447

    
448
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
449
            self._ssl_cert.digest("md5") == cert.digest("md5"))
450

    
451

    
452
class HttpServerRequestExecutor(object):
453
  """Implements server side of HTTP
454

455
  This class implements the server side of HTTP. It's based on code of Python's
456
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
457
  character encodings. Keep-alive connections are not supported.
458

459
  """
460
  # The default request version.  This only affects responses up until
461
  # the point where the request line is parsed, so it mainly decides what
462
  # the client gets back when sending a malformed request line.
463
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
464
  default_request_version = HTTP_0_9
465

    
466
  # Error message settings
467
  error_message_format = DEFAULT_ERROR_MESSAGE
468
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
469

    
470
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
471

    
472
  def __init__(self, server, conn, client_addr, fileio_class):
473
    """Initializes this class.
474

475
    Part of the initialization is reading the request and eventual POST/PUT
476
    data sent by the client.
477

478
    """
479
    self._server = server
480

    
481
    # We default rfile to buffered because otherwise it could be
482
    # really slow for large data (a getc() call per byte); we make
483
    # wfile unbuffered because (a) often after a write() we want to
484
    # read and we need to flush the line; (b) big writes to unbuffered
485
    # files are typically optimized by stdio even when big reads
486
    # aren't.
487
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
488
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
489

    
490
    self.client_addr = client_addr
491

    
492
    self.request_headers = None
493
    self.request_method = None
494
    self.request_path = None
495
    self.request_requestline = None
496
    self.request_version = self.default_request_version
497

    
498
    self.response_body = None
499
    self.response_code = HTTP_OK
500
    self.response_content_type = None
501
    self.response_headers = {}
502

    
503
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
504
    try:
505
      try:
506
        try:
507
          try:
508
            # Read, parse and handle request
509
            self._ReadRequest()
510
            self._ReadPostData()
511
            self._HandleRequest()
512
          except HttpException, err:
513
            self._SetErrorStatus(err)
514
        finally:
515
          # Try to send a response
516
          self._SendResponse()
517
          self._Close()
518
      except SocketClosed:
519
        pass
520
    finally:
521
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
522

    
523
  def _Close(self):
524
    if not self.wfile.closed:
525
      self.wfile.flush()
526
    self.wfile.close()
527
    self.rfile.close()
528

    
529
  def _DateTimeHeader(self):
530
    """Return the current date and time formatted for a message header.
531

532
    """
533
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
534
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
535
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
536

    
537
  def _SetErrorStatus(self, err):
538
    """Sets the response code and body from a HttpException.
539

540
    @type err: HttpException
541
    @param err: Exception instance
542

543
    """
544
    try:
545
      (shortmsg, longmsg) = self.responses[err.code]
546
    except KeyError:
547
      shortmsg = longmsg = "Unknown"
548

    
549
    if err.message:
550
      message = err.message
551
    else:
552
      message = shortmsg
553

    
554
    values = {
555
      "code": err.code,
556
      "message": cgi.escape(message),
557
      "explain": longmsg,
558
      }
559

    
560
    self.response_code = err.code
561
    self.response_content_type = self.error_content_type
562
    self.response_body = self.error_message_format % values
563

    
564
  def _HandleRequest(self):
565
    """Handle the actual request.
566

567
    Calls the actual handler function and converts exceptions into HTTP errors.
568

569
    """
570
    # Don't do anything if there's already been a problem
571
    if self.response_code != HTTP_OK:
572
      return
573

    
574
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
575

    
576
    # Check whether client is still there
577
    self.rfile.read(0)
578

    
579
    try:
580
      try:
581
        result = self._server.HandleRequest(self)
582

    
583
        # TODO: Content-type
584
        encoder = HttpJsonConverter()
585
        body = encoder.Encode(result)
586

    
587
        self.response_content_type = encoder.CONTENT_TYPE
588
        self.response_body = body
589
      except (HttpException, KeyboardInterrupt, SystemExit):
590
        raise
591
      except Exception, err:
592
        logging.exception("Caught exception")
593
        raise HttpInternalError(message=str(err))
594
      except:
595
        logging.exception("Unknown exception")
596
        raise HttpInternalError(message="Unknown error")
597

    
598
    except HttpException, err:
599
      self._SetErrorStatus(err)
600

    
601
  def _SendResponse(self):
602
    """Sends response to the client.
603

604
    """
605
    # Check whether client is still there
606
    self.rfile.read(0)
607

    
608
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
609
                 self.request_requestline, self.response_code)
610

    
611
    if self.response_code in self.responses:
612
      response_message = self.responses[self.response_code][0]
613
    else:
614
      response_message = ""
615

    
616
    if self.request_version != HTTP_0_9:
617
      self.wfile.write("%s %d %s\r\n" %
618
                       (self.request_version, self.response_code,
619
                        response_message))
620
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
621
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
622
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
623
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
624
      for key, val in self.response_headers.iteritems():
625
        self._SendHeader(key, val)
626

    
627
      # We don't support keep-alive at this time
628
      self._SendHeader(HTTP_CONNECTION, "close")
629
      self.wfile.write("\r\n")
630

    
631
    if (self.request_method != HTTP_HEAD and
632
        self.response_code >= HTTP_OK and
633
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
634
      self.wfile.write(self.response_body)
635

    
636
  def _SendHeader(self, name, value):
637
    if self.request_version != HTTP_0_9:
638
      self.wfile.write("%s: %s\r\n" % (name, value))
639

    
640
  def _ReadRequest(self):
641
    """Reads and parses request line
642

643
    """
644
    raw_requestline = self.rfile.readline()
645

    
646
    requestline = raw_requestline
647
    if requestline[-2:] == '\r\n':
648
      requestline = requestline[:-2]
649
    elif requestline[-1:] == '\n':
650
      requestline = requestline[:-1]
651

    
652
    if not requestline:
653
      raise HttpBadRequest("Empty request line")
654

    
655
    self.request_requestline = requestline
656

    
657
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
658

    
659
    words = requestline.split()
660

    
661
    if len(words) == 3:
662
      [method, path, version] = words
663
      if version[:5] != 'HTTP/':
664
        raise HttpBadRequest("Bad request version (%r)" % version)
665

    
666
      try:
667
        base_version_number = version.split('/', 1)[1]
668
        version_number = base_version_number.split(".")
669

    
670
        # RFC 2145 section 3.1 says there can be only one "." and
671
        #   - major and minor numbers MUST be treated as
672
        #      separate integers;
673
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
674
        #      turn is lower than HTTP/12.3;
675
        #   - Leading zeros MUST be ignored by recipients.
676
        if len(version_number) != 2:
677
          raise HttpBadRequest("Bad request version (%r)" % version)
678

    
679
        version_number = int(version_number[0]), int(version_number[1])
680
      except (ValueError, IndexError):
681
        raise HttpBadRequest("Bad request version (%r)" % version)
682

    
683
      if version_number >= (2, 0):
684
        raise HttpVersionNotSupported("Invalid HTTP Version (%s)" %
685
                                      base_version_number)
686

    
687
    elif len(words) == 2:
688
      version = HTTP_0_9
689
      [method, path] = words
690
      if method != HTTP_GET:
691
        raise HttpBadRequest("Bad HTTP/0.9 request type (%r)" % method)
692

    
693
    else:
694
      raise HttpBadRequest("Bad request syntax (%r)" % requestline)
695

    
696
    # Examine the headers and look for a Connection directive
697
    headers = mimetools.Message(self.rfile, 0)
698

    
699
    self.request_method = method
700
    self.request_path = path
701
    self.request_version = version
702
    self.request_headers = headers
703

    
704
  def _ReadPostData(self):
705
    """Reads POST/PUT data
706

707
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
708
    a request is signaled by the inclusion of a Content-Length header field in
709
    the request message headers. HTTP/1.0 requests containing an entity body
710
    must include a valid Content-Length header field."
711

712
    """
713
    # While not according to specification, we only support an entity body for
714
    # POST and PUT.
715
    if (not self.request_method or
716
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
717
      self.request_post_data = None
718
      return
719

    
720
    content_length = None
721
    try:
722
      if HTTP_CONTENT_LENGTH in self.request_headers:
723
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
724
    except TypeError:
725
      pass
726
    except ValueError:
727
      pass
728

    
729
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
730
    if content_length is None:
731
      raise HttpLengthRequired("Missing Content-Length header or"
732
                               " invalid format")
733

    
734
    data = self.rfile.read(content_length)
735

    
736
    # TODO: Content-type, error handling
737
    if data:
738
      self.request_post_data = HttpJsonConverter().Decode(data)
739
    else:
740
      self.request_post_data = None
741

    
742
    logging.debug("HTTP POST data: %s", self.request_post_data)
743

    
744

    
745
class HttpServer(HttpSocketBase):
746
  """Generic HTTP server class
747

748
  Users of this class must subclass it and override the HandleRequest function.
749

750
  """
751
  MAX_CHILDREN = 20
752

    
753
  def __init__(self, mainloop, local_address, port,
754
               ssl_params=None, ssl_verify_peer=False):
755
    """Initializes the HTTP server
756

757
    @type mainloop: ganeti.daemon.Mainloop
758
    @param mainloop: Mainloop used to poll for I/O events
759
    @type local_addess: string
760
    @param local_address: Local IP address to bind to
761
    @type port: int
762
    @param port: TCP port to listen on
763
    @type ssl_params: HttpSslParams
764
    @param ssl_params: SSL key and certificate
765
    @type ssl_verify_peer: bool
766
    @param ssl_verify_peer: Whether to require client certificate and compare
767
                            it with our certificate
768

769
    """
770
    HttpSocketBase.__init__(self)
771

    
772
    self.mainloop = mainloop
773
    self.local_address = local_address
774
    self.port = port
775

    
776
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
777

    
778
    # Allow port to be reused
779
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
780

    
781
    if self._using_ssl:
782
      self._fileio_class = _SSLFileObject
783
    else:
784
      self._fileio_class = socket._fileobject
785

    
786
    self._children = []
787

    
788
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
789
    mainloop.RegisterSignal(self)
790

    
791
  def Start(self):
792
    self.socket.bind((self.local_address, self.port))
793
    self.socket.listen(5)
794

    
795
  def Stop(self):
796
    self.socket.close()
797

    
798
  def OnIO(self, fd, condition):
799
    if condition & select.POLLIN:
800
      self._IncomingConnection()
801

    
802
  def OnSignal(self, signum):
803
    if signum == signal.SIGCHLD:
804
      self._CollectChildren(True)
805

    
806
  def _CollectChildren(self, quick):
807
    """Checks whether any child processes are done
808

809
    @type quick: bool
810
    @param quick: Whether to only use non-blocking functions
811

812
    """
813
    if not quick:
814
      # Don't wait for other processes if it should be a quick check
815
      while len(self._children) > self.MAX_CHILDREN:
816
        try:
817
          # Waiting without a timeout brings us into a potential DoS situation.
818
          # As soon as too many children run, we'll not respond to new
819
          # requests. The real solution would be to add a timeout for children
820
          # and killing them after some time.
821
          pid, status = os.waitpid(0, 0)
822
        except os.error:
823
          pid = None
824
        if pid and pid in self._children:
825
          self._children.remove(pid)
826

    
827
    for child in self._children:
828
      try:
829
        pid, status = os.waitpid(child, os.WNOHANG)
830
      except os.error:
831
        pid = None
832
      if pid and pid in self._children:
833
        self._children.remove(pid)
834

    
835
  def _IncomingConnection(self):
836
    """Called for each incoming connection
837

838
    """
839
    (connection, client_addr) = self.socket.accept()
840

    
841
    self._CollectChildren(False)
842

    
843
    pid = os.fork()
844
    if pid == 0:
845
      # Child process
846
      try:
847
        HttpServerRequestExecutor(self, connection, client_addr,
848
                                  self._fileio_class)
849
      except:
850
        logging.exception("Error while handling request from %s:%s",
851
                          client_addr[0], client_addr[1])
852
        os._exit(1)
853
      os._exit(0)
854
    else:
855
      self._children.append(pid)
856

    
857
  def HandleRequest(self, req):
858
    raise NotImplementedError()
859

    
860

    
861
class HttpClientRequest(object):
862
  def __init__(self, host, port, method, path, headers=None, post_data=None,
863
               ssl_params=None, ssl_verify_peer=False):
864
    """Describes an HTTP request.
865

866
    @type host: string
867
    @param host: Hostname
868
    @type port: int
869
    @param port: Port
870
    @type method: string
871
    @param method: Method name
872
    @type path: string
873
    @param path: Request path
874
    @type headers: dict or None
875
    @param headers: Additional headers to send
876
    @type post_data: string or None
877
    @param post_data: Additional data to send
878
    @type ssl_params: HttpSslParams
879
    @param ssl_params: SSL key and certificate
880
    @type ssl_verify_peer: bool
881
    @param ssl_verify_peer: Whether to compare our certificate with server's
882
                            certificate
883

884
    """
885
    if post_data is not None:
886
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
887
        "Only POST and GET requests support sending data"
888

    
889
    assert path.startswith("/"), "Path must start with slash (/)"
890

    
891
    self.host = host
892
    self.port = port
893
    self.ssl_params = ssl_params
894
    self.ssl_verify_peer = ssl_verify_peer
895
    self.method = method
896
    self.path = path
897
    self.headers = headers
898
    self.post_data = post_data
899

    
900
    self.success = None
901
    self.error = None
902

    
903
    self.resp_status_line = None
904
    self.resp_version = None
905
    self.resp_status = None
906
    self.resp_reason = None
907
    self.resp_headers = None
908
    self.resp_body = None
909

    
910

    
911
class HttpClientRequestExecutor(HttpSocketBase):
912
  # Default headers
913
  DEFAULT_HEADERS = {
914
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
915
    # TODO: For keep-alive, don't send "Connection: close"
916
    HTTP_CONNECTION: "close",
917
    }
918

    
919
  # Length limits
920
  STATUS_LINE_LENGTH_MAX = 512
921
  HEADER_LENGTH_MAX = 4 * 1024
922

    
923
  # Timeouts in seconds for socket layer
924
  # TODO: Make read timeout configurable per OpCode
925
  CONNECT_TIMEOUT = 5.0
926
  WRITE_TIMEOUT = 10
927
  READ_TIMEOUT = None
928
  CLOSE_TIMEOUT = 1
929

    
930
  # Parser state machine
931
  PS_STATUS_LINE = "status-line"
932
  PS_HEADERS = "headers"
933
  PS_BODY = "body"
934
  PS_COMPLETE = "complete"
935

    
936
  def __init__(self, req):
937
    """Initializes the HttpClientRequestExecutor class.
938

939
    @type req: HttpClientRequest
940
    @param req: Request object
941

942
    """
943
    HttpSocketBase.__init__(self)
944

    
945
    self.request = req
946

    
947
    self.parser_status = self.PS_STATUS_LINE
948
    self.header_buffer = StringIO()
949
    self.body_buffer = StringIO()
950
    self.content_length = None
951
    self.server_will_close = None
952

    
953
    self.poller = select.poll()
954

    
955
    try:
956
      # TODO: Implement connection caching/keep-alive
957
      self.sock = self._CreateSocket(req.ssl_params,
958
                                     req.ssl_verify_peer)
959

    
960
      # Disable Python's timeout
961
      self.sock.settimeout(None)
962

    
963
      # Operate in non-blocking mode
964
      self.sock.setblocking(0)
965

    
966
      force_close = True
967
      self._Connect()
968
      try:
969
        self._SendRequest()
970
        self._ReadResponse()
971

    
972
        # Only wait for server to close if we didn't have any exception.
973
        force_close = False
974
      finally:
975
        self._CloseConnection(force_close)
976

    
977
      self.sock.close()
978
      self.sock = None
979

    
980
      req.resp_body = self.body_buffer.getvalue()
981

    
982
      req.success = True
983
      req.error = None
984

    
985
    except _HttpClientError, err:
986
      req.success = False
987
      req.error = str(err)
988

    
989
  def _BuildRequest(self):
990
    """Build HTTP request.
991

992
    @rtype: string
993
    @return: Complete request
994

995
    """
996
    # Headers
997
    send_headers = self.DEFAULT_HEADERS.copy()
998

    
999
    if self.request.headers:
1000
      send_headers.update(self.request.headers)
1001

    
1002
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
1003

    
1004
    if self.request.post_data:
1005
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
1006

    
1007
    buf = StringIO()
1008

    
1009
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
1010
    # keep-alive).
1011
    # TODO: For keep-alive, change to HTTP/1.1
1012
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
1013
                                self.request.path, HTTP_1_0))
1014

    
1015
    # Add headers
1016
    for name, value in send_headers.iteritems():
1017
      buf.write("%s: %s\r\n" % (name, value))
1018

    
1019
    buf.write("\r\n")
1020

    
1021
    if self.request.post_data:
1022
      buf.write(self.request.post_data)
1023

    
1024
    return buf.getvalue()
1025

    
1026
  def _ParseStatusLine(self):
1027
    """Parses the status line sent by the server.
1028

1029
    """
1030
    line = self.request.resp_status_line
1031

    
1032
    if not line:
1033
      raise _HttpClientError("Empty status line")
1034

    
1035
    try:
1036
      [version, status, reason] = line.split(None, 2)
1037
    except ValueError:
1038
      try:
1039
        [version, status] = line.split(None, 1)
1040
        reason = ""
1041
      except ValueError:
1042
        version = HTTP_9_0
1043

    
1044
    if version:
1045
      version = version.upper()
1046

    
1047
    if version not in (HTTP_1_0, HTTP_1_1):
1048
      # We do not support HTTP/0.9, despite the specification requiring it
1049
      # (RFC2616, section 19.6)
1050
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1051
                             line)
1052

    
1053
    # The status code is a three-digit number
1054
    try:
1055
      status = int(status)
1056
      if status < 100 or status > 999:
1057
        status = -1
1058
    except ValueError:
1059
      status = -1
1060

    
1061
    if status == -1:
1062
      raise _HttpClientError("Invalid status code (%r)" % line)
1063

    
1064
    self.request.resp_version = version
1065
    self.request.resp_status = status
1066
    self.request.resp_reason = reason
1067

    
1068
  def _WillServerCloseConnection(self):
1069
    """Evaluate whether server will close the connection.
1070

1071
    @rtype: bool
1072
    @return: Whether server will close the connection
1073

1074
    """
1075
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1076
    if hdr_connection:
1077
      hdr_connection = hdr_connection.lower()
1078

    
1079
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1080
    if self.request.resp_version == HTTP_1_1:
1081
      return (hdr_connection and "close" in hdr_connection)
1082

    
1083
    # Some HTTP/1.0 implementations have support for persistent connections,
1084
    # using rules different than HTTP/1.1.
1085

    
1086
    # For older HTTP, Keep-Alive indicates persistent connection.
1087
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1088
      return False
1089

    
1090
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1091
    # supposed to be sent by the client.
1092
    if hdr_connection and "keep-alive" in hdr_connection:
1093
      return False
1094

    
1095
    return True
1096

    
1097
  def _ParseHeaders(self):
1098
    """Parses the headers sent by the server.
1099

1100
    This function also adjusts internal variables based on the header values.
1101

1102
    """
1103
    req = self.request
1104

    
1105
    # Parse headers
1106
    self.header_buffer.seek(0, 0)
1107
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1108

    
1109
    self.server_will_close = self._WillServerCloseConnection()
1110

    
1111
    # Do we have a Content-Length header?
1112
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1113
    if hdr_content_length:
1114
      try:
1115
        self.content_length = int(hdr_content_length)
1116
      except ValueError:
1117
        pass
1118
      if self.content_length is not None and self.content_length < 0:
1119
        self.content_length = None
1120

    
1121
    # does the body have a fixed length? (of zero)
1122
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1123
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1124
      self.content_length = 0
1125

    
1126
    # if the connection remains open and a content-length was not provided,
1127
    # then assume that the connection WILL close.
1128
    if self.content_length is None:
1129
      self.server_will_close = True
1130

    
1131
  def _CheckStatusLineLength(self, length):
1132
    if length > self.STATUS_LINE_LENGTH_MAX:
1133
      raise _HttpClientError("Status line longer than %d chars" %
1134
                             self.STATUS_LINE_LENGTH_MAX)
1135

    
1136
  def _CheckHeaderLength(self, length):
1137
    if length > self.HEADER_LENGTH_MAX:
1138
      raise _HttpClientError("Headers longer than %d chars" %
1139
                             self.HEADER_LENGTH_MAX)
1140

    
1141
  def _ParseBuffer(self, buf, eof):
1142
    """Main function for HTTP response state machine.
1143

1144
    @type buf: string
1145
    @param buf: Receive buffer
1146
    @type eof: bool
1147
    @param eof: Whether we've reached EOF on the socket
1148
    @rtype: string
1149
    @return: Updated receive buffer
1150

1151
    """
1152
    if self.parser_status == self.PS_STATUS_LINE:
1153
      # Expect status line
1154
      idx = buf.find("\r\n")
1155
      if idx >= 0:
1156
        self.request.resp_status_line = buf[:idx]
1157

    
1158
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1159

    
1160
        # Remove status line, including CRLF
1161
        buf = buf[idx + 2:]
1162

    
1163
        self._ParseStatusLine()
1164

    
1165
        self.parser_status = self.PS_HEADERS
1166
      else:
1167
        # Check whether incoming data is getting too large, otherwise we just
1168
        # fill our read buffer.
1169
        self._CheckStatusLineLength(len(buf))
1170

    
1171
    if self.parser_status == self.PS_HEADERS:
1172
      # Wait for header end
1173
      idx = buf.find("\r\n\r\n")
1174
      if idx >= 0:
1175
        self.header_buffer.write(buf[:idx + 2])
1176

    
1177
        self._CheckHeaderLength(self.header_buffer.tell())
1178

    
1179
        # Remove headers, including CRLF
1180
        buf = buf[idx + 4:]
1181

    
1182
        self._ParseHeaders()
1183

    
1184
        self.parser_status = self.PS_BODY
1185
      else:
1186
        # Check whether incoming data is getting too large, otherwise we just
1187
        # fill our read buffer.
1188
        self._CheckHeaderLength(len(buf))
1189

    
1190
    if self.parser_status == self.PS_BODY:
1191
      self.body_buffer.write(buf)
1192
      buf = ""
1193

    
1194
      # Check whether we've read everything
1195
      if (eof or
1196
          (self.content_length is not None and
1197
           self.body_buffer.tell() >= self.content_length)):
1198
        self.parser_status = self.PS_COMPLETE
1199

    
1200
    return buf
1201

    
1202
  def _Connect(self):
1203
    """Non-blocking connect to host with timeout.
1204

1205
    """
1206
    connected = False
1207
    while True:
1208
      try:
1209
        connect_error = self.sock.connect_ex((self.request.host,
1210
                                              self.request.port))
1211
      except socket.gaierror, err:
1212
        raise _HttpClientError("Connection failed: %s" % str(err))
1213

    
1214
      if connect_error == errno.EINTR:
1215
        # Mask signals
1216
        pass
1217

    
1218
      elif connect_error == 0:
1219
        # Connection established
1220
        connected = True
1221
        break
1222

    
1223
      elif connect_error == errno.EINPROGRESS:
1224
        # Connection started
1225
        break
1226

    
1227
      raise _HttpClientError("Connection failed (%s: %s)" %
1228
                             (connect_error, os.strerror(connect_error)))
1229

    
1230
    if not connected:
1231
      # Wait for connection
1232
      event = WaitForSocketCondition(self.poller, self.sock,
1233
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1234
      if event is None:
1235
        raise _HttpClientError("Timeout while connecting to server")
1236

    
1237
      # Get error code
1238
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1239
      if connect_error != 0:
1240
        raise _HttpClientError("Connection failed (%s: %s)" %
1241
                               (connect_error, os.strerror(connect_error)))
1242

    
1243
    # Enable TCP keep-alive
1244
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1245

    
1246
    # If needed, Linux specific options are available to change the TCP
1247
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1248
    # TCP_KEEPINTVL.
1249

    
1250
  def _SendRequest(self):
1251
    """Sends request to server.
1252

1253
    """
1254
    buf = self._BuildRequest()
1255

    
1256
    while buf:
1257
      # Send only 4 KB at a time
1258
      data = buf[:4096]
1259

    
1260
      try:
1261
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1262
                               self.WRITE_TIMEOUT)
1263
      except HttpSocketTimeout:
1264
        raise _HttpClientError("Timeout while sending request")
1265
      except socket.error, err:
1266
        raise _HttpClientError("Error sending request: %s" % err)
1267

    
1268
      # Remove sent bytes
1269
      buf = buf[sent:]
1270

    
1271
    assert not buf, "Request wasn't sent completely"
1272

    
1273
  def _ReadResponse(self):
1274
    """Read response from server.
1275

1276
    Calls the parser function after reading a chunk of data.
1277

1278
    """
1279
    buf = ""
1280
    eof = False
1281
    while self.parser_status != self.PS_COMPLETE:
1282
      try:
1283
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1284
                               self.READ_TIMEOUT)
1285
      except HttpSocketTimeout:
1286
        raise _HttpClientError("Timeout while reading response")
1287
      except socket.error, err:
1288
        raise _HttpClientError("Error while reading response: %s" % err)
1289

    
1290
      if data:
1291
        buf += data
1292
      else:
1293
        eof = True
1294

    
1295
      # Do some parsing and error checking while more data arrives
1296
      buf = self._ParseBuffer(buf, eof)
1297

    
1298
      # Must be done only after the buffer has been evaluated
1299
      if (eof and
1300
          self.parser_status in (self.PS_STATUS_LINE,
1301
                                 self.PS_HEADERS)):
1302
        raise _HttpClientError("Connection closed prematurely")
1303

    
1304
    # Parse rest
1305
    buf = self._ParseBuffer(buf, True)
1306

    
1307
    assert self.parser_status == self.PS_COMPLETE
1308
    assert not buf, "Parser didn't read full response"
1309

    
1310
  def _CloseConnection(self, force):
1311
    """Closes the connection.
1312

1313
    """
1314
    if self.server_will_close and not force:
1315
      # Wait for server to close
1316
      try:
1317
        # Check whether it's actually closed
1318
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1319
                               self.CLOSE_TIMEOUT):
1320
          return
1321
      except (socket.error, _HttpClientError, HttpSocketTimeout):
1322
        # Ignore errors at this stage
1323
        pass
1324

    
1325
    # Close the connection from our side
1326
    try:
1327
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1328
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1329
    except HttpSocketTimeout:
1330
      raise _HttpClientError("Timeout while shutting down connection")
1331
    except socket.error, err:
1332
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1333

    
1334

    
1335
class _HttpClientPendingRequest(object):
1336
  """Data class for pending requests.
1337

1338
  """
1339
  def __init__(self, request):
1340
    self.request = request
1341

    
1342
    # Thread synchronization
1343
    self.done = threading.Event()
1344

    
1345

    
1346
class HttpClientWorker(workerpool.BaseWorker):
1347
  """HTTP client worker class.
1348

1349
  """
1350
  def RunTask(self, pend_req):
1351
    try:
1352
      HttpClientRequestExecutor(pend_req.request)
1353
    finally:
1354
      pend_req.done.set()
1355

    
1356

    
1357
class HttpClientWorkerPool(workerpool.WorkerPool):
1358
  def __init__(self, manager):
1359
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1360
                                   HttpClientWorker)
1361
    self.manager = manager
1362

    
1363

    
1364
class HttpClientManager(object):
1365
  """Manages HTTP requests.
1366

1367
  """
1368
  def __init__(self):
1369
    self._wpool = HttpClientWorkerPool(self)
1370

    
1371
  def __del__(self):
1372
    self.Shutdown()
1373

    
1374
  def ExecRequests(self, requests):
1375
    """Execute HTTP requests.
1376

1377
    This function can be called from multiple threads at the same time.
1378

1379
    @type requests: List of HttpClientRequest instances
1380
    @param requests: The requests to execute
1381
    @rtype: List of HttpClientRequest instances
1382
    @returns: The list of requests passed in
1383

1384
    """
1385
    # _HttpClientPendingRequest is used for internal thread synchronization
1386
    pending = [_HttpClientPendingRequest(req) for req in requests]
1387

    
1388
    try:
1389
      # Add requests to queue
1390
      for pend_req in pending:
1391
        self._wpool.AddTask(pend_req)
1392

    
1393
    finally:
1394
      # In case of an exception we should still wait for the rest, otherwise
1395
      # another thread from the worker pool could modify the request object
1396
      # after we returned.
1397

    
1398
      # And wait for them to finish
1399
      for pend_req in pending:
1400
        pend_req.done.wait()
1401

    
1402
    # Return original list
1403
    return requests
1404

    
1405
  def Shutdown(self):
1406
    self._wpool.Quiesce()
1407
    self._wpool.TerminateWorkers()
1408

    
1409

    
1410
class _SSLFileObject(object):
1411
  """Wrapper around socket._fileobject
1412

1413
  This wrapper is required to handle OpenSSL exceptions.
1414

1415
  """
1416
  def _RequireOpenSocket(fn):
1417
    def wrapper(self, *args, **kwargs):
1418
      if self.closed:
1419
        raise SocketClosed("Socket is closed")
1420
      return fn(self, *args, **kwargs)
1421
    return wrapper
1422

    
1423
  def __init__(self, sock, mode='rb', bufsize=-1):
1424
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1425

    
1426
  def _ConnectionLost(self):
1427
    self._base = None
1428

    
1429
  def _getclosed(self):
1430
    return self._base is None or self._base.closed
1431
  closed = property(_getclosed, doc="True if the file is closed")
1432

    
1433
  @_RequireOpenSocket
1434
  def close(self):
1435
    return self._base.close()
1436

    
1437
  @_RequireOpenSocket
1438
  def flush(self):
1439
    return self._base.flush()
1440

    
1441
  @_RequireOpenSocket
1442
  def fileno(self):
1443
    return self._base.fileno()
1444

    
1445
  @_RequireOpenSocket
1446
  def read(self, size=-1):
1447
    return self._ReadWrapper(self._base.read, size=size)
1448

    
1449
  @_RequireOpenSocket
1450
  def readline(self, size=-1):
1451
    return self._ReadWrapper(self._base.readline, size=size)
1452

    
1453
  def _ReadWrapper(self, fn, *args, **kwargs):
1454
    while True:
1455
      try:
1456
        return fn(*args, **kwargs)
1457

    
1458
      except OpenSSL.SSL.ZeroReturnError, err:
1459
        self._ConnectionLost()
1460
        return ""
1461

    
1462
      except OpenSSL.SSL.WantReadError:
1463
        continue
1464

    
1465
      #except OpenSSL.SSL.WantWriteError:
1466
      # TODO
1467

    
1468
      except OpenSSL.SSL.SysCallError, (retval, desc):
1469
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1470
            or retval > 0):
1471
          self._ConnectionLost()
1472
          return ""
1473

    
1474
        logging.exception("Error in OpenSSL")
1475
        self._ConnectionLost()
1476
        raise socket.error(err.args)
1477

    
1478
      except OpenSSL.SSL.Error, err:
1479
        self._ConnectionLost()
1480
        raise socket.error(err.args)
1481

    
1482
  @_RequireOpenSocket
1483
  def write(self, data):
1484
    return self._WriteWrapper(self._base.write, data)
1485

    
1486
  def _WriteWrapper(self, fn, *args, **kwargs):
1487
    while True:
1488
      try:
1489
        return fn(*args, **kwargs)
1490
      except OpenSSL.SSL.ZeroReturnError, err:
1491
        self._ConnectionLost()
1492
        return 0
1493

    
1494
      except OpenSSL.SSL.WantWriteError:
1495
        continue
1496

    
1497
      #except OpenSSL.SSL.WantReadError:
1498
      # TODO
1499

    
1500
      except OpenSSL.SSL.SysCallError, err:
1501
        if err.args[0] == -1 and data == "":
1502
          # errors when writing empty strings are expected
1503
          # and can be ignored
1504
          return 0
1505

    
1506
        self._ConnectionLost()
1507
        raise socket.error(err.args)
1508

    
1509
      except OpenSSL.SSL.Error, err:
1510
        self._ConnectionLost()
1511
        raise socket.error(err.args)
1512

    
1513

    
1514
class HttpMessage(object):
1515
  """Data structure for HTTP message.
1516

1517
  """
1518
  def __init__(self):
1519
    self.start_line = None
1520
    self.headers = None
1521
    self.body = None
1522
    self.decoded_body = None
1523

    
1524

    
1525
class HttpClientToServerStartLine(object):
1526
  """Data structure for HTTP request start line.
1527

1528
  """
1529
  def __init__(self, method, path, version):
1530
    self.method = method
1531
    self.path = path
1532
    self.version = version
1533

    
1534
  def __str__(self):
1535
    return "%s %s %s" % (self.method, self.path, self.version)
1536

    
1537

    
1538
class HttpServerToClientStartLine(object):
1539
  """Data structure for HTTP response start line.
1540

1541
  """
1542
  def __init__(self, version, code, reason):
1543
    self.version = version
1544
    self.code = code
1545
    self.reason = reason
1546

    
1547
  def __str__(self):
1548
    return "%s %s %s" % (self.version, self.code, self.reason)
1549

    
1550

    
1551
class HttpMessageWriter(object):
1552
  """Writes an HTTP message to a socket.
1553

1554
  """
1555
  def __init__(self, sock, msg, write_timeout):
1556
    self._msg = msg
1557

    
1558
    self._PrepareMessage()
1559

    
1560
    buf = self._FormatMessage()
1561

    
1562
    poller = select.poll()
1563
    while buf:
1564
      # Send only 4 KB at a time
1565
      data = buf[:4096]
1566

    
1567
      sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
1568
                             write_timeout)
1569

    
1570
      # Remove sent bytes
1571
      buf = buf[sent:]
1572

    
1573
    assert not buf, "Message wasn't sent completely"
1574

    
1575
  def _PrepareMessage(self):
1576
    """Prepares the HTTP message by setting mandatory headers.
1577

1578
    """
1579
    # RFC2616, section 4.3: "The presence of a message-body in a request is
1580
    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
1581
    # field in the request's message-headers."
1582
    if self._msg.body:
1583
      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
1584

    
1585
  def _FormatMessage(self):
1586
    """Serializes the HTTP message into a string.
1587

1588
    """
1589
    buf = StringIO()
1590

    
1591
    # Add start line
1592
    buf.write(str(self._msg.start_line))
1593
    buf.write("\r\n")
1594

    
1595
    # Add headers
1596
    if self._msg.start_line.version != HTTP_0_9:
1597
      for name, value in self._msg.headers.iteritems():
1598
        buf.write("%s: %s\r\n" % (name, value))
1599

    
1600
    buf.write("\r\n")
1601

    
1602
    # Add message body if needed
1603
    if self.HasMessageBody():
1604
      buf.write(self._msg.body)
1605

    
1606
    elif self._msg.body:
1607
      logging.warning("Ignoring message body")
1608

    
1609
    return buf.getvalue()
1610

    
1611
  def HasMessageBody(self):
1612
    """Checks whether the HTTP message contains a body.
1613

1614
    Can be overriden by subclasses.
1615

1616
    """
1617
    return bool(self._msg.body)
1618

    
1619

    
1620
class HttpMessageReader(object):
1621
  """Reads HTTP message from socket.
1622

1623
  """
1624
  # Length limits
1625
  START_LINE_LENGTH_MAX = None
1626
  HEADER_LENGTH_MAX = None
1627

    
1628
  # Parser state machine
1629
  PS_START_LINE = "start-line"
1630
  PS_HEADERS = "headers"
1631
  PS_BODY = "entity-body"
1632
  PS_COMPLETE = "complete"
1633

    
1634
  def __init__(self, sock, msg, read_timeout):
1635
    self.sock = sock
1636
    self.msg = msg
1637

    
1638
    self.poller = select.poll()
1639
    self.start_line_buffer = None
1640
    self.header_buffer = StringIO()
1641
    self.body_buffer = StringIO()
1642
    self.parser_status = self.PS_START_LINE
1643
    self.content_length = None
1644
    self.peer_will_close = None
1645

    
1646
    buf = ""
1647
    eof = False
1648
    while self.parser_status != self.PS_COMPLETE:
1649
      data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096,
1650
                             read_timeout)
1651

    
1652
      if data:
1653
        buf += data
1654
      else:
1655
        eof = True
1656

    
1657
      # Do some parsing and error checking while more data arrives
1658
      buf = self._ContinueParsing(buf, eof)
1659

    
1660
      # Must be done only after the buffer has been evaluated
1661
      # TODO: Connection-length < len(data read) and connection closed
1662
      if (eof and
1663
          self.parser_status in (self.PS_START_LINE,
1664
                                 self.PS_HEADERS)):
1665
        raise HttpError("Connection closed prematurely")
1666

    
1667
    # Parse rest
1668
    buf = self._ContinueParsing(buf, True)
1669

    
1670
    assert self.parser_status == self.PS_COMPLETE
1671
    assert not buf, "Parser didn't read full response"
1672

    
1673
    msg.body = self.body_buffer.getvalue()
1674

    
1675
    # TODO: Content-type, error handling
1676
    if msg.body:
1677
      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
1678
    else:
1679
      msg.decoded_body = None
1680

    
1681
    if msg.decoded_body:
1682
      logging.debug("Message body: %s", msg.decoded_body)
1683

    
1684
  def _ContinueParsing(self, buf, eof):
1685
    """Main function for HTTP message state machine.
1686

1687
    @type buf: string
1688
    @param buf: Receive buffer
1689
    @type eof: bool
1690
    @param eof: Whether we've reached EOF on the socket
1691
    @rtype: string
1692
    @return: Updated receive buffer
1693

1694
    """
1695
    if self.parser_status == self.PS_START_LINE:
1696
      # Expect start line
1697
      while True:
1698
        idx = buf.find("\r\n")
1699

    
1700
        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
1701
        # ignore any empty line(s) received where a Request-Line is expected.
1702
        # In other words, if the server is reading the protocol stream at the
1703
        # beginning of a message and receives a CRLF first, it should ignore
1704
        # the CRLF."
1705
        if idx == 0:
1706
          # TODO: Limit number of CRLFs for safety?
1707
          buf = buf[:2]
1708
          continue
1709

    
1710
        if idx > 0:
1711
          self.start_line_buffer = buf[:idx]
1712

    
1713
          self._CheckStartLineLength(len(self.start_line_buffer))
1714

    
1715
          # Remove status line, including CRLF
1716
          buf = buf[idx + 2:]
1717

    
1718
          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
1719

    
1720
          self.parser_status = self.PS_HEADERS
1721
        else:
1722
          # Check whether incoming data is getting too large, otherwise we just
1723
          # fill our read buffer.
1724
          self._CheckStartLineLength(len(buf))
1725

    
1726
        break
1727

    
1728
    # TODO: Handle messages without headers
1729
    if self.parser_status == self.PS_HEADERS:
1730
      # Wait for header end
1731
      idx = buf.find("\r\n\r\n")
1732
      if idx >= 0:
1733
        self.header_buffer.write(buf[:idx + 2])
1734

    
1735
        self._CheckHeaderLength(self.header_buffer.tell())
1736

    
1737
        # Remove headers, including CRLF
1738
        buf = buf[idx + 4:]
1739

    
1740
        self._ParseHeaders()
1741

    
1742
        self.parser_status = self.PS_BODY
1743
      else:
1744
        # Check whether incoming data is getting too large, otherwise we just
1745
        # fill our read buffer.
1746
        self._CheckHeaderLength(len(buf))
1747

    
1748
    if self.parser_status == self.PS_BODY:
1749
      # TODO: Implement max size for body_buffer
1750
      self.body_buffer.write(buf)
1751
      buf = ""
1752

    
1753
      # Check whether we've read everything
1754
      #
1755
      # RFC2616, section 4.4: "When a message-body is included with a message,
1756
      # the transfer-length of that body is determined by one of the following
1757
      # [...] 5. By the server closing the connection. (Closing the connection
1758
      # cannot be used to indicate the end of a request body, since that would
1759
      # leave no possibility for the server to send back a response.)"
1760
      if (eof or
1761
          self.content_length is None or
1762
          (self.content_length is not None and
1763
           self.body_buffer.tell() >= self.content_length)):
1764
        self.parser_status = self.PS_COMPLETE
1765

    
1766
    return buf
1767

    
1768
  def _CheckStartLineLength(self, length):
1769
    """Limits the start line buffer size.
1770

1771
    @type length: int
1772
    @param length: Buffer size
1773

1774
    """
1775
    if (self.START_LINE_LENGTH_MAX is not None and
1776
        length > self.START_LINE_LENGTH_MAX):
1777
      raise HttpError("Start line longer than %d chars" %
1778
                       self.START_LINE_LENGTH_MAX)
1779

    
1780
  def _CheckHeaderLength(self, length):
1781
    """Limits the header buffer size.
1782

1783
    @type length: int
1784
    @param length: Buffer size
1785

1786
    """
1787
    if (self.HEADER_LENGTH_MAX is not None and
1788
        length > self.HEADER_LENGTH_MAX):
1789
      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
1790

    
1791
  def ParseStartLine(self, start_line):
1792
    """Parses the start line of a message.
1793

1794
    Must be overriden by subclass.
1795

1796
    @type start_line: string
1797
    @param start_line: Start line string
1798

1799
    """
1800
    raise NotImplementedError()
1801

    
1802
  def _WillPeerCloseConnection(self):
1803
    """Evaluate whether peer will close the connection.
1804

1805
    @rtype: bool
1806
    @return: Whether peer will close the connection
1807

1808
    """
1809
    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
1810
    # for the sender to signal that the connection will be closed after
1811
    # completion of the response. For example,
1812
    #
1813
    #        Connection: close
1814
    #
1815
    # in either the request or the response header fields indicates that the
1816
    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
1817
    # current request/response is complete."
1818

    
1819
    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
1820
    if hdr_connection:
1821
      hdr_connection = hdr_connection.lower()
1822

    
1823
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1824
    if self.msg.start_line.version == HTTP_1_1:
1825
      return (hdr_connection and "close" in hdr_connection)
1826

    
1827
    # Some HTTP/1.0 implementations have support for persistent connections,
1828
    # using rules different than HTTP/1.1.
1829

    
1830
    # For older HTTP, Keep-Alive indicates persistent connection.
1831
    if self.msg.headers.get(HTTP_KEEP_ALIVE):
1832
      return False
1833

    
1834
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1835
    # supposed to be sent by the client.
1836
    if hdr_connection and "keep-alive" in hdr_connection:
1837
      return False
1838

    
1839
    return True
1840

    
1841
  def _ParseHeaders(self):
1842
    """Parses the headers.
1843

1844
    This function also adjusts internal variables based on header values.
1845

1846
    RFC2616, section 4.3: "The presence of a message-body in a request is
1847
    signaled by the inclusion of a Content-Length or Transfer-Encoding header
1848
    field in the request's message-headers."
1849

1850
    """
1851
    # Parse headers
1852
    self.header_buffer.seek(0, 0)
1853
    self.msg.headers = mimetools.Message(self.header_buffer, 0)
1854

    
1855
    self.peer_will_close = self._WillPeerCloseConnection()
1856

    
1857
    # Do we have a Content-Length header?
1858
    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
1859
    if hdr_content_length:
1860
      try:
1861
        self.content_length = int(hdr_content_length)
1862
      except ValueError:
1863
        self.content_length = None
1864
      if self.content_length is not None and self.content_length < 0:
1865
        self.content_length = None
1866

    
1867
    # if the connection remains open and a content-length was not provided,
1868
    # then assume that the connection WILL close.
1869
    if self.content_length is None:
1870
      self.peer_will_close = True