Remove references to utils.debug
[ganeti-local] / daemons / ganeti-watcher
index 5861991..2749de6 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 This program and set of classes implement a watchdog to restart
 virtual machines in a Ganeti cluster that have crashed or been killed
 by a node reboot.  Run from cron or similar.
-"""
-
 
-LOGFILE = '/var/log/ganeti/watcher.log'
-MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
+"""
 
 import os
 import sys
 import time
-import fcntl
-import errno
-import socket
+import logging
 from optparse import OptionParser
 
-
 from ganeti import utils
 from ganeti import constants
-from ganeti import ssconf
+from ganeti import serializer
+from ganeti import errors
+from ganeti import opcodes
+from ganeti import cli
+from ganeti import luxi
+
 
+MAXTRIES = 5
+BAD_STATES = ['ERROR_down']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
+NOTICE = 'NOTICE'
+ERROR = 'ERROR'
+KEY_RESTART_COUNT = "restart_count"
+KEY_RESTART_WHEN = "restart_when"
+KEY_BOOT_ID = "bootid"
 
-class Error(Exception):
-  """Generic custom error class."""
 
+# Global client object
+client = None
 
-class NotMasterError(Error):
+
+class NotMasterError(errors.GenericError):
   """Exception raised when this host is not the master."""
 
 
 def Indent(s, prefix='| '):
   """Indent a piece of text with a given prefix before each line.
 
-  Args:
-    s: The string to indent
-    prefix: The string to prepend each line.
+  @param s: the string to indent
+  @param prefix: the string to prepend each line
 
   """
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
-def DoCmd(cmd):
-  """Run a shell command.
-
-  Args:
-    cmd: the command to run.
-
-  Raises CommandError with verbose commentary on error.
+def StartMaster():
+  """Try to start the master daemon.
 
   """
-  res = utils.RunCmd(cmd)
-
-  if res.failed:
-    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
-                (repr(cmd),
-                 Indent(res.fail_reason),
-                 Indent(res.stdout),
-                 Indent(res.stderr)))
-
-  return res
+  result = utils.RunCmd(['ganeti-masterd'])
+  if result.failed:
+    logging.error("Can't start the master daemon: output '%s'", result.output)
+  return not result.failed
 
 
-class RestarterState(object):
-  """Interface to a state file recording restart attempts.
-
-  Methods:
-    Open(): open, lock, read and parse the file.
-            Raises StandardError on lock contention.
-
-    NumberOfAttempts(name): returns the number of times in succession
-                            a restart has been attempted of the named instance.
+def EnsureDaemon(daemon):
+  """Check for and start daemon if not alive.
 
-    RecordAttempt(name, when): records one restart attempt of name at
-                               time in when.
+  """
+  pidfile = utils.DaemonPidFileName(daemon)
+  pid = utils.ReadPidFile(pidfile)
+  if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
+    logging.debug("Daemon '%s' not alive, trying to restart", daemon)
+    result = utils.RunCmd([daemon])
+    if not result:
+      logging.error("Can't start daemon '%s', failure %s, output: %s",
+                    daemon, result.fail_reason, result.output)
 
-    Remove(name): remove record given by name, if exists.
 
-    Save(name): saves all records to file, releases lock and closes file.
+class WatcherState(object):
+  """Interface to a state file recording restart attempts.
 
   """
   def __init__(self):
+    """Open, lock, read and parse the file.
+
+    Raises exception on lock contention.
+
+    """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
-    f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    f = os.fdopen(f, 'w+')
+    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
+    self.statefile = os.fdopen(fd, 'w+')
+
+    utils.LockFile(self.statefile.fileno())
 
     try:
-      fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
-    except IOError, x:
-      if x.errno == errno.EAGAIN:
-        raise StandardError('State file already locked')
-      raise
+      state_data = self.statefile.read()
+      if not state_data:
+        self._data = {}
+      else:
+        self._data = serializer.Load(state_data)
+    except Exception, msg:
+      # Ignore errors while loading the file and treat it as empty
+      self._data = {}
+      logging.warning(("Invalid state file. Using defaults."
+                       " Error message: %s"), msg)
 
-    self.statefile = f
-    self.inst_map = {}
+    if "instance" not in self._data:
+      self._data["instance"] = {}
+    if "node" not in self._data:
+      self._data["node"] = {}
 
-    for line in f:
-      name, when, count = line.rstrip().split(':')
+    self._orig_data = serializer.Dump(self._data)
 
-      when = int(when)
-      count = int(count)
+  def Save(self):
+    """Save state to file, then unlock and close it.
 
-      self.inst_map[name] = (when, count)
+    """
+    assert self.statefile
 
-  def NumberOfAttempts(self, instance):
-    """Returns number of previous restart attempts.
+    serialized_form = serializer.Dump(self._data)
+    if self._orig_data == serialized_form:
+      logging.debug("Data didn't change, just touching status file")
+      os.utime(constants.WATCHER_STATEFILE, None)
+      return
 
-    Args:
-      instance - the instance to look up.
+    # We need to make sure the file is locked before renaming it, otherwise
+    # starting ganeti-watcher again at the same time will create a conflict.
+    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+                         data=serialized_form,
+                         prewrite=utils.LockFile, close=False)
+    self.statefile = os.fdopen(fd, 'w+')
+
+  def Close(self):
+    """Unlock configuration file and close it.
 
     """
     assert self.statefile
 
-    if instance.name in self.inst_map:
-      return self.inst_map[instance.name][1]
+    # Files are automatically unlocked when closing them
+    self.statefile.close()
+    self.statefile = None
 
-    return 0
+  def GetNodeBootID(self, name):
+    """Returns the last boot ID of a node or None.
 
-  def RecordAttempt(self, instance):
-    """Record a restart attempt.
+    """
+    ndata = self._data["node"]
+
+    if name in ndata and KEY_BOOT_ID in ndata[name]:
+      return ndata[name][KEY_BOOT_ID]
+    return None
 
-    Args:
-      instance - the instance being restarted
+  def SetNodeBootID(self, name, bootid):
+    """Sets the boot ID of a node.
 
     """
-    assert self.statefile
+    assert bootid
 
-    when = time.time()
+    ndata = self._data["node"]
 
-    self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance))
+    if name not in ndata:
+      ndata[name] = {}
 
-  def Remove(self, instance):
-    """Update state to reflect that a machine is running, i.e. remove record.
+    ndata[name][KEY_BOOT_ID] = bootid
 
-    Args:
-      instance - the instance to remove from books
+  def NumberOfRestartAttempts(self, instance):
+    """Returns number of previous restart attempts.
 
-    This method removes the record for a named instance.
+    @type instance: L{Instance}
+    @param instance: the instance to look up
 
     """
-    assert self.statefile
+    idata = self._data["instance"]
 
-    if instance.name in self.inst_map:
-      del self.inst_map[instance.name]
+    if instance.name in idata:
+      return idata[instance.name][KEY_RESTART_COUNT]
 
-  def Save(self):
-    """Save records to file, then unlock and close file.
+    return 0
+
+  def RecordRestartAttempt(self, instance):
+    """Record a restart attempt.
+
+    @type instance: L{Instance}
+    @param instance: the instance being restarted
 
     """
-    assert self.statefile
+    idata = self._data["instance"]
 
-    self.statefile.seek(0)
-    self.statefile.truncate()
+    if instance.name not in idata:
+      inst = idata[instance.name] = {}
+    else:
+      inst = idata[instance.name]
 
-    for name in self.inst_map:
-      print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name])
+    inst[KEY_RESTART_WHEN] = time.time()
+    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
+  def RemoveInstance(self, instance):
+    """Update state to reflect that a machine is running.
 
-    self.statefile.close()
-    self.statefile = None
+    This method removes the record for a named instance (as we only
+    track down instances).
+
+    @type instance: L{Instance}
+    @param instance: the instance to remove from books
+
+    """
+    idata = self._data["instance"]
+
+    if instance.name in idata:
+      del idata[instance.name]
 
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
-  Methods:
-    Restart(): issue a command to restart the represented machine.
-
   """
-  def __init__(self, name, state):
+  def __init__(self, name, state, autostart):
     self.name = name
     self.state = state
+    self.autostart = autostart
 
   def Restart(self):
-    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+    """Encapsulates the start of an instance.
+
+    """
+    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
+    cli.SubmitOpCode(op, cl=client)
+
+  def ActivateDisks(self):
+    """Encapsulates the activation of all disks of an instance.
+
+    """
+    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=client)
 
 
-class InstanceList(object):
-  """The set of Virtual Machine instances on a cluster.
+def GetClusterData():
+  """Get a list of instances on this cluster.
 
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15',
-         '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:']
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
-  def __init__(self):
-    res = DoCmd(self.cmd)
+  job_id = client.SubmitJob([op1, op2])
 
-    lines = res.stdout.splitlines()
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
-    self.instances = []
-    for line in lines:
-      fields = [fld.strip() for fld in line.split(':')]
+  logging.debug("Got data from cluster, writing instance status file")
 
-      if len(fields) != 3:
-        continue
-      if fields[1] == "no": #no autostart, we don't care about this instance
-        continue
-      name, status = fields[0], fields[2]
+  result = all_results[0]
+  smap = {}
 
-      self.instances.append(Instance(name, status))
+  instances = {}
 
-  def __iter__(self):
-    return self.instances.__iter__()
+  # write the upfile
+  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
+  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
 
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
-class Message(object):
-  """Encapsulation of a notice or error message.
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
-  """
-  def __init__(self, level, msg):
-    self.level = level
-    self.msg = msg
-    self.when = time.time()
+    instances[name] = Instance(name, status, autostart)
+
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
-  def __str__(self):
-    return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
+  client.ArchiveJob(job_id)
 
+  return instances, nodes, smap
 
-class Restarter(object):
+
+class Watcher(object):
   """Encapsulate the logic for restarting erronously halted virtual machines.
 
   The calling program should periodically instantiate me and call Run().
@@ -254,23 +307,86 @@ class Restarter(object):
   to restart machines that are down.
 
   """
-  def __init__(self):
-    sstore = ssconf.SimpleStore()
-    master = sstore.GetMasterNode()
-    if master != socket.gethostname():
-      raise NotMasterError, ("This is not the master node")
-    self.instances = InstanceList()
-    self.messages = []
+  def __init__(self, opts, notepad):
+    self.notepad = notepad
+    master = client.QueryConfigValues(["master_node"])[0]
+    if master != utils.HostInfo().name:
+      raise NotMasterError("This is not the master node")
+    # first archive old jobs
+    self.ArchiveJobs(opts.job_age)
+    # and only then submit new ones
+    self.instances, self.bootids, self.smap = GetClusterData()
+    self.started_instances = set()
+    self.opts = opts
 
   def Run(self):
-    """Make a pass over the list of instances, restarting downed ones.
+    """Watcher run sequence.
+
+    """
+    notepad = self.notepad
+    self.CheckInstances(notepad)
+    self.CheckDisks(notepad)
+    self.VerifyDisks()
+
+  @staticmethod
+  def ArchiveJobs(age):
+    """Archive old jobs.
 
     """
-    notepad = RestarterState()
+    arch_count, left_count = client.AutoArchiveJobs(age)
+    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
+
+  def CheckDisks(self, notepad):
+    """Check all nodes for restarted ones.
+
+    """
+    check_nodes = []
+    for name, (new_id, offline) in self.bootids.iteritems():
+      old = notepad.GetNodeBootID(name)
+      if new_id is None:
+        # Bad node, not returning a boot id
+        if not offline:
+          logging.debug("Node %s missing boot id, skipping secondary checks",
+                        name)
+        continue
+      if old != new_id:
+        # Node's boot ID has changed, proably through a reboot.
+        check_nodes.append(name)
+
+    if check_nodes:
+      # Activate disks for all instances with any of the checked nodes as a
+      # secondary node.
+      for node in check_nodes:
+        if node not in self.smap:
+          continue
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception:
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
+
+      # Keep changed boot IDs
+      for name in check_nodes:
+        notepad.SetNodeBootID(name, self.bootids[name][0])
+
+  def CheckInstances(self, notepad):
+    """Make a pass over the list of instances, restarting downed ones.
 
-    for instance in self.instances:
+    """
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
-        n = notepad.NumberOfAttempts(instance)
+        n = notepad.NumberOfRestartAttempts(instance)
 
         if n > MAXTRIES:
           # stay quiet.
@@ -278,48 +394,59 @@ class Restarter(object):
         elif n < MAXTRIES:
           last = " (Attempt #%d)" % (n + 1)
         else:
-          notepad.RecordAttempt(instance)
-          self.messages.append(Message(ERROR, "Could not restart %s for %d"
-                                       " times, giving up..." %
-                                       (instance.name, MAXTRIES)))
+          notepad.RecordRestartAttempt(instance)
+          logging.error("Could not restart %s after %d attempts, giving up",
+                        instance.name, MAXTRIES)
           continue
         try:
-          self.messages.append(Message(NOTICE,
-                                       "Restarting %s%s." %
-                                       (instance.name, last)))
+          logging.info("Restarting %s%s",
+                        instance.name, last)
           instance.Restart()
-        except Error, x:
-          self.messages.append(Message(ERROR, str(x)))
+          self.started_instances.add(instance.name)
+        except Exception:
+          logging.exception("Error while restarting instance %s",
+                            instance.name)
 
-        notepad.RecordAttempt(instance)
+        notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
       else:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
-          msg = Message(NOTICE,
-                        "Restart of %s succeeded." % instance.name)
-          self.messages.append(msg)
-
-    notepad.Save()
-
-  def WriteReport(self, logfile):
-    """Log all messages to file.
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
+          logging.info("Restart of %s succeeded", instance.name)
 
-    Args:
-      logfile: file object open for writing (the log file)
+  @staticmethod
+  def VerifyDisks():
+    """Run gnt-cluster verify-disks.
 
     """
-    for msg in self.messages:
-      print >> logfile, str(msg)
+    op = opcodes.OpVerifyDisks()
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
+    if not isinstance(result, (tuple, list)):
+      logging.error("Can't get a valid result from verify-disks")
+      return
+    offline_disk_instances = result[2]
+    if not offline_disk_instances:
+      # nothing to do
+      return
+    logging.debug("Will activate disks for instances %s",
+                  ", ".join(offline_disk_instances))
+    # we submit only one job, and wait for it. not optimal, but spams
+    # less the job queue
+    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
+           for name in offline_disk_instances]
+    job_id = cli.SendJob(job, cl=client)
+
+    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
 
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
@@ -328,9 +455,13 @@ def ParseOptions():
                         constants.RELEASE_VERSION)
 
   parser.add_option("-d", "--debug", dest="debug",
-                    help="Don't redirect messages to the log file",
+                    help="Write all messages to stderr",
                     default=False, action="store_true")
+  parser.add_option("-A", "--job-age", dest="job_age",
+                    help="Autoarchive jobs older than this age (default"
+                    " 6 hours)", default=6*3600)
   options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
   return options, args
 
 
@@ -338,21 +469,71 @@ def main():
   """Main function.
 
   """
+  global client
+
   options, args = ParseOptions()
 
-  if not options.debug:
-    sys.stderr = sys.stdout = open(LOGFILE, 'a')
+  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+                     stderr_logging=options.debug)
 
+  update_file = False
   try:
-    restarter = Restarter()
-    restarter.Run()
-    restarter.WriteReport(sys.stdout)
+    # on master or not, try to start the node dameon (use _PID but is
+    # the same as daemon name)
+    EnsureDaemon(constants.NODED_PID)
+
+    notepad = WatcherState()
+    try:
+      try:
+        client = cli.GetClient()
+      except errors.OpPrereqError:
+        # this is, from cli.GetClient, a not-master case
+        logging.debug("Not on master, exiting")
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+      except luxi.NoMasterError, err:
+        logging.warning("Master seems to be down (%s), trying to restart",
+                        str(err))
+        if not StartMaster():
+          logging.critical("Can't start the master, exiting")
+          sys.exit(constants.EXIT_FAILURE)
+        # else retry the connection
+        client = cli.GetClient()
+
+      # we are on master now (use _PID but is the same as daemon name)
+      EnsureDaemon(constants.RAPI_PID)
+
+      try:
+        watcher = Watcher(options, notepad)
+      except errors.ConfigurationError:
+        # Just exit if there's no configuration
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+
+      watcher.Run()
+      update_file = True
+
+    finally:
+      if update_file:
+        notepad.Save()
+      else:
+        logging.debug("Not updating status file due to failure")
+  except SystemExit:
+    raise
   except NotMasterError:
-    if options.debug:
-      sys.stderr.write("Not master, exiting.\n")
+    logging.debug("Not master, exiting")
     sys.exit(constants.EXIT_NOTMASTER)
-  except Error, err:
-    print err
+  except errors.ResolverError, err:
+    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
+    sys.exit(constants.EXIT_NODESETUP_ERROR)
+  except errors.JobQueueFull:
+    logging.error("Job queue is full, can't query cluster state")
+  except errors.JobQueueDrainError:
+    logging.error("Job queue is drained, can't maintain cluster state")
+  except Exception, err:
+    logging.error(str(err), exc_info=True)
+    sys.exit(constants.EXIT_FAILURE)
+
 
 if __name__ == '__main__':
   main()