Revision cc28af80

b/daemons/ganeti-noded
40 40
from ganeti import objects
41 41
from ganeti import errors
42 42
from ganeti import jstore
43
from ganeti import daemon
43 44
from ganeti import http
44 45
from ganeti import utils
45 46

  
......
65 66
  return wrapper
66 67

  
67 68

  
68
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
69
class NodeHttpServer(http.HttpServer):
69 70
  """The server implementation.
70 71

  
71 72
  This class holds all methods exposed over the RPC interface.
72 73

  
73 74
  """
74
  def HandleRequest(self):
75
  def __init__(self, *args, **kwargs):
76
    http.HttpServer.__init__(self, *args, **kwargs)
77
    self.noded_pid = os.getpid()
78

  
79
  def HandleRequest(self, req):
75 80
    """Handle a request.
76 81

  
77 82
    """
78
    if self.command.upper() != "PUT":
83
    if req.request_method.upper() != "PUT":
79 84
      raise http.HTTPBadRequest()
80 85

  
81
    path = self.path
86
    path = req.request_path
82 87
    if path.startswith("/"):
83 88
      path = path[1:]
84 89

  
85 90
    method = getattr(self, "perspective_%s" % path, None)
86 91
    if method is None:
87
      raise httperror.HTTPNotFound()
92
      raise http.HTTPNotFound()
88 93

  
89 94
    try:
90 95
      try:
91
        return method(self.post_data)
96
        return method(req.request_post_data)
92 97
      except:
93 98
        logging.exception("Error in RPC call")
94 99
        raise
95 100
    except errors.QuitGanetiException, err:
96 101
      # Tell parent to quit
97
      os.kill(self.server.noded_pid, signal.SIGTERM)
102
      os.kill(self.noded_pid, signal.SIGTERM)
98 103

  
99 104
  # the new block devices  --------------------------
100 105

  
......
594 599
    return backend.JobQueueRename(old, new)
595 600

  
596 601

  
597
class NodeDaemonHttpServer(http.HTTPServer):
598
  def __init__(self, server_address):
599
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
600
    self.noded_pid = os.getpid()
601

  
602
  def serve_forever(self):
603
    """Handle requests until told to quit."""
604
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
605
    try:
606
      while not sighandler.called:
607
        self.handle_request()
608
      # TODO: There could be children running at this point
609
    finally:
610
      sighandler.Reset()
611

  
612

  
613
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
614
  """Forking HTTP Server.
615

  
616
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
617
  request we handle. This allows more requests to be handled concurrently.
618

  
619
  """
620

  
621

  
622 602
def ParseOptions():
623 603
  """Parse the command line options.
624 604

  
......
681 661
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
682 662

  
683 663
  utils.WritePidFile(constants.NODED_PID)
664
  try:
665
    logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
666
                        stderr_logging=not options.fork)
667
    logging.info("ganeti node daemon startup")
684 668

  
685
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
686
                      stderr_logging=not options.fork)
687
  logging.info("ganeti node daemon startup")
688

  
689
  # Prepare job queue
690
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
691

  
692
  if options.fork:
693
    server = ForkingHTTPServer(('', port))
694
  else:
695
    server = NodeDaemonHttpServer(('', port))
669
    # Prepare job queue
670
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
696 671

  
697
  try:
698
    server.serve_forever()
672
    mainloop = daemon.Mainloop()
673
    server = NodeHttpServer(mainloop, ("", port))
674
    server.Start()
675
    try:
676
      mainloop.Run()
677
    finally:
678
      server.Stop()
699 679
  finally:
700 680
    utils.RemovePidFile(constants.NODED_PID)
701 681

  

Also available in: Unified diff