Watcher: automatic shutdown of orphan resources
authorIustin Pop <iustin@google.com>
Mon, 22 Mar 2010 15:10:03 +0000 (16:10 +0100)
committerIustin Pop <iustin@google.com>
Thu, 8 Apr 2010 15:50:37 +0000 (17:50 +0200)
This patch changes the watcher so that it maintains (on all nodes) the
list of instances and DRBD devices by shutting down ones that confd
daemons indicate should not be running on this node.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

daemons/ganeti-watcher

index 8e216c2..034cc97 100755 (executable)
@@ -44,6 +44,10 @@ from ganeti import errors
 from ganeti import opcodes
 from ganeti import cli
 from ganeti import luxi
+from ganeti import ssconf
+from ganeti import bdev
+from ganeti import hypervisor
+from ganeti.confd import client as confd_client
 
 
 MAXTRIES = 5
@@ -109,6 +113,117 @@ def RunWatcherHooks():
                       runresult.output)
 
 
+class NodeMaintenance(object):
+  """Talks to confd daemons and possible shutdown instances/drbd devices.
+
+  """
+  def __init__(self):
+    self.store_cb = confd_client.StoreResultCallback()
+    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
+    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
+
+  @staticmethod
+  def ShouldRun():
+    """Checks whether node maintenance should run.
+
+    """
+    try:
+      return ssconf.SimpleStore().GetMaintainNodeHealth()
+    except errors.ConfigurationError, err:
+      logging.error("Configuration error, not activating node maintenance: %s",
+                    err)
+      return False
+
+  @staticmethod
+  def GetRunningInstances():
+    """Compute list of hypervisor/running instances.
+
+    """
+    hyp_list = ssconf.SimpleStore().GetHypervisorList()
+    results = []
+    for hv_name in hyp_list:
+      try:
+        hv = hypervisor.GetHypervisor(hv_name)
+        ilist = hv.ListInstances()
+        results.extend([(iname, hv_name) for iname in ilist])
+      except: # pylint: disable-msg=W0702
+        logging.error("Error while listing instances for hypervisor %s",
+                      hv_name, exc_info=True)
+    return results
+
+  @staticmethod
+  def GetUsedDRBDs():
+    """Get list of used DRBD minors.
+
+    """
+    return bdev.DRBD8.GetUsedDevs().keys()
+
+  @classmethod
+  def DoMaintenance(cls, role):
+    """Maintain the instance list.
+
+    """
+    if role == constants.CONFD_NODE_ROLE_OFFLINE:
+      inst_running = cls.GetRunningInstances()
+      cls.ShutdownInstances(inst_running)
+      drbd_running = cls.GetUsedDRBDs()
+      cls.ShutdownDRBD(drbd_running)
+    else:
+      logging.debug("Not doing anything for role %s", role)
+
+  @staticmethod
+  def ShutdownInstances(inst_running):
+    """Shutdown running instances.
+
+    """
+    names_running = set([i[0] for i in inst_running])
+    if names_running:
+      logging.info("Following instances should not be running,"
+                   " shutting them down: %s", utils.CommaJoin(names_running))
+      # this dictionary will collapse duplicate instance names (only
+      # xen pvm/vhm) into a single key, which is fine
+      i2h = dict(inst_running)
+      for name in names_running:
+        hv_name = i2h[name]
+        hv = hypervisor.GetHypervisor(hv_name)
+        hv.StopInstance(None, force=True, name=name)
+
+  @staticmethod
+  def ShutdownDRBD(drbd_running):
+    """Shutdown active DRBD devices.
+
+    """
+    if drbd_running:
+      logging.info("Following DRBD minors should not be active,"
+                   " shutting them down: %s", utils.CommaJoin(drbd_running))
+      for minor in drbd_running:
+        # pylint: disable-msg=W0212
+        # using the private method as is, pending enhancements to the DRBD
+        # interface
+        bdev.DRBD8._ShutdownAll(minor)
+
+  def Exec(self):
+    """Check node status versus cluster desired state.
+
+    """
+    my_name = utils.HostInfo().name
+    req = confd_client.ConfdClientRequest(type=
+                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
+                                          query=my_name)
+    self.confd_client.SendRequest(req, async=False)
+    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
+    if not timed_out:
+      # should have a valid response
+      status, result = self.store_cb.GetResponse(req.rsalt)
+      assert status, "Missing result but received replies"
+      if not self.filter_cb.consistent[req.rsalt]:
+        logging.warning("Inconsistent replies, not doing anything")
+        return
+      self.DoMaintenance(result.server_reply.answer)
+    else:
+      logging.warning("Confd query timed out, cannot do maintenance actions")
+
+
 class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
@@ -527,6 +642,10 @@ def main():
   try:
     StartNodeDaemons()
     RunWatcherHooks()
+    # run node maintenance in all cases, even if master, so that old
+    # masters can be properly cleaned up too
+    if NodeMaintenance.ShouldRun():
+      NodeMaintenance().Exec()
 
     notepad = WatcherState(statefile)
     try: