errors: Document arguments to QueryFilterParseError
[ganeti-local] / lib / watcher / __init__.py
index 0e62063..c60f4a1 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -27,52 +27,53 @@ by a node reboot.  Run from cron or similar.
 
 """
 
-# pylint: disable-msg=C0103,W0142
-
-# C0103: Invalid name ganeti-watcher
-
 import os
 import os.path
 import sys
 import time
 import logging
+import operator
+import errno
 from optparse import OptionParser
 
 from ganeti import utils
 from ganeti import constants
 from ganeti import compat
-from ganeti import serializer
 from ganeti import errors
 from ganeti import opcodes
 from ganeti import cli
 from ganeti import luxi
-from ganeti import ssconf
-from ganeti import bdev
-from ganeti import hypervisor
 from ganeti import rapi
-from ganeti.confd import client as confd_client
 from ganeti import netutils
+from ganeti import qlang
+from ganeti import objects
+from ganeti import ssconf
+from ganeti import ht
+from ganeti import pathutils
 
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
+from ganeti.rapi.client import UsesRapiClient
+
+from ganeti.watcher import nodemaint
+from ganeti.watcher import state
 
 
 MAXTRIES = 5
-# Delete any record that is older than 8 hours; this value is based on
-# the fact that the current retry counter is 5, and watcher runs every
-# 5 minutes, so it takes around half an hour to exceed the retry
-# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
-RETRY_EXPIRATION = 8 * 3600
-BAD_STATES = [constants.INSTST_ERRORDOWN]
-HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
-KEY_RESTART_COUNT = "restart_count"
-KEY_RESTART_WHEN = "restart_when"
-KEY_BOOT_ID = "bootid"
+BAD_STATES = frozenset([
+  constants.INSTST_ERRORDOWN,
+  ])
+HELPLESS_STATES = frozenset([
+  constants.INSTST_NODEDOWN,
+  constants.INSTST_NODEOFFLINE,
+  ])
+NOTICE = "NOTICE"
+ERROR = "ERROR"
 
+#: Number of seconds to wait between starting child processes for node groups
+CHILD_PROCESS_DELAY = 1.0
 
-# Global LUXI client object
-client = None
+#: How many seconds to wait for instance status file lock
+INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
 
 
 class NotMasterError(errors.GenericError):
@@ -83,7 +84,7 @@ def ShouldPause():
   """Check whether we should pause.
 
   """
-  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
 
 
 def StartNodeDaemons():
@@ -93,22 +94,23 @@ def StartNodeDaemons():
   # on master or not, try to start the node daemon
   utils.EnsureDaemon(constants.NODED)
   # start confd as well. On non candidates it will be in disabled mode.
-  utils.EnsureDaemon(constants.CONFD)
+  if constants.ENABLE_CONFD:
+    utils.EnsureDaemon(constants.CONFD)
 
 
 def RunWatcherHooks():
   """Run the watcher hooks.
 
   """
-  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
                              constants.HOOKS_NAME_WATCHER)
   if not os.path.isdir(hooks_dir):
     return
 
   try:
     results = utils.RunParts(hooks_dir)
-  except Exception: # pylint: disable-msg=W0703
-    logging.exception("RunParts %s failed: %s", hooks_dir)
+  except Exception, err: # pylint: disable=W0703
+    logging.exception("RunParts %s failed: %s", hooks_dir, err)
     return
 
   for (relname, status, runresult) in results:
@@ -123,697 +125,668 @@ def RunWatcherHooks():
       else:
         logging.debug("Watcher hook %s: success (output: %s)", relname,
                       runresult.output)
+    else:
+      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
+                                   status)
 
 
-class NodeMaintenance(object):
-  """Talks to confd daemons and possible shutdown instances/drbd devices.
+class Instance(object):
+  """Abstraction for a Virtual Machine instance.
 
   """
-  def __init__(self):
-    self.store_cb = confd_client.StoreResultCallback()
-    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
-    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
+  def __init__(self, name, status, autostart, snodes):
+    self.name = name
+    self.status = status
+    self.autostart = autostart
+    self.snodes = snodes
 
-  @staticmethod
-  def ShouldRun():
-    """Checks whether node maintenance should run.
+  def Restart(self, cl):
+    """Encapsulates the start of an instance.
 
     """
-    try:
-      return ssconf.SimpleStore().GetMaintainNodeHealth()
-    except errors.ConfigurationError, err:
-      logging.error("Configuration error, not activating node maintenance: %s",
-                    err)
-      return False
+    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
+    cli.SubmitOpCode(op, cl=cl)
 
-  @staticmethod
-  def GetRunningInstances():
-    """Compute list of hypervisor/running instances.
+  def ActivateDisks(self, cl):
+    """Encapsulates the activation of all disks of an instance.
 
     """
-    hyp_list = ssconf.SimpleStore().GetHypervisorList()
-    results = []
-    for hv_name in hyp_list:
-      try:
-        hv = hypervisor.GetHypervisor(hv_name)
-        ilist = hv.ListInstances()
-        results.extend([(iname, hv_name) for iname in ilist])
-      except: # pylint: disable-msg=W0702
-        logging.error("Error while listing instances for hypervisor %s",
-                      hv_name, exc_info=True)
-    return results
-
-  @staticmethod
-  def GetUsedDRBDs():
-    """Get list of used DRBD minors.
+    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=cl)
 
-    """
-    return bdev.DRBD8.GetUsedDevs().keys()
 
-  @classmethod
-  def DoMaintenance(cls, role):
-    """Maintain the instance list.
+class Node:
+  """Data container representing cluster node.
+
+  """
+  def __init__(self, name, bootid, offline, secondaries):
+    """Initializes this class.
 
     """
-    if role == constants.CONFD_NODE_ROLE_OFFLINE:
-      inst_running = cls.GetRunningInstances()
-      cls.ShutdownInstances(inst_running)
-      drbd_running = cls.GetUsedDRBDs()
-      cls.ShutdownDRBD(drbd_running)
-    else:
-      logging.debug("Not doing anything for role %s", role)
+    self.name = name
+    self.bootid = bootid
+    self.offline = offline
+    self.secondaries = secondaries
 
-  @staticmethod
-  def ShutdownInstances(inst_running):
-    """Shutdown running instances.
 
-    """
-    names_running = set([i[0] for i in inst_running])
-    if names_running:
-      logging.info("Following instances should not be running,"
-                   " shutting them down: %s", utils.CommaJoin(names_running))
-      # this dictionary will collapse duplicate instance names (only
-      # xen pvm/vhm) into a single key, which is fine
-      i2h = dict(inst_running)
-      for name in names_running:
-        hv_name = i2h[name]
-        hv = hypervisor.GetHypervisor(hv_name)
-        hv.StopInstance(None, force=True, name=name)
-
-  @staticmethod
-  def ShutdownDRBD(drbd_running):
-    """Shutdown active DRBD devices.
+def _CheckInstances(cl, notepad, instances):
+  """Make a pass over the list of instances, restarting downed ones.
 
-    """
-    if drbd_running:
-      logging.info("Following DRBD minors should not be active,"
-                   " shutting them down: %s", utils.CommaJoin(drbd_running))
-      for minor in drbd_running:
-        # pylint: disable-msg=W0212
-        # using the private method as is, pending enhancements to the DRBD
-        # interface
-        bdev.DRBD8._ShutdownAll(minor)
-
-  def Exec(self):
-    """Check node status versus cluster desired state.
+  """
+  notepad.MaintainInstanceList(instances.keys())
+
+  started = set()
+
+  for inst in instances.values():
+    if inst.status in BAD_STATES:
+      n = notepad.NumberOfRestartAttempts(inst.name)
+
+      if n > MAXTRIES:
+        logging.warning("Not restarting instance '%s', retries exhausted",
+                        inst.name)
+        continue
+
+      if n == MAXTRIES:
+        notepad.RecordRestartAttempt(inst.name)
+        logging.error("Could not restart instance '%s' after %s attempts,"
+                      " giving up", inst.name, MAXTRIES)
+        continue
+
+      try:
+        logging.info("Restarting instance '%s' (attempt #%s)",
+                     inst.name, n + 1)
+        inst.Restart(cl)
+      except Exception: # pylint: disable=W0703
+        logging.exception("Error while restarting instance '%s'", inst.name)
+      else:
+        started.add(inst.name)
+
+      notepad.RecordRestartAttempt(inst.name)
 
-    """
-    my_name = netutils.Hostname.GetSysName()
-    req = confd_client.ConfdClientRequest(type=
-                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
-                                          query=my_name)
-    self.confd_client.SendRequest(req, async=False, coverage=-1)
-    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
-    if not timed_out:
-      # should have a valid response
-      status, result = self.store_cb.GetResponse(req.rsalt)
-      assert status, "Missing result but received replies"
-      if not self.filter_cb.consistent[req.rsalt]:
-        logging.warning("Inconsistent replies, not doing anything")
-        return
-      self.DoMaintenance(result.server_reply.answer)
     else:
-      logging.warning("Confd query timed out, cannot do maintenance actions")
+      if notepad.NumberOfRestartAttempts(inst.name):
+        notepad.RemoveInstance(inst.name)
+        if inst.status not in HELPLESS_STATES:
+          logging.info("Restart of instance '%s' succeeded", inst.name)
+
+  return started
 
 
-class WatcherState(object):
-  """Interface to a state file recording restart attempts.
+def _CheckDisks(cl, notepad, nodes, instances, started):
+  """Check all nodes for restarted ones.
 
   """
-  def __init__(self, statefile):
-    """Open, lock, read and parse the file.
+  check_nodes = []
+
+  for node in nodes.values():
+    old = notepad.GetNodeBootID(node.name)
+    if not node.bootid:
+      # Bad node, not returning a boot id
+      if not node.offline:
+        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
+                      node.name)
+      continue
+
+    if old != node.bootid:
+      # Node's boot ID has changed, probably through a reboot
+      check_nodes.append(node)
+
+  if check_nodes:
+    # Activate disks for all instances with any of the checked nodes as a
+    # secondary node.
+    for node in check_nodes:
+      for instance_name in node.secondaries:
+        try:
+          inst = instances[instance_name]
+        except KeyError:
+          logging.info("Can't find instance '%s', maybe it was ignored",
+                       instance_name)
+          continue
 
-    @type statefile: file
-    @param statefile: State file object
+        if not inst.autostart:
+          logging.info("Skipping disk activation for non-autostart"
+                       " instance '%s'", inst.name)
+          continue
 
-    """
-    self.statefile = statefile
+        if inst.name in started:
+          # we already tried to start the instance, which should have
+          # activated its drives (if they can be at all)
+          logging.debug("Skipping disk activation for instance '%s' as"
+                        " it was already started", inst.name)
+          continue
 
-    try:
-      state_data = self.statefile.read()
-      if not state_data:
-        self._data = {}
-      else:
-        self._data = serializer.Load(state_data)
-    except Exception, msg: # pylint: disable-msg=W0703
-      # Ignore errors while loading the file and treat it as empty
-      self._data = {}
-      logging.warning(("Invalid state file. Using defaults."
-                       " Error message: %s"), msg)
+        try:
+          logging.info("Activating disks for instance '%s'", inst.name)
+          inst.ActivateDisks(cl)
+        except Exception: # pylint: disable=W0703
+          logging.exception("Error while activating disks for instance '%s'",
+                            inst.name)
 
-    if "instance" not in self._data:
-      self._data["instance"] = {}
-    if "node" not in self._data:
-      self._data["node"] = {}
+    # Keep changed boot IDs
+    for node in check_nodes:
+      notepad.SetNodeBootID(node.name, node.bootid)
 
-    self._orig_data = serializer.Dump(self._data)
 
-  def Save(self):
-    """Save state to file, then unlock and close it.
+def _CheckForOfflineNodes(nodes, instance):
+  """Checks if given instances has any secondary in offline status.
 
-    """
-    assert self.statefile
+  @param instance: The instance object
+  @return: True if any of the secondary is offline, False otherwise
 
-    serialized_form = serializer.Dump(self._data)
-    if self._orig_data == serialized_form:
-      logging.debug("Data didn't change, just touching status file")
-      os.utime(constants.WATCHER_STATEFILE, None)
-      return
+  """
+  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
 
-    # We need to make sure the file is locked before renaming it, otherwise
-    # starting ganeti-watcher again at the same time will create a conflict.
-    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
-                         data=serialized_form,
-                         prewrite=utils.LockFile, close=False)
-    self.statefile = os.fdopen(fd, 'w+')
 
-  def Close(self):
-    """Unlock configuration file and close it.
+def _VerifyDisks(cl, uuid, nodes, instances):
+  """Run a per-group "gnt-cluster verify-disks".
 
-    """
-    assert self.statefile
+  """
+  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
+  ((_, offline_disk_instances, _), ) = \
+    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+  cl.ArchiveJob(job_id)
+
+  if not offline_disk_instances:
+    # nothing to do
+    logging.debug("Verify-disks reported no offline disks, nothing to do")
+    return
 
-    # Files are automatically unlocked when closing them
-    self.statefile.close()
-    self.statefile = None
+  logging.debug("Will activate disks for instance(s) %s",
+                utils.CommaJoin(offline_disk_instances))
 
-  def GetNodeBootID(self, name):
-    """Returns the last boot ID of a node or None.
+  # We submit only one job, and wait for it. Not optimal, but this puts less
+  # load on the job queue.
+  job = []
+  for name in offline_disk_instances:
+    try:
+      inst = instances[name]
+    except KeyError:
+      logging.info("Can't find instance '%s', maybe it was ignored", name)
+      continue
 
-    """
-    ndata = self._data["node"]
+    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
+      logging.info("Skipping instance '%s' because it is in a helpless state"
+                   " or has offline secondaries", name)
+      continue
 
-    if name in ndata and KEY_BOOT_ID in ndata[name]:
-      return ndata[name][KEY_BOOT_ID]
-    return None
+    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
 
-  def SetNodeBootID(self, name, bootid):
-    """Sets the boot ID of a node.
+  if job:
+    job_id = cli.SendJob(job, cl=cl)
 
-    """
-    assert bootid
+    try:
+      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+    except Exception: # pylint: disable=W0703
+      logging.exception("Error while activating disks")
 
-    ndata = self._data["node"]
 
-    if name not in ndata:
-      ndata[name] = {}
+def IsRapiResponding(hostname):
+  """Connects to RAPI port and does a simple test.
 
-    ndata[name][KEY_BOOT_ID] = bootid
+  Connects to RAPI port of hostname and does a simple test. At this time, the
+  test is GetVersion.
 
-  def NumberOfRestartAttempts(self, instance):
-    """Returns number of previous restart attempts.
+  @type hostname: string
+  @param hostname: hostname of the node to connect to.
+  @rtype: bool
+  @return: Whether RAPI is working properly
 
-    @type instance: L{Instance}
-    @param instance: the instance to look up
+  """
+  curl_config = rapi.client.GenericCurlConfig()
+  rapi_client = rapi.client.GanetiRapiClient(hostname,
+                                             curl_config_fn=curl_config)
+  try:
+    master_version = rapi_client.GetVersion()
+  except rapi.client.CertificateError, err:
+    logging.warning("RAPI certificate error: %s", err)
+    return False
+  except rapi.client.GanetiApiError, err:
+    logging.warning("RAPI error: %s", err)
+    return False
+  else:
+    logging.debug("Reported RAPI version %s", master_version)
+    return master_version == constants.RAPI_VERSION
 
-    """
-    idata = self._data["instance"]
 
-    if instance.name in idata:
-      return idata[instance.name][KEY_RESTART_COUNT]
+def ParseOptions():
+  """Parse the command line options.
 
-    return 0
+  @return: (options, args) as from OptionParser.parse_args()
 
-  def MaintainInstanceList(self, instances):
-    """Perform maintenance on the recorded instances.
+  """
+  parser = OptionParser(description="Ganeti cluster watcher",
+                        usage="%prog [-d]",
+                        version="%%prog (ganeti) %s" %
+                        constants.RELEASE_VERSION)
 
-    @type instances: list of string
-    @param instances: the list of currently existing instances
+  parser.add_option(cli.DEBUG_OPT)
+  parser.add_option(cli.NODEGROUP_OPT)
+  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
+                    help="Autoarchive jobs older than this age (default"
+                          " 6 hours)")
+  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
+                    action="store_true", help="Ignore cluster pause setting")
+  parser.add_option("--wait-children", dest="wait_children",
+                    action="store_true", help="Wait for child processes")
+  parser.add_option("--no-wait-children", dest="wait_children",
+                    action="store_false",
+                    help="Don't wait for child processes")
+  # See optparse documentation for why default values are not set by options
+  parser.set_defaults(wait_children=True)
+  options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
 
-    """
-    idict = self._data["instance"]
-    # First, delete obsolete instances
-    obsolete_instances = set(idict).difference(instances)
-    for inst in obsolete_instances:
-      logging.debug("Forgetting obsolete instance %s", inst)
-      del idict[inst]
-
-    # Second, delete expired records
-    earliest = time.time() - RETRY_EXPIRATION
-    expired_instances = [i for i in idict
-                         if idict[i][KEY_RESTART_WHEN] < earliest]
-    for inst in expired_instances:
-      logging.debug("Expiring record for instance %s", inst)
-      del idict[inst]
-
-  def RecordRestartAttempt(self, instance):
-    """Record a restart attempt.
-
-    @type instance: L{Instance}
-    @param instance: the instance being restarted
+  if args:
+    parser.error("No arguments expected")
 
-    """
-    idata = self._data["instance"]
+  return (options, args)
 
-    if instance.name not in idata:
-      inst = idata[instance.name] = {}
-    else:
-      inst = idata[instance.name]
 
-    inst[KEY_RESTART_WHEN] = time.time()
-    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
+def _WriteInstanceStatus(filename, data):
+  """Writes the per-group instance status file.
 
-  def RemoveInstance(self, instance):
-    """Update state to reflect that a machine is running.
+  The entries are sorted.
 
-    This method removes the record for a named instance (as we only
-    track down instances).
+  @type filename: string
+  @param filename: Path to instance status file
+  @type data: list of tuple; (instance name as string, status as string)
+  @param data: Instance name and status
 
-    @type instance: L{Instance}
-    @param instance: the instance to remove from books
+  """
+  logging.debug("Updating instance status file '%s' with %s instances",
+                filename, len(data))
 
-    """
-    idata = self._data["instance"]
+  utils.WriteFile(filename,
+                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
+                                   sorted(data))))
 
-    if instance.name in idata:
-      del idata[instance.name]
 
+def _UpdateInstanceStatus(filename, instances):
+  """Writes an instance status file from L{Instance} objects.
 
-class Instance(object):
-  """Abstraction for a Virtual Machine instance.
+  @type filename: string
+  @param filename: Path to status file
+  @type instances: list of L{Instance}
 
   """
-  def __init__(self, name, state, autostart, snodes):
-    self.name = name
-    self.state = state
-    self.autostart = autostart
-    self.snodes = snodes
+  _WriteInstanceStatus(filename, [(inst.name, inst.status)
+                                  for inst in instances])
 
-  def Restart(self):
-    """Encapsulates the start of an instance.
 
-    """
-    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
-    cli.SubmitOpCode(op, cl=client)
+def _ReadInstanceStatus(filename):
+  """Reads an instance status file.
 
-  def ActivateDisks(self):
-    """Encapsulates the activation of all disks of an instance.
+  @type filename: string
+  @param filename: Path to status file
+  @rtype: tuple; (None or number, list of lists containing instance name and
+    status)
+  @return: File's mtime and instance status contained in the file; mtime is
+    C{None} if file can't be read
+
+  """
+  logging.debug("Reading per-group instance status from '%s'", filename)
+
+  statcb = utils.FileStatHelper()
+  try:
+    content = utils.ReadFile(filename, preread=statcb)
+  except EnvironmentError, err:
+    if err.errno == errno.ENOENT:
+      logging.error("Can't read '%s', does not exist (yet)", filename)
+    else:
+      logging.exception("Unable to read '%s', ignoring", filename)
+    return (None, None)
+  else:
+    return (statcb.st.st_mtime, [line.split(None, 1)
+                                 for line in content.splitlines()])
 
-    """
-    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
-    cli.SubmitOpCode(op, cl=client)
 
+def _MergeInstanceStatus(filename, pergroup_filename, groups):
+  """Merges all per-group instance status files into a global one.
 
-def GetClusterData():
-  """Get a list of instances on this cluster.
+  @type filename: string
+  @param filename: Path to global instance status file
+  @type pergroup_filename: string
+  @param pergroup_filename: Path to per-group status files, must contain "%s"
+    to be replaced with group UUID
+  @type groups: sequence
+  @param groups: UUIDs of known groups
 
   """
-  op1_fields = ["name", "status", "admin_state", "snodes"]
-  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
-                                use_locking=True)
-  op2_fields = ["name", "bootid", "offline"]
-  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
-                            use_locking=True)
+  # Lock global status file in exclusive mode
+  lock = utils.FileLock.Open(filename)
+  try:
+    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
+  except errors.LockError, err:
+    # All per-group processes will lock and update the file. None of them
+    # should take longer than 10 seconds (the value of
+    # INSTANCE_STATUS_LOCK_TIMEOUT).
+    logging.error("Can't acquire lock on instance status file '%s', not"
+                  " updating: %s", filename, err)
+    return
 
-  job_id = client.SubmitJob([op1, op2])
+  logging.debug("Acquired exclusive lock on '%s'", filename)
 
-  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+  data = {}
 
-  logging.debug("Got data from cluster, writing instance status file")
+  # Load instance status from all groups
+  for group_uuid in groups:
+    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
 
-  result = all_results[0]
-  smap = {}
+    if mtime is not None:
+      for (instance_name, status) in instdata:
+        data.setdefault(instance_name, []).append((mtime, status))
 
-  instances = {}
+  # Select last update based on file mtime
+  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
+                for (instance_name, status) in data.items()]
 
-  # write the upfile
-  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
-  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
+  # Write the global status file. Don't touch file after it's been
+  # updated--there is no lock anymore.
+  _WriteInstanceStatus(filename, inststatus)
 
-  for fields in result:
-    (name, status, autostart, snodes) = fields
 
-    # update the secondary node map
-    for node in snodes:
-      if node not in smap:
-        smap[node] = []
-      smap[node].append(name)
+def GetLuxiClient(try_restart):
+  """Tries to connect to the master daemon.
 
-    instances[name] = Instance(name, status, autostart, snodes)
+  @type try_restart: bool
+  @param try_restart: Whether to attempt to restart the master daemon
 
-  nodes =  dict([(name, (bootid, offline))
-                 for name, bootid, offline in all_results[1]])
+  """
+  try:
+    return cli.GetClient()
+  except errors.OpPrereqError, err:
+    # this is, from cli.GetClient, a not-master case
+    raise NotMasterError("Not on master node (%s)" % err)
+
+  except luxi.NoMasterError, err:
+    if not try_restart:
+      raise
 
-  client.ArchiveJob(job_id)
+    logging.warning("Master daemon seems to be down (%s), trying to restart",
+                    err)
 
-  return instances, nodes, smap
+    if not utils.EnsureDaemon(constants.MASTERD):
+      raise errors.GenericError("Can't start the master daemon")
 
+    # Retry the connection
+    return cli.GetClient()
 
-class Watcher(object):
-  """Encapsulate the logic for restarting erroneously halted virtual machines.
 
-  The calling program should periodically instantiate me and call Run().
-  This will traverse the list of instances, and make up to MAXTRIES attempts
-  to restart machines that are down.
+def _StartGroupChildren(cl, wait):
+  """Starts a new instance of the watcher for every node group.
 
   """
-  def __init__(self, opts, notepad):
-    self.notepad = notepad
-    master = client.QueryConfigValues(["master_node"])[0]
-    if master != netutils.Hostname.GetSysName():
-      raise NotMasterError("This is not the master node")
-    # first archive old jobs
-    self.ArchiveJobs(opts.job_age)
-    # and only then submit new ones
-    self.instances, self.bootids, self.smap = GetClusterData()
-    self.started_instances = set()
-    self.opts = opts
-
-  def Run(self):
-    """Watcher run sequence.
+  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
+                        for arg in sys.argv)
 
-    """
-    notepad = self.notepad
-    self.CheckInstances(notepad)
-    self.CheckDisks(notepad)
-    self.VerifyDisks()
+  result = cl.QueryGroups([], ["name", "uuid"], False)
 
-  @staticmethod
-  def ArchiveJobs(age):
-    """Archive old jobs.
+  children = []
 
-    """
-    arch_count, left_count = client.AutoArchiveJobs(age)
-    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
+  for (idx, (name, uuid)) in enumerate(result):
+    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
 
-  def CheckDisks(self, notepad):
-    """Check all nodes for restarted ones.
+    if idx > 0:
+      # Let's not kill the system
+      time.sleep(CHILD_PROCESS_DELAY)
 
-    """
-    check_nodes = []
-    for name, (new_id, offline) in self.bootids.iteritems():
-      old = notepad.GetNodeBootID(name)
-      if new_id is None:
-        # Bad node, not returning a boot id
-        if not offline:
-          logging.debug("Node %s missing boot id, skipping secondary checks",
-                        name)
-        continue
-      if old != new_id:
-        # Node's boot ID has changed, proably through a reboot.
-        check_nodes.append(name)
-
-    if check_nodes:
-      # Activate disks for all instances with any of the checked nodes as a
-      # secondary node.
-      for node in check_nodes:
-        if node not in self.smap:
-          continue
-        for instance_name in self.smap[node]:
-          instance = self.instances[instance_name]
-          if not instance.autostart:
-            logging.info(("Skipping disk activation for non-autostart"
-                          " instance %s"), instance.name)
-            continue
-          if instance.name in self.started_instances:
-            # we already tried to start the instance, which should have
-            # activated its drives (if they can be at all)
-            logging.debug("Skipping disk activation for instance %s, as"
-                          " it was already started", instance.name)
-            continue
-          try:
-            logging.info("Activating disks for instance %s", instance.name)
-            instance.ActivateDisks()
-          except Exception: # pylint: disable-msg=W0703
-            logging.exception("Error while activating disks for instance %s",
-                              instance.name)
-
-      # Keep changed boot IDs
-      for name in check_nodes:
-        notepad.SetNodeBootID(name, self.bootids[name][0])
-
-  def CheckInstances(self, notepad):
-    """Make a pass over the list of instances, restarting downed ones.
+    logging.debug("Spawning child for group '%s' (%s), arguments %s",
+                  name, uuid, args)
 
-    """
-    notepad.MaintainInstanceList(self.instances.keys())
+    try:
+      # TODO: Should utils.StartDaemon be used instead?
+      pid = os.spawnv(os.P_NOWAIT, args[0], args)
+    except Exception: # pylint: disable=W0703
+      logging.exception("Failed to start child for group '%s' (%s)",
+                        name, uuid)
+    else:
+      logging.debug("Started with PID %s", pid)
+      children.append(pid)
 
-    for instance in self.instances.values():
-      if instance.state in BAD_STATES:
-        n = notepad.NumberOfRestartAttempts(instance)
+  if wait:
+    for pid in children:
+      logging.debug("Waiting for child PID %s", pid)
+      try:
+        result = utils.RetryOnSignal(os.waitpid, pid, 0)
+      except EnvironmentError, err:
+        result = str(err)
 
-        if n > MAXTRIES:
-          logging.warning("Not restarting instance %s, retries exhausted",
-                          instance.name)
-          continue
-        elif n < MAXTRIES:
-          last = " (Attempt #%d)" % (n + 1)
-        else:
-          notepad.RecordRestartAttempt(instance)
-          logging.error("Could not restart %s after %d attempts, giving up",
-                        instance.name, MAXTRIES)
-          continue
-        try:
-          logging.info("Restarting %s%s", instance.name, last)
-          instance.Restart()
-          self.started_instances.add(instance.name)
-        except Exception: # pylint: disable-msg=W0703
-          logging.exception("Error while restarting instance %s",
-                            instance.name)
-
-        notepad.RecordRestartAttempt(instance)
-      elif instance.state in HELPLESS_STATES:
-        if notepad.NumberOfRestartAttempts(instance):
-          notepad.RemoveInstance(instance)
-      else:
-        if notepad.NumberOfRestartAttempts(instance):
-          notepad.RemoveInstance(instance)
-          logging.info("Restart of %s succeeded", instance.name)
+      logging.debug("Child PID %s exited with status %s", pid, result)
 
-  def _CheckForOfflineNodes(self, instance):
-    """Checks if given instances has any secondary in offline status.
 
-    @param instance: The instance object
-    @return: True if any of the secondary is offline, False otherwise
+def _ArchiveJobs(cl, age):
+  """Archives old jobs.
 
-    """
-    bootids = []
-    for node in instance.snodes:
-      bootids.append(self.bootids[node])
+  """
+  (arch_count, left_count) = cl.AutoArchiveJobs(age)
+  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
 
-    return compat.any(offline for (_, offline) in bootids)
 
-  def VerifyDisks(self):
-    """Run gnt-cluster verify-disks.
+def _CheckMaster(cl):
+  """Ensures current host is master node.
 
-    """
-    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
-    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
-    client.ArchiveJob(job_id)
+  """
+  (master, ) = cl.QueryConfigValues(["master_node"])
+  if master != netutils.Hostname.GetSysName():
+    raise NotMasterError("This is not the master node")
 
-    # Keep track of submitted jobs
-    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
 
-    archive_jobs = set()
-    for (status, job_id) in result[constants.JOB_IDS_KEY]:
-      jex.AddJobId(None, status, job_id)
-      if status:
-        archive_jobs.add(job_id)
+@UsesRapiClient
+def _GlobalWatcher(opts):
+  """Main function for global watcher.
 
-    offline_disk_instances = set()
+  At the end child processes are spawned for every node group.
 
-    for (status, result) in jex.GetResults():
-      if not status:
-        logging.error("Verify-disks job failed: %s", result)
-        continue
+  """
+  StartNodeDaemons()
+  RunWatcherHooks()
 
-      ((_, instances, _), ) = result
+  # Run node maintenance in all cases, even if master, so that old masters can
+  # be properly cleaned up
+  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
+    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
 
-      offline_disk_instances.update(instances)
+  try:
+    client = GetLuxiClient(True)
+  except NotMasterError:
+    # Don't proceed on non-master nodes
+    return constants.EXIT_SUCCESS
 
-    for job_id in archive_jobs:
-      client.ArchiveJob(job_id)
+  # we are on master now
+  utils.EnsureDaemon(constants.RAPI)
 
-    if not offline_disk_instances:
-      # nothing to do
-      logging.debug("verify-disks reported no offline disks, nothing to do")
-      return
+  # If RAPI isn't responding to queries, try one restart
+  logging.debug("Attempting to talk to remote API on %s",
+                constants.IP4_ADDRESS_LOCALHOST)
+  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+    logging.warning("Couldn't get answer from remote API, restaring daemon")
+    utils.StopDaemon(constants.RAPI)
+    utils.EnsureDaemon(constants.RAPI)
+    logging.debug("Second attempt to talk to remote API")
+    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+      logging.fatal("RAPI is not responding")
+  logging.debug("Successfully talked to remote API")
 
-    logging.debug("Will activate disks for instance(s) %s",
-                  utils.CommaJoin(offline_disk_instances))
+  _CheckMaster(client)
+  _ArchiveJobs(client, opts.job_age)
 
-    # we submit only one job, and wait for it. not optimal, but spams
-    # less the job queue
-    job = []
-    for name in offline_disk_instances:
-      instance = self.instances[name]
-      if (instance.state in HELPLESS_STATES or
-          self._CheckForOfflineNodes(instance)):
-        logging.info("Skip instance %s because it is in helpless state or has"
-                     " one offline secondary", name)
-        continue
-      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
+  # Spawn child processes for all node groups
+  _StartGroupChildren(client, opts.wait_children)
 
-    if job:
-      job_id = cli.SendJob(job, cl=client)
+  return constants.EXIT_SUCCESS
 
-      try:
-        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
-      except Exception: # pylint: disable-msg=W0703
-        logging.exception("Error while activating disks")
 
+def _GetGroupData(cl, uuid):
+  """Retrieves instances and nodes per node group.
 
-def OpenStateFile(path):
-  """Opens the state file and acquires a lock on it.
+  """
+  job = [
+    # Get all primary instances in group
+    opcodes.OpQuery(what=constants.QR_INSTANCE,
+                    fields=["name", "status", "admin_state", "snodes",
+                            "pnode.group.uuid", "snodes.group.uuid"],
+                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
+                    use_locking=True),
+
+    # Get all nodes in group
+    opcodes.OpQuery(what=constants.QR_NODE,
+                    fields=["name", "bootid", "offline"],
+                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
+                    use_locking=True),
+    ]
+
+  job_id = cl.SubmitJob(job)
+  results = map(objects.QueryResponse.FromDict,
+                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
+  cl.ArchiveJob(job_id)
+
+  results_data = map(operator.attrgetter("data"), results)
+
+  # Ensure results are tuples with two values
+  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
+
+  # Extract values ignoring result status
+  (raw_instances, raw_nodes) = [[map(compat.snd, values)
+                                 for values in res]
+                                for res in results_data]
+
+  secondaries = {}
+  instances = []
+
+  # Load all instances
+  for (name, status, autostart, snodes, pnode_group_uuid,
+       snodes_group_uuid) in raw_instances:
+    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
+      logging.error("Ignoring split instance '%s', primary group %s, secondary"
+                    " groups %s", name, pnode_group_uuid,
+                    utils.CommaJoin(snodes_group_uuid))
+    else:
+      instances.append(Instance(name, status, autostart, snodes))
 
-  @type path: string
-  @param path: Path to state file
+      for node in snodes:
+        secondaries.setdefault(node, set()).add(name)
+
+  # Load all nodes
+  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
+           for (name, bootid, offline) in raw_nodes]
+
+  return (dict((node.name, node) for node in nodes),
+          dict((inst.name, inst) for inst in instances))
+
+
+def _LoadKnownGroups():
+  """Returns a list of all node groups known by L{ssconf}.
 
   """
-  # The two-step dance below is necessary to allow both opening existing
-  # file read/write and creating if not existing. Vanilla open will truncate
-  # an existing file -or- allow creating if not existing.
-  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
-
-  # Try to acquire lock on state file. If this fails, another watcher instance
-  # might already be running or another program is temporarily blocking the
-  # watcher from running.
-  try:
-    utils.LockFile(statefile_fd)
-  except errors.LockError, err:
-    logging.error("Can't acquire lock on state file %s: %s", path, err)
-    return None
+  groups = ssconf.SimpleStore().GetNodegroupList()
 
-  return os.fdopen(statefile_fd, "w+")
+  result = list(line.split(None, 1)[0] for line in groups
+                if line.strip())
 
+  if not compat.all(map(utils.UUID_RE.match, result)):
+    raise errors.GenericError("Ssconf contains invalid group UUID")
 
-def IsRapiResponding(hostname):
-  """Connects to RAPI port and does a simple test.
+  return result
 
-  Connects to RAPI port of hostname and does a simple test. At this time, the
-  test is GetVersion.
 
-  @type hostname: string
-  @param hostname: hostname of the node to connect to.
-  @rtype: bool
-  @return: Whether RAPI is working properly
+def _GroupWatcher(opts):
+  """Main function for per-group watcher process.
 
   """
-  curl_config = rapi.client.GenericCurlConfig()
-  rapi_client = rapi.client.GanetiRapiClient(hostname,
-                                             curl_config_fn=curl_config)
-  try:
-    master_version = rapi_client.GetVersion()
-  except rapi.client.CertificateError, err:
-    logging.warning("RAPI Error: CertificateError (%s)", err)
-    return False
-  except rapi.client.GanetiApiError, err:
-    logging.warning("RAPI Error: GanetiApiError (%s)", err)
-    return False
-  logging.debug("RAPI Result: master_version is %s", master_version)
-  return master_version == constants.RAPI_VERSION
+  group_uuid = opts.nodegroup.lower()
 
+  if not utils.UUID_RE.match(group_uuid):
+    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
+                              " got '%s'" %
+                              (cli.NODEGROUP_OPT_NAME, group_uuid))
 
-def ParseOptions():
-  """Parse the command line options.
+  logging.info("Watcher for node group '%s'", group_uuid)
 
-  @return: (options, args) as from OptionParser.parse_args()
+  known_groups = _LoadKnownGroups()
 
-  """
-  parser = OptionParser(description="Ganeti cluster watcher",
-                        usage="%prog [-d]",
-                        version="%%prog (ganeti) %s" %
-                        constants.RELEASE_VERSION)
+  # Check if node group is known
+  if group_uuid not in known_groups:
+    raise errors.GenericError("Node group '%s' is not known by ssconf" %
+                              group_uuid)
 
-  parser.add_option(cli.DEBUG_OPT)
-  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
-                    help="Autoarchive jobs older than this age (default"
-                          " 6 hours)")
-  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
-                    action="store_true", help="Ignore cluster pause setting")
-  options, args = parser.parse_args()
-  options.job_age = cli.ParseTimespec(options.job_age)
+  # Group UUID has been verified and should not contain any dangerous
+  # characters
+  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
+  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
 
-  if args:
-    parser.error("No arguments expected")
+  logging.debug("Using state file %s", state_path)
 
-  return (options, args)
+  # Global watcher
+  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
+  if not statefile:
+    return constants.EXIT_FAILURE
+
+  notepad = state.WatcherState(statefile) # pylint: disable=E0602
+  try:
+    # Connect to master daemon
+    client = GetLuxiClient(False)
+
+    _CheckMaster(client)
+
+    (nodes, instances) = _GetGroupData(client, group_uuid)
+
+    # Update per-group instance status file
+    _UpdateInstanceStatus(inst_status_path, instances.values())
+
+    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
+                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
+                         known_groups)
+
+    started = _CheckInstances(client, notepad, instances)
+    _CheckDisks(client, notepad, nodes, instances, started)
+    _VerifyDisks(client, group_uuid, nodes, instances)
+  except Exception, err:
+    logging.info("Not updating status file due to failure: %s", err)
+    raise
+  else:
+    # Save changes for next run
+    notepad.Save(state_path)
+
+  return constants.EXIT_SUCCESS
 
 
-@rapi.client.UsesRapiClient
 def Main():
   """Main function.
 
   """
-  global client # pylint: disable-msg=W0603
-
   (options, _) = ParseOptions()
 
-  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
+  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
                      debug=options.debug, stderr_logging=options.debug)
 
   if ShouldPause() and not options.ignore_pause:
     logging.debug("Pause has been set, exiting")
     return constants.EXIT_SUCCESS
 
-  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
-  if not statefile:
-    return constants.EXIT_FAILURE
-
-  update_file = False
+  # Try to acquire global watcher lock in shared mode
+  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
   try:
-    StartNodeDaemons()
-    RunWatcherHooks()
-    # run node maintenance in all cases, even if master, so that old
-    # masters can be properly cleaned up too
-    if NodeMaintenance.ShouldRun():
-      NodeMaintenance().Exec()
-
-    notepad = WatcherState(statefile)
-    try:
-      try:
-        client = cli.GetClient()
-      except errors.OpPrereqError:
-        # this is, from cli.GetClient, a not-master case
-        logging.debug("Not on master, exiting")
-        update_file = True
-        return constants.EXIT_SUCCESS
-      except luxi.NoMasterError, err:
-        logging.warning("Master seems to be down (%s), trying to restart",
-                        str(err))
-        if not utils.EnsureDaemon(constants.MASTERD):
-          logging.critical("Can't start the master, exiting")
-          return constants.EXIT_FAILURE
-        # else retry the connection
-        client = cli.GetClient()
-
-      # we are on master now
-      utils.EnsureDaemon(constants.RAPI)
-
-      # If RAPI isn't responding to queries, try one restart.
-      logging.debug("Attempting to talk with RAPI.")
-      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
-        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
-                        " Restarting Ganeti RAPI.")
-        utils.StopDaemon(constants.RAPI)
-        utils.EnsureDaemon(constants.RAPI)
-        logging.debug("Second attempt to talk with RAPI")
-        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
-          logging.fatal("RAPI is not responding. Please investigate.")
-      logging.debug("Successfully talked to RAPI.")
+    lock.Shared(blocking=False)
+  except (EnvironmentError, errors.LockError), err:
+    logging.error("Can't acquire lock on %s: %s",
+                  pathutils.WATCHER_LOCK_FILE, err)
+    return constants.EXIT_SUCCESS
 
-      try:
-        watcher = Watcher(options, notepad)
-      except errors.ConfigurationError:
-        # Just exit if there's no configuration
-        update_file = True
-        return constants.EXIT_SUCCESS
-
-      watcher.Run()
-      update_file = True
-
-    finally:
-      if update_file:
-        notepad.Save()
-      else:
-        logging.debug("Not updating status file due to failure")
-  except SystemExit:
+  if options.nodegroup is None:
+    fn = _GlobalWatcher
+  else:
+    # Per-nodegroup watcher
+    fn = _GroupWatcher
+
+  try:
+    return fn(options)
+  except (SystemExit, KeyboardInterrupt):
     raise
   except NotMasterError:
     logging.debug("Not master, exiting")
     return constants.EXIT_NOTMASTER
   except errors.ResolverError, err:
-    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
+    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
     return constants.EXIT_NODESETUP_ERROR
   except errors.JobQueueFull:
     logging.error("Job queue is full, can't query cluster state")