Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-watcher
index 7e46067..42a2eaf 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -29,23 +29,22 @@ by a node reboot.  Run from cron or similar.
 
 import os
 import sys
 
 import os
 import sys
-import re
 import time
 import time
-import fcntl
-import errno
 import logging
 from optparse import OptionParser
 
 from ganeti import utils
 from ganeti import constants
 from ganeti import serializer
 import logging
 from optparse import OptionParser
 
 from ganeti import utils
 from ganeti import constants
 from ganeti import serializer
-from ganeti import ssconf
 from ganeti import errors
 from ganeti import errors
+from ganeti import opcodes
+from ganeti import cli
+from ganeti import luxi
 
 
 MAXTRIES = 5
 
 
 MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
+BAD_STATES = ['ERROR_down']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
 NOTICE = 'NOTICE'
 ERROR = 'ERROR'
 KEY_RESTART_COUNT = "restart_count"
 NOTICE = 'NOTICE'
 ERROR = 'ERROR'
 KEY_RESTART_COUNT = "restart_count"
@@ -53,44 +52,32 @@ KEY_RESTART_WHEN = "restart_when"
 KEY_BOOT_ID = "bootid"
 
 
 KEY_BOOT_ID = "bootid"
 
 
-class Error(Exception):
-  """Generic custom error class."""
+# Global client object
+client = None
 
 
 
 
-class NotMasterError(Error):
+class NotMasterError(errors.GenericError):
   """Exception raised when this host is not the master."""
 
 
 def Indent(s, prefix='| '):
   """Indent a piece of text with a given prefix before each line.
 
   """Exception raised when this host is not the master."""
 
 
 def Indent(s, prefix='| '):
   """Indent a piece of text with a given prefix before each line.
 
-  Args:
-    s: The string to indent
-    prefix: The string to prepend each line.
+  @param s: the string to indent
+  @param prefix: the string to prepend each line
 
   """
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
 
   """
   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
 
 
-def DoCmd(cmd):
-  """Run a shell command.
-
-  Args:
-    cmd: the command to run.
-
-  Raises CommandError with verbose commentary on error.
+def StartMaster():
+  """Try to start the master daemon.
 
   """
 
   """
-  res = utils.RunCmd(cmd)
-
-  if res.failed:
-    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
-                (repr(cmd),
-                 Indent(res.fail_reason),
-                 Indent(res.stdout),
-                 Indent(res.stderr)))
-
-  return res
+  result = utils.RunCmd(['ganeti-masterd'])
+  if result.failed:
+    logging.error("Can't start the master daemon: output '%s'", result.output)
+  return not result.failed
 
 
 class WatcherState(object):
 
 
 class WatcherState(object):
@@ -100,52 +87,62 @@ class WatcherState(object):
   def __init__(self):
     """Open, lock, read and parse the file.
 
   def __init__(self):
     """Open, lock, read and parse the file.
 
-    Raises StandardError on lock contention.
+    Raises exception on lock contention.
 
     """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
 
     """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
-    f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    f = os.fdopen(f, 'w+')
+    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
+    self.statefile = os.fdopen(fd, 'w+')
 
 
-    try:
-      fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
-    except IOError, x:
-      if x.errno == errno.EAGAIN:
-        raise StandardError("State file already locked")
-      raise
-
-    self.statefile = f
+    utils.LockFile(self.statefile.fileno())
 
     try:
 
     try:
-      self.data = serializer.Load(self.statefile.read())
+      state_data = self.statefile.read()
+      if not state_data:
+        self._data = {}
+      else:
+        self._data = serializer.Load(state_data)
     except Exception, msg:
       # Ignore errors while loading the file and treat it as empty
     except Exception, msg:
       # Ignore errors while loading the file and treat it as empty
-      self.data = {}
-      logging.warning(("Empty or invalid state file. Using defaults."
+      self._data = {}
+      logging.warning(("Invalid state file. Using defaults."
                        " Error message: %s"), msg)
 
                        " Error message: %s"), msg)
 
-    if "instance" not in self.data:
-      self.data["instance"] = {}
-    if "node" not in self.data:
-      self.data["node"] = {}
+    if "instance" not in self._data:
+      self._data["instance"] = {}
+    if "node" not in self._data:
+      self._data["node"] = {}
 
 
-  def __del__(self):
-    """Called on destruction.
+    self._orig_data = serializer.Dump(self._data)
+
+  def Save(self):
+    """Save state to file, then unlock and close it.
 
     """
 
     """
-    if self.statefile:
-      self._Close()
+    assert self.statefile
+
+    serialized_form = serializer.Dump(self._data)
+    if self._orig_data == serialized_form:
+      logging.debug("Data didn't change, just touching status file")
+      os.utime(constants.WATCHER_STATEFILE, None)
+      return
 
 
-  def _Close(self):
+    # We need to make sure the file is locked before renaming it, otherwise
+    # starting ganeti-watcher again at the same time will create a conflict.
+    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+                         data=serialized_form,
+                         prewrite=utils.LockFile, close=False)
+    self.statefile = os.fdopen(fd, 'w+')
+
+  def Close(self):
     """Unlock configuration file and close it.
 
     """
     assert self.statefile
 
     """Unlock configuration file and close it.
 
     """
     assert self.statefile
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
-
+    # Files are automatically unlocked when closing them
     self.statefile.close()
     self.statefile = None
 
     self.statefile.close()
     self.statefile = None
 
@@ -153,7 +150,7 @@ class WatcherState(object):
     """Returns the last boot ID of a node or None.
 
     """
     """Returns the last boot ID of a node or None.
 
     """
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name in ndata and KEY_BOOT_ID in ndata[name]:
       return ndata[name][KEY_BOOT_ID]
 
     if name in ndata and KEY_BOOT_ID in ndata[name]:
       return ndata[name][KEY_BOOT_ID]
@@ -165,7 +162,7 @@ class WatcherState(object):
     """
     assert bootid
 
     """
     assert bootid
 
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name not in ndata:
       ndata[name] = {}
 
     if name not in ndata:
       ndata[name] = {}
@@ -175,11 +172,11 @@ class WatcherState(object):
   def NumberOfRestartAttempts(self, instance):
     """Returns number of previous restart attempts.
 
   def NumberOfRestartAttempts(self, instance):
     """Returns number of previous restart attempts.
 
-    Args:
-      instance - the instance to look up.
+    @type instance: L{Instance}
+    @param instance: the instance to look up
 
     """
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       return idata[instance.name][KEY_RESTART_COUNT]
 
     if instance.name in idata:
       return idata[instance.name][KEY_RESTART_COUNT]
@@ -189,11 +186,11 @@ class WatcherState(object):
   def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
   def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
-    Args:
-      instance - the instance being restarted
+    @type instance: L{Instance}
+    @param instance: the instance being restarted
 
     """
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name not in idata:
       inst = idata[instance.name] = {}
 
     if instance.name not in idata:
       inst = idata[instance.name] = {}
@@ -204,39 +201,24 @@ class WatcherState(object):
     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
   def RemoveInstance(self, instance):
     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
   def RemoveInstance(self, instance):
-    """Update state to reflect that a machine is running, i.e. remove record.
+    """Update state to reflect that a machine is running.
 
 
-    Args:
-      instance - the instance to remove from books
+    This method removes the record for a named instance (as we only
+    track down instances).
 
 
-    This method removes the record for a named instance.
+    @type instance: L{Instance}
+    @param instance: the instance to remove from books
 
     """
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       del idata[instance.name]
 
 
     if instance.name in idata:
       del idata[instance.name]
 
-  def Save(self):
-    """Save state to file, then unlock and close it.
-
-    """
-    assert self.statefile
-
-    self.statefile.seek(0)
-    self.statefile.truncate()
-
-    self.statefile.write(serializer.Dump(self.data))
-
-    self._Close()
-
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
-  Methods:
-    Restart(): issue a command to restart the represented machine.
-
   """
   def __init__(self, name, state, autostart):
     self.name = name
   """
   def __init__(self, name, state, autostart):
     self.name = name
@@ -247,73 +229,53 @@ class Instance(object):
     """Encapsulates the start of an instance.
 
     """
     """Encapsulates the start of an instance.
 
     """
-    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
+    cli.SubmitOpCode(op, cl=client)
 
   def ActivateDisks(self):
     """Encapsulates the activation of all disks of an instance.
 
     """
 
   def ActivateDisks(self):
     """Encapsulates the activation of all disks of an instance.
 
     """
-    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
-
+    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=client)
 
 
-def _RunListCmd(cmd):
-  """Runs a command and parses its output into lists.
-
-  """
-  for line in DoCmd(cmd).stdout.splitlines():
-    yield line.split(':')
 
 
-
-def GetInstanceList(with_secondaries=None):
+def GetClusterData():
   """Get a list of instances on this cluster.
 
   """
   """Get a list of instances on this cluster.
 
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:']
-
-  fields = 'name,oper_state,admin_state'
-
-  if with_secondaries is not None:
-    fields += ',snodes'
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
 
-  cmd.append('-o')
-  cmd.append(fields)
+  job_id = client.SubmitJob([op1, op2])
 
 
-  instances = []
-  for fields in _RunListCmd(cmd):
-    if with_secondaries is not None:
-      (name, status, autostart, snodes) = fields
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
 
-      if snodes == "-":
-        continue
-
-      for node in with_secondaries:
-        if node in snodes.split(','):
-          break
-      else:
-        continue
+  result = all_results[0]
+  smap = {}
 
 
-    else:
-      (name, status, autostart) = fields
-
-    instances.append(Instance(name, status, autostart != "no"))
+  instances = {}
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
 
-  return instances
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
 
+    instances[name] = Instance(name, status, autostart)
 
 
-def GetNodeBootIDs():
-  """Get a dict mapping nodes to boot IDs.
-
-  """
-  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:', '-o', 'name,bootid']
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
 
-  ids = {}
-  for fields in _RunListCmd(cmd):
-    (name, bootid) = fields
-    ids[name] = bootid
+  client.ArchiveJob(job_id)
 
 
-  return ids
+  return instances, nodes, smap
 
 
 class Watcher(object):
 
 
 class Watcher(object):
@@ -324,64 +286,81 @@ class Watcher(object):
   to restart machines that are down.
 
   """
   to restart machines that are down.
 
   """
-  def __init__(self):
-    sstore = ssconf.SimpleStore()
-    master = sstore.GetMasterNode()
+  def __init__(self, opts, notepad):
+    self.notepad = notepad
+    master = client.QueryConfigValues(["master_node"])[0]
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
-    self.instances = GetInstanceList()
-    self.bootids = GetNodeBootIDs()
+    self.instances, self.bootids, self.smap = GetClusterData()
     self.started_instances = set()
     self.started_instances = set()
+    self.opts = opts
 
   def Run(self):
 
   def Run(self):
-    notepad = WatcherState()
+    """Watcher run sequence.
+
+    """
+    notepad = self.notepad
+    self.ArchiveJobs(self.opts.job_age)
     self.CheckInstances(notepad)
     self.CheckDisks(notepad)
     self.VerifyDisks()
     self.CheckInstances(notepad)
     self.CheckDisks(notepad)
     self.VerifyDisks()
-    notepad.Save()
+
+  def ArchiveJobs(self, age):
+    """Archive old jobs.
+
+    """
+    arch_count, left_count = client.AutoArchiveJobs(age)
+    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
 
   def CheckDisks(self, notepad):
     """Check all nodes for restarted ones.
 
     """
     check_nodes = []
 
   def CheckDisks(self, notepad):
     """Check all nodes for restarted ones.
 
     """
     check_nodes = []
-    for name, id in self.bootids.iteritems():
+    for name, (new_id, offline) in self.bootids.iteritems():
       old = notepad.GetNodeBootID(name)
       old = notepad.GetNodeBootID(name)
-      if old != id:
+      if new_id is None:
+        # Bad node, not returning a boot id
+        if not offline:
+          logging.debug("Node %s missing boot id, skipping secondary checks",
+                        name)
+        continue
+      if old != new_id:
         # Node's boot ID has changed, proably through a reboot.
         check_nodes.append(name)
 
     if check_nodes:
       # Activate disks for all instances with any of the checked nodes as a
       # secondary node.
         # Node's boot ID has changed, proably through a reboot.
         check_nodes.append(name)
 
     if check_nodes:
       # Activate disks for all instances with any of the checked nodes as a
       # secondary node.
-      for instance in GetInstanceList(with_secondaries=check_nodes):
-        if not instance.autostart:
-          logging.info(("Skipping disk activation for non-autostart"
-                        " instance %s"), instance.name)
-          continue
-        if instance.name in self.started_instances:
-          # we already tried to start the instance, which should have
-          # activated its drives (if they can be at all)
+      for node in check_nodes:
+        if node not in self.smap:
           continue
           continue
-        try:
-          logging.info("Activating disks for instance %s", instance.name)
-          instance.ActivateDisks()
-        except Error, err:
-          logging.error(str(err), exc_info=True)
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception:
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
 
       # Keep changed boot IDs
       for name in check_nodes:
 
       # Keep changed boot IDs
       for name in check_nodes:
-        notepad.SetNodeBootID(name, self.bootids[name])
+        notepad.SetNodeBootID(name, self.bootids[name][0])
 
   def CheckInstances(self, notepad):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
 
   def CheckInstances(self, notepad):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
-    for instance in self.instances:
-      # Don't care about manually stopped instances
-      if not instance.autostart:
-        continue
-
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
@@ -400,8 +379,9 @@ class Watcher(object):
                         instance.name, last)
           instance.Restart()
           self.started_instances.add(instance.name)
                         instance.name, last)
           instance.Restart()
           self.started_instances.add(instance.name)
-        except Error, err:
-          logging.error(str(err), exc_info=True)
+        except Exception:
+          logging.exception("Error while restarting instance %s",
+                            instance.name)
 
         notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
 
         notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
@@ -412,20 +392,37 @@ class Watcher(object):
           notepad.RemoveInstance(instance)
           logging.info("Restart of %s succeeded", instance.name)
 
           notepad.RemoveInstance(instance)
           logging.info("Restart of %s succeeded", instance.name)
 
-  def VerifyDisks(self):
+  @staticmethod
+  def VerifyDisks():
     """Run gnt-cluster verify-disks.
 
     """
     """Run gnt-cluster verify-disks.
 
     """
-    result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15'])
-    if result.output:
-      logging.info(result.output)
+    op = opcodes.OpVerifyDisks()
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
+    if not isinstance(result, (tuple, list)):
+      logging.error("Can't get a valid result from verify-disks")
+      return
+    offline_disk_instances = result[2]
+    if not offline_disk_instances:
+      # nothing to do
+      return
+    logging.debug("Will activate disks for instances %s",
+                  ", ".join(offline_disk_instances))
+    # we submit only one job, and wait for it. not optimal, but spams
+    # less the job queue
+    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
+           for name in offline_disk_instances]
+    job_id = cli.SendJob(job, cl=client)
+
+    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
 
 def ParseOptions():
   """Parse the command line options.
 
 
 
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
@@ -436,48 +433,59 @@ def ParseOptions():
   parser.add_option("-d", "--debug", dest="debug",
                     help="Write all messages to stderr",
                     default=False, action="store_true")
   parser.add_option("-d", "--debug", dest="debug",
                     help="Write all messages to stderr",
                     default=False, action="store_true")
+  parser.add_option("-A", "--job-age", dest="job_age",
+                    help="Autoarchive jobs older than this age (default"
+                    " 6 hours)", default=6*3600)
   options, args = parser.parse_args()
   options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
   return options, args
 
 
   return options, args
 
 
-def SetupLogging(debug):
-  """Configures the logging module.
-
-  """
-  formatter = logging.Formatter("%(asctime)s: %(message)s")
-
-  logfile_handler = logging.FileHandler(constants.LOG_WATCHER)
-  logfile_handler.setFormatter(formatter)
-  logfile_handler.setLevel(logging.INFO)
-
-  stderr_handler = logging.StreamHandler()
-  stderr_handler.setFormatter(formatter)
-  if debug:
-    stderr_handler.setLevel(logging.NOTSET)
-  else:
-    stderr_handler.setLevel(logging.CRITICAL)
-
-  root_logger = logging.getLogger("")
-  root_logger.setLevel(logging.NOTSET)
-  root_logger.addHandler(logfile_handler)
-  root_logger.addHandler(stderr_handler)
-
-
 def main():
   """Main function.
 
   """
 def main():
   """Main function.
 
   """
+  global client
+
   options, args = ParseOptions()
 
   options, args = ParseOptions()
 
-  SetupLogging(options.debug)
+  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+                     stderr_logging=options.debug)
 
 
+  update_file = True
   try:
   try:
+    notepad = WatcherState()
     try:
     try:
-      watcher = Watcher()
-    except errors.ConfigurationError:
-      # Just exit if there's no configuration
-      sys.exit(constants.EXIT_SUCCESS)
-    watcher.Run()
+      try:
+        client = cli.GetClient()
+      except errors.OpPrereqError:
+        # this is, from cli.GetClient, a not-master case
+        logging.debug("Not on master, exiting")
+        sys.exit(constants.EXIT_SUCCESS)
+      except luxi.NoMasterError, err:
+        logging.warning("Master seems to be down (%s), trying to restart",
+                        str(err))
+        if not StartMaster():
+          logging.critical("Can't start the master, exiting")
+          update_file = False
+          sys.exit(constants.EXIT_FAILURE)
+        # else retry the connection
+        client = cli.GetClient()
+
+      try:
+        watcher = Watcher(options, notepad)
+      except errors.ConfigurationError:
+        # Just exit if there's no configuration
+        sys.exit(constants.EXIT_SUCCESS)
+
+      watcher.Run()
+    finally:
+      if update_file:
+        notepad.Save()
+      else:
+        logging.debug("Not updating status file due to failure")
+  except SystemExit:
+    raise
   except NotMasterError:
     logging.debug("Not master, exiting")
     sys.exit(constants.EXIT_NOTMASTER)
   except NotMasterError:
     logging.debug("Not master, exiting")
     sys.exit(constants.EXIT_NOTMASTER)