Statistics
| Branch: | Tag: | Revision:

root / lib / http.py @ cb4e8387

History | View | Annotate | Download (36.2 kB)

1
#
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11
# General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16
# 02110-1301, USA.
17

    
18
"""HTTP server module.
19

20
"""
21

    
22
import BaseHTTPServer
23
import cgi
24
import logging
25
import mimetools
26
import OpenSSL
27
import os
28
import select
29
import socket
30
import sys
31
import time
32
import signal
33
import logging
34
import errno
35

    
36
from cStringIO import StringIO
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import utils
42

    
43

    
44
HTTP_CLIENT_THREADS = 10
45

    
46
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
47

    
48
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
49
MONTHNAME = [None,
50
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
51
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
52

    
53
# Default error message
54
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
55
DEFAULT_ERROR_MESSAGE = """\
56
<head>
57
<title>Error response</title>
58
</head>
59
<body>
60
<h1>Error response</h1>
61
<p>Error code %(code)d.
62
<p>Message: %(message)s.
63
<p>Error code explanation: %(code)s = %(explain)s.
64
</body>
65
"""
66

    
67
HTTP_OK = 200
68
HTTP_NO_CONTENT = 204
69
HTTP_NOT_MODIFIED = 304
70

    
71
HTTP_0_9 = "HTTP/0.9"
72
HTTP_1_0 = "HTTP/1.0"
73
HTTP_1_1 = "HTTP/1.1"
74

    
75
HTTP_GET = "GET"
76
HTTP_HEAD = "HEAD"
77
HTTP_POST = "POST"
78
HTTP_PUT = "PUT"
79

    
80
HTTP_ETAG = "ETag"
81
HTTP_HOST = "Host"
82
HTTP_SERVER = "Server"
83
HTTP_DATE = "Date"
84
HTTP_USER_AGENT = "User-Agent"
85
HTTP_CONTENT_TYPE = "Content-Type"
86
HTTP_CONTENT_LENGTH = "Content-Length"
87
HTTP_CONNECTION = "Connection"
88
HTTP_KEEP_ALIVE = "Keep-Alive"
89

    
90
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
91

    
92

    
93
class SocketClosed(socket.error):
94
  pass
95

    
96

    
97
class ResponseError(Exception):
98
  pass
99

    
100

    
101
class HTTPException(Exception):
102
  code = None
103
  message = None
104

    
105
  def __init__(self, message=None):
106
    Exception.__init__(self)
107
    if message is not None:
108
      self.message = message
109

    
110

    
111
class HTTPBadRequest(HTTPException):
112
  code = 400
113

    
114

    
115
class HTTPForbidden(HTTPException):
116
  code = 403
117

    
118

    
119
class HTTPNotFound(HTTPException):
120
  code = 404
121

    
122

    
123
class HTTPGone(HTTPException):
124
  code = 410
125

    
126

    
127
class HTTPLengthRequired(HTTPException):
128
  code = 411
129

    
130

    
131
class HTTPInternalError(HTTPException):
132
  code = 500
133

    
134

    
135
class HTTPNotImplemented(HTTPException):
136
  code = 501
137

    
138

    
139
class HTTPServiceUnavailable(HTTPException):
140
  code = 503
141

    
142

    
143
class HTTPVersionNotSupported(HTTPException):
144
  code = 505
145

    
146

    
147
class ApacheLogfile:
148
  """Utility class to write HTTP server log files.
149

150
  The written format is the "Common Log Format" as defined by Apache:
151
  http://httpd.apache.org/docs/2.2/mod/mod_log_config.html#examples
152

153
  """
154
  def __init__(self, fd):
155
    """Constructor for ApacheLogfile class.
156

157
    Args:
158
    - fd: Open file object
159

160
    """
161
    self._fd = fd
162

    
163
  def LogRequest(self, request, format, *args):
164
    self._fd.write("%s %s %s [%s] %s\n" % (
165
      # Remote host address
166
      request.address_string(),
167

    
168
      # RFC1413 identity (identd)
169
      "-",
170

    
171
      # Remote user
172
      "-",
173

    
174
      # Request time
175
      self._FormatCurrentTime(),
176

    
177
      # Message
178
      format % args,
179
      ))
180
    self._fd.flush()
181

    
182
  def _FormatCurrentTime(self):
183
    """Formats current time in Common Log Format.
184

185
    """
186
    return self._FormatLogTime(time.time())
187

    
188
  def _FormatLogTime(self, seconds):
189
    """Formats time for Common Log Format.
190

191
    All timestamps are logged in the UTC timezone.
192

193
    Args:
194
    - seconds: Time in seconds since the epoch
195

196
    """
197
    (_, month, _, _, _, _, _, _, _) = tm = time.gmtime(seconds)
198
    format = "%d/" + MONTHNAME[month] + "/%Y:%H:%M:%S +0000"
199
    return time.strftime(format, tm)
200

    
201

    
202
class HTTPJsonConverter:
203
  CONTENT_TYPE = "application/json"
204

    
205
  def Encode(self, data):
206
    return serializer.DumpJson(data)
207

    
208
  def Decode(self, data):
209
    return serializer.LoadJson(data)
210

    
211

    
212
class _HttpSocketBase(object):
213
  """Base class for HTTP server and client.
214

215
  """
216
  def __init__(self):
217
    self._using_ssl = None
218
    self._ssl_cert = None
219
    self._ssl_key = None
220

    
221
  def _CreateSocket(self, ssl_key_path, ssl_cert_path, ssl_verify_peer):
222
    """Creates a TCP socket and initializes SSL if needed.
223

224
    @type ssl_key_path: string
225
    @param ssl_key_path: Path to file containing SSL key in PEM format
226
    @type ssl_cert_path: string
227
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
228
    @type ssl_verify_peer: bool
229
    @param ssl_verify_peer: Whether to require client certificate and compare
230
                            it with our certificate
231

232
    """
233
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
234

    
235
    # Should we enable SSL?
236
    self._using_ssl = (ssl_cert_path and ssl_key_path)
237

    
238
    if not self._using_ssl:
239
      return sock
240

    
241
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
242
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
243

    
244
    ssl_key_pem = utils.ReadFile(ssl_key_path)
245
    ssl_cert_pem = utils.ReadFile(ssl_cert_path)
246

    
247
    cr = OpenSSL.crypto
248
    self._ssl_cert = cr.load_certificate(cr.FILETYPE_PEM, ssl_cert_pem)
249
    self._ssl_key = cr.load_privatekey(cr.FILETYPE_PEM, ssl_key_pem)
250
    del cr
251

    
252
    ctx.use_privatekey(self._ssl_key)
253
    ctx.use_certificate(self._ssl_cert)
254
    ctx.check_privatekey()
255

    
256
    if ssl_verify_peer:
257
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
258
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
259
                     self._SSLVerifyCallback)
260

    
261
    return OpenSSL.SSL.Connection(ctx, sock)
262

    
263
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
264
    """Verify the certificate provided by the peer
265

266
    We only compare fingerprints. The client must use the same certificate as
267
    we do on our side.
268

269
    """
270
    assert self._ssl_cert and self._ssl_key, "SSL not initialized"
271

    
272
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
273
            self._ssl_cert.digest("md5") == cert.digest("md5"))
274

    
275

    
276
class _HttpConnectionHandler(object):
277
  """Implements server side of HTTP
278

279
  This class implements the server side of HTTP. It's based on code of Python's
280
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
281
  character encodings. Keep-alive connections are not supported.
282

283
  """
284
  # The default request version.  This only affects responses up until
285
  # the point where the request line is parsed, so it mainly decides what
286
  # the client gets back when sending a malformed request line.
287
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
288
  default_request_version = HTTP_0_9
289

    
290
  # Error message settings
291
  error_message_format = DEFAULT_ERROR_MESSAGE
292
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
293

    
294
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
295

    
296
  def __init__(self, server, conn, client_addr, fileio_class):
297
    """Initializes this class.
298

299
    Part of the initialization is reading the request and eventual POST/PUT
300
    data sent by the client.
301

302
    """
303
    self._server = server
304

    
305
    # We default rfile to buffered because otherwise it could be
306
    # really slow for large data (a getc() call per byte); we make
307
    # wfile unbuffered because (a) often after a write() we want to
308
    # read and we need to flush the line; (b) big writes to unbuffered
309
    # files are typically optimized by stdio even when big reads
310
    # aren't.
311
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
312
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
313

    
314
    self.client_addr = client_addr
315

    
316
    self.request_headers = None
317
    self.request_method = None
318
    self.request_path = None
319
    self.request_requestline = None
320
    self.request_version = self.default_request_version
321

    
322
    self.response_body = None
323
    self.response_code = HTTP_OK
324
    self.response_content_type = None
325
    self.response_headers = {}
326

    
327
    self.should_fork = False
328

    
329
    try:
330
      self._ReadRequest()
331
      self._ReadPostData()
332
    except HTTPException, err:
333
      self._SetErrorStatus(err)
334

    
335
  def Close(self):
336
    if not self.wfile.closed:
337
      self.wfile.flush()
338
    self.wfile.close()
339
    self.rfile.close()
340

    
341
  def _DateTimeHeader(self):
342
    """Return the current date and time formatted for a message header.
343

344
    """
345
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
346
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
347
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
348

    
349
  def _SetErrorStatus(self, err):
350
    """Sets the response code and body from a HTTPException.
351

352
    @type err: HTTPException
353
    @param err: Exception instance
354

355
    """
356
    try:
357
      (shortmsg, longmsg) = self.responses[err.code]
358
    except KeyError:
359
      shortmsg = longmsg = "Unknown"
360

    
361
    if err.message:
362
      message = err.message
363
    else:
364
      message = shortmsg
365

    
366
    values = {
367
      "code": err.code,
368
      "message": cgi.escape(message),
369
      "explain": longmsg,
370
      }
371

    
372
    self.response_code = err.code
373
    self.response_content_type = self.error_content_type
374
    self.response_body = self.error_message_format % values
375

    
376
  def HandleRequest(self):
377
    """Handle the actual request.
378

379
    Calls the actual handler function and converts exceptions into HTTP errors.
380

381
    """
382
    # Don't do anything if there's already been a problem
383
    if self.response_code != HTTP_OK:
384
      return
385

    
386
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
387

    
388
    # Check whether client is still there
389
    self.rfile.read(0)
390

    
391
    try:
392
      try:
393
        result = self._server.HandleRequest(self)
394

    
395
        # TODO: Content-type
396
        encoder = HTTPJsonConverter()
397
        body = encoder.Encode(result)
398

    
399
        self.response_content_type = encoder.CONTENT_TYPE
400
        self.response_body = body
401
      except (HTTPException, KeyboardInterrupt, SystemExit):
402
        raise
403
      except Exception, err:
404
        logging.exception("Caught exception")
405
        raise HTTPInternalError(message=str(err))
406
      except:
407
        logging.exception("Unknown exception")
408
        raise HTTPInternalError(message="Unknown error")
409

    
410
    except HTTPException, err:
411
      self._SetErrorStatus(err)
412

    
413
  def SendResponse(self):
414
    """Sends response to the client.
415

416
    """
417
    # Check whether client is still there
418
    self.rfile.read(0)
419

    
420
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
421
                 self.request_requestline, self.response_code)
422

    
423
    if self.response_code in self.responses:
424
      response_message = self.responses[self.response_code][0]
425
    else:
426
      response_message = ""
427

    
428
    if self.request_version != HTTP_0_9:
429
      self.wfile.write("%s %d %s\r\n" %
430
                       (self.request_version, self.response_code,
431
                        response_message))
432
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
433
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
434
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
435
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
436
      for key, val in self.response_headers.iteritems():
437
        self._SendHeader(key, val)
438

    
439
      # We don't support keep-alive at this time
440
      self._SendHeader(HTTP_CONNECTION, "close")
441
      self.wfile.write("\r\n")
442

    
443
    if (self.request_method != HTTP_HEAD and
444
        self.response_code >= HTTP_OK and
445
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
446
      self.wfile.write(self.response_body)
447

    
448
  def _SendHeader(self, name, value):
449
    if self.request_version != HTTP_0_9:
450
      self.wfile.write("%s: %s\r\n" % (name, value))
451

    
452
  def _ReadRequest(self):
453
    """Reads and parses request line
454

455
    """
456
    raw_requestline = self.rfile.readline()
457

    
458
    requestline = raw_requestline
459
    if requestline[-2:] == '\r\n':
460
      requestline = requestline[:-2]
461
    elif requestline[-1:] == '\n':
462
      requestline = requestline[:-1]
463

    
464
    if not requestline:
465
      raise HTTPBadRequest("Empty request line")
466

    
467
    self.request_requestline = requestline
468

    
469
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
470

    
471
    words = requestline.split()
472

    
473
    if len(words) == 3:
474
      [method, path, version] = words
475
      if version[:5] != 'HTTP/':
476
        raise HTTPBadRequest("Bad request version (%r)" % version)
477

    
478
      try:
479
        base_version_number = version.split('/', 1)[1]
480
        version_number = base_version_number.split(".")
481

    
482
        # RFC 2145 section 3.1 says there can be only one "." and
483
        #   - major and minor numbers MUST be treated as
484
        #      separate integers;
485
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
486
        #      turn is lower than HTTP/12.3;
487
        #   - Leading zeros MUST be ignored by recipients.
488
        if len(version_number) != 2:
489
          raise HTTPBadRequest("Bad request version (%r)" % version)
490

    
491
        version_number = int(version_number[0]), int(version_number[1])
492
      except (ValueError, IndexError):
493
        raise HTTPBadRequest("Bad request version (%r)" % version)
494

    
495
      if version_number >= (2, 0):
496
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
497
                                      base_version_number)
498

    
499
    elif len(words) == 2:
500
      version = HTTP_0_9
501
      [method, path] = words
502
      if method != HTTP_GET:
503
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
504

    
505
    else:
506
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
507

    
508
    # Examine the headers and look for a Connection directive
509
    headers = mimetools.Message(self.rfile, 0)
510

    
511
    self.request_method = method
512
    self.request_path = path
513
    self.request_version = version
514
    self.request_headers = headers
515

    
516
  def _ReadPostData(self):
517
    """Reads POST/PUT data
518

519
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
520
    a request is signaled by the inclusion of a Content-Length header field in
521
    the request message headers. HTTP/1.0 requests containing an entity body
522
    must include a valid Content-Length header field."
523

524
    """
525
    # While not according to specification, we only support an entity body for
526
    # POST and PUT.
527
    if (not self.request_method or
528
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
529
      self.request_post_data = None
530
      return
531

    
532
    content_length = None
533
    try:
534
      if HTTP_CONTENT_LENGTH in self.request_headers:
535
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
536
    except TypeError:
537
      pass
538
    except ValueError:
539
      pass
540

    
541
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
542
    if content_length is None:
543
      raise HTTPLengthRequired("Missing Content-Length header or"
544
                               " invalid format")
545

    
546
    data = self.rfile.read(content_length)
547

    
548
    # TODO: Content-type, error handling
549
    self.request_post_data = HTTPJsonConverter().Decode(data)
550

    
551
    logging.debug("HTTP POST data: %s", self.request_post_data)
552

    
553

    
554
class HttpServer(_HttpSocketBase):
555
  """Generic HTTP server class
556

557
  Users of this class must subclass it and override the HandleRequest function.
558

559
  """
560
  MAX_CHILDREN = 20
561

    
562
  def __init__(self, mainloop, local_address, port,
563
               ssl_key_path=None, ssl_cert_path=None, ssl_verify_peer=False):
564
    """Initializes the HTTP server
565

566
    @type mainloop: ganeti.daemon.Mainloop
567
    @param mainloop: Mainloop used to poll for I/O events
568
    @type local_addess: string
569
    @param local_address: Local IP address to bind to
570
    @type port: int
571
    @param port: TCP port to listen on
572
    @type ssl_key_path: string
573
    @param ssl_key_path: Path to file containing SSL key in PEM format
574
    @type ssl_cert_path: string
575
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
576
    @type ssl_verify_peer: bool
577
    @param ssl_verify_peer: Whether to require client certificate and compare
578
                            it with our certificate
579

580
    """
581
    _HttpSocketBase.__init__(self)
582

    
583
    self.mainloop = mainloop
584
    self.local_address = local_address
585
    self.port = port
586

    
587
    self.socket = self._CreateSocket(ssl_key_path, ssl_cert_path, ssl_verify_peer)
588

    
589
    # Allow port to be reused
590
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
591

    
592
    if self._using_ssl:
593
      self._fileio_class = _SSLFileObject
594
    else:
595
      self._fileio_class = socket._fileobject
596

    
597
    self._children = []
598

    
599
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
600
    mainloop.RegisterSignal(self)
601

    
602
  def Start(self):
603
    self.socket.bind((self.local_address, self.port))
604
    self.socket.listen(5)
605

    
606
  def Stop(self):
607
    self.socket.close()
608

    
609
  def OnIO(self, fd, condition):
610
    if condition & select.POLLIN:
611
      self._IncomingConnection()
612

    
613
  def OnSignal(self, signum):
614
    if signum == signal.SIGCHLD:
615
      self._CollectChildren(True)
616

    
617
  def _CollectChildren(self, quick):
618
    """Checks whether any child processes are done
619

620
    @type quick: bool
621
    @param quick: Whether to only use non-blocking functions
622

623
    """
624
    if not quick:
625
      # Don't wait for other processes if it should be a quick check
626
      while len(self._children) > self.MAX_CHILDREN:
627
        try:
628
          # Waiting without a timeout brings us into a potential DoS situation.
629
          # As soon as too many children run, we'll not respond to new
630
          # requests. The real solution would be to add a timeout for children
631
          # and killing them after some time.
632
          pid, status = os.waitpid(0, 0)
633
        except os.error:
634
          pid = None
635
        if pid and pid in self._children:
636
          self._children.remove(pid)
637

    
638
    for child in self._children:
639
      try:
640
        pid, status = os.waitpid(child, os.WNOHANG)
641
      except os.error:
642
        pid = None
643
      if pid and pid in self._children:
644
        self._children.remove(pid)
645

    
646
  def _IncomingConnection(self):
647
    """Called for each incoming connection
648

649
    """
650
    (connection, client_addr) = self.socket.accept()
651

    
652
    self._CollectChildren(False)
653

    
654
    pid = os.fork()
655
    if pid == 0:
656
      # Child process
657
      logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
658

    
659
      try:
660
        try:
661
          try:
662
            handler = None
663
            try:
664
              # Read, parse and handle request
665
              handler = _HttpConnectionHandler(self, connection, client_addr,
666
                                               self._fileio_class)
667
              handler.HandleRequest()
668
            finally:
669
              # Try to send a response
670
              if handler:
671
                handler.SendResponse()
672
                handler.Close()
673
          except SocketClosed:
674
            pass
675
        finally:
676
          logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
677
      except:
678
        logging.exception("Error while handling request from %s:%s",
679
                          client_addr[0], client_addr[1])
680
        os._exit(1)
681
      os._exit(0)
682
    else:
683
      self._children.append(pid)
684

    
685
  def HandleRequest(self, req):
686
    raise NotImplementedError()
687

    
688

    
689
class HttpClientRequest(object):
690
  def __init__(self, host, port, method, path, headers=None, post_data=None):
691
    """Describes an HTTP request.
692

693
    @type host: string
694
    @param host: Hostname
695
    @type port: int
696
    @param port: Port
697
    @type method: string
698
    @param method: Method name
699
    @type path: string
700
    @param path: Request path
701
    @type headers: dict or None
702
    @param headers: Additional headers to send
703
    @type post_data: string or None
704
    @param post_data: Additional data to send
705

706
    """
707
    if post_data is not None:
708
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
709
        "Only POST and GET requests support sending data"
710

    
711
    assert path.startswith("/"), "Path must start with slash (/)"
712

    
713
    self.host = host
714
    self.port = port
715
    self.method = method
716
    self.path = path
717
    self.headers = headers
718
    self.post_data = post_data
719

    
720
    self.success = None
721
    self.error = None
722

    
723
    self.resp_status_line = None
724
    self.resp_version = None
725
    self.resp_status = None
726
    self.resp_reason = None
727
    self.resp_headers = None
728
    self.resp_body = None
729

    
730

    
731
class HttpClientRequestExecutor(object):
732
  # Default headers
733
  DEFAULT_HEADERS = {
734
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
735
    # TODO: For keep-alive, don't send "Connection: close"
736
    HTTP_CONNECTION: "close",
737
    }
738

    
739
  # Length limits
740
  STATUS_LINE_LENGTH_MAX = 512
741
  HEADER_LENGTH_MAX = 4 * 1024
742

    
743
  # Timeouts in seconds
744
  # TODO: Make read timeout configurable per OpCode
745
  CONNECT_TIMEOUT = 5.0
746
  WRITE_TIMEOUT = 10
747
  READ_TIMEOUT = None
748
  CLOSE_TIMEOUT = 1
749

    
750
  # Parser state machine
751
  PS_STATUS_LINE = "status-line"
752
  PS_HEADERS = "headers"
753
  PS_BODY = "body"
754
  PS_COMPLETE = "complete"
755

    
756
  def __init__(self, req):
757
    """Initializes the HttpClientRequestExecutor class.
758

759
    @type req: HttpClientRequest
760
    @param req: Request object
761

762
    """
763
    self.request = req
764

    
765
    self.parser_status = self.PS_STATUS_LINE
766
    self.header_buffer = StringIO()
767
    self.body_buffer = StringIO()
768
    self.content_length = None
769
    self.server_will_close = None
770

    
771
    self.poller = select.poll()
772

    
773
    # TODO: SSL
774

    
775
    try:
776
      # TODO: Implement connection caching/keep-alive
777
      self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
778

    
779
      # Disable Python's timeout
780
      self.sock.settimeout(None)
781

    
782
      # Operate in non-blocking mode
783
      self.sock.setblocking(0)
784

    
785
      force_close = True
786
      self._Connect()
787
      try:
788
        self._SendRequest()
789
        self._ReadResponse()
790

    
791
        # Only wait for server to close if we didn't have any exception.
792
        force_close = False
793
      finally:
794
        self._CloseConnection(force_close)
795

    
796
      self.sock.close()
797
      self.sock = None
798

    
799
      req.resp_body = self.body_buffer.getvalue()
800

    
801
      req.success = True
802
      req.error = None
803

    
804
    except ResponseError, err:
805
      req.success = False
806
      req.error = str(err)
807

    
808
  def _BuildRequest(self):
809
    """Build HTTP request.
810

811
    @rtype: string
812
    @return: Complete request
813

814
    """
815
    # Headers
816
    send_headers = self.DEFAULT_HEADERS.copy()
817

    
818
    if self.request.headers:
819
      send_headers.update(req.headers)
820

    
821
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
822

    
823
    if self.request.post_data:
824
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
825

    
826
    buf = StringIO()
827

    
828
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
829
    # keep-alive).
830
    # TODO: For keep-alive, change to HTTP/1.1
831
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
832
                                self.request.path, HTTP_1_0))
833

    
834
    # Add headers
835
    for name, value in send_headers.iteritems():
836
      buf.write("%s: %s\r\n" % (name, value))
837

    
838
    buf.write("\r\n")
839

    
840
    if self.request.post_data:
841
      buf.write(self.request.post_data)
842

    
843
    return buf.getvalue()
844

    
845
  def _ParseStatusLine(self):
846
    """Parses the status line sent by the server.
847

848
    """
849
    line = self.request.resp_status_line
850

    
851
    if not line:
852
      raise ResponseError("Empty status line")
853

    
854
    try:
855
      [version, status, reason] = line.split(None, 2)
856
    except ValueError:
857
      try:
858
        [version, status] = line.split(None, 1)
859
        reason = ""
860
      except ValueError:
861
        version = HTTP_9_0
862

    
863
    if version:
864
      version = version.upper()
865

    
866
    if version not in (HTTP_1_0, HTTP_1_1):
867
      # We do not support HTTP/0.9, despite the specification requiring it
868
      # (RFC2616, section 19.6)
869
      raise ResponseError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
870
                          line)
871

    
872
    # The status code is a three-digit number
873
    try:
874
      status = int(status)
875
      if status < 100 or status > 999:
876
        status = -1
877
    except ValueError:
878
      status = -1
879

    
880
    if status == -1:
881
      raise ResponseError("Invalid status code (%r)" % line)
882

    
883
    self.request.resp_version = version
884
    self.request.resp_status = status
885
    self.request.resp_reason = reason
886

    
887
  def _WillServerCloseConnection(self):
888
    """Evaluate whether server will close the connection.
889

890
    @rtype: bool
891
    @return: Whether server will close the connection
892

893
    """
894
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
895
    if hdr_connection:
896
      hdr_connection = hdr_connection.lower()
897

    
898
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
899
    if self.request.resp_version == HTTP_1_1:
900
      return (hdr_connection and "close" in hdr_connection)
901

    
902
    # Some HTTP/1.0 implementations have support for persistent connections,
903
    # using rules different than HTTP/1.1.
904

    
905
    # For older HTTP, Keep-Alive indicates persistent connection.
906
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
907
      return False
908

    
909
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
910
    # supposed to be sent by the client.
911
    if hdr_connection and "keep-alive" in hdr_connection:
912
      return False
913

    
914
    return True
915

    
916
  def _ParseHeaders(self):
917
    """Parses the headers sent by the server.
918

919
    This function also adjusts internal variables based on the header values.
920

921
    """
922
    req = self.request
923

    
924
    # Parse headers
925
    self.header_buffer.seek(0, 0)
926
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
927

    
928
    self.server_will_close = self._WillServerCloseConnection()
929

    
930
    # Do we have a Content-Length header?
931
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
932
    if hdr_content_length:
933
      try:
934
        self.content_length = int(hdr_content_length)
935
      except ValueError:
936
        pass
937
      if self.content_length is not None and self.content_length < 0:
938
        self.content_length = None
939

    
940
    # does the body have a fixed length? (of zero)
941
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
942
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
943
      self.content_length = 0
944

    
945
    # if the connection remains open and a content-length was not provided,
946
    # then assume that the connection WILL close.
947
    if self.content_length is None:
948
      self.server_will_close = True
949

    
950
  def _CheckStatusLineLength(self, length):
951
    if length > self.STATUS_LINE_LENGTH_MAX:
952
      raise ResponseError("Status line longer than %d chars" %
953
                          self.STATUS_LINE_LENGTH_MAX)
954

    
955
  def _CheckHeaderLength(self, length):
956
    if length > self.HEADER_LENGTH_MAX:
957
      raise ResponseError("Headers longer than %d chars" %
958
                          self.HEADER_LENGTH_MAX)
959

    
960
  def _ParseBuffer(self, buf, eof):
961
    """Main function for HTTP response state machine.
962

963
    @type buf: string
964
    @param buf: Receive buffer
965
    @type eof: bool
966
    @param eof: Whether we've reached EOF on the socket
967
    @rtype: string
968
    @return: Updated receive buffer
969

970
    """
971
    if self.parser_status == self.PS_STATUS_LINE:
972
      # Expect status line
973
      idx = buf.find("\r\n")
974
      if idx >= 0:
975
        self.request.resp_status_line = buf[:idx]
976

    
977
        self._CheckStatusLineLength(len(self.request.resp_status_line))
978

    
979
        # Remove status line, including CRLF
980
        buf = buf[idx + 2:]
981

    
982
        self._ParseStatusLine()
983

    
984
        self.parser_status = self.PS_HEADERS
985
      else:
986
        # Check whether incoming data is getting too large, otherwise we just
987
        # fill our read buffer.
988
        self._CheckStatusLineLength(len(buf))
989

    
990
    if self.parser_status == self.PS_HEADERS:
991
      # Wait for header end
992
      idx = buf.find("\r\n\r\n")
993
      if idx >= 0:
994
        self.header_buffer.write(buf[:idx + 2])
995

    
996
        self._CheckHeaderLength(self.header_buffer.tell())
997

    
998
        # Remove headers, including CRLF
999
        buf = buf[idx + 4:]
1000

    
1001
        self._ParseHeaders()
1002

    
1003
        self.parser_status = self.PS_BODY
1004
      else:
1005
        # Check whether incoming data is getting too large, otherwise we just
1006
        # fill our read buffer.
1007
        self._CheckHeaderLength(len(buf))
1008

    
1009
    if self.parser_status == self.PS_BODY:
1010
      self.body_buffer.write(buf)
1011
      buf = ""
1012

    
1013
      # Check whether we've read everything
1014
      if (eof or
1015
          (self.content_length is not None and
1016
           self.body_buffer.tell() >= self.content_length)):
1017
        self.parser_status = self.PS_COMPLETE
1018

    
1019
    return buf
1020

    
1021
  def _WaitForCondition(self, event, timeout):
1022
    """Waits for a condition to occur on the socket.
1023

1024
    @type event: int
1025
    @param event: ORed condition (see select module)
1026
    @type timeout: float or None
1027
    @param timeout: Timeout in seconds
1028
    @rtype: int or None
1029
    @return: None for timeout, otherwise occured conditions
1030

1031
    """
1032
    check = (event | select.POLLPRI |
1033
             select.POLLNVAL | select.POLLHUP | select.POLLERR)
1034

    
1035
    if timeout is not None:
1036
      # Poller object expects milliseconds
1037
      timeout *= 1000
1038

    
1039
    self.poller.register(self.sock, event)
1040
    try:
1041
      while True:
1042
        # TODO: If the main thread receives a signal and we have no timeout, we
1043
        # could wait forever. This should check a global "quit" flag or
1044
        # something every so often.
1045
        io_events = self.poller.poll(timeout)
1046
        if io_events:
1047
          for (evfd, evcond) in io_events:
1048
            if evcond & check:
1049
              return evcond
1050
        else:
1051
          # Timeout
1052
          return None
1053
    finally:
1054
      self.poller.unregister(self.sock)
1055

    
1056
  def _Connect(self):
1057
    """Non-blocking connect to host with timeout.
1058

1059
    """
1060
    connected = False
1061
    while True:
1062
      connect_error = self.sock.connect_ex((self.request.host,
1063
                                            self.request.port))
1064
      if connect_error == errno.EINTR:
1065
        # Mask signals
1066
        pass
1067

    
1068
      elif connect_error == 0:
1069
        # Connection established
1070
        connected = True
1071
        break
1072

    
1073
      elif connect_error == errno.EINPROGRESS:
1074
        # Connection started
1075
        break
1076

    
1077
      raise ResponseError("Connection failed (%s: %s)" %
1078
                          (connect_error, os.strerror(connect_error)))
1079

    
1080
    if not connected:
1081
      # Wait for connection
1082
      event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT)
1083
      if event is None:
1084
        raise ResponseError("Timeout while connecting to server")
1085

    
1086
      # Get error code
1087
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1088
      if connect_error != 0:
1089
        raise ResponseError("Connection failed (%s: %s)" %
1090
                            (connect_error, os.strerror(connect_error)))
1091

    
1092
    # Enable TCP keep-alive
1093
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1094

    
1095
    # If needed, Linux specific options are available to change the TCP
1096
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1097
    # TCP_KEEPINTVL.
1098

    
1099
  def _SendRequest(self):
1100
    """Sends request to server.
1101

1102
    """
1103
    buf = self._BuildRequest()
1104

    
1105
    while buf:
1106
      event = self._WaitForCondition(select.POLLOUT, self.WRITE_TIMEOUT)
1107
      if event is None:
1108
        raise ResponseError("Timeout while sending request")
1109

    
1110
      try:
1111
        # Only send 4 KB at a time
1112
        data = buf[:4096]
1113

    
1114
        sent = self.sock.send(data)
1115
      except socket.error, err:
1116
        if err.args and err.args[0] == errno.EAGAIN:
1117
          # Ignore EAGAIN
1118
          continue
1119

    
1120
        raise ResponseError("Sending request failed: %s" % str(err))
1121

    
1122
      # Remove sent bytes
1123
      buf = buf[sent:]
1124

    
1125
    assert not buf, "Request wasn't sent completely"
1126

    
1127
  def _ReadResponse(self):
1128
    """Read response from server.
1129

1130
    Calls the parser function after reading a chunk of data.
1131

1132
    """
1133
    buf = ""
1134
    eof = False
1135
    while self.parser_status != self.PS_COMPLETE:
1136
      event = self._WaitForCondition(select.POLLIN, self.READ_TIMEOUT)
1137
      if event is None:
1138
        raise ResponseError("Timeout while reading response")
1139

    
1140
      if event & (select.POLLIN | select.POLLPRI):
1141
        try:
1142
          data = self.sock.recv(4096)
1143
        except socket.error, err:
1144
          if err.args and err.args[0] == errno.EAGAIN:
1145
            # Ignore EAGAIN
1146
            continue
1147

    
1148
          raise ResponseError("Reading response failed: %s" % str(err))
1149

    
1150
        if data:
1151
          buf += data
1152
        else:
1153
          eof = True
1154

    
1155
      if event & (select.POLLNVAL | select.POLLHUP | select.POLLERR):
1156
        eof = True
1157

    
1158
      # Do some parsing and error checking while more data arrives
1159
      buf = self._ParseBuffer(buf, eof)
1160

    
1161
      # Must be done only after the buffer has been evaluated
1162
      if (eof and
1163
          self.parser_status in (self.PS_STATUS_LINE,
1164
                                 self.PS_HEADERS)):
1165
        raise ResponseError("Connection closed prematurely")
1166

    
1167
    # Parse rest
1168
    buf = self._ParseBuffer(buf, True)
1169

    
1170
    assert self.parser_status == self.PS_COMPLETE
1171
    assert not buf, "Parser didn't read full response"
1172

    
1173
  def _CloseConnection(self, force):
1174
    """Closes the connection.
1175

1176
    """
1177
    if self.server_will_close and not force:
1178
      # Wait for server to close
1179
      event = self._WaitForCondition(select.POLLIN, self.CLOSE_TIMEOUT)
1180
      if event is None:
1181
        # Server didn't close connection within CLOSE_TIMEOUT
1182
        pass
1183
      else:
1184
        try:
1185
          # Check whether it's actually closed
1186
          if not self.sock.recv(1):
1187
            return
1188
        except socket.error, err:
1189
          # Ignore errors at this stage
1190
          pass
1191

    
1192
    # Close the connection from our side
1193
    self.sock.shutdown(socket.SHUT_RDWR)
1194

    
1195

    
1196
class HttpClientWorker(workerpool.BaseWorker):
1197
  """HTTP client worker class.
1198

1199
  """
1200
  def RunTask(self, req):
1201
    HttpClientRequestExecutor(req)
1202

    
1203

    
1204
class HttpClientWorkerPool(workerpool.WorkerPool):
1205
  def __init__(self, manager):
1206
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1207
                                   HttpClientWorker)
1208
    self.manager = manager
1209

    
1210

    
1211
class HttpClientManager(object):
1212
  def __init__(self):
1213
    self._wpool = HttpClientWorkerPool(self)
1214

    
1215
  def __del__(self):
1216
    self.Shutdown()
1217

    
1218
  def ExecRequests(self, requests):
1219
    # Add requests to queue
1220
    for req in requests:
1221
      self._wpool.AddTask(req)
1222

    
1223
    # And wait for them to finish
1224
    self._wpool.Quiesce()
1225

    
1226
    return requests
1227

    
1228
  def Shutdown(self):
1229
    self._wpool.TerminateWorkers()
1230

    
1231

    
1232
class _SSLFileObject(object):
1233
  """Wrapper around socket._fileobject
1234

1235
  This wrapper is required to handle OpenSSL exceptions.
1236

1237
  """
1238
  def _RequireOpenSocket(fn):
1239
    def wrapper(self, *args, **kwargs):
1240
      if self.closed:
1241
        raise SocketClosed("Socket is closed")
1242
      return fn(self, *args, **kwargs)
1243
    return wrapper
1244

    
1245
  def __init__(self, sock, mode='rb', bufsize=-1):
1246
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1247

    
1248
  def _ConnectionLost(self):
1249
    self._base = None
1250

    
1251
  def _getclosed(self):
1252
    return self._base is None or self._base.closed
1253
  closed = property(_getclosed, doc="True if the file is closed")
1254

    
1255
  @_RequireOpenSocket
1256
  def close(self):
1257
    return self._base.close()
1258

    
1259
  @_RequireOpenSocket
1260
  def flush(self):
1261
    return self._base.flush()
1262

    
1263
  @_RequireOpenSocket
1264
  def fileno(self):
1265
    return self._base.fileno()
1266

    
1267
  @_RequireOpenSocket
1268
  def read(self, size=-1):
1269
    return self._ReadWrapper(self._base.read, size=size)
1270

    
1271
  @_RequireOpenSocket
1272
  def readline(self, size=-1):
1273
    return self._ReadWrapper(self._base.readline, size=size)
1274

    
1275
  def _ReadWrapper(self, fn, *args, **kwargs):
1276
    while True:
1277
      try:
1278
        return fn(*args, **kwargs)
1279

    
1280
      except OpenSSL.SSL.ZeroReturnError, err:
1281
        self._ConnectionLost()
1282
        return ""
1283

    
1284
      except OpenSSL.SSL.WantReadError:
1285
        continue
1286

    
1287
      #except OpenSSL.SSL.WantWriteError:
1288
      # TODO
1289

    
1290
      except OpenSSL.SSL.SysCallError, (retval, desc):
1291
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1292
            or retval > 0):
1293
          self._ConnectionLost()
1294
          return ""
1295

    
1296
        logging.exception("Error in OpenSSL")
1297
        self._ConnectionLost()
1298
        raise socket.error(err.args)
1299

    
1300
      except OpenSSL.SSL.Error, err:
1301
        self._ConnectionLost()
1302
        raise socket.error(err.args)
1303

    
1304
  @_RequireOpenSocket
1305
  def write(self, data):
1306
    return self._WriteWrapper(self._base.write, data)
1307

    
1308
  def _WriteWrapper(self, fn, *args, **kwargs):
1309
    while True:
1310
      try:
1311
        return fn(*args, **kwargs)
1312
      except OpenSSL.SSL.ZeroReturnError, err:
1313
        self._ConnectionLost()
1314
        return 0
1315

    
1316
      except OpenSSL.SSL.WantWriteError:
1317
        continue
1318

    
1319
      #except OpenSSL.SSL.WantReadError:
1320
      # TODO
1321

    
1322
      except OpenSSL.SSL.SysCallError, err:
1323
        if err.args[0] == -1 and data == "":
1324
          # errors when writing empty strings are expected
1325
          # and can be ignored
1326
          return 0
1327

    
1328
        self._ConnectionLost()
1329
        raise socket.error(err.args)
1330

    
1331
      except OpenSSL.SSL.Error, err:
1332
        self._ConnectionLost()
1333
        raise socket.error(err.args)