Add new “daemon-util” script to start/stop Ganeti daemons
[ganeti-local] / daemons / ganeti-masterd
index 57fa9ec..4e77c71 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/python -u
+#!/usr/bin/python
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -35,7 +35,6 @@ import collections
 import signal
 import logging
 
-from cStringIO import StringIO
 from optparse import OptionParser
 
 from ganeti import config
@@ -108,14 +107,17 @@ class IOServer(SocketServer.UnixStreamServer):
     """
     self.request_workers.AddTask(self, request, client_address)
 
-  def serve_forever(self):
+  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
+  def serve_forever(self, signal_handlers=None):
     """Handle one request at a time until told to quit."""
-    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
-    try:
-      while not sighandler.called:
-        self.handle_request()
-    finally:
-      sighandler.Reset()
+    assert isinstance(signal_handlers, dict) and \
+           len(signal_handlers) > 0, \
+           "Broken SignalHandled decorator"
+    # Since we use SignalHandled only once, the resulting dict will map all
+    # signals to the same handler. We'll just use the first one.
+    sighandler = signal_handlers.values()[0]
+    while not sighandler.called:
+      self.handle_request()
 
   def server_cleanup(self):
     """Cleanup the server.
@@ -168,7 +170,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         success = True
       except errors.GenericError, err:
         success = False
-        result = (err.__class__.__name__, err.args)
+        result = errors.EncodeException(err)
       except:
         logging.error("Unexpected exception", exc_info=True)
         err = sys.exc_info()
@@ -254,7 +256,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
@@ -263,7 +266,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
@@ -271,7 +275,8 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
@@ -293,20 +298,32 @@ class ClientOps:
                    drain_flag)
       return queue.SetDrainFlag(drain_flag)
 
+    elif method == luxi.REQ_SET_WATCHER_PAUSE:
+      (until, ) = args
+
+      if until is None:
+        logging.info("Received request to no longer pause the watcher")
+      else:
+        if not isinstance(until, (int, float)):
+          raise TypeError("Duration must be an integer or float")
+
+        if until < time.time():
+          raise errors.GenericError("Unable to set pause end time in the past")
+
+        logging.info("Received request to pause the watcher until %s", until)
+
+      return _SetWatcherPause(until)
+
     else:
       logging.info("Received invalid request '%s'", method)
       raise ValueError("Invalid operation '%s'" % method)
 
-  def _DummyLog(self, *args):
-    pass
-
   def _Query(self, op):
     """Runs the specified opcode and returns the result.
 
     """
     proc = mcpu.Processor(self.server.context)
-    # TODO: Where should log messages go?
-    return proc.ExecOpCode(op, self._DummyLog, None)
+    return proc.ExecOpCode(op, None)
 
 
 class GanetiContext(object):
@@ -381,6 +398,22 @@ class GanetiContext(object):
     self.glm.remove(locking.LEVEL_NODE, name)
 
 
+def _SetWatcherPause(until):
+  """Creates or removes the watcher pause file.
+
+  @type until: None or int
+  @param until: Unix timestamp saying until when the watcher shouldn't run
+
+  """
+  if until is None:
+    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
+  else:
+    utils.WriteFile(constants.WATCHER_PAUSEFILE,
+                    data="%d\n" % (until, ))
+
+  return until
+
+
 def CheckAgreement():
   """Check the agreement on who is the master.
 
@@ -424,6 +457,7 @@ def CheckAgreement():
   # here a real node is at the top of the list
   all_votes = sum(item[1] for item in votes)
   top_node, top_votes = votes[0]
+
   result = False
   if top_node != myself:
     logging.critical("It seems we are not the master (top-voted node"
@@ -437,33 +471,91 @@ def CheckAgreement():
 
   return result
 
-def CheckMASTERD(options, args):
-  """Initial checks whether to run or exit with a failure
 
-  """
+def CheckAgreementWithRpc():
   rpc.Init()
   try:
-    ssconf.CheckMaster(options.debug)
-
-    # we believe we are the master, let's ask the other nodes...
-    if options.no_voting and not options.yes_do_it:
-      sys.stdout.write("The 'no voting' option has been selected.\n")
-      sys.stdout.write("This is dangerous, please confirm by"
-                       " typing uppercase 'yes': ")
-      sys.stdout.flush()
-      confirmation = sys.stdin.readline().strip()
-      if confirmation != "YES":
-        print "Aborting."
-        return
-    elif not options.no_voting:
-      if not CheckAgreement():
-        return
+    return CheckAgreement()
   finally:
     rpc.Shutdown()
 
 
-def ExecMASTERD(options, args):
-  """Main MASTERD function, executed with the pidfile held.
+def _RunInSeparateProcess(fn):
+  """Runs a function in a separate process.
+
+  Note: Only boolean return values are supported.
+
+  @type fn: callable
+  @param fn: Function to be called
+  @rtype: bool
+
+  """
+  pid = os.fork()
+  if pid == 0:
+    # Child process
+    try:
+      # Call function
+      result = int(bool(fn()))
+      assert result in (0, 1)
+    except:
+      logging.exception("Error while calling function in separate process")
+      # 0 and 1 are reserved for the return value
+      result = 33
+
+    os._exit(result)
+
+  # Parent process
+
+  # Avoid zombies and check exit code
+  (_, status) = os.waitpid(pid, 0)
+
+  if os.WIFSIGNALED(status):
+    signum = os.WTERMSIG(status)
+    exitcode = None
+  else:
+    signum = None
+    exitcode = os.WEXITSTATUS(status)
+
+  if not (exitcode in (0, 1) and signum is None):
+    logging.error("Child program failed (code=%s, signal=%s)",
+                  exitcode, signum)
+    sys.exit(constants.EXIT_FAILURE)
+
+  return bool(exitcode)
+
+
+def CheckMasterd(options, args):
+  """Initial checks whether to run or exit with a failure.
+
+  """
+  ssconf.CheckMaster(options.debug)
+
+  # If CheckMaster didn't fail we believe we are the master, but we have to
+  # confirm with the other nodes.
+  if options.no_voting:
+    if options.yes_do_it:
+      return
+
+    sys.stdout.write("The 'no voting' option has been selected.\n")
+    sys.stdout.write("This is dangerous, please confirm by"
+                     " typing uppercase 'yes': ")
+    sys.stdout.flush()
+
+    confirmation = sys.stdin.readline().strip()
+    if confirmation != "YES":
+      print >>sys.stderr, "Aborting."
+      sys.exit(constants.EXIT_FAILURE)
+
+    return
+
+  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
+  # process before we call utils.Daemonize in the current process.
+  if not _RunInSeparateProcess(CheckAgreementWithRpc):
+    sys.exit(constants.EXIT_FAILURE)
+
+
+def ExecMasterd (options, args):
+  """Main master daemon function, executed with the PID file held.
 
   """
   # This is safe to do as the pid file guarantees against
@@ -477,7 +569,7 @@ def ExecMASTERD(options, args):
       # activate ip
       master_node = ssconf.SimpleStore().GetMasterNode()
       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
-      msg = result.RemoteFailMsg()
+      msg = result.fail_msg
       if msg:
         logging.error("Can't activate master IP address: %s", msg)
 
@@ -509,7 +601,8 @@ def main():
           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
          ]
   daemon.GenericMain(constants.MASTERD, parser, dirs,
-                     CheckMASTERD, ExecMASTERD)
+                     CheckMasterd, ExecMasterd)
+
 
 if __name__ == "__main__":
   main()