daemons: handle arguments correctly and uniformly
[ganeti-local] / daemons / ganeti-watcher
index 42a2eaf..6c97d40 100755 (executable)
@@ -27,10 +27,15 @@ by a node reboot.  Run from cron or similar.
 
 """
 
+# pylint: disable-msg=C0103,W0142
+
+# C0103: Invalid name ganeti-watcher
+
 import os
 import sys
 import time
 import logging
+import errno
 from optparse import OptionParser
 
 from ganeti import utils
@@ -70,14 +75,24 @@ def Indent(s, prefix='| '):
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
-def StartMaster():
-  """Try to start the master daemon.
+def ShouldPause():
+  """Check whether we should pause.
 
   """
-  result = utils.RunCmd(['ganeti-masterd'])
+  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+
+
+def EnsureDaemon(name):
+  """Check for and start daemon if not alive.
+
+  """
+  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
   if result.failed:
-    logging.error("Can't start the master daemon: output '%s'", result.output)
-  return not result.failed
+    logging.error("Can't start daemon '%s', failure %s, output: %s",
+                  name, result.fail_reason, result.output)
+    return False
+
+  return True
 
 
 class WatcherState(object):
@@ -104,7 +119,7 @@ class WatcherState(object):
         self._data = {}
       else:
         self._data = serializer.Load(state_data)
-    except Exception, msg:
+    except Exception, msg: # pylint: disable-msg=W0703
       # Ignore errors while loading the file and treat it as empty
       self._data = {}
       logging.warning(("Invalid state file. Using defaults."
@@ -255,10 +270,17 @@ def GetClusterData():
 
   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
+  logging.debug("Got data from cluster, writing instance status file")
+
   result = all_results[0]
   smap = {}
 
   instances = {}
+
+  # write the upfile
+  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
+  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
+
   for fields in result:
     (name, status, autostart, snodes) = fields
 
@@ -291,6 +313,9 @@ class Watcher(object):
     master = client.QueryConfigValues(["master_node"])[0]
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
+    # first archive old jobs
+    self.ArchiveJobs(opts.job_age)
+    # and only then submit new ones
     self.instances, self.bootids, self.smap = GetClusterData()
     self.started_instances = set()
     self.opts = opts
@@ -300,17 +325,17 @@ class Watcher(object):
 
     """
     notepad = self.notepad
-    self.ArchiveJobs(self.opts.job_age)
     self.CheckInstances(notepad)
     self.CheckDisks(notepad)
     self.VerifyDisks()
 
-  def ArchiveJobs(self, age):
+  @staticmethod
+  def ArchiveJobs(age):
     """Archive old jobs.
 
     """
     arch_count, left_count = client.AutoArchiveJobs(age)
-    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
+    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
 
   def CheckDisks(self, notepad):
     """Check all nodes for restarted ones.
@@ -348,7 +373,7 @@ class Watcher(object):
           try:
             logging.info("Activating disks for instance %s", instance.name)
             instance.ActivateDisks()
-          except Exception:
+          except Exception: # pylint: disable-msg=W0703
             logging.exception("Error while activating disks for instance %s",
                               instance.name)
 
@@ -379,7 +404,7 @@ class Watcher(object):
                         instance.name, last)
           instance.Restart()
           self.started_instances.add(instance.name)
-        except Exception:
+        except Exception: # pylint: disable-msg=W0703
           logging.exception("Error while restarting instance %s",
                             instance.name)
 
@@ -409,7 +434,7 @@ class Watcher(object):
       # nothing to do
       return
     logging.debug("Will activate disks for instances %s",
-                  ", ".join(offline_disk_instances))
+                  utils.CommaJoin(offline_disk_instances))
     # we submit only one job, and wait for it. not optimal, but spams
     # less the job queue
     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
@@ -430,9 +455,7 @@ def ParseOptions():
                         version="%%prog (ganeti) %s" %
                         constants.RELEASE_VERSION)
 
-  parser.add_option("-d", "--debug", dest="debug",
-                    help="Write all messages to stderr",
-                    default=False, action="store_true")
+  parser.add_option(cli.DEBUG_OPT)
   parser.add_option("-A", "--job-age", dest="job_age",
                     help="Autoarchive jobs older than this age (default"
                     " 6 hours)", default=6*3600)
@@ -445,15 +468,26 @@ def main():
   """Main function.
 
   """
-  global client
+  global client # pylint: disable-msg=W0603
 
   options, args = ParseOptions()
 
+  if args: # watcher doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
                      stderr_logging=options.debug)
 
-  update_file = True
+  if ShouldPause():
+    logging.debug("Pause has been set, exiting")
+    sys.exit(constants.EXIT_SUCCESS)
+
+  update_file = False
   try:
+    # on master or not, try to start the node dameon
+    EnsureDaemon(constants.NODED)
+
     notepad = WatcherState()
     try:
       try:
@@ -461,24 +495,30 @@ def main():
       except errors.OpPrereqError:
         # this is, from cli.GetClient, a not-master case
         logging.debug("Not on master, exiting")
+        update_file = True
         sys.exit(constants.EXIT_SUCCESS)
       except luxi.NoMasterError, err:
         logging.warning("Master seems to be down (%s), trying to restart",
                         str(err))
-        if not StartMaster():
+        if not EnsureDaemon(constants.MASTERD):
           logging.critical("Can't start the master, exiting")
-          update_file = False
           sys.exit(constants.EXIT_FAILURE)
         # else retry the connection
         client = cli.GetClient()
 
+      # we are on master now
+      EnsureDaemon(constants.RAPI)
+
       try:
         watcher = Watcher(options, notepad)
       except errors.ConfigurationError:
         # Just exit if there's no configuration
+        update_file = True
         sys.exit(constants.EXIT_SUCCESS)
 
       watcher.Run()
+      update_file = True
+
     finally:
       if update_file:
         notepad.Save()
@@ -492,6 +532,10 @@ def main():
   except errors.ResolverError, err:
     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
     sys.exit(constants.EXIT_NODESETUP_ERROR)
+  except errors.JobQueueFull:
+    logging.error("Job queue is full, can't query cluster state")
+  except errors.JobQueueDrainError:
+    logging.error("Job queue is drained, can't maintain cluster state")
   except Exception, err:
     logging.error(str(err), exc_info=True)
     sys.exit(constants.EXIT_FAILURE)