daemons: handle arguments correctly and uniformly
[ganeti-local] / daemons / ganeti-watcher
index fc7fd1b..6c97d40 100755 (executable)
@@ -27,25 +27,29 @@ by a node reboot.  Run from cron or similar.
 
 """
 
+# pylint: disable-msg=C0103,W0142
+
+# C0103: Invalid name ganeti-watcher
+
 import os
 import sys
 import time
 import logging
+import errno
 from optparse import OptionParser
 
 from ganeti import utils
 from ganeti import constants
 from ganeti import serializer
-from ganeti import ssconf
 from ganeti import errors
 from ganeti import opcodes
-from ganeti import logger
 from ganeti import cli
+from ganeti import luxi
 
 
 MAXTRIES = 5
 BAD_STATES = ['ERROR_down']
-HELPLESS_STATES = ['ERROR_nodedown']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
 NOTICE = 'NOTICE'
 ERROR = 'ERROR'
 KEY_RESTART_COUNT = "restart_count"
@@ -64,14 +68,33 @@ class NotMasterError(errors.GenericError):
 def Indent(s, prefix='| '):
   """Indent a piece of text with a given prefix before each line.
 
-  Args:
-    s: The string to indent
-    prefix: The string to prepend each line.
+  @param s: the string to indent
+  @param prefix: the string to prepend each line
 
   """
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
+def ShouldPause():
+  """Check whether we should pause.
+
+  """
+  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+
+
+def EnsureDaemon(name):
+  """Check for and start daemon if not alive.
+
+  """
+  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+  if result.failed:
+    logging.error("Can't start daemon '%s', failure %s, output: %s",
+                  name, result.fail_reason, result.output)
+    return False
+
+  return True
+
+
 class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
@@ -91,11 +114,15 @@ class WatcherState(object):
     utils.LockFile(self.statefile.fileno())
 
     try:
-      self._data = serializer.Load(self.statefile.read())
-    except Exception, msg:
+      state_data = self.statefile.read()
+      if not state_data:
+        self._data = {}
+      else:
+        self._data = serializer.Load(state_data)
+    except Exception, msg: # pylint: disable-msg=W0703
       # Ignore errors while loading the file and treat it as empty
       self._data = {}
-      logging.warning(("Empty or invalid state file. Using defaults."
+      logging.warning(("Invalid state file. Using defaults."
                        " Error message: %s"), msg)
 
     if "instance" not in self._data:
@@ -160,8 +187,8 @@ class WatcherState(object):
   def NumberOfRestartAttempts(self, instance):
     """Returns number of previous restart attempts.
 
-    Args:
-      instance - the instance to look up.
+    @type instance: L{Instance}
+    @param instance: the instance to look up
 
     """
     idata = self._data["instance"]
@@ -174,8 +201,8 @@ class WatcherState(object):
   def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
-    Args:
-      instance - the instance being restarted
+    @type instance: L{Instance}
+    @param instance: the instance being restarted
 
     """
     idata = self._data["instance"]
@@ -189,12 +216,13 @@ class WatcherState(object):
     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
   def RemoveInstance(self, instance):
-    """Update state to reflect that a machine is running, i.e. remove record.
+    """Update state to reflect that a machine is running.
 
-    Args:
-      instance - the instance to remove from books
+    This method removes the record for a named instance (as we only
+    track down instances).
 
-    This method removes the record for a named instance.
+    @type instance: L{Instance}
+    @param instance: the instance to remove from books
 
     """
     idata = self._data["instance"]
@@ -206,9 +234,6 @@ class WatcherState(object):
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
-  Methods:
-    Restart(): issue a command to restart the represented machine.
-
   """
   def __init__(self, name, state, autostart):
     self.name = name
@@ -219,9 +244,7 @@ class Instance(object):
     """Encapsulates the start of an instance.
 
     """
-    op = opcodes.OpStartupInstance(instance_name=self.name,
-                                   force=False,
-                                   extra_args=None)
+    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
     cli.SubmitOpCode(op, cl=client)
 
   def ActivateDisks(self):
@@ -232,45 +255,49 @@ class Instance(object):
     cli.SubmitOpCode(op, cl=client)
 
 
-def GetInstanceList(with_secondaries=None):
+def GetClusterData():
   """Get a list of instances on this cluster.
 
   """
-  fields = ["name", "status", "admin_state"]
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
-  if with_secondaries is not None:
-    fields.append("snodes")
+  job_id = client.SubmitJob([op1, op2])
 
-  result = client.QueryInstances([], fields)
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
-  instances = []
-  for fields in result:
-    if with_secondaries is not None:
-      (name, status, autostart, snodes) = fields
+  logging.debug("Got data from cluster, writing instance status file")
 
-      if not snodes:
-        continue
+  result = all_results[0]
+  smap = {}
 
-      for node in with_secondaries:
-        if node in snodes:
-          break
-      else:
-        continue
+  instances = {}
 
-    else:
-      (name, status, autostart) = fields
+  # write the upfile
+  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
+  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
 
-    instances.append(Instance(name, status, autostart))
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
-  return instances
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
+    instances[name] = Instance(name, status, autostart)
 
-def GetNodeBootIDs():
-  """Get a dict mapping nodes to boot IDs.
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
-  """
-  result = client.QueryNodes([], ["name", "bootid"])
-  return dict([(name, bootid) for name, bootid in result])
+  client.ArchiveJob(job_id)
+
+  return instances, nodes, smap
 
 
 class Watcher(object):
@@ -281,35 +308,47 @@ class Watcher(object):
   to restart machines that are down.
 
   """
-  def __init__(self):
-    sstore = ssconf.SimpleStore()
-    master = sstore.GetMasterNode()
+  def __init__(self, opts, notepad):
+    self.notepad = notepad
+    master = client.QueryConfigValues(["master_node"])[0]
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
-    self.instances = GetInstanceList()
-    self.bootids = GetNodeBootIDs()
+    # first archive old jobs
+    self.ArchiveJobs(opts.job_age)
+    # and only then submit new ones
+    self.instances, self.bootids, self.smap = GetClusterData()
     self.started_instances = set()
+    self.opts = opts
 
   def Run(self):
-    notepad = WatcherState()
-    try:
-      self.CheckInstances(notepad)
-      self.CheckDisks(notepad)
-      self.VerifyDisks()
-    finally:
-      notepad.Save()
+    """Watcher run sequence.
+
+    """
+    notepad = self.notepad
+    self.CheckInstances(notepad)
+    self.CheckDisks(notepad)
+    self.VerifyDisks()
+
+  @staticmethod
+  def ArchiveJobs(age):
+    """Archive old jobs.
+
+    """
+    arch_count, left_count = client.AutoArchiveJobs(age)
+    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
 
   def CheckDisks(self, notepad):
     """Check all nodes for restarted ones.
 
     """
     check_nodes = []
-    for name, new_id in self.bootids.iteritems():
+    for name, (new_id, offline) in self.bootids.iteritems():
       old = notepad.GetNodeBootID(name)
       if new_id is None:
         # Bad node, not returning a boot id
-        logging.debug("Node %s missing boot id, skipping secondary checks",
-                      name)
+        if not offline:
+          logging.debug("Node %s missing boot id, skipping secondary checks",
+                        name)
         continue
       if old != new_id:
         # Node's boot ID has changed, proably through a reboot.
@@ -318,31 +357,35 @@ class Watcher(object):
     if check_nodes:
       # Activate disks for all instances with any of the checked nodes as a
       # secondary node.
-      for instance in GetInstanceList(with_secondaries=check_nodes):
-        if not instance.autostart:
-          logging.info(("Skipping disk activation for non-autostart"
-                        " instance %s"), instance.name)
+      for node in check_nodes:
+        if node not in self.smap:
           continue
-        if instance.name in self.started_instances:
-          # we already tried to start the instance, which should have
-          # activated its drives (if they can be at all)
-          continue
-        try:
-          logging.info("Activating disks for instance %s", instance.name)
-          instance.ActivateDisks()
-        except Exception:
-          logging.exception("Error while activating disks for instance %s",
-                            instance.name)
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception: # pylint: disable-msg=W0703
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
 
       # Keep changed boot IDs
       for name in check_nodes:
-        notepad.SetNodeBootID(name, self.bootids[name])
+        notepad.SetNodeBootID(name, self.bootids[name][0])
 
   def CheckInstances(self, notepad):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
-    for instance in self.instances:
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
@@ -361,8 +404,9 @@ class Watcher(object):
                         instance.name, last)
           instance.Restart()
           self.started_instances.add(instance.name)
-        except Exception:
-          logging.exception("Erro while restarting instance %s", instance.name)
+        except Exception: # pylint: disable-msg=W0703
+          logging.exception("Error while restarting instance %s",
+                            instance.name)
 
         notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
@@ -379,7 +423,9 @@ class Watcher(object):
 
     """
     op = opcodes.OpVerifyDisks()
-    result = cli.SubmitOpCode(op, cl=client)
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
     if not isinstance(result, (tuple, list)):
       logging.error("Can't get a valid result from verify-disks")
       return
@@ -388,7 +434,7 @@ class Watcher(object):
       # nothing to do
       return
     logging.debug("Will activate disks for instances %s",
-                  ", ".join(offline_disk_instances))
+                  utils.CommaJoin(offline_disk_instances))
     # we submit only one job, and wait for it. not optimal, but spams
     # less the job queue
     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
@@ -401,8 +447,7 @@ class Watcher(object):
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
@@ -410,10 +455,12 @@ def ParseOptions():
                         version="%%prog (ganeti) %s" %
                         constants.RELEASE_VERSION)
 
-  parser.add_option("-d", "--debug", dest="debug",
-                    help="Write all messages to stderr",
-                    default=False, action="store_true")
+  parser.add_option(cli.DEBUG_OPT)
+  parser.add_option("-A", "--job-age", dest="job_age",
+                    help="Autoarchive jobs older than this age (default"
+                    " 6 hours)", default=6*3600)
   options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
   return options, args
 
 
@@ -421,23 +468,62 @@ def main():
   """Main function.
 
   """
-  global client
+  global client # pylint: disable-msg=W0603
 
   options, args = ParseOptions()
 
-  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
-                      stderr_logging=options.debug)
+  if args: # watcher doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
+  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+                     stderr_logging=options.debug)
+
+  if ShouldPause():
+    logging.debug("Pause has been set, exiting")
+    sys.exit(constants.EXIT_SUCCESS)
 
+  update_file = False
   try:
-    client = cli.GetClient()
+    # on master or not, try to start the node dameon
+    EnsureDaemon(constants.NODED)
 
+    notepad = WatcherState()
     try:
-      watcher = Watcher()
-    except errors.ConfigurationError:
-      # Just exit if there's no configuration
-      sys.exit(constants.EXIT_SUCCESS)
+      try:
+        client = cli.GetClient()
+      except errors.OpPrereqError:
+        # this is, from cli.GetClient, a not-master case
+        logging.debug("Not on master, exiting")
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+      except luxi.NoMasterError, err:
+        logging.warning("Master seems to be down (%s), trying to restart",
+                        str(err))
+        if not EnsureDaemon(constants.MASTERD):
+          logging.critical("Can't start the master, exiting")
+          sys.exit(constants.EXIT_FAILURE)
+        # else retry the connection
+        client = cli.GetClient()
+
+      # we are on master now
+      EnsureDaemon(constants.RAPI)
+
+      try:
+        watcher = Watcher(options, notepad)
+      except errors.ConfigurationError:
+        # Just exit if there's no configuration
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+
+      watcher.Run()
+      update_file = True
 
-    watcher.Run()
+    finally:
+      if update_file:
+        notepad.Save()
+      else:
+        logging.debug("Not updating status file due to failure")
   except SystemExit:
     raise
   except NotMasterError:
@@ -446,6 +532,10 @@ def main():
   except errors.ResolverError, err:
     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
     sys.exit(constants.EXIT_NODESETUP_ERROR)
+  except errors.JobQueueFull:
+    logging.error("Job queue is full, can't query cluster state")
+  except errors.JobQueueDrainError:
+    logging.error("Job queue is drained, can't maintain cluster state")
   except Exception, err:
     logging.error(str(err), exc_info=True)
     sys.exit(constants.EXIT_FAILURE)