errors: Document arguments to QueryFilterParseError
[ganeti-local] / lib / watcher / __init__.py
index 82682d7..c60f4a1 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -49,8 +49,10 @@ from ganeti import qlang
 from ganeti import objects
 from ganeti import ssconf
 from ganeti import ht
+from ganeti import pathutils
 
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
+from ganeti.rapi.client import UsesRapiClient
 
 from ganeti.watcher import nodemaint
 from ganeti.watcher import state
@@ -82,7 +84,7 @@ def ShouldPause():
   """Check whether we should pause.
 
   """
-  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
 
 
 def StartNodeDaemons():
@@ -92,22 +94,23 @@ def StartNodeDaemons():
   # on master or not, try to start the node daemon
   utils.EnsureDaemon(constants.NODED)
   # start confd as well. On non candidates it will be in disabled mode.
-  utils.EnsureDaemon(constants.CONFD)
+  if constants.ENABLE_CONFD:
+    utils.EnsureDaemon(constants.CONFD)
 
 
 def RunWatcherHooks():
   """Run the watcher hooks.
 
   """
-  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
                              constants.HOOKS_NAME_WATCHER)
   if not os.path.isdir(hooks_dir):
     return
 
   try:
     results = utils.RunParts(hooks_dir)
-  except Exception: # pylint: disable-msg=W0703
-    logging.exception("RunParts %s failed: %s", hooks_dir)
+  except Exception, err: # pylint: disable=W0703
+    logging.exception("RunParts %s failed: %s", hooks_dir, err)
     return
 
   for (relname, status, runresult) in results:
@@ -193,7 +196,7 @@ def _CheckInstances(cl, notepad, instances):
         logging.info("Restarting instance '%s' (attempt #%s)",
                      inst.name, n + 1)
         inst.Restart(cl)
-      except Exception: # pylint: disable-msg=W0703
+      except Exception: # pylint: disable=W0703
         logging.exception("Error while restarting instance '%s'", inst.name)
       else:
         started.add(inst.name)
@@ -255,7 +258,7 @@ def _CheckDisks(cl, notepad, nodes, instances, started):
         try:
           logging.info("Activating disks for instance '%s'", inst.name)
           inst.ActivateDisks(cl)
-        except Exception: # pylint: disable-msg=W0703
+        except Exception: # pylint: disable=W0703
           logging.exception("Error while activating disks for instance '%s'",
                             inst.name)
 
@@ -302,8 +305,8 @@ def _VerifyDisks(cl, uuid, nodes, instances):
       continue
 
     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
-      logging.info("Skipping instance '%s' because it is in a helpless state or"
-                   " has offline secondaries", name)
+      logging.info("Skipping instance '%s' because it is in a helpless state"
+                   " or has offline secondaries", name)
       continue
 
     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
@@ -313,7 +316,7 @@ def _VerifyDisks(cl, uuid, nodes, instances):
 
     try:
       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
-    except Exception: # pylint: disable-msg=W0703
+    except Exception: # pylint: disable=W0703
       logging.exception("Error while activating disks")
 
 
@@ -363,8 +366,13 @@ def ParseOptions():
                           " 6 hours)")
   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
                     action="store_true", help="Ignore cluster pause setting")
-  parser.add_option("--wait-children", dest="wait_children", default=False,
+  parser.add_option("--wait-children", dest="wait_children",
                     action="store_true", help="Wait for child processes")
+  parser.add_option("--no-wait-children", dest="wait_children",
+                    action="store_false",
+                    help="Don't wait for child processes")
+  # See optparse documentation for why default values are not set by options
+  parser.set_defaults(wait_children=True)
   options, args = parser.parse_args()
   options.job_age = cli.ParseTimespec(options.job_age)
 
@@ -405,23 +413,6 @@ def _UpdateInstanceStatus(filename, instances):
                                   for inst in instances])
 
 
-class _StatCb:
-  """Helper to store file handle's C{fstat}.
-
-  """
-  def __init__(self):
-    """Initializes this class.
-
-    """
-    self.st = None
-
-  def __call__(self, fh):
-    """Calls C{fstat} on file handle.
-
-    """
-    self.st = os.fstat(fh.fileno())
-
-
 def _ReadInstanceStatus(filename):
   """Reads an instance status file.
 
@@ -435,7 +426,7 @@ def _ReadInstanceStatus(filename):
   """
   logging.debug("Reading per-group instance status from '%s'", filename)
 
-  statcb = _StatCb()
+  statcb = utils.FileStatHelper()
   try:
     content = utils.ReadFile(filename, preread=statcb)
   except EnvironmentError, err:
@@ -445,7 +436,7 @@ def _ReadInstanceStatus(filename):
       logging.exception("Unable to read '%s', ignoring", filename)
     return (None, None)
   else:
-    return (statcb.st.st_mtime, [line.split(1)
+    return (statcb.st.st_mtime, [line.split(None, 1)
                                  for line in content.splitlines()])
 
 
@@ -545,7 +536,7 @@ def _StartGroupChildren(cl, wait):
     try:
       # TODO: Should utils.StartDaemon be used instead?
       pid = os.spawnv(os.P_NOWAIT, args[0], args)
-    except Exception: # pylint: disable-msg=W0703
+    except Exception: # pylint: disable=W0703
       logging.exception("Failed to start child for group '%s' (%s)",
                         name, uuid)
     else:
@@ -580,7 +571,7 @@ def _CheckMaster(cl):
     raise NotMasterError("This is not the master node")
 
 
-@rapi.client.UsesRapiClient
+@UsesRapiClient
 def _GlobalWatcher(opts):
   """Main function for global watcher.
 
@@ -592,8 +583,8 @@ def _GlobalWatcher(opts):
 
   # Run node maintenance in all cases, even if master, so that old masters can
   # be properly cleaned up
-  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
-    nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
+  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
+    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
 
   try:
     client = GetLuxiClient(True)
@@ -629,18 +620,19 @@ def _GetGroupData(cl, uuid):
   """Retrieves instances and nodes per node group.
 
   """
-  # TODO: Implement locking
   job = [
     # Get all primary instances in group
     opcodes.OpQuery(what=constants.QR_INSTANCE,
                     fields=["name", "status", "admin_state", "snodes",
                             "pnode.group.uuid", "snodes.group.uuid"],
-                    filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
+                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
+                    use_locking=True),
 
     # Get all nodes in group
     opcodes.OpQuery(what=constants.QR_NODE,
                     fields=["name", "bootid", "offline"],
-                    filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
+                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
+                    use_locking=True),
     ]
 
   job_id = cl.SubmitJob(job)
@@ -717,18 +709,19 @@ def _GroupWatcher(opts):
     raise errors.GenericError("Node group '%s' is not known by ssconf" %
                               group_uuid)
 
-  # Group UUID has been verified and should not contain any dangerous characters
-  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
-  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
+  # Group UUID has been verified and should not contain any dangerous
+  # characters
+  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
+  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
 
   logging.debug("Using state file %s", state_path)
 
   # Global watcher
-  statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
+  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
   if not statefile:
     return constants.EXIT_FAILURE
 
-  notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
+  notepad = state.WatcherState(statefile) # pylint: disable=E0602
   try:
     # Connect to master daemon
     client = GetLuxiClient(False)
@@ -740,8 +733,8 @@ def _GroupWatcher(opts):
     # Update per-group instance status file
     _UpdateInstanceStatus(inst_status_path, instances.values())
 
-    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
-                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
+    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
+                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
                          known_groups)
 
     started = _CheckInstances(client, notepad, instances)
@@ -763,7 +756,7 @@ def Main():
   """
   (options, _) = ParseOptions()
 
-  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
+  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
                      debug=options.debug, stderr_logging=options.debug)
 
   if ShouldPause() and not options.ignore_pause:
@@ -771,12 +764,12 @@ def Main():
     return constants.EXIT_SUCCESS
 
   # Try to acquire global watcher lock in shared mode
-  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
+  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
   try:
     lock.Shared(blocking=False)
   except (EnvironmentError, errors.LockError), err:
     logging.error("Can't acquire lock on %s: %s",
-                  constants.WATCHER_LOCK_FILE, err)
+                  pathutils.WATCHER_LOCK_FILE, err)
     return constants.EXIT_SUCCESS
 
   if options.nodegroup is None: