masterd: Log PID, UID and GID of connected client
[ganeti-local] / daemons / ganeti-watcher
index 78de6c9..1f82db8 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -19,7 +19,7 @@
 # 02110-1301, USA.
 
 
-"""Tool to restart erronously downed virtual machines.
+"""Tool to restart erroneously downed virtual machines.
 
 This program and set of classes implement a watchdog to restart
 virtual machines in a Ganeti cluster that have crashed or been killed
@@ -27,24 +27,32 @@ by a node reboot.  Run from cron or similar.
 
 """
 
+# pylint: disable-msg=C0103,W0142
+
+# C0103: Invalid name ganeti-watcher
+
 import os
 import sys
-import re
 import time
-import fcntl
-import errno
-import simplejson
+import logging
 from optparse import OptionParser
 
 from ganeti import utils
 from ganeti import constants
-from ganeti import ssconf
+from ganeti import serializer
 from ganeti import errors
+from ganeti import opcodes
+from ganeti import cli
+from ganeti import luxi
+from ganeti import ssconf
+from ganeti import bdev
+from ganeti import hypervisor
+from ganeti.confd import client as confd_client
 
 
 MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
+BAD_STATES = ['ERROR_down']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
 NOTICE = 'NOTICE'
 ERROR = 'ERROR'
 KEY_RESTART_COUNT = "restart_count"
@@ -52,99 +60,228 @@ KEY_RESTART_WHEN = "restart_when"
 KEY_BOOT_ID = "bootid"
 
 
-class Error(Exception):
-  """Generic custom error class."""
+# Global client object
+client = None
 
 
-class NotMasterError(Error):
+class NotMasterError(errors.GenericError):
   """Exception raised when this host is not the master."""
 
 
-def Indent(s, prefix='| '):
-  """Indent a piece of text with a given prefix before each line.
+def ShouldPause():
+  """Check whether we should pause.
+
+  """
+  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+
 
-  Args:
-    s: The string to indent
-    prefix: The string to prepend each line.
+def StartNodeDaemons():
+  """Start all the daemons that should be running on all nodes.
 
   """
-  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
+  # on master or not, try to start the node daemon
+  utils.EnsureDaemon(constants.NODED)
+  # start confd as well. On non candidates it will be in disabled mode.
+  utils.EnsureDaemon(constants.CONFD)
 
 
-def DoCmd(cmd):
-  """Run a shell command.
+def RunWatcherHooks():
+  """Run the watcher hooks.
+
+  """
+  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+                             constants.HOOKS_NAME_WATCHER)
+  if not os.path.isdir(hooks_dir):
+    return
+
+  try:
+    results = utils.RunParts(hooks_dir)
+  except Exception, msg: # pylint: disable-msg=W0703
+    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
+
+  for (relname, status, runresult) in results:
+    if status == constants.RUNPARTS_SKIP:
+      logging.debug("Watcher hook %s: skipped", relname)
+    elif status == constants.RUNPARTS_ERR:
+      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
+    elif status == constants.RUNPARTS_RUN:
+      if runresult.failed:
+        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
+                        relname, runresult.exit_code, runresult.output)
+      else:
+        logging.debug("Watcher hook %s: success (output: %s)", relname,
+                      runresult.output)
 
-  Args:
-    cmd: the command to run.
 
-  Raises CommandError with verbose commentary on error.
+class NodeMaintenance(object):
+  """Talks to confd daemons and possible shutdown instances/drbd devices.
 
   """
-  res = utils.RunCmd(cmd)
+  def __init__(self):
+    self.store_cb = confd_client.StoreResultCallback()
+    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
+    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
 
-  if res.failed:
-    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
-                (repr(cmd),
-                 Indent(res.fail_reason),
-                 Indent(res.stdout),
-                 Indent(res.stderr)))
+  @staticmethod
+  def ShouldRun():
+    """Checks whether node maintenance should run.
 
-  return res
+    """
+    try:
+      return ssconf.SimpleStore().GetMaintainNodeHealth()
+    except errors.ConfigurationError, err:
+      logging.error("Configuration error, not activating node maintenance: %s",
+                    err)
+      return False
+
+  @staticmethod
+  def GetRunningInstances():
+    """Compute list of hypervisor/running instances.
+
+    """
+    hyp_list = ssconf.SimpleStore().GetHypervisorList()
+    results = []
+    for hv_name in hyp_list:
+      try:
+        hv = hypervisor.GetHypervisor(hv_name)
+        ilist = hv.ListInstances()
+        results.extend([(iname, hv_name) for iname in ilist])
+      except: # pylint: disable-msg=W0702
+        logging.error("Error while listing instances for hypervisor %s",
+                      hv_name, exc_info=True)
+    return results
+
+  @staticmethod
+  def GetUsedDRBDs():
+    """Get list of used DRBD minors.
+
+    """
+    return bdev.DRBD8.GetUsedDevs().keys()
+
+  @classmethod
+  def DoMaintenance(cls, role):
+    """Maintain the instance list.
+
+    """
+    if role == constants.CONFD_NODE_ROLE_OFFLINE:
+      inst_running = cls.GetRunningInstances()
+      cls.ShutdownInstances(inst_running)
+      drbd_running = cls.GetUsedDRBDs()
+      cls.ShutdownDRBD(drbd_running)
+    else:
+      logging.debug("Not doing anything for role %s", role)
+
+  @staticmethod
+  def ShutdownInstances(inst_running):
+    """Shutdown running instances.
+
+    """
+    names_running = set([i[0] for i in inst_running])
+    if names_running:
+      logging.info("Following instances should not be running,"
+                   " shutting them down: %s", utils.CommaJoin(names_running))
+      # this dictionary will collapse duplicate instance names (only
+      # xen pvm/vhm) into a single key, which is fine
+      i2h = dict(inst_running)
+      for name in names_running:
+        hv_name = i2h[name]
+        hv = hypervisor.GetHypervisor(hv_name)
+        hv.StopInstance(None, force=True, name=name)
+
+  @staticmethod
+  def ShutdownDRBD(drbd_running):
+    """Shutdown active DRBD devices.
+
+    """
+    if drbd_running:
+      logging.info("Following DRBD minors should not be active,"
+                   " shutting them down: %s", utils.CommaJoin(drbd_running))
+      for minor in drbd_running:
+        # pylint: disable-msg=W0212
+        # using the private method as is, pending enhancements to the DRBD
+        # interface
+        bdev.DRBD8._ShutdownAll(minor)
+
+  def Exec(self):
+    """Check node status versus cluster desired state.
+
+    """
+    my_name = utils.HostInfo().name
+    req = confd_client.ConfdClientRequest(type=
+                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
+                                          query=my_name)
+    self.confd_client.SendRequest(req, async=False, coverage=-1)
+    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
+    if not timed_out:
+      # should have a valid response
+      status, result = self.store_cb.GetResponse(req.rsalt)
+      assert status, "Missing result but received replies"
+      if not self.filter_cb.consistent[req.rsalt]:
+        logging.warning("Inconsistent replies, not doing anything")
+        return
+      self.DoMaintenance(result.server_reply.answer)
+    else:
+      logging.warning("Confd query timed out, cannot do maintenance actions")
 
 
 class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
   """
-  def __init__(self):
+  def __init__(self, statefile):
     """Open, lock, read and parse the file.
 
-    Raises StandardError on lock contention.
+    @type statefile: file
+    @param statefile: State file object
 
     """
-    # The two-step dance below is necessary to allow both opening existing
-    # file read/write and creating if not existing.  Vanilla open will truncate
-    # an existing file -or- allow creating if not existing.
-    f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    f = os.fdopen(f, 'w+')
-
-    try:
-      fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
-    except IOError, x:
-      if x.errno == errno.EAGAIN:
-        raise StandardError("State file already locked")
-      raise
-
-    self.statefile = f
+    self.statefile = statefile
 
     try:
-      self.data = simplejson.load(self.statefile)
-    except Exception, msg:
+      state_data = self.statefile.read()
+      if not state_data:
+        self._data = {}
+      else:
+        self._data = serializer.Load(state_data)
+    except Exception, msg: # pylint: disable-msg=W0703
       # Ignore errors while loading the file and treat it as empty
-      self.data = {}
-      sys.stderr.write("Empty or invalid state file. "
-          "Using defaults. Error message: %s\n" % msg)
+      self._data = {}
+      logging.warning(("Invalid state file. Using defaults."
+                       " Error message: %s"), msg)
 
-    if "instance" not in self.data:
-      self.data["instance"] = {}
-    if "node" not in self.data:
-      self.data["node"] = {}
+    if "instance" not in self._data:
+      self._data["instance"] = {}
+    if "node" not in self._data:
+      self._data["node"] = {}
 
-  def __del__(self):
-    """Called on destruction.
+    self._orig_data = serializer.Dump(self._data)
+
+  def Save(self):
+    """Save state to file, then unlock and close it.
 
     """
-    if self.statefile:
-      self._Close()
+    assert self.statefile
+
+    serialized_form = serializer.Dump(self._data)
+    if self._orig_data == serialized_form:
+      logging.debug("Data didn't change, just touching status file")
+      os.utime(constants.WATCHER_STATEFILE, None)
+      return
+
+    # We need to make sure the file is locked before renaming it, otherwise
+    # starting ganeti-watcher again at the same time will create a conflict.
+    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+                         data=serialized_form,
+                         prewrite=utils.LockFile, close=False)
+    self.statefile = os.fdopen(fd, 'w+')
 
-  def _Close(self):
+  def Close(self):
     """Unlock configuration file and close it.
 
     """
     assert self.statefile
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
-
+    # Files are automatically unlocked when closing them
     self.statefile.close()
     self.statefile = None
 
@@ -152,7 +289,7 @@ class WatcherState(object):
     """Returns the last boot ID of a node or None.
 
     """
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name in ndata and KEY_BOOT_ID in ndata[name]:
       return ndata[name][KEY_BOOT_ID]
@@ -164,7 +301,7 @@ class WatcherState(object):
     """
     assert bootid
 
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name not in ndata:
       ndata[name] = {}
@@ -174,11 +311,11 @@ class WatcherState(object):
   def NumberOfRestartAttempts(self, instance):
     """Returns number of previous restart attempts.
 
-    Args:
-      instance - the instance to look up.
+    @type instance: L{Instance}
+    @param instance: the instance to look up
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       return idata[instance.name][KEY_RESTART_COUNT]
@@ -188,11 +325,11 @@ class WatcherState(object):
   def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
-    Args:
-      instance - the instance being restarted
+    @type instance: L{Instance}
+    @param instance: the instance being restarted
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name not in idata:
       inst = idata[instance.name] = {}
@@ -203,39 +340,24 @@ class WatcherState(object):
     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
 
   def RemoveInstance(self, instance):
-    """Update state to reflect that a machine is running, i.e. remove record.
+    """Update state to reflect that a machine is running.
 
-    Args:
-      instance - the instance to remove from books
+    This method removes the record for a named instance (as we only
+    track down instances).
 
-    This method removes the record for a named instance.
+    @type instance: L{Instance}
+    @param instance: the instance to remove from books
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       del idata[instance.name]
 
-  def Save(self):
-    """Save state to file, then unlock and close it.
-
-    """
-    assert self.statefile
-
-    self.statefile.seek(0)
-    self.statefile.truncate()
-
-    simplejson.dump(self.data, self.statefile)
-
-    self._Close()
-
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
 
-  Methods:
-    Restart(): issue a command to restart the represented machine.
-
   """
   def __init__(self, name, state, autostart):
     self.name = name
@@ -246,147 +368,148 @@ class Instance(object):
     """Encapsulates the start of an instance.
 
     """
-    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
+    cli.SubmitOpCode(op, cl=client)
 
   def ActivateDisks(self):
     """Encapsulates the activation of all disks of an instance.
 
     """
-    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
-
+    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=client)
 
-def _RunListCmd(cmd):
-  """Runs a command and parses its output into lists.
 
-  """
-  for line in DoCmd(cmd).stdout.splitlines():
-    yield line.split(':')
-
-
-def GetInstanceList(with_secondaries=None):
+def GetClusterData():
   """Get a list of instances on this cluster.
 
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:']
-
-  fields = 'name,oper_state,admin_state'
-
-  if with_secondaries is not None:
-    fields += ',snodes'
-
-  cmd.append('-o')
-  cmd.append(fields)
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
-  instances = []
-  for fields in _RunListCmd(cmd):
-    if with_secondaries is not None:
-      (name, status, autostart, snodes) = fields
+  job_id = client.SubmitJob([op1, op2])
 
-      if snodes == "-":
-        continue
-
-      for node in with_secondaries:
-        if node in snodes.split(','):
-          break
-      else:
-        continue
-
-    else:
-      (name, status, autostart) = fields
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
-    instances.append(Instance(name, status, autostart != "no"))
+  logging.debug("Got data from cluster, writing instance status file")
 
-  return instances
+  result = all_results[0]
+  smap = {}
 
+  instances = {}
 
-def GetNodeBootIDs():
-  """Get a dict mapping nodes to boot IDs.
+  # write the upfile
+  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
+  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
 
-  """
-  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:', '-o', 'name,bootid']
-
-  ids = {}
-  for fields in _RunListCmd(cmd):
-    (name, bootid) = fields
-    ids[name] = bootid
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
-  return ids
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
+    instances[name] = Instance(name, status, autostart)
 
-class Message(object):
-  """Encapsulation of a notice or error message.
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
-  """
-  def __init__(self, level, msg):
-    self.level = level
-    self.msg = msg
-    self.when = time.time()
+  client.ArchiveJob(job_id)
 
-  def __str__(self):
-    return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
+  return instances, nodes, smap
 
 
 class Watcher(object):
-  """Encapsulate the logic for restarting erronously halted virtual machines.
+  """Encapsulate the logic for restarting erroneously halted virtual machines.
 
   The calling program should periodically instantiate me and call Run().
   This will traverse the list of instances, and make up to MAXTRIES attempts
   to restart machines that are down.
 
   """
-  def __init__(self):
-    sstore = ssconf.SimpleStore()
-    master = sstore.GetMasterNode()
+  def __init__(self, opts, notepad):
+    self.notepad = notepad
+    master = client.QueryConfigValues(["master_node"])[0]
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
-    self.instances = GetInstanceList()
-    self.bootids = GetNodeBootIDs()
-    self.messages = []
+    # first archive old jobs
+    self.ArchiveJobs(opts.job_age)
+    # and only then submit new ones
+    self.instances, self.bootids, self.smap = GetClusterData()
+    self.started_instances = set()
+    self.opts = opts
 
   def Run(self):
-    notepad = WatcherState()
+    """Watcher run sequence.
+
+    """
+    notepad = self.notepad
     self.CheckInstances(notepad)
     self.CheckDisks(notepad)
-    notepad.Save()
+    self.VerifyDisks()
+
+  @staticmethod
+  def ArchiveJobs(age):
+    """Archive old jobs.
+
+    """
+    arch_count, left_count = client.AutoArchiveJobs(age)
+    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
 
   def CheckDisks(self, notepad):
     """Check all nodes for restarted ones.
 
     """
     check_nodes = []
-    for name, id in self.bootids.iteritems():
+    for name, (new_id, offline) in self.bootids.iteritems():
       old = notepad.GetNodeBootID(name)
-      if old != id:
+      if new_id is None:
+        # Bad node, not returning a boot id
+        if not offline:
+          logging.debug("Node %s missing boot id, skipping secondary checks",
+                        name)
+        continue
+      if old != new_id:
         # Node's boot ID has changed, proably through a reboot.
         check_nodes.append(name)
 
     if check_nodes:
       # Activate disks for all instances with any of the checked nodes as a
       # secondary node.
-      for instance in GetInstanceList(with_secondaries=check_nodes):
-        try:
-          self.messages.append(Message(NOTICE,
-                                       "Activating disks for %s." %
-                                       instance.name))
-          instance.ActivateDisks()
-        except Error, x:
-          self.messages.append(Message(ERROR, str(x)))
+      for node in check_nodes:
+        if node not in self.smap:
+          continue
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception: # pylint: disable-msg=W0703
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
 
       # Keep changed boot IDs
       for name in check_nodes:
-        notepad.SetNodeBootID(name, self.bootids[name])
+        notepad.SetNodeBootID(name, self.bootids[name][0])
 
   def CheckInstances(self, notepad):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
-    for instance in self.instances:
-      # Don't care about manually stopped instances
-      if not instance.autostart:
-        continue
-
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
@@ -397,17 +520,17 @@ class Watcher(object):
           last = " (Attempt #%d)" % (n + 1)
         else:
           notepad.RecordRestartAttempt(instance)
-          self.messages.append(Message(ERROR, "Could not restart %s for %d"
-                                       " times, giving up..." %
-                                       (instance.name, MAXTRIES)))
+          logging.error("Could not restart %s after %d attempts, giving up",
+                        instance.name, MAXTRIES)
           continue
         try:
-          self.messages.append(Message(NOTICE,
-                                       "Restarting %s%s." %
-                                       (instance.name, last)))
+          logging.info("Restarting %s%s",
+                        instance.name, last)
           instance.Restart()
-        except Error, x:
-          self.messages.append(Message(ERROR, str(x)))
+          self.started_instances.add(instance.name)
+        except Exception: # pylint: disable-msg=W0703
+          logging.exception("Error while restarting instance %s",
+                            instance.name)
 
         notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
@@ -416,26 +539,66 @@ class Watcher(object):
       else:
         if notepad.NumberOfRestartAttempts(instance):
           notepad.RemoveInstance(instance)
-          msg = Message(NOTICE,
-                        "Restart of %s succeeded." % instance.name)
-          self.messages.append(msg)
-
-  def WriteReport(self, logfile):
-    """Log all messages to file.
+          logging.info("Restart of %s succeeded", instance.name)
 
-    Args:
-      logfile: file object open for writing (the log file)
+  @staticmethod
+  def VerifyDisks():
+    """Run gnt-cluster verify-disks.
 
     """
-    for msg in self.messages:
-      print >> logfile, str(msg)
+    op = opcodes.OpVerifyDisks()
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
+    if not isinstance(result, (tuple, list)):
+      logging.error("Can't get a valid result from verify-disks")
+      return
+    offline_disk_instances = result[2]
+    if not offline_disk_instances:
+      # nothing to do
+      return
+    logging.debug("Will activate disks for instances %s",
+                  utils.CommaJoin(offline_disk_instances))
+    # we submit only one job, and wait for it. not optimal, but spams
+    # less the job queue
+    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
+           for name in offline_disk_instances]
+    job_id = cli.SendJob(job, cl=client)
+
+    try:
+      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+    except Exception: # pylint: disable-msg=W0703
+      logging.exception("Error while activating disks")
+
+
+def OpenStateFile(path):
+  """Opens the state file and acquires a lock on it.
+
+  @type path: string
+  @param path: Path to state file
+
+  """
+  # The two-step dance below is necessary to allow both opening existing
+  # file read/write and creating if not existing. Vanilla open will truncate
+  # an existing file -or- allow creating if not existing.
+  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
+
+  # Try to acquire lock on state file. If this fails, another watcher instance
+  # might already be running or another program is temporarily blocking the
+  # watcher from running.
+  try:
+    utils.LockFile(statefile_fd)
+  except errors.LockError, err:
+    logging.error("Can't acquire lock on state file %s: %s", path, err)
+    return None
+
+  return os.fdopen(statefile_fd, "w+")
 
 
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti cluster watcher",
@@ -443,10 +606,12 @@ def ParseOptions():
                         version="%%prog (ganeti) %s" %
                         constants.RELEASE_VERSION)
 
-  parser.add_option("-d", "--debug", dest="debug",
-                    help="Don't redirect messages to the log file",
-                    default=False, action="store_true")
+  parser.add_option(cli.DEBUG_OPT)
+  parser.add_option("-A", "--job-age", dest="job_age",
+                    help="Autoarchive jobs older than this age (default"
+                    " 6 hours)", default=6*3600)
   options, args = parser.parse_args()
+  options.job_age = cli.ParseTimespec(options.job_age)
   return options, args
 
 
@@ -454,28 +619,85 @@ def main():
   """Main function.
 
   """
+  global client # pylint: disable-msg=W0603
+
   options, args = ParseOptions()
 
-  if not options.debug:
-    sys.stderr = sys.stdout = open(constants.LOG_WATCHER, 'a')
+  if args: # watcher doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
+  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+                     stderr_logging=options.debug)
 
+  if ShouldPause():
+    logging.debug("Pause has been set, exiting")
+    sys.exit(constants.EXIT_SUCCESS)
+
+  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
+  if not statefile:
+    sys.exit(constants.EXIT_FAILURE)
+
+  update_file = False
   try:
+    StartNodeDaemons()
+    RunWatcherHooks()
+    # run node maintenance in all cases, even if master, so that old
+    # masters can be properly cleaned up too
+    if NodeMaintenance.ShouldRun():
+      NodeMaintenance().Exec()
+
+    notepad = WatcherState(statefile)
     try:
-      watcher = Watcher()
-    except errors.ConfigurationError:
-      # Just exit if there's no configuration
-      sys.exit(constants.EXIT_SUCCESS)
-    watcher.Run()
-    watcher.WriteReport(sys.stdout)
+      try:
+        client = cli.GetClient()
+      except errors.OpPrereqError:
+        # this is, from cli.GetClient, a not-master case
+        logging.debug("Not on master, exiting")
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+      except luxi.NoMasterError, err:
+        logging.warning("Master seems to be down (%s), trying to restart",
+                        str(err))
+        if not utils.EnsureDaemon(constants.MASTERD):
+          logging.critical("Can't start the master, exiting")
+          sys.exit(constants.EXIT_FAILURE)
+        # else retry the connection
+        client = cli.GetClient()
+
+      # we are on master now
+      utils.EnsureDaemon(constants.RAPI)
+
+      try:
+        watcher = Watcher(options, notepad)
+      except errors.ConfigurationError:
+        # Just exit if there's no configuration
+        update_file = True
+        sys.exit(constants.EXIT_SUCCESS)
+
+      watcher.Run()
+      update_file = True
+
+    finally:
+      if update_file:
+        notepad.Save()
+      else:
+        logging.debug("Not updating status file due to failure")
+  except SystemExit:
+    raise
   except NotMasterError:
-    if options.debug:
-      sys.stderr.write("Not master, exiting.\n")
+    logging.debug("Not master, exiting")
     sys.exit(constants.EXIT_NOTMASTER)
   except errors.ResolverError, err:
-    sys.stderr.write("Cannot resolve hostname '%s', exiting.\n" % err.args[0])
+    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
     sys.exit(constants.EXIT_NODESETUP_ERROR)
-  except Error, err:
-    print err
+  except errors.JobQueueFull:
+    logging.error("Job queue is full, can't query cluster state")
+  except errors.JobQueueDrainError:
+    logging.error("Job queue is drained, can't maintain cluster state")
+  except Exception, err:
+    logging.exception(str(err))
+    sys.exit(constants.EXIT_FAILURE)
 
 
 if __name__ == '__main__':