Statistics
| Branch: | Tag: | Revision:

root / lib / http.py @ f22c1cea

History | View | Annotate | Download (40.1 kB)

1
#
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11
# General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16
# 02110-1301, USA.
17

    
18
"""HTTP server module.
19

20
"""
21

    
22
import BaseHTTPServer
23
import cgi
24
import logging
25
import mimetools
26
import OpenSSL
27
import os
28
import select
29
import socket
30
import sys
31
import time
32
import signal
33
import logging
34
import errno
35
import threading
36

    
37
from cStringIO import StringIO
38

    
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import workerpool
42
from ganeti import utils
43

    
44

    
45
HTTP_CLIENT_THREADS = 10
46

    
47
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48

    
49
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50
MONTHNAME = [None,
51
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53

    
54
# Default error message
55
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56
DEFAULT_ERROR_MESSAGE = """\
57
<head>
58
<title>Error response</title>
59
</head>
60
<body>
61
<h1>Error response</h1>
62
<p>Error code %(code)d.
63
<p>Message: %(message)s.
64
<p>Error code explanation: %(code)s = %(explain)s.
65
</body>
66
"""
67

    
68
HTTP_OK = 200
69
HTTP_NO_CONTENT = 204
70
HTTP_NOT_MODIFIED = 304
71

    
72
HTTP_0_9 = "HTTP/0.9"
73
HTTP_1_0 = "HTTP/1.0"
74
HTTP_1_1 = "HTTP/1.1"
75

    
76
HTTP_GET = "GET"
77
HTTP_HEAD = "HEAD"
78
HTTP_POST = "POST"
79
HTTP_PUT = "PUT"
80

    
81
HTTP_ETAG = "ETag"
82
HTTP_HOST = "Host"
83
HTTP_SERVER = "Server"
84
HTTP_DATE = "Date"
85
HTTP_USER_AGENT = "User-Agent"
86
HTTP_CONTENT_TYPE = "Content-Type"
87
HTTP_CONTENT_LENGTH = "Content-Length"
88
HTTP_CONNECTION = "Connection"
89
HTTP_KEEP_ALIVE = "Keep-Alive"
90

    
91
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
92

    
93

    
94
class SocketClosed(socket.error):
95
  pass
96

    
97

    
98
class _HttpClientError(Exception):
99
  """Internal exception for HTTP client errors.
100

101
  This should only be used for internal error reporting.
102

103
  """
104
  pass
105

    
106

    
107
class HTTPException(Exception):
108
  code = None
109
  message = None
110

    
111
  def __init__(self, message=None):
112
    Exception.__init__(self)
113
    if message is not None:
114
      self.message = message
115

    
116

    
117
class HTTPBadRequest(HTTPException):
118
  code = 400
119

    
120

    
121
class HTTPForbidden(HTTPException):
122
  code = 403
123

    
124

    
125
class HTTPNotFound(HTTPException):
126
  code = 404
127

    
128

    
129
class HTTPGone(HTTPException):
130
  code = 410
131

    
132

    
133
class HTTPLengthRequired(HTTPException):
134
  code = 411
135

    
136

    
137
class HTTPInternalError(HTTPException):
138
  code = 500
139

    
140

    
141
class HTTPNotImplemented(HTTPException):
142
  code = 501
143

    
144

    
145
class HTTPServiceUnavailable(HTTPException):
146
  code = 503
147

    
148

    
149
class HTTPVersionNotSupported(HTTPException):
150
  code = 505
151

    
152

    
153
class HTTPJsonConverter:
154
  CONTENT_TYPE = "application/json"
155

    
156
  def Encode(self, data):
157
    return serializer.DumpJson(data)
158

    
159
  def Decode(self, data):
160
    return serializer.LoadJson(data)
161

    
162

    
163
def WaitForSocketCondition(sock, poller, event, timeout):
164
  """Waits for a condition to occur on the socket.
165

166
  @type sock: socket
167
  @param socket: Wait for events on this socket
168
  @type poller: select.Poller
169
  @param poller: Poller object as created by select.poll()
170
  @type event: int
171
  @param event: ORed condition (see select module)
172
  @type timeout: float or None
173
  @param timeout: Timeout in seconds
174
  @rtype: int or None
175
  @return: None for timeout, otherwise occured conditions
176

177
  """
178
  check = (event | select.POLLPRI |
179
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
180

    
181
  if timeout is not None:
182
    # Poller object expects milliseconds
183
    timeout *= 1000
184

    
185
  poller.register(sock, event)
186
  try:
187
    while True:
188
      # TODO: If the main thread receives a signal and we have no timeout, we
189
      # could wait forever. This should check a global "quit" flag or
190
      # something every so often.
191
      io_events = poller.poll(timeout)
192
      if not io_events:
193
        # Timeout
194
        return None
195
      for (evfd, evcond) in io_events:
196
        if evcond & check:
197
          return evcond
198
  finally:
199
    poller.unregister(sock)
200

    
201

    
202
class HttpSslParams(object):
203
  """Data class for SSL key and certificate.
204

205
  """
206
  def __init__(self, ssl_key_path, ssl_cert_path):
207
    """Initializes this class.
208

209
    @type ssl_key_path: string
210
    @param ssl_key_path: Path to file containing SSL key in PEM format
211
    @type ssl_cert_path: string
212
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
213

214
    """
215
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
216
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
217

    
218
  def GetKey(self):
219
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
220
                                          self.ssl_key_pem)
221

    
222
  def GetCertificate(self):
223
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
224
                                           self.ssl_cert_pem)
225

    
226

    
227
class _HttpSocketBase(object):
228
  """Base class for HTTP server and client.
229

230
  """
231
  def __init__(self):
232
    self._using_ssl = None
233
    self._ssl_params = None
234
    self._ssl_key = None
235
    self._ssl_cert = None
236

    
237
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
238
    """Creates a TCP socket and initializes SSL if needed.
239

240
    @type ssl_params: HttpSslParams
241
    @param ssl_params: SSL key and certificate
242
    @type ssl_verify_peer: bool
243
    @param ssl_verify_peer: Whether to require client certificate and compare
244
                            it with our certificate
245

246
    """
247
    self._ssl_params = ssl_params
248

    
249
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
250

    
251
    # Should we enable SSL?
252
    self._using_ssl = ssl_params is not None
253

    
254
    if not self._using_ssl:
255
      return sock
256

    
257
    self._ssl_key = ssl_params.GetKey()
258
    self._ssl_cert = ssl_params.GetCertificate()
259

    
260
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
261
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
262

    
263
    ctx.use_privatekey(self._ssl_key)
264
    ctx.use_certificate(self._ssl_cert)
265
    ctx.check_privatekey()
266

    
267
    if ssl_verify_peer:
268
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
269
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
270
                     self._SSLVerifyCallback)
271

    
272
    return OpenSSL.SSL.Connection(ctx, sock)
273

    
274
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
275
    """Verify the certificate provided by the peer
276

277
    We only compare fingerprints. The client must use the same certificate as
278
    we do on our side.
279

280
    """
281
    assert self._ssl_params, "SSL not initialized"
282

    
283
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
284
            self._ssl_cert.digest("md5") == cert.digest("md5"))
285

    
286

    
287
class _HttpConnectionHandler(object):
288
  """Implements server side of HTTP
289

290
  This class implements the server side of HTTP. It's based on code of Python's
291
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
292
  character encodings. Keep-alive connections are not supported.
293

294
  """
295
  # The default request version.  This only affects responses up until
296
  # the point where the request line is parsed, so it mainly decides what
297
  # the client gets back when sending a malformed request line.
298
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
299
  default_request_version = HTTP_0_9
300

    
301
  # Error message settings
302
  error_message_format = DEFAULT_ERROR_MESSAGE
303
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
304

    
305
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
306

    
307
  def __init__(self, server, conn, client_addr, fileio_class):
308
    """Initializes this class.
309

310
    Part of the initialization is reading the request and eventual POST/PUT
311
    data sent by the client.
312

313
    """
314
    self._server = server
315

    
316
    # We default rfile to buffered because otherwise it could be
317
    # really slow for large data (a getc() call per byte); we make
318
    # wfile unbuffered because (a) often after a write() we want to
319
    # read and we need to flush the line; (b) big writes to unbuffered
320
    # files are typically optimized by stdio even when big reads
321
    # aren't.
322
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
323
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
324

    
325
    self.client_addr = client_addr
326

    
327
    self.request_headers = None
328
    self.request_method = None
329
    self.request_path = None
330
    self.request_requestline = None
331
    self.request_version = self.default_request_version
332

    
333
    self.response_body = None
334
    self.response_code = HTTP_OK
335
    self.response_content_type = None
336
    self.response_headers = {}
337

    
338
    self.should_fork = False
339

    
340
    try:
341
      self._ReadRequest()
342
      self._ReadPostData()
343
    except HTTPException, err:
344
      self._SetErrorStatus(err)
345

    
346
  def Close(self):
347
    if not self.wfile.closed:
348
      self.wfile.flush()
349
    self.wfile.close()
350
    self.rfile.close()
351

    
352
  def _DateTimeHeader(self):
353
    """Return the current date and time formatted for a message header.
354

355
    """
356
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
357
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
358
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
359

    
360
  def _SetErrorStatus(self, err):
361
    """Sets the response code and body from a HTTPException.
362

363
    @type err: HTTPException
364
    @param err: Exception instance
365

366
    """
367
    try:
368
      (shortmsg, longmsg) = self.responses[err.code]
369
    except KeyError:
370
      shortmsg = longmsg = "Unknown"
371

    
372
    if err.message:
373
      message = err.message
374
    else:
375
      message = shortmsg
376

    
377
    values = {
378
      "code": err.code,
379
      "message": cgi.escape(message),
380
      "explain": longmsg,
381
      }
382

    
383
    self.response_code = err.code
384
    self.response_content_type = self.error_content_type
385
    self.response_body = self.error_message_format % values
386

    
387
  def HandleRequest(self):
388
    """Handle the actual request.
389

390
    Calls the actual handler function and converts exceptions into HTTP errors.
391

392
    """
393
    # Don't do anything if there's already been a problem
394
    if self.response_code != HTTP_OK:
395
      return
396

    
397
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
398

    
399
    # Check whether client is still there
400
    self.rfile.read(0)
401

    
402
    try:
403
      try:
404
        result = self._server.HandleRequest(self)
405

    
406
        # TODO: Content-type
407
        encoder = HTTPJsonConverter()
408
        body = encoder.Encode(result)
409

    
410
        self.response_content_type = encoder.CONTENT_TYPE
411
        self.response_body = body
412
      except (HTTPException, KeyboardInterrupt, SystemExit):
413
        raise
414
      except Exception, err:
415
        logging.exception("Caught exception")
416
        raise HTTPInternalError(message=str(err))
417
      except:
418
        logging.exception("Unknown exception")
419
        raise HTTPInternalError(message="Unknown error")
420

    
421
    except HTTPException, err:
422
      self._SetErrorStatus(err)
423

    
424
  def SendResponse(self):
425
    """Sends response to the client.
426

427
    """
428
    # Check whether client is still there
429
    self.rfile.read(0)
430

    
431
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
432
                 self.request_requestline, self.response_code)
433

    
434
    if self.response_code in self.responses:
435
      response_message = self.responses[self.response_code][0]
436
    else:
437
      response_message = ""
438

    
439
    if self.request_version != HTTP_0_9:
440
      self.wfile.write("%s %d %s\r\n" %
441
                       (self.request_version, self.response_code,
442
                        response_message))
443
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
444
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
445
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
446
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
447
      for key, val in self.response_headers.iteritems():
448
        self._SendHeader(key, val)
449

    
450
      # We don't support keep-alive at this time
451
      self._SendHeader(HTTP_CONNECTION, "close")
452
      self.wfile.write("\r\n")
453

    
454
    if (self.request_method != HTTP_HEAD and
455
        self.response_code >= HTTP_OK and
456
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
457
      self.wfile.write(self.response_body)
458

    
459
  def _SendHeader(self, name, value):
460
    if self.request_version != HTTP_0_9:
461
      self.wfile.write("%s: %s\r\n" % (name, value))
462

    
463
  def _ReadRequest(self):
464
    """Reads and parses request line
465

466
    """
467
    raw_requestline = self.rfile.readline()
468

    
469
    requestline = raw_requestline
470
    if requestline[-2:] == '\r\n':
471
      requestline = requestline[:-2]
472
    elif requestline[-1:] == '\n':
473
      requestline = requestline[:-1]
474

    
475
    if not requestline:
476
      raise HTTPBadRequest("Empty request line")
477

    
478
    self.request_requestline = requestline
479

    
480
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
481

    
482
    words = requestline.split()
483

    
484
    if len(words) == 3:
485
      [method, path, version] = words
486
      if version[:5] != 'HTTP/':
487
        raise HTTPBadRequest("Bad request version (%r)" % version)
488

    
489
      try:
490
        base_version_number = version.split('/', 1)[1]
491
        version_number = base_version_number.split(".")
492

    
493
        # RFC 2145 section 3.1 says there can be only one "." and
494
        #   - major and minor numbers MUST be treated as
495
        #      separate integers;
496
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
497
        #      turn is lower than HTTP/12.3;
498
        #   - Leading zeros MUST be ignored by recipients.
499
        if len(version_number) != 2:
500
          raise HTTPBadRequest("Bad request version (%r)" % version)
501

    
502
        version_number = int(version_number[0]), int(version_number[1])
503
      except (ValueError, IndexError):
504
        raise HTTPBadRequest("Bad request version (%r)" % version)
505

    
506
      if version_number >= (2, 0):
507
        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
508
                                      base_version_number)
509

    
510
    elif len(words) == 2:
511
      version = HTTP_0_9
512
      [method, path] = words
513
      if method != HTTP_GET:
514
        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
515

    
516
    else:
517
      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
518

    
519
    # Examine the headers and look for a Connection directive
520
    headers = mimetools.Message(self.rfile, 0)
521

    
522
    self.request_method = method
523
    self.request_path = path
524
    self.request_version = version
525
    self.request_headers = headers
526

    
527
  def _ReadPostData(self):
528
    """Reads POST/PUT data
529

530
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
531
    a request is signaled by the inclusion of a Content-Length header field in
532
    the request message headers. HTTP/1.0 requests containing an entity body
533
    must include a valid Content-Length header field."
534

535
    """
536
    # While not according to specification, we only support an entity body for
537
    # POST and PUT.
538
    if (not self.request_method or
539
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
540
      self.request_post_data = None
541
      return
542

    
543
    content_length = None
544
    try:
545
      if HTTP_CONTENT_LENGTH in self.request_headers:
546
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
547
    except TypeError:
548
      pass
549
    except ValueError:
550
      pass
551

    
552
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
553
    if content_length is None:
554
      raise HTTPLengthRequired("Missing Content-Length header or"
555
                               " invalid format")
556

    
557
    data = self.rfile.read(content_length)
558

    
559
    # TODO: Content-type, error handling
560
    if data:
561
      self.request_post_data = HTTPJsonConverter().Decode(data)
562
    else:
563
      self.request_post_data = None
564

    
565
    logging.debug("HTTP POST data: %s", self.request_post_data)
566

    
567

    
568
class HttpServer(_HttpSocketBase):
569
  """Generic HTTP server class
570

571
  Users of this class must subclass it and override the HandleRequest function.
572

573
  """
574
  MAX_CHILDREN = 20
575

    
576
  def __init__(self, mainloop, local_address, port,
577
               ssl_params=None, ssl_verify_peer=False):
578
    """Initializes the HTTP server
579

580
    @type mainloop: ganeti.daemon.Mainloop
581
    @param mainloop: Mainloop used to poll for I/O events
582
    @type local_addess: string
583
    @param local_address: Local IP address to bind to
584
    @type port: int
585
    @param port: TCP port to listen on
586
    @type ssl_params: HttpSslParams
587
    @param ssl_params: SSL key and certificate
588
    @type ssl_verify_peer: bool
589
    @param ssl_verify_peer: Whether to require client certificate and compare
590
                            it with our certificate
591

592
    """
593
    _HttpSocketBase.__init__(self)
594

    
595
    self.mainloop = mainloop
596
    self.local_address = local_address
597
    self.port = port
598

    
599
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
600

    
601
    # Allow port to be reused
602
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
603

    
604
    if self._using_ssl:
605
      self._fileio_class = _SSLFileObject
606
    else:
607
      self._fileio_class = socket._fileobject
608

    
609
    self._children = []
610

    
611
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
612
    mainloop.RegisterSignal(self)
613

    
614
  def Start(self):
615
    self.socket.bind((self.local_address, self.port))
616
    self.socket.listen(5)
617

    
618
  def Stop(self):
619
    self.socket.close()
620

    
621
  def OnIO(self, fd, condition):
622
    if condition & select.POLLIN:
623
      self._IncomingConnection()
624

    
625
  def OnSignal(self, signum):
626
    if signum == signal.SIGCHLD:
627
      self._CollectChildren(True)
628

    
629
  def _CollectChildren(self, quick):
630
    """Checks whether any child processes are done
631

632
    @type quick: bool
633
    @param quick: Whether to only use non-blocking functions
634

635
    """
636
    if not quick:
637
      # Don't wait for other processes if it should be a quick check
638
      while len(self._children) > self.MAX_CHILDREN:
639
        try:
640
          # Waiting without a timeout brings us into a potential DoS situation.
641
          # As soon as too many children run, we'll not respond to new
642
          # requests. The real solution would be to add a timeout for children
643
          # and killing them after some time.
644
          pid, status = os.waitpid(0, 0)
645
        except os.error:
646
          pid = None
647
        if pid and pid in self._children:
648
          self._children.remove(pid)
649

    
650
    for child in self._children:
651
      try:
652
        pid, status = os.waitpid(child, os.WNOHANG)
653
      except os.error:
654
        pid = None
655
      if pid and pid in self._children:
656
        self._children.remove(pid)
657

    
658
  def _IncomingConnection(self):
659
    """Called for each incoming connection
660

661
    """
662
    (connection, client_addr) = self.socket.accept()
663

    
664
    self._CollectChildren(False)
665

    
666
    pid = os.fork()
667
    if pid == 0:
668
      # Child process
669
      logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
670

    
671
      try:
672
        try:
673
          try:
674
            handler = None
675
            try:
676
              # Read, parse and handle request
677
              handler = _HttpConnectionHandler(self, connection, client_addr,
678
                                               self._fileio_class)
679
              handler.HandleRequest()
680
            finally:
681
              # Try to send a response
682
              if handler:
683
                handler.SendResponse()
684
                handler.Close()
685
          except SocketClosed:
686
            pass
687
        finally:
688
          logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
689
      except:
690
        logging.exception("Error while handling request from %s:%s",
691
                          client_addr[0], client_addr[1])
692
        os._exit(1)
693
      os._exit(0)
694
    else:
695
      self._children.append(pid)
696

    
697
  def HandleRequest(self, req):
698
    raise NotImplementedError()
699

    
700

    
701
class HttpClientRequest(object):
702
  def __init__(self, host, port, method, path, headers=None, post_data=None,
703
               ssl_params=None, ssl_verify_peer=False):
704
    """Describes an HTTP request.
705

706
    @type host: string
707
    @param host: Hostname
708
    @type port: int
709
    @param port: Port
710
    @type method: string
711
    @param method: Method name
712
    @type path: string
713
    @param path: Request path
714
    @type headers: dict or None
715
    @param headers: Additional headers to send
716
    @type post_data: string or None
717
    @param post_data: Additional data to send
718
    @type ssl_params: HttpSslParams
719
    @param ssl_params: SSL key and certificate
720
    @type ssl_verify_peer: bool
721
    @param ssl_verify_peer: Whether to compare our certificate with server's
722
                            certificate
723

724
    """
725
    if post_data is not None:
726
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
727
        "Only POST and GET requests support sending data"
728

    
729
    assert path.startswith("/"), "Path must start with slash (/)"
730

    
731
    self.host = host
732
    self.port = port
733
    self.ssl_params = ssl_params
734
    self.ssl_verify_peer = ssl_verify_peer
735
    self.method = method
736
    self.path = path
737
    self.headers = headers
738
    self.post_data = post_data
739

    
740
    self.success = None
741
    self.error = None
742

    
743
    self.resp_status_line = None
744
    self.resp_version = None
745
    self.resp_status = None
746
    self.resp_reason = None
747
    self.resp_headers = None
748
    self.resp_body = None
749

    
750

    
751
class HttpClientRequestExecutor(_HttpSocketBase):
752
  # Default headers
753
  DEFAULT_HEADERS = {
754
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
755
    # TODO: For keep-alive, don't send "Connection: close"
756
    HTTP_CONNECTION: "close",
757
    }
758

    
759
  # Length limits
760
  STATUS_LINE_LENGTH_MAX = 512
761
  HEADER_LENGTH_MAX = 4 * 1024
762

    
763
  # Timeouts in seconds for socket layer
764
  # TODO: Make read timeout configurable per OpCode
765
  CONNECT_TIMEOUT = 5.0
766
  WRITE_TIMEOUT = 10
767
  READ_TIMEOUT = None
768
  CLOSE_TIMEOUT = 1
769

    
770
  # Parser state machine
771
  PS_STATUS_LINE = "status-line"
772
  PS_HEADERS = "headers"
773
  PS_BODY = "body"
774
  PS_COMPLETE = "complete"
775

    
776
  # Socket operations
777
  (OP_SEND,
778
   OP_RECV,
779
   OP_CLOSE_CHECK,
780
   OP_SHUTDOWN) = range(4)
781

    
782
  def __init__(self, req):
783
    """Initializes the HttpClientRequestExecutor class.
784

785
    @type req: HttpClientRequest
786
    @param req: Request object
787

788
    """
789
    _HttpSocketBase.__init__(self)
790

    
791
    self.request = req
792

    
793
    self.parser_status = self.PS_STATUS_LINE
794
    self.header_buffer = StringIO()
795
    self.body_buffer = StringIO()
796
    self.content_length = None
797
    self.server_will_close = None
798

    
799
    self.poller = select.poll()
800

    
801
    try:
802
      # TODO: Implement connection caching/keep-alive
803
      self.sock = self._CreateSocket(req.ssl_params,
804
                                     req.ssl_verify_peer)
805

    
806
      # Disable Python's timeout
807
      self.sock.settimeout(None)
808

    
809
      # Operate in non-blocking mode
810
      self.sock.setblocking(0)
811

    
812
      force_close = True
813
      self._Connect()
814
      try:
815
        self._SendRequest()
816
        self._ReadResponse()
817

    
818
        # Only wait for server to close if we didn't have any exception.
819
        force_close = False
820
      finally:
821
        self._CloseConnection(force_close)
822

    
823
      self.sock.close()
824
      self.sock = None
825

    
826
      req.resp_body = self.body_buffer.getvalue()
827

    
828
      req.success = True
829
      req.error = None
830

    
831
    except _HttpClientError, err:
832
      req.success = False
833
      req.error = str(err)
834

    
835
  def _BuildRequest(self):
836
    """Build HTTP request.
837

838
    @rtype: string
839
    @return: Complete request
840

841
    """
842
    # Headers
843
    send_headers = self.DEFAULT_HEADERS.copy()
844

    
845
    if self.request.headers:
846
      send_headers.update(self.request.headers)
847

    
848
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
849

    
850
    if self.request.post_data:
851
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
852

    
853
    buf = StringIO()
854

    
855
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
856
    # keep-alive).
857
    # TODO: For keep-alive, change to HTTP/1.1
858
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
859
                                self.request.path, HTTP_1_0))
860

    
861
    # Add headers
862
    for name, value in send_headers.iteritems():
863
      buf.write("%s: %s\r\n" % (name, value))
864

    
865
    buf.write("\r\n")
866

    
867
    if self.request.post_data:
868
      buf.write(self.request.post_data)
869

    
870
    return buf.getvalue()
871

    
872
  def _ParseStatusLine(self):
873
    """Parses the status line sent by the server.
874

875
    """
876
    line = self.request.resp_status_line
877

    
878
    if not line:
879
      raise _HttpClientError("Empty status line")
880

    
881
    try:
882
      [version, status, reason] = line.split(None, 2)
883
    except ValueError:
884
      try:
885
        [version, status] = line.split(None, 1)
886
        reason = ""
887
      except ValueError:
888
        version = HTTP_9_0
889

    
890
    if version:
891
      version = version.upper()
892

    
893
    if version not in (HTTP_1_0, HTTP_1_1):
894
      # We do not support HTTP/0.9, despite the specification requiring it
895
      # (RFC2616, section 19.6)
896
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
897
                             line)
898

    
899
    # The status code is a three-digit number
900
    try:
901
      status = int(status)
902
      if status < 100 or status > 999:
903
        status = -1
904
    except ValueError:
905
      status = -1
906

    
907
    if status == -1:
908
      raise _HttpClientError("Invalid status code (%r)" % line)
909

    
910
    self.request.resp_version = version
911
    self.request.resp_status = status
912
    self.request.resp_reason = reason
913

    
914
  def _WillServerCloseConnection(self):
915
    """Evaluate whether server will close the connection.
916

917
    @rtype: bool
918
    @return: Whether server will close the connection
919

920
    """
921
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
922
    if hdr_connection:
923
      hdr_connection = hdr_connection.lower()
924

    
925
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
926
    if self.request.resp_version == HTTP_1_1:
927
      return (hdr_connection and "close" in hdr_connection)
928

    
929
    # Some HTTP/1.0 implementations have support for persistent connections,
930
    # using rules different than HTTP/1.1.
931

    
932
    # For older HTTP, Keep-Alive indicates persistent connection.
933
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
934
      return False
935

    
936
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
937
    # supposed to be sent by the client.
938
    if hdr_connection and "keep-alive" in hdr_connection:
939
      return False
940

    
941
    return True
942

    
943
  def _ParseHeaders(self):
944
    """Parses the headers sent by the server.
945

946
    This function also adjusts internal variables based on the header values.
947

948
    """
949
    req = self.request
950

    
951
    # Parse headers
952
    self.header_buffer.seek(0, 0)
953
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
954

    
955
    self.server_will_close = self._WillServerCloseConnection()
956

    
957
    # Do we have a Content-Length header?
958
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
959
    if hdr_content_length:
960
      try:
961
        self.content_length = int(hdr_content_length)
962
      except ValueError:
963
        pass
964
      if self.content_length is not None and self.content_length < 0:
965
        self.content_length = None
966

    
967
    # does the body have a fixed length? (of zero)
968
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
969
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
970
      self.content_length = 0
971

    
972
    # if the connection remains open and a content-length was not provided,
973
    # then assume that the connection WILL close.
974
    if self.content_length is None:
975
      self.server_will_close = True
976

    
977
  def _CheckStatusLineLength(self, length):
978
    if length > self.STATUS_LINE_LENGTH_MAX:
979
      raise _HttpClientError("Status line longer than %d chars" %
980
                             self.STATUS_LINE_LENGTH_MAX)
981

    
982
  def _CheckHeaderLength(self, length):
983
    if length > self.HEADER_LENGTH_MAX:
984
      raise _HttpClientError("Headers longer than %d chars" %
985
                             self.HEADER_LENGTH_MAX)
986

    
987
  def _ParseBuffer(self, buf, eof):
988
    """Main function for HTTP response state machine.
989

990
    @type buf: string
991
    @param buf: Receive buffer
992
    @type eof: bool
993
    @param eof: Whether we've reached EOF on the socket
994
    @rtype: string
995
    @return: Updated receive buffer
996

997
    """
998
    if self.parser_status == self.PS_STATUS_LINE:
999
      # Expect status line
1000
      idx = buf.find("\r\n")
1001
      if idx >= 0:
1002
        self.request.resp_status_line = buf[:idx]
1003

    
1004
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1005

    
1006
        # Remove status line, including CRLF
1007
        buf = buf[idx + 2:]
1008

    
1009
        self._ParseStatusLine()
1010

    
1011
        self.parser_status = self.PS_HEADERS
1012
      else:
1013
        # Check whether incoming data is getting too large, otherwise we just
1014
        # fill our read buffer.
1015
        self._CheckStatusLineLength(len(buf))
1016

    
1017
    if self.parser_status == self.PS_HEADERS:
1018
      # Wait for header end
1019
      idx = buf.find("\r\n\r\n")
1020
      if idx >= 0:
1021
        self.header_buffer.write(buf[:idx + 2])
1022

    
1023
        self._CheckHeaderLength(self.header_buffer.tell())
1024

    
1025
        # Remove headers, including CRLF
1026
        buf = buf[idx + 4:]
1027

    
1028
        self._ParseHeaders()
1029

    
1030
        self.parser_status = self.PS_BODY
1031
      else:
1032
        # Check whether incoming data is getting too large, otherwise we just
1033
        # fill our read buffer.
1034
        self._CheckHeaderLength(len(buf))
1035

    
1036
    if self.parser_status == self.PS_BODY:
1037
      self.body_buffer.write(buf)
1038
      buf = ""
1039

    
1040
      # Check whether we've read everything
1041
      if (eof or
1042
          (self.content_length is not None and
1043
           self.body_buffer.tell() >= self.content_length)):
1044
        self.parser_status = self.PS_COMPLETE
1045

    
1046
    return buf
1047

    
1048
  def _SocketOperation(self, op, arg1, error_msg, timeout_msg):
1049
    """Wrapper around socket functions.
1050

1051
    This function abstracts error handling for socket operations, especially
1052
    for the complicated interaction with OpenSSL.
1053

1054
    """
1055
    if op == self.OP_SEND:
1056
      event_poll = select.POLLOUT
1057
      event_check = select.POLLOUT
1058
      timeout = self.WRITE_TIMEOUT
1059

    
1060
    elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1061
      event_poll = select.POLLIN
1062
      event_check = select.POLLIN | select.POLLPRI
1063
      if op == self.OP_CLOSE_CHECK:
1064
        timeout = self.CLOSE_TIMEOUT
1065
      else:
1066
        timeout = self.READ_TIMEOUT
1067

    
1068
    elif op == self.OP_SHUTDOWN:
1069
      event_poll = None
1070
      event_check = None
1071

    
1072
      # The timeout is only used when OpenSSL requests polling for a condition.
1073
      # It is not advisable to have no timeout for shutdown.
1074
      timeout = self.WRITE_TIMEOUT
1075

    
1076
    else:
1077
      raise AssertionError("Invalid socket operation")
1078

    
1079
    # No override by default
1080
    event_override = 0
1081

    
1082
    while True:
1083
      # Poll only for certain operations and when asked for by an override
1084
      if (event_override or
1085
          op in (self.OP_SEND, self.OP_RECV, self.OP_CLOSE_CHECK)):
1086
        if event_override:
1087
          wait_for_event = event_override
1088
        else:
1089
          wait_for_event = event_poll
1090

    
1091
        event = WaitForSocketCondition(self.sock, self.poller, wait_for_event,
1092
                                       timeout)
1093
        if event is None:
1094
          raise _HttpClientTimeout(timeout_msg)
1095

    
1096
        if (op == self.OP_RECV and
1097
            event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
1098
          return ""
1099

    
1100
        if not event & wait_for_event:
1101
          continue
1102

    
1103
      # Reset override
1104
      event_override = 0
1105

    
1106
      try:
1107
        try:
1108
          if op == self.OP_SEND:
1109
            return self.sock.send(arg1)
1110

    
1111
          elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1112
            return self.sock.recv(arg1)
1113

    
1114
          elif op == self.OP_SHUTDOWN:
1115
            if self._using_ssl:
1116
              # PyOpenSSL's shutdown() doesn't take arguments
1117
              return self.sock.shutdown()
1118
            else:
1119
              return self.sock.shutdown(arg1)
1120

    
1121
        except OpenSSL.SSL.WantWriteError:
1122
          # OpenSSL wants to write, poll for POLLOUT
1123
          event_override = select.POLLOUT
1124
          continue
1125

    
1126
        except OpenSSL.SSL.WantReadError:
1127
          # OpenSSL wants to read, poll for POLLIN
1128
          event_override = select.POLLIN | select.POLLPRI
1129
          continue
1130

    
1131
        except OpenSSL.SSL.WantX509LookupError:
1132
          continue
1133

    
1134
        except OpenSSL.SSL.SysCallError, err:
1135
          if op == self.OP_SEND:
1136
            # arg1 is the data when writing
1137
            if err.args and err.args[0] == -1 and arg1 == "":
1138
              # errors when writing empty strings are expected
1139
              # and can be ignored
1140
              return 0
1141

    
1142
          elif op == self.OP_RECV:
1143
            if err.args == (-1, _SSL_UNEXPECTED_EOF):
1144
              return ""
1145

    
1146
          raise socket.error(err.args)
1147

    
1148
        except OpenSSL.SSL.Error, err:
1149
          raise socket.error(err.args)
1150

    
1151
      except socket.error, err:
1152
        if err.args and err.args[0] == errno.EAGAIN:
1153
          # Ignore EAGAIN
1154
          continue
1155

    
1156
        raise _HttpClientError("%s: %s" % (error_msg, str(err)))
1157

    
1158
  def _Connect(self):
1159
    """Non-blocking connect to host with timeout.
1160

1161
    """
1162
    connected = False
1163
    while True:
1164
      try:
1165
        connect_error = self.sock.connect_ex((self.request.host,
1166
                                              self.request.port))
1167
      except socket.gaierror, err:
1168
        raise _HttpClientError("Connection failed: %s" % str(err))
1169

    
1170
      if connect_error == errno.EINTR:
1171
        # Mask signals
1172
        pass
1173

    
1174
      elif connect_error == 0:
1175
        # Connection established
1176
        connected = True
1177
        break
1178

    
1179
      elif connect_error == errno.EINPROGRESS:
1180
        # Connection started
1181
        break
1182

    
1183
      raise _HttpClientError("Connection failed (%s: %s)" %
1184
                             (connect_error, os.strerror(connect_error)))
1185

    
1186
    if not connected:
1187
      # Wait for connection
1188
      event = WaitForSocketCondition(self.sock, self.poller,
1189
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1190
      if event is None:
1191
        raise _HttpClientError("Timeout while connecting to server")
1192

    
1193
      # Get error code
1194
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1195
      if connect_error != 0:
1196
        raise _HttpClientError("Connection failed (%s: %s)" %
1197
                               (connect_error, os.strerror(connect_error)))
1198

    
1199
    # Enable TCP keep-alive
1200
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1201

    
1202
    # If needed, Linux specific options are available to change the TCP
1203
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1204
    # TCP_KEEPINTVL.
1205

    
1206
  def _SendRequest(self):
1207
    """Sends request to server.
1208

1209
    """
1210
    buf = self._BuildRequest()
1211

    
1212
    while buf:
1213
      # Send only 4 KB at a time
1214
      data = buf[:4096]
1215

    
1216
      sent = self._SocketOperation(self.OP_SEND, data,
1217
                                   "Error while sending request",
1218
                                   "Timeout while sending request")
1219

    
1220
      # Remove sent bytes
1221
      buf = buf[sent:]
1222

    
1223
    assert not buf, "Request wasn't sent completely"
1224

    
1225
  def _ReadResponse(self):
1226
    """Read response from server.
1227

1228
    Calls the parser function after reading a chunk of data.
1229

1230
    """
1231
    buf = ""
1232
    eof = False
1233
    while self.parser_status != self.PS_COMPLETE:
1234
      data = self._SocketOperation(self.OP_RECV, 4096,
1235
                                   "Error while reading response",
1236
                                   "Timeout while reading response")
1237

    
1238
      if data:
1239
        buf += data
1240
      else:
1241
        eof = True
1242

    
1243
      # Do some parsing and error checking while more data arrives
1244
      buf = self._ParseBuffer(buf, eof)
1245

    
1246
      # Must be done only after the buffer has been evaluated
1247
      if (eof and
1248
          self.parser_status in (self.PS_STATUS_LINE,
1249
                                 self.PS_HEADERS)):
1250
        raise _HttpClientError("Connection closed prematurely")
1251

    
1252
    # Parse rest
1253
    buf = self._ParseBuffer(buf, True)
1254

    
1255
    assert self.parser_status == self.PS_COMPLETE
1256
    assert not buf, "Parser didn't read full response"
1257

    
1258
  def _CloseConnection(self, force):
1259
    """Closes the connection.
1260

1261
    """
1262
    if self.server_will_close and not force:
1263
      # Wait for server to close
1264
      try:
1265
        # Check whether it's actually closed
1266
        if not self._SocketOperation(self.OP_CLOSE_CHECK, 1,
1267
                                     "Error", "Timeout"):
1268
          return
1269
      except (socket.error, _HttpClientError):
1270
        # Ignore errors at this stage
1271
        pass
1272

    
1273
    # Close the connection from our side
1274
    self._SocketOperation(self.OP_SHUTDOWN, socket.SHUT_RDWR,
1275
                          "Error while shutting down connection",
1276
                          "Timeout while shutting down connection")
1277

    
1278

    
1279
class _HttpClientPendingRequest(object):
1280
  """Data class for pending requests.
1281

1282
  """
1283
  def __init__(self, request):
1284
    self.request = request
1285

    
1286
    # Thread synchronization
1287
    self.done = threading.Event()
1288

    
1289

    
1290
class HttpClientWorker(workerpool.BaseWorker):
1291
  """HTTP client worker class.
1292

1293
  """
1294
  def RunTask(self, pend_req):
1295
    try:
1296
      HttpClientRequestExecutor(pend_req.request)
1297
    finally:
1298
      pend_req.done.set()
1299

    
1300

    
1301
class HttpClientWorkerPool(workerpool.WorkerPool):
1302
  def __init__(self, manager):
1303
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1304
                                   HttpClientWorker)
1305
    self.manager = manager
1306

    
1307

    
1308
class HttpClientManager(object):
1309
  """Manages HTTP requests.
1310

1311
  """
1312
  def __init__(self):
1313
    self._wpool = HttpClientWorkerPool(self)
1314

    
1315
  def __del__(self):
1316
    self.Shutdown()
1317

    
1318
  def ExecRequests(self, requests):
1319
    """Execute HTTP requests.
1320

1321
    This function can be called from multiple threads at the same time.
1322

1323
    @type requests: List of HttpClientRequest instances
1324
    @param requests: The requests to execute
1325
    @rtype: List of HttpClientRequest instances
1326
    @returns: The list of requests passed in
1327

1328
    """
1329
    # _HttpClientPendingRequest is used for internal thread synchronization
1330
    pending = [_HttpClientPendingRequest(req) for req in requests]
1331

    
1332
    try:
1333
      # Add requests to queue
1334
      for pend_req in pending:
1335
        self._wpool.AddTask(pend_req)
1336

    
1337
    finally:
1338
      # In case of an exception we should still wait for the rest, otherwise
1339
      # another thread from the worker pool could modify the request object
1340
      # after we returned.
1341

    
1342
      # And wait for them to finish
1343
      for pend_req in pending:
1344
        pend_req.done.wait()
1345

    
1346
    # Return original list
1347
    return requests
1348

    
1349
  def Shutdown(self):
1350
    self._wpool.Quiesce()
1351
    self._wpool.TerminateWorkers()
1352

    
1353

    
1354
class _SSLFileObject(object):
1355
  """Wrapper around socket._fileobject
1356

1357
  This wrapper is required to handle OpenSSL exceptions.
1358

1359
  """
1360
  def _RequireOpenSocket(fn):
1361
    def wrapper(self, *args, **kwargs):
1362
      if self.closed:
1363
        raise SocketClosed("Socket is closed")
1364
      return fn(self, *args, **kwargs)
1365
    return wrapper
1366

    
1367
  def __init__(self, sock, mode='rb', bufsize=-1):
1368
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1369

    
1370
  def _ConnectionLost(self):
1371
    self._base = None
1372

    
1373
  def _getclosed(self):
1374
    return self._base is None or self._base.closed
1375
  closed = property(_getclosed, doc="True if the file is closed")
1376

    
1377
  @_RequireOpenSocket
1378
  def close(self):
1379
    return self._base.close()
1380

    
1381
  @_RequireOpenSocket
1382
  def flush(self):
1383
    return self._base.flush()
1384

    
1385
  @_RequireOpenSocket
1386
  def fileno(self):
1387
    return self._base.fileno()
1388

    
1389
  @_RequireOpenSocket
1390
  def read(self, size=-1):
1391
    return self._ReadWrapper(self._base.read, size=size)
1392

    
1393
  @_RequireOpenSocket
1394
  def readline(self, size=-1):
1395
    return self._ReadWrapper(self._base.readline, size=size)
1396

    
1397
  def _ReadWrapper(self, fn, *args, **kwargs):
1398
    while True:
1399
      try:
1400
        return fn(*args, **kwargs)
1401

    
1402
      except OpenSSL.SSL.ZeroReturnError, err:
1403
        self._ConnectionLost()
1404
        return ""
1405

    
1406
      except OpenSSL.SSL.WantReadError:
1407
        continue
1408

    
1409
      #except OpenSSL.SSL.WantWriteError:
1410
      # TODO
1411

    
1412
      except OpenSSL.SSL.SysCallError, (retval, desc):
1413
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1414
            or retval > 0):
1415
          self._ConnectionLost()
1416
          return ""
1417

    
1418
        logging.exception("Error in OpenSSL")
1419
        self._ConnectionLost()
1420
        raise socket.error(err.args)
1421

    
1422
      except OpenSSL.SSL.Error, err:
1423
        self._ConnectionLost()
1424
        raise socket.error(err.args)
1425

    
1426
  @_RequireOpenSocket
1427
  def write(self, data):
1428
    return self._WriteWrapper(self._base.write, data)
1429

    
1430
  def _WriteWrapper(self, fn, *args, **kwargs):
1431
    while True:
1432
      try:
1433
        return fn(*args, **kwargs)
1434
      except OpenSSL.SSL.ZeroReturnError, err:
1435
        self._ConnectionLost()
1436
        return 0
1437

    
1438
      except OpenSSL.SSL.WantWriteError:
1439
        continue
1440

    
1441
      #except OpenSSL.SSL.WantReadError:
1442
      # TODO
1443

    
1444
      except OpenSSL.SSL.SysCallError, err:
1445
        if err.args[0] == -1 and data == "":
1446
          # errors when writing empty strings are expected
1447
          # and can be ignored
1448
          return 0
1449

    
1450
        self._ConnectionLost()
1451
        raise socket.error(err.args)
1452

    
1453
      except OpenSSL.SSL.Error, err:
1454
        self._ConnectionLost()
1455
        raise socket.error(err.args)