"""
+# pylint: disable-msg=C0103,W0142
+
+# C0103: Invalid name ganeti-watcher
+
import os
import sys
import time
import logging
+import errno
from optparse import OptionParser
from ganeti import utils
from ganeti import constants
from ganeti import serializer
-from ganeti import ssconf
from ganeti import errors
from ganeti import opcodes
-from ganeti import logger
from ganeti import cli
+from ganeti import luxi
MAXTRIES = 5
BAD_STATES = ['ERROR_down']
-HELPLESS_STATES = ['ERROR_nodedown']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
NOTICE = 'NOTICE'
ERROR = 'ERROR'
KEY_RESTART_COUNT = "restart_count"
def Indent(s, prefix='| '):
"""Indent a piece of text with a given prefix before each line.
- Args:
- s: The string to indent
- prefix: The string to prepend each line.
+ @param s: the string to indent
+ @param prefix: the string to prepend each line
"""
return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
+def ShouldPause():
+ """Check whether we should pause.
+
+ """
+ return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+
+
+def EnsureDaemon(name):
+ """Check for and start daemon if not alive.
+
+ """
+ result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+ if result.failed:
+ logging.error("Can't start daemon '%s', failure %s, output: %s",
+ name, result.fail_reason, result.output)
+ return False
+
+ return True
+
+
class WatcherState(object):
"""Interface to a state file recording restart attempts.
utils.LockFile(self.statefile.fileno())
try:
- self._data = serializer.Load(self.statefile.read())
- except Exception, msg:
+ state_data = self.statefile.read()
+ if not state_data:
+ self._data = {}
+ else:
+ self._data = serializer.Load(state_data)
+ except Exception, msg: # pylint: disable-msg=W0703
# Ignore errors while loading the file and treat it as empty
self._data = {}
- logging.warning(("Empty or invalid state file. Using defaults."
+ logging.warning(("Invalid state file. Using defaults."
" Error message: %s"), msg)
if "instance" not in self._data:
def NumberOfRestartAttempts(self, instance):
"""Returns number of previous restart attempts.
- Args:
- instance - the instance to look up.
+ @type instance: L{Instance}
+ @param instance: the instance to look up
"""
idata = self._data["instance"]
def RecordRestartAttempt(self, instance):
"""Record a restart attempt.
- Args:
- instance - the instance being restarted
+ @type instance: L{Instance}
+ @param instance: the instance being restarted
"""
idata = self._data["instance"]
inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
def RemoveInstance(self, instance):
- """Update state to reflect that a machine is running, i.e. remove record.
+ """Update state to reflect that a machine is running.
- Args:
- instance - the instance to remove from books
+ This method removes the record for a named instance (as we only
+ track down instances).
- This method removes the record for a named instance.
+ @type instance: L{Instance}
+ @param instance: the instance to remove from books
"""
idata = self._data["instance"]
class Instance(object):
"""Abstraction for a Virtual Machine instance.
- Methods:
- Restart(): issue a command to restart the represented machine.
-
"""
def __init__(self, name, state, autostart):
self.name = name
"""Encapsulates the start of an instance.
"""
- op = opcodes.OpStartupInstance(instance_name=self.name,
- force=False,
- extra_args=None)
+ op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
cli.SubmitOpCode(op, cl=client)
def ActivateDisks(self):
cli.SubmitOpCode(op, cl=client)
-def GetInstanceList(with_secondaries=None):
+def GetClusterData():
"""Get a list of instances on this cluster.
"""
- fields = ["name", "status", "admin_state"]
+ op1_fields = ["name", "status", "admin_state", "snodes"]
+ op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+ use_locking=True)
+ op2_fields = ["name", "bootid", "offline"]
+ op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+ use_locking=True)
- if with_secondaries is not None:
- fields.append("snodes")
+ job_id = client.SubmitJob([op1, op2])
- result = client.QueryInstances([], fields)
+ all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
- instances = []
- for fields in result:
- if with_secondaries is not None:
- (name, status, autostart, snodes) = fields
+ logging.debug("Got data from cluster, writing instance status file")
- if not snodes:
- continue
+ result = all_results[0]
+ smap = {}
- for node in with_secondaries:
- if node in snodes:
- break
- else:
- continue
+ instances = {}
- else:
- (name, status, autostart) = fields
+ # write the upfile
+ up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
+ utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
- instances.append(Instance(name, status, autostart))
+ for fields in result:
+ (name, status, autostart, snodes) = fields
- return instances
+ # update the secondary node map
+ for node in snodes:
+ if node not in smap:
+ smap[node] = []
+ smap[node].append(name)
+ instances[name] = Instance(name, status, autostart)
-def GetNodeBootIDs():
- """Get a dict mapping nodes to boot IDs.
+ nodes = dict([(name, (bootid, offline))
+ for name, bootid, offline in all_results[1]])
- """
- result = client.QueryNodes([], ["name", "bootid"])
- return dict([(name, bootid) for name, bootid in result])
+ client.ArchiveJob(job_id)
+
+ return instances, nodes, smap
class Watcher(object):
to restart machines that are down.
"""
- def __init__(self):
- sstore = ssconf.SimpleStore()
- master = sstore.GetMasterNode()
+ def __init__(self, opts, notepad):
+ self.notepad = notepad
+ master = client.QueryConfigValues(["master_node"])[0]
if master != utils.HostInfo().name:
raise NotMasterError("This is not the master node")
- self.instances = GetInstanceList()
- self.bootids = GetNodeBootIDs()
+ # first archive old jobs
+ self.ArchiveJobs(opts.job_age)
+ # and only then submit new ones
+ self.instances, self.bootids, self.smap = GetClusterData()
self.started_instances = set()
+ self.opts = opts
def Run(self):
- notepad = WatcherState()
- try:
- self.CheckInstances(notepad)
- self.CheckDisks(notepad)
- self.VerifyDisks()
- finally:
- notepad.Save()
+ """Watcher run sequence.
+
+ """
+ notepad = self.notepad
+ self.CheckInstances(notepad)
+ self.CheckDisks(notepad)
+ self.VerifyDisks()
+
+ @staticmethod
+ def ArchiveJobs(age):
+ """Archive old jobs.
+
+ """
+ arch_count, left_count = client.AutoArchiveJobs(age)
+ logging.debug("Archived %s jobs, left %s", arch_count, left_count)
def CheckDisks(self, notepad):
"""Check all nodes for restarted ones.
"""
check_nodes = []
- for name, new_id in self.bootids.iteritems():
+ for name, (new_id, offline) in self.bootids.iteritems():
old = notepad.GetNodeBootID(name)
if new_id is None:
# Bad node, not returning a boot id
- logging.debug("Node %s missing boot id, skipping secondary checks",
- name)
+ if not offline:
+ logging.debug("Node %s missing boot id, skipping secondary checks",
+ name)
continue
if old != new_id:
# Node's boot ID has changed, proably through a reboot.
if check_nodes:
# Activate disks for all instances with any of the checked nodes as a
# secondary node.
- for instance in GetInstanceList(with_secondaries=check_nodes):
- if not instance.autostart:
- logging.info(("Skipping disk activation for non-autostart"
- " instance %s"), instance.name)
+ for node in check_nodes:
+ if node not in self.smap:
continue
- if instance.name in self.started_instances:
- # we already tried to start the instance, which should have
- # activated its drives (if they can be at all)
- continue
- try:
- logging.info("Activating disks for instance %s", instance.name)
- instance.ActivateDisks()
- except Exception:
- logging.exception("Error while activating disks for instance %s",
- instance.name)
+ for instance_name in self.smap[node]:
+ instance = self.instances[instance_name]
+ if not instance.autostart:
+ logging.info(("Skipping disk activation for non-autostart"
+ " instance %s"), instance.name)
+ continue
+ if instance.name in self.started_instances:
+ # we already tried to start the instance, which should have
+ # activated its drives (if they can be at all)
+ continue
+ try:
+ logging.info("Activating disks for instance %s", instance.name)
+ instance.ActivateDisks()
+ except Exception: # pylint: disable-msg=W0703
+ logging.exception("Error while activating disks for instance %s",
+ instance.name)
# Keep changed boot IDs
for name in check_nodes:
- notepad.SetNodeBootID(name, self.bootids[name])
+ notepad.SetNodeBootID(name, self.bootids[name][0])
def CheckInstances(self, notepad):
"""Make a pass over the list of instances, restarting downed ones.
"""
- for instance in self.instances:
+ for instance in self.instances.values():
if instance.state in BAD_STATES:
n = notepad.NumberOfRestartAttempts(instance)
instance.name, last)
instance.Restart()
self.started_instances.add(instance.name)
- except Exception:
- logging.exception("Erro while restarting instance %s", instance.name)
+ except Exception: # pylint: disable-msg=W0703
+ logging.exception("Error while restarting instance %s",
+ instance.name)
notepad.RecordRestartAttempt(instance)
elif instance.state in HELPLESS_STATES:
"""
op = opcodes.OpVerifyDisks()
- result = cli.SubmitOpCode(op, cl=client)
+ job_id = client.SubmitJob([op])
+ result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+ client.ArchiveJob(job_id)
if not isinstance(result, (tuple, list)):
logging.error("Can't get a valid result from verify-disks")
return
# nothing to do
return
logging.debug("Will activate disks for instances %s",
- ", ".join(offline_disk_instances))
+ utils.CommaJoin(offline_disk_instances))
# we submit only one job, and wait for it. not optimal, but spams
# less the job queue
job = [opcodes.OpActivateInstanceDisks(instance_name=name)
def ParseOptions():
"""Parse the command line options.
- Returns:
- (options, args) as from OptionParser.parse_args()
+ @return: (options, args) as from OptionParser.parse_args()
"""
parser = OptionParser(description="Ganeti cluster watcher",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
- parser.add_option("-d", "--debug", dest="debug",
- help="Write all messages to stderr",
- default=False, action="store_true")
+ parser.add_option(cli.DEBUG_OPT)
+ parser.add_option("-A", "--job-age", dest="job_age",
+ help="Autoarchive jobs older than this age (default"
+ " 6 hours)", default=6*3600)
options, args = parser.parse_args()
+ options.job_age = cli.ParseTimespec(options.job_age)
return options, args
"""Main function.
"""
- global client
+ global client # pylint: disable-msg=W0603
options, args = ParseOptions()
- logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
- stderr_logging=options.debug)
+ if args: # watcher doesn't take any arguments
+ print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
+ sys.exit(constants.EXIT_FAILURE)
+
+ utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+ stderr_logging=options.debug)
+
+ if ShouldPause():
+ logging.debug("Pause has been set, exiting")
+ sys.exit(constants.EXIT_SUCCESS)
+ update_file = False
try:
- client = cli.GetClient()
+ # on master or not, try to start the node dameon
+ EnsureDaemon(constants.NODED)
+ notepad = WatcherState()
try:
- watcher = Watcher()
- except errors.ConfigurationError:
- # Just exit if there's no configuration
- sys.exit(constants.EXIT_SUCCESS)
+ try:
+ client = cli.GetClient()
+ except errors.OpPrereqError:
+ # this is, from cli.GetClient, a not-master case
+ logging.debug("Not on master, exiting")
+ update_file = True
+ sys.exit(constants.EXIT_SUCCESS)
+ except luxi.NoMasterError, err:
+ logging.warning("Master seems to be down (%s), trying to restart",
+ str(err))
+ if not EnsureDaemon(constants.MASTERD):
+ logging.critical("Can't start the master, exiting")
+ sys.exit(constants.EXIT_FAILURE)
+ # else retry the connection
+ client = cli.GetClient()
+
+ # we are on master now
+ EnsureDaemon(constants.RAPI)
+
+ try:
+ watcher = Watcher(options, notepad)
+ except errors.ConfigurationError:
+ # Just exit if there's no configuration
+ update_file = True
+ sys.exit(constants.EXIT_SUCCESS)
+
+ watcher.Run()
+ update_file = True
- watcher.Run()
+ finally:
+ if update_file:
+ notepad.Save()
+ else:
+ logging.debug("Not updating status file due to failure")
except SystemExit:
raise
except NotMasterError:
except errors.ResolverError, err:
logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
sys.exit(constants.EXIT_NODESETUP_ERROR)
+ except errors.JobQueueFull:
+ logging.error("Job queue is full, can't query cluster state")
+ except errors.JobQueueDrainError:
+ logging.error("Job queue is drained, can't maintain cluster state")
except Exception, err:
logging.error(str(err), exc_info=True)
sys.exit(constants.EXIT_FAILURE)