#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import sys
import time
import logging
+import operator
+import errno
from optparse import OptionParser
from ganeti import utils
from ganeti import luxi
from ganeti import rapi
from ganeti import netutils
+from ganeti import qlang
+from ganeti import objects
+from ganeti import ssconf
+from ganeti import ht
+from ganeti import pathutils
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
+from ganeti.rapi.client import UsesRapiClient
from ganeti.watcher import nodemaint
from ganeti.watcher import state
MAXTRIES = 5
-
-
-# Global LUXI client object
-client = None
BAD_STATES = frozenset([
constants.INSTST_ERRORDOWN,
])
NOTICE = "NOTICE"
ERROR = "ERROR"
+#: Number of seconds to wait between starting child processes for node groups
+CHILD_PROCESS_DELAY = 1.0
+
+#: How many seconds to wait for instance status file lock
+INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
+
class NotMasterError(errors.GenericError):
"""Exception raised when this host is not the master."""
"""Check whether we should pause.
"""
- return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+ return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
def StartNodeDaemons():
# on master or not, try to start the node daemon
utils.EnsureDaemon(constants.NODED)
# start confd as well. On non candidates it will be in disabled mode.
- utils.EnsureDaemon(constants.CONFD)
+ if constants.ENABLE_CONFD:
+ utils.EnsureDaemon(constants.CONFD)
def RunWatcherHooks():
"""Run the watcher hooks.
"""
- hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+ hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
constants.HOOKS_NAME_WATCHER)
if not os.path.isdir(hooks_dir):
return
try:
results = utils.RunParts(hooks_dir)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("RunParts %s failed: %s", hooks_dir)
+ except Exception, err: # pylint: disable=W0703
+ logging.exception("RunParts %s failed: %s", hooks_dir, err)
return
for (relname, status, runresult) in results:
self.autostart = autostart
self.snodes = snodes
- def Restart(self):
+ def Restart(self, cl):
"""Encapsulates the start of an instance.
"""
op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
- cli.SubmitOpCode(op, cl=client)
+ cli.SubmitOpCode(op, cl=cl)
- def ActivateDisks(self):
+ def ActivateDisks(self, cl):
"""Encapsulates the activation of all disks of an instance.
"""
op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
- cli.SubmitOpCode(op, cl=client)
+ cli.SubmitOpCode(op, cl=cl)
-def GetClusterData():
- """Get a list of instances on this cluster.
+class Node:
+ """Data container representing cluster node.
"""
- op1_fields = ["name", "status", "admin_state", "snodes"]
- op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
- use_locking=True)
- op2_fields = ["name", "bootid", "offline"]
- op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
- use_locking=True)
-
- job_id = client.SubmitJob([op1, op2])
+ def __init__(self, name, bootid, offline, secondaries):
+ """Initializes this class.
- all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+ """
+ self.name = name
+ self.bootid = bootid
+ self.offline = offline
+ self.secondaries = secondaries
- logging.debug("Got data from cluster, writing instance status file")
- result = all_results[0]
- smap = {}
+def _CheckInstances(cl, notepad, instances):
+ """Make a pass over the list of instances, restarting downed ones.
- instances = {}
+ """
+ notepad.MaintainInstanceList(instances.keys())
- # write the instance status file
- up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
- utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
+ started = set()
- for fields in result:
- (name, status, autostart, snodes) = fields
+ for inst in instances.values():
+ if inst.status in BAD_STATES:
+ n = notepad.NumberOfRestartAttempts(inst.name)
- # update the secondary node map
- for node in snodes:
- if node not in smap:
- smap[node] = []
- smap[node].append(name)
+ if n > MAXTRIES:
+ logging.warning("Not restarting instance '%s', retries exhausted",
+ inst.name)
+ continue
- instances[name] = Instance(name, status, autostart, snodes)
+ if n == MAXTRIES:
+ notepad.RecordRestartAttempt(inst.name)
+ logging.error("Could not restart instance '%s' after %s attempts,"
+ " giving up", inst.name, MAXTRIES)
+ continue
- nodes = dict([(name, (bootid, offline))
- for name, bootid, offline in all_results[1]])
+ try:
+ logging.info("Restarting instance '%s' (attempt #%s)",
+ inst.name, n + 1)
+ inst.Restart(cl)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while restarting instance '%s'", inst.name)
+ else:
+ started.add(inst.name)
- client.ArchiveJob(job_id)
+ notepad.RecordRestartAttempt(inst.name)
- return instances, nodes, smap
+ else:
+ if notepad.NumberOfRestartAttempts(inst.name):
+ notepad.RemoveInstance(inst.name)
+ if inst.status not in HELPLESS_STATES:
+ logging.info("Restart of instance '%s' succeeded", inst.name)
+ return started
-class Watcher(object):
- """Encapsulate the logic for restarting erroneously halted virtual machines.
- The calling program should periodically instantiate me and call Run().
- This will traverse the list of instances, and make up to MAXTRIES attempts
- to restart machines that are down.
+def _CheckDisks(cl, notepad, nodes, instances, started):
+ """Check all nodes for restarted ones.
"""
- def __init__(self, opts, notepad):
- self.notepad = notepad
- master = client.QueryConfigValues(["master_node"])[0]
- if master != netutils.Hostname.GetSysName():
- raise NotMasterError("This is not the master node")
- # first archive old jobs
- self.ArchiveJobs(opts.job_age)
- # and only then submit new ones
- self.instances, self.bootids, self.smap = GetClusterData()
- self.started_instances = set()
- self.opts = opts
-
- def Run(self):
- """Watcher run sequence.
-
- """
- notepad = self.notepad
- self.CheckInstances(notepad)
- self.CheckDisks(notepad)
- self.VerifyDisks()
-
- @staticmethod
- def ArchiveJobs(age):
- """Archive old jobs.
-
- """
- arch_count, left_count = client.AutoArchiveJobs(age)
- logging.debug("Archived %s jobs, left %s", arch_count, left_count)
-
- def CheckDisks(self, notepad):
- """Check all nodes for restarted ones.
-
- """
- check_nodes = []
- for name, (new_id, offline) in self.bootids.iteritems():
- old = notepad.GetNodeBootID(name)
- if new_id is None:
- # Bad node, not returning a boot id
- if not offline:
- logging.debug("Node %s missing boot id, skipping secondary checks",
- name)
- continue
- if old != new_id:
- # Node's boot ID has changed, proably through a reboot.
- check_nodes.append(name)
-
- if check_nodes:
- # Activate disks for all instances with any of the checked nodes as a
- # secondary node.
- for node in check_nodes:
- if node not in self.smap:
+ check_nodes = []
+
+ for node in nodes.values():
+ old = notepad.GetNodeBootID(node.name)
+ if not node.bootid:
+ # Bad node, not returning a boot id
+ if not node.offline:
+ logging.debug("Node '%s' missing boot ID, skipping secondary checks",
+ node.name)
+ continue
+
+ if old != node.bootid:
+ # Node's boot ID has changed, probably through a reboot
+ check_nodes.append(node)
+
+ if check_nodes:
+ # Activate disks for all instances with any of the checked nodes as a
+ # secondary node.
+ for node in check_nodes:
+ for instance_name in node.secondaries:
+ try:
+ inst = instances[instance_name]
+ except KeyError:
+ logging.info("Can't find instance '%s', maybe it was ignored",
+ instance_name)
continue
- for instance_name in self.smap[node]:
- instance = self.instances[instance_name]
- if not instance.autostart:
- logging.info(("Skipping disk activation for non-autostart"
- " instance %s"), instance.name)
- continue
- if instance.name in self.started_instances:
- # we already tried to start the instance, which should have
- # activated its drives (if they can be at all)
- logging.debug("Skipping disk activation for instance %s, as"
- " it was already started", instance.name)
- continue
- try:
- logging.info("Activating disks for instance %s", instance.name)
- instance.ActivateDisks()
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while activating disks for instance %s",
- instance.name)
-
- # Keep changed boot IDs
- for name in check_nodes:
- notepad.SetNodeBootID(name, self.bootids[name][0])
-
- def CheckInstances(self, notepad):
- """Make a pass over the list of instances, restarting downed ones.
-
- """
- notepad.MaintainInstanceList(self.instances.keys())
- for instance in self.instances.values():
- if instance.status in BAD_STATES:
- n = notepad.NumberOfRestartAttempts(instance)
-
- if n > MAXTRIES:
- logging.warning("Not restarting instance %s, retries exhausted",
- instance.name)
- continue
- elif n < MAXTRIES:
- last = " (Attempt #%d)" % (n + 1)
- else:
- notepad.RecordRestartAttempt(instance)
- logging.error("Could not restart %s after %d attempts, giving up",
- instance.name, MAXTRIES)
+ if not inst.autostart:
+ logging.info("Skipping disk activation for non-autostart"
+ " instance '%s'", inst.name)
continue
- try:
- logging.info("Restarting %s%s", instance.name, last)
- instance.Restart()
- self.started_instances.add(instance.name)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while restarting instance %s",
- instance.name)
-
- notepad.RecordRestartAttempt(instance)
- elif instance.status in HELPLESS_STATES:
- if notepad.NumberOfRestartAttempts(instance):
- notepad.RemoveInstance(instance)
- else:
- if notepad.NumberOfRestartAttempts(instance):
- notepad.RemoveInstance(instance)
- logging.info("Restart of %s succeeded", instance.name)
-
- def _CheckForOfflineNodes(self, instance):
- """Checks if given instances has any secondary in offline status.
-
- @param instance: The instance object
- @return: True if any of the secondary is offline, False otherwise
- """
- bootids = []
- for node in instance.snodes:
- bootids.append(self.bootids[node])
+ if inst.name in started:
+ # we already tried to start the instance, which should have
+ # activated its drives (if they can be at all)
+ logging.debug("Skipping disk activation for instance '%s' as"
+ " it was already started", inst.name)
+ continue
- return compat.any(offline for (_, offline) in bootids)
+ try:
+ logging.info("Activating disks for instance '%s'", inst.name)
+ inst.ActivateDisks(cl)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while activating disks for instance '%s'",
+ inst.name)
- def VerifyDisks(self):
- """Run gnt-cluster verify-disks.
+ # Keep changed boot IDs
+ for node in check_nodes:
+ notepad.SetNodeBootID(node.name, node.bootid)
- """
- job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
- result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
- client.ArchiveJob(job_id)
- # Keep track of submitted jobs
- jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
+def _CheckForOfflineNodes(nodes, instance):
+ """Checks if given instances has any secondary in offline status.
- archive_jobs = set()
- for (status, job_id) in result[constants.JOB_IDS_KEY]:
- jex.AddJobId(None, status, job_id)
- if status:
- archive_jobs.add(job_id)
+ @param instance: The instance object
+ @return: True if any of the secondary is offline, False otherwise
- offline_disk_instances = set()
+ """
+ return compat.any(nodes[node_name].offline for node_name in instance.snodes)
- for (status, result) in jex.GetResults():
- if not status:
- logging.error("Verify-disks job failed: %s", result)
- continue
- ((_, instances, _), ) = result
+def _VerifyDisks(cl, uuid, nodes, instances):
+ """Run a per-group "gnt-cluster verify-disks".
- offline_disk_instances.update(instances)
+ """
+ job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
+ ((_, offline_disk_instances, _), ) = \
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+ cl.ArchiveJob(job_id)
+
+ if not offline_disk_instances:
+ # nothing to do
+ logging.debug("Verify-disks reported no offline disks, nothing to do")
+ return
- for job_id in archive_jobs:
- client.ArchiveJob(job_id)
+ logging.debug("Will activate disks for instance(s) %s",
+ utils.CommaJoin(offline_disk_instances))
- if not offline_disk_instances:
- # nothing to do
- logging.debug("verify-disks reported no offline disks, nothing to do")
- return
+ # We submit only one job, and wait for it. Not optimal, but this puts less
+ # load on the job queue.
+ job = []
+ for name in offline_disk_instances:
+ try:
+ inst = instances[name]
+ except KeyError:
+ logging.info("Can't find instance '%s', maybe it was ignored", name)
+ continue
- logging.debug("Will activate disks for instance(s) %s",
- utils.CommaJoin(offline_disk_instances))
+ if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
+ logging.info("Skipping instance '%s' because it is in a helpless state"
+ " or has offline secondaries", name)
+ continue
- # we submit only one job, and wait for it. not optimal, but spams
- # less the job queue
- job = []
- for name in offline_disk_instances:
- instance = self.instances[name]
- if (instance.status in HELPLESS_STATES or
- self._CheckForOfflineNodes(instance)):
- logging.info("Skip instance %s because it is in helpless state or has"
- " one offline secondary", name)
- continue
- job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
+ job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
- if job:
- job_id = cli.SendJob(job, cl=client)
+ if job:
+ job_id = cli.SendJob(job, cl=cl)
- try:
- cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while activating disks")
+ try:
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while activating disks")
def IsRapiResponding(hostname):
try:
master_version = rapi_client.GetVersion()
except rapi.client.CertificateError, err:
- logging.warning("RAPI Error: CertificateError (%s)", err)
+ logging.warning("RAPI certificate error: %s", err)
return False
except rapi.client.GanetiApiError, err:
- logging.warning("RAPI Error: GanetiApiError (%s)", err)
+ logging.warning("RAPI error: %s", err)
return False
- logging.debug("RAPI Result: master_version is %s", master_version)
- return master_version == constants.RAPI_VERSION
+ else:
+ logging.debug("Reported RAPI version %s", master_version)
+ return master_version == constants.RAPI_VERSION
def ParseOptions():
constants.RELEASE_VERSION)
parser.add_option(cli.DEBUG_OPT)
+ parser.add_option(cli.NODEGROUP_OPT)
parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
help="Autoarchive jobs older than this age (default"
" 6 hours)")
parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
action="store_true", help="Ignore cluster pause setting")
+ parser.add_option("--wait-children", dest="wait_children",
+ action="store_true", help="Wait for child processes")
+ parser.add_option("--no-wait-children", dest="wait_children",
+ action="store_false",
+ help="Don't wait for child processes")
+ # See optparse documentation for why default values are not set by options
+ parser.set_defaults(wait_children=True)
options, args = parser.parse_args()
options.job_age = cli.ParseTimespec(options.job_age)
return (options, args)
-@rapi.client.UsesRapiClient
+def _WriteInstanceStatus(filename, data):
+ """Writes the per-group instance status file.
+
+ The entries are sorted.
+
+ @type filename: string
+ @param filename: Path to instance status file
+ @type data: list of tuple; (instance name as string, status as string)
+ @param data: Instance name and status
+
+ """
+ logging.debug("Updating instance status file '%s' with %s instances",
+ filename, len(data))
+
+ utils.WriteFile(filename,
+ data="".join(map(compat.partial(operator.mod, "%s %s\n"),
+ sorted(data))))
+
+
+def _UpdateInstanceStatus(filename, instances):
+ """Writes an instance status file from L{Instance} objects.
+
+ @type filename: string
+ @param filename: Path to status file
+ @type instances: list of L{Instance}
+
+ """
+ _WriteInstanceStatus(filename, [(inst.name, inst.status)
+ for inst in instances])
+
+
+def _ReadInstanceStatus(filename):
+ """Reads an instance status file.
+
+ @type filename: string
+ @param filename: Path to status file
+ @rtype: tuple; (None or number, list of lists containing instance name and
+ status)
+ @return: File's mtime and instance status contained in the file; mtime is
+ C{None} if file can't be read
+
+ """
+ logging.debug("Reading per-group instance status from '%s'", filename)
+
+ statcb = utils.FileStatHelper()
+ try:
+ content = utils.ReadFile(filename, preread=statcb)
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ logging.error("Can't read '%s', does not exist (yet)", filename)
+ else:
+ logging.exception("Unable to read '%s', ignoring", filename)
+ return (None, None)
+ else:
+ return (statcb.st.st_mtime, [line.split(None, 1)
+ for line in content.splitlines()])
+
+
+def _MergeInstanceStatus(filename, pergroup_filename, groups):
+ """Merges all per-group instance status files into a global one.
+
+ @type filename: string
+ @param filename: Path to global instance status file
+ @type pergroup_filename: string
+ @param pergroup_filename: Path to per-group status files, must contain "%s"
+ to be replaced with group UUID
+ @type groups: sequence
+ @param groups: UUIDs of known groups
+
+ """
+ # Lock global status file in exclusive mode
+ lock = utils.FileLock.Open(filename)
+ try:
+ lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
+ except errors.LockError, err:
+ # All per-group processes will lock and update the file. None of them
+ # should take longer than 10 seconds (the value of
+ # INSTANCE_STATUS_LOCK_TIMEOUT).
+ logging.error("Can't acquire lock on instance status file '%s', not"
+ " updating: %s", filename, err)
+ return
+
+ logging.debug("Acquired exclusive lock on '%s'", filename)
+
+ data = {}
+
+ # Load instance status from all groups
+ for group_uuid in groups:
+ (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
+
+ if mtime is not None:
+ for (instance_name, status) in instdata:
+ data.setdefault(instance_name, []).append((mtime, status))
+
+ # Select last update based on file mtime
+ inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
+ for (instance_name, status) in data.items()]
+
+ # Write the global status file. Don't touch file after it's been
+ # updated--there is no lock anymore.
+ _WriteInstanceStatus(filename, inststatus)
+
+
+def GetLuxiClient(try_restart):
+ """Tries to connect to the master daemon.
+
+ @type try_restart: bool
+ @param try_restart: Whether to attempt to restart the master daemon
+
+ """
+ try:
+ return cli.GetClient()
+ except errors.OpPrereqError, err:
+ # this is, from cli.GetClient, a not-master case
+ raise NotMasterError("Not on master node (%s)" % err)
+
+ except luxi.NoMasterError, err:
+ if not try_restart:
+ raise
+
+ logging.warning("Master daemon seems to be down (%s), trying to restart",
+ err)
+
+ if not utils.EnsureDaemon(constants.MASTERD):
+ raise errors.GenericError("Can't start the master daemon")
+
+ # Retry the connection
+ return cli.GetClient()
+
+
+def _StartGroupChildren(cl, wait):
+ """Starts a new instance of the watcher for every node group.
+
+ """
+ assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
+ for arg in sys.argv)
+
+ result = cl.QueryGroups([], ["name", "uuid"], False)
+
+ children = []
+
+ for (idx, (name, uuid)) in enumerate(result):
+ args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
+
+ if idx > 0:
+ # Let's not kill the system
+ time.sleep(CHILD_PROCESS_DELAY)
+
+ logging.debug("Spawning child for group '%s' (%s), arguments %s",
+ name, uuid, args)
+
+ try:
+ # TODO: Should utils.StartDaemon be used instead?
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Failed to start child for group '%s' (%s)",
+ name, uuid)
+ else:
+ logging.debug("Started with PID %s", pid)
+ children.append(pid)
+
+ if wait:
+ for pid in children:
+ logging.debug("Waiting for child PID %s", pid)
+ try:
+ result = utils.RetryOnSignal(os.waitpid, pid, 0)
+ except EnvironmentError, err:
+ result = str(err)
+
+ logging.debug("Child PID %s exited with status %s", pid, result)
+
+
+def _ArchiveJobs(cl, age):
+ """Archives old jobs.
+
+ """
+ (arch_count, left_count) = cl.AutoArchiveJobs(age)
+ logging.debug("Archived %s jobs, left %s", arch_count, left_count)
+
+
+def _CheckMaster(cl):
+ """Ensures current host is master node.
+
+ """
+ (master, ) = cl.QueryConfigValues(["master_node"])
+ if master != netutils.Hostname.GetSysName():
+ raise NotMasterError("This is not the master node")
+
+
+@UsesRapiClient
+def _GlobalWatcher(opts):
+ """Main function for global watcher.
+
+ At the end child processes are spawned for every node group.
+
+ """
+ StartNodeDaemons()
+ RunWatcherHooks()
+
+ # Run node maintenance in all cases, even if master, so that old masters can
+ # be properly cleaned up
+ if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
+ nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
+
+ try:
+ client = GetLuxiClient(True)
+ except NotMasterError:
+ # Don't proceed on non-master nodes
+ return constants.EXIT_SUCCESS
+
+ # we are on master now
+ utils.EnsureDaemon(constants.RAPI)
+
+ # If RAPI isn't responding to queries, try one restart
+ logging.debug("Attempting to talk to remote API on %s",
+ constants.IP4_ADDRESS_LOCALHOST)
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.warning("Couldn't get answer from remote API, restaring daemon")
+ utils.StopDaemon(constants.RAPI)
+ utils.EnsureDaemon(constants.RAPI)
+ logging.debug("Second attempt to talk to remote API")
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.fatal("RAPI is not responding")
+ logging.debug("Successfully talked to remote API")
+
+ _CheckMaster(client)
+ _ArchiveJobs(client, opts.job_age)
+
+ # Spawn child processes for all node groups
+ _StartGroupChildren(client, opts.wait_children)
+
+ return constants.EXIT_SUCCESS
+
+
+def _GetGroupData(cl, uuid):
+ """Retrieves instances and nodes per node group.
+
+ """
+ job = [
+ # Get all primary instances in group
+ opcodes.OpQuery(what=constants.QR_INSTANCE,
+ fields=["name", "status", "admin_state", "snodes",
+ "pnode.group.uuid", "snodes.group.uuid"],
+ qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
+ use_locking=True),
+
+ # Get all nodes in group
+ opcodes.OpQuery(what=constants.QR_NODE,
+ fields=["name", "bootid", "offline"],
+ qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
+ use_locking=True),
+ ]
+
+ job_id = cl.SubmitJob(job)
+ results = map(objects.QueryResponse.FromDict,
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
+ cl.ArchiveJob(job_id)
+
+ results_data = map(operator.attrgetter("data"), results)
+
+ # Ensure results are tuples with two values
+ assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
+
+ # Extract values ignoring result status
+ (raw_instances, raw_nodes) = [[map(compat.snd, values)
+ for values in res]
+ for res in results_data]
+
+ secondaries = {}
+ instances = []
+
+ # Load all instances
+ for (name, status, autostart, snodes, pnode_group_uuid,
+ snodes_group_uuid) in raw_instances:
+ if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
+ logging.error("Ignoring split instance '%s', primary group %s, secondary"
+ " groups %s", name, pnode_group_uuid,
+ utils.CommaJoin(snodes_group_uuid))
+ else:
+ instances.append(Instance(name, status, autostart, snodes))
+
+ for node in snodes:
+ secondaries.setdefault(node, set()).add(name)
+
+ # Load all nodes
+ nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
+ for (name, bootid, offline) in raw_nodes]
+
+ return (dict((node.name, node) for node in nodes),
+ dict((inst.name, inst) for inst in instances))
+
+
+def _LoadKnownGroups():
+ """Returns a list of all node groups known by L{ssconf}.
+
+ """
+ groups = ssconf.SimpleStore().GetNodegroupList()
+
+ result = list(line.split(None, 1)[0] for line in groups
+ if line.strip())
+
+ if not compat.all(map(utils.UUID_RE.match, result)):
+ raise errors.GenericError("Ssconf contains invalid group UUID")
+
+ return result
+
+
+def _GroupWatcher(opts):
+ """Main function for per-group watcher process.
+
+ """
+ group_uuid = opts.nodegroup.lower()
+
+ if not utils.UUID_RE.match(group_uuid):
+ raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
+ " got '%s'" %
+ (cli.NODEGROUP_OPT_NAME, group_uuid))
+
+ logging.info("Watcher for node group '%s'", group_uuid)
+
+ known_groups = _LoadKnownGroups()
+
+ # Check if node group is known
+ if group_uuid not in known_groups:
+ raise errors.GenericError("Node group '%s' is not known by ssconf" %
+ group_uuid)
+
+ # Group UUID has been verified and should not contain any dangerous
+ # characters
+ state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
+ inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
+
+ logging.debug("Using state file %s", state_path)
+
+ # Global watcher
+ statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
+ if not statefile:
+ return constants.EXIT_FAILURE
+
+ notepad = state.WatcherState(statefile) # pylint: disable=E0602
+ try:
+ # Connect to master daemon
+ client = GetLuxiClient(False)
+
+ _CheckMaster(client)
+
+ (nodes, instances) = _GetGroupData(client, group_uuid)
+
+ # Update per-group instance status file
+ _UpdateInstanceStatus(inst_status_path, instances.values())
+
+ _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
+ pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
+ known_groups)
+
+ started = _CheckInstances(client, notepad, instances)
+ _CheckDisks(client, notepad, nodes, instances, started)
+ _VerifyDisks(client, group_uuid, nodes, instances)
+ except Exception, err:
+ logging.info("Not updating status file due to failure: %s", err)
+ raise
+ else:
+ # Save changes for next run
+ notepad.Save(state_path)
+
+ return constants.EXIT_SUCCESS
+
+
def Main():
"""Main function.
"""
- global client # pylint: disable-msg=W0603
-
(options, _) = ParseOptions()
- utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
+ utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
debug=options.debug, stderr_logging=options.debug)
if ShouldPause() and not options.ignore_pause:
logging.debug("Pause has been set, exiting")
return constants.EXIT_SUCCESS
- statefile = \
- state.OpenStateFile(constants.WATCHER_STATEFILE)
- if not statefile:
- return constants.EXIT_FAILURE
-
- update_file = False
+ # Try to acquire global watcher lock in shared mode
+ lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
try:
- StartNodeDaemons()
- RunWatcherHooks()
- # run node maintenance in all cases, even if master, so that old
- # masters can be properly cleaned up too
- if nodemaint.NodeMaintenance.ShouldRun():
- nodemaint.NodeMaintenance().Exec()
-
- notepad = state.WatcherState(statefile)
- try:
- try:
- client = cli.GetClient()
- except errors.OpPrereqError:
- # this is, from cli.GetClient, a not-master case
- logging.debug("Not on master, exiting")
- update_file = True
- return constants.EXIT_SUCCESS
- except luxi.NoMasterError, err:
- logging.warning("Master seems to be down (%s), trying to restart",
- str(err))
- if not utils.EnsureDaemon(constants.MASTERD):
- logging.critical("Can't start the master, exiting")
- return constants.EXIT_FAILURE
- # else retry the connection
- client = cli.GetClient()
-
- # we are on master now
- utils.EnsureDaemon(constants.RAPI)
-
- # If RAPI isn't responding to queries, try one restart.
- logging.debug("Attempting to talk with RAPI.")
- if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
- logging.warning("Couldn't get answer from Ganeti RAPI daemon."
- " Restarting Ganeti RAPI.")
- utils.StopDaemon(constants.RAPI)
- utils.EnsureDaemon(constants.RAPI)
- logging.debug("Second attempt to talk with RAPI")
- if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
- logging.fatal("RAPI is not responding. Please investigate.")
- logging.debug("Successfully talked to RAPI.")
+ lock.Shared(blocking=False)
+ except (EnvironmentError, errors.LockError), err:
+ logging.error("Can't acquire lock on %s: %s",
+ pathutils.WATCHER_LOCK_FILE, err)
+ return constants.EXIT_SUCCESS
- try:
- watcher = Watcher(options, notepad)
- except errors.ConfigurationError:
- # Just exit if there's no configuration
- update_file = True
- return constants.EXIT_SUCCESS
-
- watcher.Run()
- update_file = True
-
- finally:
- if update_file:
- notepad.Save()
- else:
- logging.debug("Not updating status file due to failure")
- except SystemExit:
+ if options.nodegroup is None:
+ fn = _GlobalWatcher
+ else:
+ # Per-nodegroup watcher
+ fn = _GroupWatcher
+
+ try:
+ return fn(options)
+ except (SystemExit, KeyboardInterrupt):
raise
except NotMasterError:
logging.debug("Not master, exiting")