Export backend.GetMasterInfo over the rpc layer
[ganeti-local] / daemons / ganeti-noded
index b826724..3de30d3 100755 (executable)
 import os
 import sys
 import traceback
 import os
 import sys
 import traceback
-import BaseHTTPServer
-import simplejson
+import SocketServer
 import errno
 import errno
+import logging
+import signal
 
 from optparse import OptionParser
 
 
 from optparse import OptionParser
 
-
 from ganeti import backend
 from ganeti import logger
 from ganeti import constants
 from ganeti import objects
 from ganeti import errors
 from ganeti import backend
 from ganeti import logger
 from ganeti import constants
 from ganeti import objects
 from ganeti import errors
+from ganeti import jstore
 from ganeti import ssconf
 from ganeti import ssconf
+from ganeti import http
 from ganeti import utils
 
 
 from ganeti import utils
 
 
-class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
+queue_lock = None
+
+
+def _RequireJobQueueLock(fn):
+  """Decorator for job queue manipulating functions.
+
+  """
+  def wrapper(*args, **kwargs):
+    # Locking in exclusive, blocking mode because there could be several
+    # children running at the same time. Waiting up to 10 seconds.
+    queue_lock.Exclusive(blocking=True, timeout=10)
+    try:
+      return fn(*args, **kwargs)
+    finally:
+      queue_lock.Unlock()
+  return wrapper
+
+
+class NodeDaemonRequestHandler(http.HTTPRequestHandler):
   """The server implementation.
 
   This class holds all methods exposed over the RPC interface.
 
   """
   """The server implementation.
 
   This class holds all methods exposed over the RPC interface.
 
   """
-  def do_PUT(self):
-    """Handle a post request.
+  def HandleRequest(self):
+    """Handle a request.
 
     """
 
     """
+    if self.command.upper() != "PUT":
+      raise http.HTTPBadRequest()
+
     path = self.path
     if path.startswith("/"):
       path = path[1:]
     path = self.path
     if path.startswith("/"):
       path = path[1:]
-    mname = "perspective_%s" % path
-    if not hasattr(self, mname):
-      self.send_error(404)
-      return False
 
 
-    method = getattr(self, mname)
-    try:
-      body_length = int(self.headers.get('Content-Length', '0'))
-    except ValueError:
-      self.send_error(400, 'No Content-Length header or invalid format')
-      return False
+    method = getattr(self, "perspective_%s" % path, None)
+    if method is None:
+      raise httperror.HTTPNotFound()
 
     try:
 
     try:
-      body = self.rfile.read(body_length)
-    except socket.error, err:
-      logger.Error("Socket error while reading: %s" % str(err))
-      return
-    try:
-      params = simplejson.loads(body)
-      result = method(params)
-      payload = simplejson.dumps(result)
-    except Exception, err:
-      self.send_error(500, "Error: %s" % str(err))
-      return False
-    self.send_response(200)
-    self.send_header('Content-Length', str(len(payload)))
-    self.end_headers()
-    self.wfile.write(payload)
-    return True
-
-  def log_message(self, format, *args):
-    """Log a request to the log.
-
-    This is the same as the parent, we just log somewhere else.
-
-    """
-    msg = ("%s - - [%s] %s" %
-           (self.address_string(),
-            self.log_date_time_string(),
-            format % args))
-    logger.Debug(msg)
+      try:
+        return method(self.post_data)
+      except:
+        logging.exception("Error in RPC call")
+        raise
+    except errors.QuitGanetiException, err:
+      # Tell parent to quit
+      os.kill(self.server.noded_pid, signal.SIGTERM)
 
   # the new block devices  --------------------------
 
 
   # the new block devices  --------------------------
 
@@ -434,14 +430,14 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
     """Promote this node to master status.
 
     """
     """Promote this node to master status.
 
     """
-    return backend.StartMaster()
+    return backend.StartMaster(params[0])
 
   @staticmethod
   def perspective_node_stop_master(params):
     """Demote this node from master status.
 
     """
 
   @staticmethod
   def perspective_node_stop_master(params):
     """Demote this node from master status.
 
     """
-    return backend.StopMaster()
+    return backend.StopMaster(params[0])
 
   @staticmethod
   def perspective_node_leave_cluster(params):
 
   @staticmethod
   def perspective_node_leave_cluster(params):
@@ -476,6 +472,12 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
     """
     return backend.UploadFile(*params)
 
     """
     return backend.UploadFile(*params)
 
+  @staticmethod
+  def perspective_master_info(params):
+    """Query master information.
+
+    """
+    return backend.GetMasterInfo()
 
   # os -----------------------
 
 
   # os -----------------------
 
@@ -530,6 +532,8 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
     duration = params[0]
     return utils.TestDelay(duration)
 
     duration = params[0]
     return utils.TestDelay(duration)
 
+  # file storage ---------------
+
   @staticmethod
   def perspective_file_storage_dir_create(params):
     """Create the file storage directory.
   @staticmethod
   def perspective_file_storage_dir_create(params):
     """Create the file storage directory.
@@ -556,6 +560,60 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
     return backend.RenameFileStorageDir(old_file_storage_dir,
                                         new_file_storage_dir)
 
     return backend.RenameFileStorageDir(old_file_storage_dir,
                                         new_file_storage_dir)
 
+  # jobs ------------------------
+
+  @staticmethod
+  @_RequireJobQueueLock
+  def perspective_jobqueue_update(params):
+    """Update job queue.
+
+    """
+    (file_name, content) = params
+    return backend.JobQueueUpdate(file_name, content)
+
+  @staticmethod
+  @_RequireJobQueueLock
+  def perspective_jobqueue_purge(params):
+    """Purge job queue.
+
+    """
+    return backend.JobQueuePurge()
+
+  @staticmethod
+  @_RequireJobQueueLock
+  def perspective_jobqueue_rename(params):
+    """Rename a job queue file.
+
+    """
+    (old, new) = params
+
+    return backend.JobQueueRename(old, new)
+
+
+class NodeDaemonHttpServer(http.HTTPServer):
+  def __init__(self, server_address):
+    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
+    self.noded_pid = os.getpid()
+
+  def serve_forever(self):
+    """Handle requests until told to quit."""
+    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
+    try:
+      while not sighandler.called:
+        self.handle_request()
+      # TODO: There could be children running at this point
+    finally:
+      sighandler.Reset()
+
+
+class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
+  """Forking HTTP Server.
+
+  This inherits from ForkingMixIn and HTTPServer in order to fork for each
+  request we handle. This allows more requests to be handled concurrently.
+
+  """
+
 
 def ParseOptions():
   """Parse the command line options.
 
 def ParseOptions():
   """Parse the command line options.
@@ -583,6 +641,8 @@ def main():
   """Main function for the node daemon.
 
   """
   """Main function for the node daemon.
 
   """
+  global queue_lock
+
   options, args = ParseOptions()
   utils.debug = options.debug
   for fname in (constants.SSL_CERT_FILE,):
   options, args = ParseOptions()
   utils.debug = options.debug
   for fname in (constants.SSL_CERT_FILE,):
@@ -598,29 +658,43 @@ def main():
     print "Cluster configuration incomplete: '%s'" % str(err)
     sys.exit(5)
 
     print "Cluster configuration incomplete: '%s'" % str(err)
     sys.exit(5)
 
-  # create /var/run/ganeti if not existing, in order to take care of
-  # tmpfs /var/run
-  if not os.path.exists(constants.BDEV_CACHE_DIR):
-    try:
-      os.mkdir(constants.BDEV_CACHE_DIR, 0755)
-    except EnvironmentError, err:
-      if err.errno != errno.EEXIST:
-        print ("Node setup wrong, cannot create directory %s: %s" %
-               (constants.BDEV_CACHE_DIR, err))
-        sys.exit(5)
-  if not os.path.isdir(constants.BDEV_CACHE_DIR):
-    print ("Node setup wrong, %s is not a directory" %
-           constants.BDEV_CACHE_DIR)
-    sys.exit(5)
+  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
+  # situation where RUN_DIR is tmpfs
+  for dir_name in constants.SUB_RUN_DIRS:
+    if not os.path.exists(dir_name):
+      try:
+        os.mkdir(dir_name, 0755)
+      except EnvironmentError, err:
+        if err.errno != errno.EEXIST:
+          print ("Node setup wrong, cannot create directory %s: %s" %
+                 (dir_name, err))
+          sys.exit(5)
+    if not os.path.isdir(dir_name):
+      print ("Node setup wrong, %s is not a directory" % dir_name)
+      sys.exit(5)
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_NODESERVER)
 
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_NODESERVER)
 
-  logger.SetupLogging(program="ganeti-noded", debug=options.debug)
+  utils.WritePidFile(constants.NODED_PID)
 
 
-  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
-  httpd.serve_forever()
+  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
+                      stderr_logging=not options.fork)
+  logging.info("ganeti node daemon startup")
+
+  # Prepare job queue
+  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
+
+  if options.fork:
+    server = ForkingHTTPServer(('', port))
+  else:
+    server = NodeDaemonHttpServer(('', port))
+
+  try:
+    server.serve_forever()
+  finally:
+    utils.RemovePidFile(constants.NODED_PID)
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':