Modify gnt-node add to call external script
[ganeti-local] / daemons / ganeti-watcher
index 71b548f..60ed04a 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -19,7 +19,7 @@
 # 02110-1301, USA.
 
 
-"""Tool to restart erronously downed virtual machines.
+"""Tool to restart erroneously downed virtual machines.
 
 This program and set of classes implement a watchdog to restart
 virtual machines in a Ganeti cluster that have crashed or been killed
@@ -35,7 +35,6 @@ import os
 import sys
 import time
 import logging
-import errno
 from optparse import OptionParser
 
 from ganeti import utils
@@ -45,9 +44,22 @@ from ganeti import errors
 from ganeti import opcodes
 from ganeti import cli
 from ganeti import luxi
+from ganeti import ssconf
+from ganeti import bdev
+from ganeti import hypervisor
+from ganeti import rapi
+from ganeti.confd import client as confd_client
+from ganeti import netutils
+
+import ganeti.rapi.client # pylint: disable-msg=W0611
 
 
 MAXTRIES = 5
+# Delete any record that is older than 8 hours; this value is based on
+# the fact that the current retry counter is 5, and watcher runs every
+# 5 minutes, so it takes around half an hour to exceed the retry
+# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
+RETRY_EXPIRATION = 8 * 3600
 BAD_STATES = ['ERROR_down']
 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
 NOTICE = 'NOTICE'
@@ -65,53 +77,174 @@ class NotMasterError(errors.GenericError):
   """Exception raised when this host is not the master."""
 
 
-def Indent(s, prefix='| '):
-  """Indent a piece of text with a given prefix before each line.
+def ShouldPause():
+  """Check whether we should pause.
+
+  """
+  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+
 
-  @param s: the string to indent
-  @param prefix: the string to prepend each line
+def StartNodeDaemons():
+  """Start all the daemons that should be running on all nodes.
 
   """
-  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
+  # on master or not, try to start the node daemon
+  utils.EnsureDaemon(constants.NODED)
+  # start confd as well. On non candidates it will be in disabled mode.
+  utils.EnsureDaemon(constants.CONFD)
 
 
-def ShouldPause():
-  """Check whether we should pause.
+def RunWatcherHooks():
+  """Run the watcher hooks.
 
   """
-  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+                             constants.HOOKS_NAME_WATCHER)
+  if not os.path.isdir(hooks_dir):
+    return
+
+  try:
+    results = utils.RunParts(hooks_dir)
+  except Exception, msg: # pylint: disable-msg=W0703
+    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
+
+  for (relname, status, runresult) in results:
+    if status == constants.RUNPARTS_SKIP:
+      logging.debug("Watcher hook %s: skipped", relname)
+    elif status == constants.RUNPARTS_ERR:
+      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
+    elif status == constants.RUNPARTS_RUN:
+      if runresult.failed:
+        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
+                        relname, runresult.exit_code, runresult.output)
+      else:
+        logging.debug("Watcher hook %s: success (output: %s)", relname,
+                      runresult.output)
 
 
-def EnsureDaemon(name):
-  """Check for and start daemon if not alive.
+class NodeMaintenance(object):
+  """Talks to confd daemons and possible shutdown instances/drbd devices.
 
   """
-  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
-  if result.failed:
-    logging.error("Can't start daemon '%s', failure %s, output: %s",
-                  name, result.fail_reason, result.output)
-    return False
+  def __init__(self):
+    self.store_cb = confd_client.StoreResultCallback()
+    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
+    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
+
+  @staticmethod
+  def ShouldRun():
+    """Checks whether node maintenance should run.
+
+    """
+    try:
+      return ssconf.SimpleStore().GetMaintainNodeHealth()
+    except errors.ConfigurationError, err:
+      logging.error("Configuration error, not activating node maintenance: %s",
+                    err)
+      return False
+
+  @staticmethod
+  def GetRunningInstances():
+    """Compute list of hypervisor/running instances.
+
+    """
+    hyp_list = ssconf.SimpleStore().GetHypervisorList()
+    results = []
+    for hv_name in hyp_list:
+      try:
+        hv = hypervisor.GetHypervisor(hv_name)
+        ilist = hv.ListInstances()
+        results.extend([(iname, hv_name) for iname in ilist])
+      except: # pylint: disable-msg=W0702
+        logging.error("Error while listing instances for hypervisor %s",
+                      hv_name, exc_info=True)
+    return results
+
+  @staticmethod
+  def GetUsedDRBDs():
+    """Get list of used DRBD minors.
+
+    """
+    return bdev.DRBD8.GetUsedDevs().keys()
+
+  @classmethod
+  def DoMaintenance(cls, role):
+    """Maintain the instance list.
+
+    """
+    if role == constants.CONFD_NODE_ROLE_OFFLINE:
+      inst_running = cls.GetRunningInstances()
+      cls.ShutdownInstances(inst_running)
+      drbd_running = cls.GetUsedDRBDs()
+      cls.ShutdownDRBD(drbd_running)
+    else:
+      logging.debug("Not doing anything for role %s", role)
+
+  @staticmethod
+  def ShutdownInstances(inst_running):
+    """Shutdown running instances.
+
+    """
+    names_running = set([i[0] for i in inst_running])
+    if names_running:
+      logging.info("Following instances should not be running,"
+                   " shutting them down: %s", utils.CommaJoin(names_running))
+      # this dictionary will collapse duplicate instance names (only
+      # xen pvm/vhm) into a single key, which is fine
+      i2h = dict(inst_running)
+      for name in names_running:
+        hv_name = i2h[name]
+        hv = hypervisor.GetHypervisor(hv_name)
+        hv.StopInstance(None, force=True, name=name)
 
-  return True
+  @staticmethod
+  def ShutdownDRBD(drbd_running):
+    """Shutdown active DRBD devices.
+
+    """
+    if drbd_running:
+      logging.info("Following DRBD minors should not be active,"
+                   " shutting them down: %s", utils.CommaJoin(drbd_running))
+      for minor in drbd_running:
+        # pylint: disable-msg=W0212
+        # using the private method as is, pending enhancements to the DRBD
+        # interface
+        bdev.DRBD8._ShutdownAll(minor)
+
+  def Exec(self):
+    """Check node status versus cluster desired state.
+
+    """
+    my_name = netutils.Hostname.GetSysName()
+    req = confd_client.ConfdClientRequest(type=
+                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
+                                          query=my_name)
+    self.confd_client.SendRequest(req, async=False, coverage=-1)
+    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
+    if not timed_out:
+      # should have a valid response
+      status, result = self.store_cb.GetResponse(req.rsalt)
+      assert status, "Missing result but received replies"
+      if not self.filter_cb.consistent[req.rsalt]:
+        logging.warning("Inconsistent replies, not doing anything")
+        return
+      self.DoMaintenance(result.server_reply.answer)
+    else:
+      logging.warning("Confd query timed out, cannot do maintenance actions")
 
 
 class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
   """
-  def __init__(self):
+  def __init__(self, statefile):
     """Open, lock, read and parse the file.
 
-    Raises exception on lock contention.
+    @type statefile: file
+    @param statefile: State file object
 
     """
-    # The two-step dance below is necessary to allow both opening existing
-    # file read/write and creating if not existing.  Vanilla open will truncate
-    # an existing file -or- allow creating if not existing.
-    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    self.statefile = os.fdopen(fd, 'w+')
-
-    utils.LockFile(self.statefile.fileno())
+    self.statefile = statefile
 
     try:
       state_data = self.statefile.read()
@@ -198,6 +331,28 @@ class WatcherState(object):
 
     return 0
 
+  def MaintainInstanceList(self, instances):
+    """Perform maintenance on the recorded instances.
+
+    @type instances: list of string
+    @param instances: the list of currently existing instances
+
+    """
+    idict = self._data["instance"]
+    # First, delete obsolete instances
+    obsolete_instances = set(idict).difference(instances)
+    for inst in obsolete_instances:
+      logging.debug("Forgetting obsolete instance %s", inst)
+      del idict[inst]
+
+    # Second, delete expired records
+    earliest = time.time() - RETRY_EXPIRATION
+    expired_instances = [i for i in idict
+                         if idict[i][KEY_RESTART_WHEN] < earliest]
+    for inst in expired_instances:
+      logging.debug("Expiring record for instance %s", inst)
+      del idict[inst]
+
   def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
@@ -301,7 +456,7 @@ def GetClusterData():
 
 
 class Watcher(object):
-  """Encapsulate the logic for restarting erronously halted virtual machines.
+  """Encapsulate the logic for restarting erroneously halted virtual machines.
 
   The calling program should periodically instantiate me and call Run().
   This will traverse the list of instances, and make up to MAXTRIES attempts
@@ -311,7 +466,7 @@ class Watcher(object):
   def __init__(self, opts, notepad):
     self.notepad = notepad
     master = client.QueryConfigValues(["master_node"])[0]
-    if master != utils.HostInfo().name:
+    if master != netutils.Hostname.GetSysName():
       raise NotMasterError("This is not the master node")
     # first archive old jobs
     self.ArchiveJobs(opts.job_age)
@@ -385,12 +540,15 @@ class Watcher(object):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
+    notepad.MaintainInstanceList(self.instances.keys())
+
     for instance in self.instances.values():
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
         if n > MAXTRIES:
-          # stay quiet.
+          logging.warning("Not restarting instance %s, retries exhausted",
+                          instance.name)
           continue
         elif n < MAXTRIES:
           last = " (Attempt #%d)" % (n + 1)
@@ -441,7 +599,61 @@ class Watcher(object):
            for name in offline_disk_instances]
     job_id = cli.SendJob(job, cl=client)
 
-    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+    try:
+      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+    except Exception: # pylint: disable-msg=W0703
+      logging.exception("Error while activating disks")
+
+
+def OpenStateFile(path):
+  """Opens the state file and acquires a lock on it.
+
+  @type path: string
+  @param path: Path to state file
+
+  """
+  # The two-step dance below is necessary to allow both opening existing
+  # file read/write and creating if not existing. Vanilla open will truncate
+  # an existing file -or- allow creating if not existing.
+  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
+
+  # Try to acquire lock on state file. If this fails, another watcher instance
+  # might already be running or another program is temporarily blocking the
+  # watcher from running.
+  try:
+    utils.LockFile(statefile_fd)
+  except errors.LockError, err:
+    logging.error("Can't acquire lock on state file %s: %s", path, err)
+    return None
+
+  return os.fdopen(statefile_fd, "w+")
+
+
+def IsRapiResponding(hostname):
+  """Connects to RAPI port and does a simple test.
+
+  Connects to RAPI port of hostname and does a simple test. At this time, the
+  test is GetVersion.
+
+  @type hostname: string
+  @param hostname: hostname of the node to connect to.
+  @rtype: bool
+  @return: Whether RAPI is working properly
+
+  """
+  curl_config = rapi.client.GenericCurlConfig(cafile=constants.RAPI_CERT_FILE)
+  rapi_client = rapi.client.GanetiRapiClient(hostname,
+                                             curl_config_fn=curl_config)
+  try:
+    master_version = rapi_client.GetVersion()
+  except rapi.client.CertificateError, err:
+    logging.warning("RAPI Error: CertificateError (%s)", err)
+    return False
+  except rapi.client.GanetiApiError, err:
+    logging.warning("RAPI Error: GanetiApiError (%s)", err)
+    return False
+  logging.debug("RAPI Result: master_version is %s", master_version)
+  return master_version == constants.RAPI_VERSION
 
 
 def ParseOptions():
@@ -464,6 +676,7 @@ def ParseOptions():
   return options, args
 
 
+@rapi.client.UsesRapiClient
 def main():
   """Main function.
 
@@ -472,6 +685,10 @@ def main():
 
   options, args = ParseOptions()
 
+  if args: # watcher doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
                      stderr_logging=options.debug)
 
@@ -479,12 +696,20 @@ def main():
     logging.debug("Pause has been set, exiting")
     sys.exit(constants.EXIT_SUCCESS)
 
+  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
+  if not statefile:
+    sys.exit(constants.EXIT_FAILURE)
+
   update_file = False
   try:
-    # on master or not, try to start the node dameon
-    EnsureDaemon(constants.NODED)
-
-    notepad = WatcherState()
+    StartNodeDaemons()
+    RunWatcherHooks()
+    # run node maintenance in all cases, even if master, so that old
+    # masters can be properly cleaned up too
+    if NodeMaintenance.ShouldRun():
+      NodeMaintenance().Exec()
+
+    notepad = WatcherState(statefile)
     try:
       try:
         client = cli.GetClient()
@@ -496,14 +721,26 @@ def main():
       except luxi.NoMasterError, err:
         logging.warning("Master seems to be down (%s), trying to restart",
                         str(err))
-        if not EnsureDaemon(constants.MASTERD):
+        if not utils.EnsureDaemon(constants.MASTERD):
           logging.critical("Can't start the master, exiting")
           sys.exit(constants.EXIT_FAILURE)
         # else retry the connection
         client = cli.GetClient()
 
       # we are on master now
-      EnsureDaemon(constants.RAPI)
+      utils.EnsureDaemon(constants.RAPI)
+
+      # If RAPI isn't responding to queries, try one restart.
+      logging.debug("Attempting to talk with RAPI.")
+      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
+                        " Restarting Ganeti RAPI.")
+        utils.StopDaemon(constants.RAPI)
+        utils.EnsureDaemon(constants.RAPI)
+        logging.debug("Second attempt to talk with RAPI")
+        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+          logging.fatal("RAPI is not responding. Please investigate.")
+      logging.debug("Successfully talked to RAPI.")
 
       try:
         watcher = Watcher(options, notepad)
@@ -533,7 +770,7 @@ def main():
   except errors.JobQueueDrainError:
     logging.error("Job queue is drained, can't maintain cluster state")
   except Exception, err:
-    logging.error(str(err), exc_info=True)
+    logging.exception(str(err))
     sys.exit(constants.EXIT_FAILURE)