Add output of job/opcode timestamps
[ganeti-local] / daemons / ganeti-watcher
index f16d56f..074fc3b 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -29,7 +29,6 @@ by a node reboot.  Run from cron or similar.
 
 import os
 import sys
-import re
 import time
 import fcntl
 import errno
@@ -41,6 +40,9 @@ from ganeti import constants
 from ganeti import serializer
 from ganeti import ssconf
 from ganeti import errors
+from ganeti import opcodes
+from ganeti import logger
+from ganeti import cli
 
 
 MAXTRIES = 5
@@ -53,11 +55,11 @@ KEY_RESTART_WHEN = "restart_when"
 KEY_BOOT_ID = "bootid"
 
 
-class Error(Exception):
-  """Generic custom error class."""
+# Global client object
+client = None
 
 
-class NotMasterError(Error):
+class NotMasterError(errors.GenericError):
   """Exception raised when this host is not the master."""
 
 
@@ -84,11 +86,12 @@ def DoCmd(cmd):
   res = utils.RunCmd(cmd)
 
   if res.failed:
-    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
-                (repr(cmd),
-                 Indent(res.fail_reason),
-                 Indent(res.stdout),
-                 Indent(res.stderr)))
+    msg = ("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
+           (repr(cmd),
+            Indent(res.fail_reason),
+            Indent(res.stdout),
+            Indent(res.stderr)))
+    raise errors.CommandError(msg)
 
   return res
 
@@ -100,52 +103,58 @@ class WatcherState(object):
   def __init__(self):
     """Open, lock, read and parse the file.
 
-    Raises StandardError on lock contention.
+    Raises exception on lock contention.
 
     """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
-    f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
-    f = os.fdopen(f, 'w+')
+    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
+    self.statefile = os.fdopen(fd, 'w+')
 
-    try:
-      fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
-    except IOError, x:
-      if x.errno == errno.EAGAIN:
-        raise StandardError("State file already locked")
-      raise
-
-    self.statefile = f
+    utils.LockFile(self.statefile.fileno())
 
     try:
-      self.data = serializer.Load(self.statefile.read())
+      self._data = serializer.Load(self.statefile.read())
     except Exception, msg:
       # Ignore errors while loading the file and treat it as empty
-      self.data = {}
+      self._data = {}
       logging.warning(("Empty or invalid state file. Using defaults."
                        " Error message: %s"), msg)
 
-    if "instance" not in self.data:
-      self.data["instance"] = {}
-    if "node" not in self.data:
-      self.data["node"] = {}
+    if "instance" not in self._data:
+      self._data["instance"] = {}
+    if "node" not in self._data:
+      self._data["node"] = {}
+
+    self._orig_data = serializer.Dump(self._data)
 
-  def __del__(self):
-    """Called on destruction.
+  def Save(self):
+    """Save state to file, then unlock and close it.
 
     """
-    if self.statefile:
-      self._Close()
+    assert self.statefile
+
+    serialized_form = serializer.Dump(self._data)
+    if self._orig_data == serialized_form:
+      logging.debug("Data didn't change, just touching status file")
+      os.utime(constants.WATCHER_STATEFILE, None)
+      return
 
-  def _Close(self):
+    # We need to make sure the file is locked before renaming it, otherwise
+    # starting ganeti-watcher again at the same time will create a conflict.
+    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+                         data=serialized_form,
+                         prewrite=utils.LockFile, close=False)
+    self.statefile = os.fdopen(fd, 'w+')
+
+  def Close(self):
     """Unlock configuration file and close it.
 
     """
     assert self.statefile
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
-
+    # Files are automatically unlocked when closing them
     self.statefile.close()
     self.statefile = None
 
@@ -153,7 +162,7 @@ class WatcherState(object):
     """Returns the last boot ID of a node or None.
 
     """
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name in ndata and KEY_BOOT_ID in ndata[name]:
       return ndata[name][KEY_BOOT_ID]
@@ -165,7 +174,7 @@ class WatcherState(object):
     """
     assert bootid
 
-    ndata = self.data["node"]
+    ndata = self._data["node"]
 
     if name not in ndata:
       ndata[name] = {}
@@ -179,7 +188,7 @@ class WatcherState(object):
       instance - the instance to look up.
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       return idata[instance.name][KEY_RESTART_COUNT]
@@ -193,7 +202,7 @@ class WatcherState(object):
       instance - the instance being restarted
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name not in idata:
       inst = idata[instance.name] = {}
@@ -212,24 +221,11 @@ class WatcherState(object):
     This method removes the record for a named instance.
 
     """
-    idata = self.data["instance"]
+    idata = self._data["instance"]
 
     if instance.name in idata:
       del idata[instance.name]
 
-  def Save(self):
-    """Save state to file, then unlock and close it.
-
-    """
-    assert self.statefile
-
-    self.statefile.seek(0)
-    self.statefile.truncate()
-
-    self.statefile.write(serializer.Dump(self.data))
-
-    self._Close()
-
 
 class Instance(object):
   """Abstraction for a Virtual Machine instance.
@@ -247,48 +243,40 @@ class Instance(object):
     """Encapsulates the start of an instance.
 
     """
-    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+    op = opcodes.OpStartupInstance(instance_name=self.name,
+                                   force=False,
+                                   extra_args=None)
+    cli.SubmitOpCode(op, cl=client)
 
   def ActivateDisks(self):
     """Encapsulates the activation of all disks of an instance.
 
     """
-    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
-
-
-def _RunListCmd(cmd):
-  """Runs a command and parses its output into lists.
-
-  """
-  for line in DoCmd(cmd).stdout.splitlines():
-    yield line.split(':')
+    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+    cli.SubmitOpCode(op, cl=client)
 
 
 def GetInstanceList(with_secondaries=None):
   """Get a list of instances on this cluster.
 
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:']
-
-  fields = 'name,oper_state,admin_state'
+  fields = ["name", "oper_state", "admin_state"]
 
   if with_secondaries is not None:
-    fields += ',snodes'
+    fields.append("snodes")
 
-  cmd.append('-o')
-  cmd.append(fields)
+  result = client.QueryInstances([], fields)
 
   instances = []
-  for fields in _RunListCmd(cmd):
+  for fields in result:
     if with_secondaries is not None:
       (name, status, autostart, snodes) = fields
 
-      if snodes == "-":
+      if not snodes:
         continue
 
       for node in with_secondaries:
-        if node in snodes.split(','):
+        if node in snodes:
           break
       else:
         continue
@@ -296,7 +284,7 @@ def GetInstanceList(with_secondaries=None):
     else:
       (name, status, autostart) = fields
 
-    instances.append(Instance(name, status, autostart != "no"))
+    instances.append(Instance(name, status, autostart))
 
   return instances
 
@@ -305,15 +293,8 @@ def GetNodeBootIDs():
   """Get a dict mapping nodes to boot IDs.
 
   """
-  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
-         '--separator=:', '-o', 'name,bootid']
-
-  ids = {}
-  for fields in _RunListCmd(cmd):
-    (name, bootid) = fields
-    ids[name] = bootid
-
-  return ids
+  result = client.QueryNodes([], ["name", "bootid"])
+  return dict([(name, bootid) for name, bootid in result])
 
 
 class Watcher(object):
@@ -347,9 +328,9 @@ class Watcher(object):
 
     """
     check_nodes = []
-    for name, id in self.bootids.iteritems():
+    for name, new_id in self.bootids.iteritems():
       old = notepad.GetNodeBootID(name)
-      if old != id:
+      if old != new_id:
         # Node's boot ID has changed, proably through a reboot.
         check_nodes.append(name)
 
@@ -368,7 +349,7 @@ class Watcher(object):
         try:
           logging.info("Activating disks for instance %s", instance.name)
           instance.ActivateDisks()
-        except Error, err:
+        except Exception, err:
           logging.error(str(err), exc_info=True)
 
       # Keep changed boot IDs
@@ -402,7 +383,7 @@ class Watcher(object):
                         instance.name, last)
           instance.Restart()
           self.started_instances.add(instance.name)
-        except Error, err:
+        except Exception, err:
           logging.error(str(err), exc_info=True)
 
         notepad.RecordRestartAttempt(instance)
@@ -418,7 +399,8 @@ class Watcher(object):
     """Run gnt-cluster verify-disks.
 
     """
-    result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15'])
+    # TODO: What should we do here?
+    result = DoCmd(['gnt-cluster', 'verify-disks'])
     if result.output:
       logging.info(result.output)
 
@@ -442,44 +424,28 @@ def ParseOptions():
   return options, args
 
 
-def SetupLogging(debug):
-  """Configures the logging module.
-
-  """
-  formatter = logging.Formatter("%(asctime)s: %(message)s")
-
-  logfile_handler = logging.FileHandler(constants.LOG_WATCHER)
-  logfile_handler.setFormatter(formatter)
-  logfile_handler.setLevel(logging.INFO)
-
-  stderr_handler = logging.StreamHandler()
-  stderr_handler.setFormatter(formatter)
-  if debug:
-    stderr_handler.setLevel(logging.NOTSET)
-  else:
-    stderr_handler.setLevel(logging.CRITICAL)
-
-  root_logger = logging.getLogger("")
-  root_logger.setLevel(logging.NOTSET)
-  root_logger.addHandler(logfile_handler)
-  root_logger.addHandler(stderr_handler)
-
-
 def main():
   """Main function.
 
   """
+  global client
+
   options, args = ParseOptions()
 
-  SetupLogging(options.debug)
+  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug)
 
   try:
+    client = cli.GetClient()
+
     try:
       watcher = Watcher()
     except errors.ConfigurationError:
       # Just exit if there's no configuration
       sys.exit(constants.EXIT_SUCCESS)
+
     watcher.Run()
+  except SystemExit:
+    raise
   except NotMasterError:
     logging.debug("Not master, exiting")
     sys.exit(constants.EXIT_NOTMASTER)