UploadFile: allow ancillary files
[ganeti-local] / daemons / ganeti-watcher
index 39250e5..42a2eaf 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 This program and set of classes implement a watchdog to restart
 virtual machines in a Ganeti cluster that have crashed or been killed
 by a node reboot.  Run from cron or similar.
-"""
-
 
-LOGFILE = '/var/log/ganeti/watcher.log'
-MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
+"""
 
 import os
 import sys
 import time
-import fcntl
-import errno
+import logging
 from optparse import OptionParser
 
-
 from ganeti import utils
 from ganeti import constants
+from ganeti import serializer
+from ganeti import errors
+from ganeti import opcodes
+from ganeti import cli
+from ganeti import luxi
+
+
+MAXTRIES = 5
+BAD_STATES = ['ERROR_down']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
+NOTICE = 'NOTICE'
+ERROR = 'ERROR'
+KEY_RESTART_COUNT = "restart_count"
+KEY_RESTART_WHEN = "restart_when"
+KEY_BOOT_ID = "bootid"
+
+
+# Global client object
+client = None
 
 
-class Error(Exception):
-  """Generic custom error class."""
-  pass
+class NotMasterError(errors.GenericError):
+  """Exception raised when this host is not the master."""
 
 
 def Indent(s, prefix='| '):
   """Indent a piece of text with a given prefix before each line.
 
-  Args:
-    s: The string to indent
-    prefix: The string to prepend each line.
+  @param s: the string to indent
+  @param prefix: the string to prepend each line
+
   """
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
-def DoCmd(cmd):
-  """Run a shell command.
-
-  Args:
-    cmd: the command to run.
+def StartMaster():
+  """Try to start the master daemon.
 
-  Raises CommandError with verbose commentary on error.
   """
-  res = utils.RunCmd(cmd)
-
-  if res.failed:
-    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
-                (repr(cmd),
-                 Indent(res.fail_reason),
-                 Indent(res.stdout),
-                 Indent(res.stderr)))
+  result = utils.RunCmd(['ganeti-masterd'])
+  if result.failed:
+    logging.error("Can't start the master daemon: output '%s'", result.output)
+  return not result.failed
 
-  return res
 
-
-class RestarterState(object):
+class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
-  Methods:
-    Open(): open, lock, read and parse the file.
-            Raises StandardError on lock contention.
-
-    NumberOfAttempts(name): returns the number of times in succession
-                            a restart has been attempted of the named instance.
-
-    RecordAttempt(name, when): records one restart attempt of name at
-                               time in when.
-
-    Remove(name): remove record given by name, if exists.
-
-    Save(name): saves all records to file, releases lock and closes file.
   """
   def __init__(self):
+    """Open, lock, read and parse the file.
+
+    Raises exception on lock contention.
+
+    """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
-    f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    f = os.fdopen(f, 'w+')
+    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
+    self.statefile = os.fdopen(fd, 'w+')
+
+    utils.LockFile(self.statefile.fileno())
 
     try:
-      fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
-    except IOError, x:
-      if x.errno == errno.EAGAIN:
-        raise StandardError('State file already locked')
-      raise
+      state_data = self.statefile.read()
+      if not state_data:
+        self._data = {}
+      else:
+        self._data = serializer.Load(state_data)
+    except Exception, msg:
+      # Ignore errors while loading the file and treat it as empty
+      self._data = {}
+      logging.warning(("Invalid state file. Using defaults."
+                       " Error message: %s"), msg)
 
-    self.statefile = f
-    self.inst_map = {}
+    if "instance" not in self._data:
+      self._data["instance"] = {}
+    if "node" not in self._data:
+      self._data["node"] = {}
 
-    for line in f:
-      name, when, count = line.rstrip().split(':')
+    self._orig_data = serializer.Dump(self._data)
 
-      when = int(when)
-      count = int(count)
+  def Save(self):
+    """Save state to file, then unlock and close it.
 
-      self.inst_map[name] = (when, count)
+    """
+    assert self.statefile
 
-  def NumberOfAttempts(self, instance):
-    """Returns number of previous restart attempts.
+    serialized_form = serializer.Dump(self._data)
+    if self._orig_data == serialized_form:
+      logging.debug("Data didn't change, just touching status file")
+      os.utime(constants.WATCHER_STATEFILE, None)
+      return
+
+    # We need to make sure the file is locked before renaming it, otherwise
+    # starting ganeti-watcher again at the same time will create a conflict.
+    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+                         data=serialized_form,
+                         prewrite=utils.LockFile, close=False)
+    self.statefile = os.fdopen(fd, 'w+')
+
+  def Close(self):
+    """Unlock configuration file and close it.
 
-    Args:
-      instance - the instance to look up.
     """
     assert self.statefile
 
-    if instance.name in self.inst_map:
-      return self.inst_map[instance.name][1]
+    # Files are automatically unlocked when closing them
+    self.statefile.close()
+    self.statefile = None
 
-    return 0
+  def GetNodeBootID(self, name):
+    """Returns the last boot ID of a node or None.
 
-  def RecordAttempt(self, instance):
-    """Record a restart attempt.
+    """
+    ndata = self._data["node"]
+
+    if name in ndata and KEY_BOOT_ID in ndata[name]:
+      return ndata[name][KEY_BOOT_ID]
+    return None
+
+  def SetNodeBootID(self, name, bootid):
+    """Sets the boot ID of a node.
 
-    Args:
-      instance - the instance being restarted
     """
-    assert self.statefile
+    assert bootid
 
-    when = time.time()
+    ndata = self._data["node"]
 
-    self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance))
+    if name not in ndata:
+      ndata[name] = {}
 
-  def Remove(self, instance):
-    """Update state to reflect that a machine is running, i.e. remove record
+    ndata[name][KEY_BOOT_ID] = bootid
 
-    Args:
-      instance - the instance to remove from books
+  def NumberOfRestartAttempts(self, instance):
+    """Returns number of previous restart attempts.
+
+    @type instance: L{Instance}
+    @param instance: the instance to look up
 
-    This method removes the record for a named instance
     """
-    assert self.statefile
+    idata = self._data["instance"]
 
-    if instance.name in self.inst_map:
-      del self.inst_map[instance.name]
+    if instance.name in idata:
+      return idata[instance.name][KEY_RESTART_COUNT]
+
+    return 0
+
+  def RecordRestartAttempt(self, instance):
+    """Record a restart attempt.
+
+    @type instance: L{Instance}
+    @param instance: the instance being restarted
 
-  def Save(self):
-    """Save records to file, then unlock and close file.
     """
-    assert self.statefile
+    idata = self._data["instance"]
 
-    self.statefile.seek(0)
-    self.statefile.truncate()
+    if instance.name not in idata:
+      inst = idata[instance.name] = {}
+    else:
+      inst = idata[instance.name]
 
-    for name in self.inst_map:
-      print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name])
+    inst[KEY_RESTART_WHEN] = time.time()
+    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
+  def RemoveInstance(self, instance):
+    """Update state to reflect that a machine is running.
 
-    self.statefile.close()
-    self.statefile = None
+    This method removes the record for a named instance (as we only
+    track down instances).
+
+    @type instance: L{Instance}
+    @param instance: the instance to remove from books
+
+    """
+    idata = self._data["instance"]
+
+    if instance.name in idata:
+      del idata[instance.name]
 
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
-  Methods:
-    Restart(): issue a command to restart the represented machine.
   """
-  def __init__(self, name, state):
+  def __init__(self, name, state, autostart):
     self.name = name
     self.state = state
+    self.autostart = autostart
 
   def Restart(self):
-    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+    """Encapsulates the start of an instance.
+
+    """
+    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
+    cli.SubmitOpCode(op, cl=client)
+
+  def ActivateDisks(self):
+    """Encapsulates the activation of all disks of an instance.
 
+    """
+    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=client)
+
+
+def GetClusterData():
+  """Get a list of instances on this cluster.
 
-class InstanceList(object):
-  """The set of Virtual Machine instances on a cluster.
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15',
-         '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:']
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
-  def __init__(self):
-    res = DoCmd(self.cmd)
+  job_id = client.SubmitJob([op1, op2])
 
-    lines = res.stdout.splitlines()
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
-    self.instances = []
-    for line in lines:
-      fields = [fld.strip() for fld in line.split(':')]
+  result = all_results[0]
+  smap = {}
 
-      if len(fields) != 3:
-        continue
-      if fields[1] == "no": #no autostart, we don't care about this instance
-        continue
-      name, status = fields[0], fields[2]
+  instances = {}
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
-      self.instances.append(Instance(name, status))
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
-  def __iter__(self):
-    return self.instances.__iter__()
+    instances[name] = Instance(name, status, autostart)
 
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
-class Message(object):
-  """Encapsulation of a notice or error message.
-  """
-  def __init__(self, level, msg):
-    self.level = level
-    self.msg = msg
-    self.when = time.time()
+  client.ArchiveJob(job_id)
 
-  def __str__(self):
-    return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
+  return instances, nodes, smap
 
 
-class Restarter(object):
+class Watcher(object):
   """Encapsulate the logic for restarting erronously halted virtual machines.
 
   The calling program should periodically instantiate me and call Run().
   This will traverse the list of instances, and make up to MAXTRIES attempts
   to restart machines that are down.
+
   """
-  def __init__(self):
-    self.instances = InstanceList()
-    self.messages = []
+  def __init__(self, opts, notepad):
+    self.notepad = notepad
+    master = client.QueryConfigValues(["master_node"])[0]
+    if master != utils.HostInfo().name:
+      raise NotMasterError("This is not the master node")
+    self.instances, self.bootids, self.smap = GetClusterData()
+    self.started_instances = set()
+    self.opts = opts
 
   def Run(self):
-    """Make a pass over the list of instances, restarting downed ones.
+    """Watcher run sequence.
+
     """
-    notepad = RestarterState()
+    notepad = self.notepad
+    self.ArchiveJobs(self.opts.job_age)
+    self.CheckInstances(notepad)
+    self.CheckDisks(notepad)
+    self.VerifyDisks()
+
+  def ArchiveJobs(self, age):
+    """Archive old jobs.
 
-    for instance in self.instances:
+    """
+    arch_count, left_count = client.AutoArchiveJobs(age)
+    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
+
+  def CheckDisks(self, notepad):
+    """Check all nodes for restarted ones.
+
+    """
+    check_nodes = []
+    for name, (new_id, offline) in self.bootids.iteritems():
+      old = notepad.GetNodeBootID(name)
+      if new_id is None:
+        # Bad node, not returning a boot id
+        if not offline:
+          logging.debug("Node %s missing boot id, skipping secondary checks",
+                        name)
+        continue
+      if old != new_id:
+        # Node's boot ID has changed, proably through a reboot.
+        check_nodes.append(name)
+
+    if check_nodes:
+      # Activate disks for all instances with any of the checked nodes as a
+      # secondary node.
+      for node in check_nodes:
+        if node not in self.smap:
+          continue
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception:
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
+
+      # Keep changed boot IDs
+      for name in check_nodes:
+        notepad.SetNodeBootID(name, self.bootids[name][0])
+
+  def CheckInstances(self, notepad):
+    """Make a pass over the list of instances, restarting downed ones.
+
+    """
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
-        n = notepad.NumberOfAttempts(instance)
+        n = notepad.NumberOfRestartAttempts(instance)
 
         if n > MAXTRIES:
           # stay quiet.
@@ -257,48 +370,59 @@ class Restarter(object):
         elif n < MAXTRIES:
           last = " (Attempt #%d)" % (n + 1)
         else:
-          notepad.RecordAttempt(instance)
-          self.messages.append(Message(ERROR, "Could not restart %s for %d"
-                                       " times, giving up..." %
-                                       (instance.name, MAXTRIES)))
+          notepad.RecordRestartAttempt(instance)
+          logging.error("Could not restart %s after %d attempts, giving up",
+                        instance.name, MAXTRIES)
           continue
         try:
-          self.messages.append(Message(NOTICE,
-                                       "Restarting %s%s." %
-                                       (instance.name, last)))
+          logging.info("Restarting %s%s",
+                        instance.name, last)
           instance.Restart()
-        except Error, x:
-          self.messages.append(Message(ERROR, str(x)))
+          self.started_instances.add(instance.name)
+        except Exception:
+          logging.exception("Error while restarting instance %s",
+                            instance.name)
 
-        notepad.RecordAttempt(instance)
+        notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
       else:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
-          msg = Message(NOTICE,
-                        "Restart of %s succeeded." % instance.name)
-          self.messages.append(msg)
-
-    notepad.Save()
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
+          logging.info("Restart of %s succeeded", instance.name)
 
-  def WriteReport(self, logfile):
-    """
-    Log all messages to file.
+  @staticmethod
+  def VerifyDisks():
+    """Run gnt-cluster verify-disks.
 
-    Args:
-      logfile: file object open for writing (the log file)
     """
-    for msg in self.messages:
-      print >> logfile, str(msg)
+    op = opcodes.OpVerifyDisks()
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
+    if not isinstance(result, (tuple, list)):
+      logging.error("Can't get a valid result from verify-disks")
+      return
+    offline_disk_instances = result[2]
+    if not offline_disk_instances:
+      # nothing to do
+      return
+    logging.debug("Will activate disks for instances %s",
+                  ", ".join(offline_disk_instances))
+    # we submit only one job, and wait for it. not optimal, but spams
+    # less the job queue
+    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
+           for name in offline_disk_instances]
+    job_id = cli.SendJob(job, cl=client)
+
+    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
 
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
@@ -307,9 +431,13 @@ def ParseOptions():
                         constants.RELEASE_VERSION)
 
   parser.add_option("-d", "--debug", dest="debug",
-                    help="Don't redirect messages to the log file",
+                    help="Write all messages to stderr",
                     default=False, action="store_true")
+  parser.add_option("-A", "--job-age", dest="job_age",
+                    help="Autoarchive jobs older than this age (default"
+                    " 6 hours)", default=6*3600)
   options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
   return options, args
 
 
@@ -317,17 +445,57 @@ def main():
   """Main function.
 
   """
+  global client
+
   options, args = ParseOptions()
 
-  if not options.debug:
-    sys.stderr = sys.stdout = open(LOGFILE, 'a')
+  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+                     stderr_logging=options.debug)
 
+  update_file = True
   try:
-    restarter = Restarter()
-    restarter.Run()
-    restarter.WriteReport(sys.stdout)
-  except Error, err:
-    print err
+    notepad = WatcherState()
+    try:
+      try:
+        client = cli.GetClient()
+      except errors.OpPrereqError:
+        # this is, from cli.GetClient, a not-master case
+        logging.debug("Not on master, exiting")
+        sys.exit(constants.EXIT_SUCCESS)
+      except luxi.NoMasterError, err:
+        logging.warning("Master seems to be down (%s), trying to restart",
+                        str(err))
+        if not StartMaster():
+          logging.critical("Can't start the master, exiting")
+          update_file = False
+          sys.exit(constants.EXIT_FAILURE)
+        # else retry the connection
+        client = cli.GetClient()
+
+      try:
+        watcher = Watcher(options, notepad)
+      except errors.ConfigurationError:
+        # Just exit if there's no configuration
+        sys.exit(constants.EXIT_SUCCESS)
+
+      watcher.Run()
+    finally:
+      if update_file:
+        notepad.Save()
+      else:
+        logging.debug("Not updating status file due to failure")
+  except SystemExit:
+    raise
+  except NotMasterError:
+    logging.debug("Not master, exiting")
+    sys.exit(constants.EXIT_NOTMASTER)
+  except errors.ResolverError, err:
+    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
+    sys.exit(constants.EXIT_NODESETUP_ERROR)
+  except Exception, err:
+    logging.error(str(err), exc_info=True)
+    sys.exit(constants.EXIT_FAILURE)
+
 
 if __name__ == '__main__':
   main()