#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot. Run from cron or similar.
-"""
-
-LOGFILE = '/var/log/ganeti/watcher.log'
-MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
+"""
import os
import sys
import time
-import fcntl
-import errno
-import socket
+import logging
from optparse import OptionParser
-
from ganeti import utils
from ganeti import constants
-from ganeti import ssconf
+from ganeti import serializer
+from ganeti import errors
+from ganeti import opcodes
+from ganeti import cli
+from ganeti import luxi
+
+
+MAXTRIES = 5
+BAD_STATES = ['ERROR_down']
+HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
+NOTICE = 'NOTICE'
+ERROR = 'ERROR'
+KEY_RESTART_COUNT = "restart_count"
+KEY_RESTART_WHEN = "restart_when"
+KEY_BOOT_ID = "bootid"
-class Error(Exception):
- """Generic custom error class."""
+# Global client object
+client = None
-class NotMasterError(Error):
+class NotMasterError(errors.GenericError):
"""Exception raised when this host is not the master."""
def Indent(s, prefix='| '):
"""Indent a piece of text with a given prefix before each line.
- Args:
- s: The string to indent
- prefix: The string to prepend each line.
+ @param s: the string to indent
+ @param prefix: the string to prepend each line
"""
return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
-def DoCmd(cmd):
- """Run a shell command.
-
- Args:
- cmd: the command to run.
-
- Raises CommandError with verbose commentary on error.
+def StartMaster():
+ """Try to start the master daemon.
"""
- res = utils.RunCmd(cmd)
-
- if res.failed:
- raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
- (repr(cmd),
- Indent(res.fail_reason),
- Indent(res.stdout),
- Indent(res.stderr)))
+ result = utils.RunCmd(['ganeti-masterd'])
+ if result.failed:
+ logging.error("Can't start the master daemon: output '%s'", result.output)
+ return not result.failed
- return res
-
-class RestarterState(object):
+class WatcherState(object):
"""Interface to a state file recording restart attempts.
- Methods:
- Open(): open, lock, read and parse the file.
- Raises StandardError on lock contention.
-
- NumberOfAttempts(name): returns the number of times in succession
- a restart has been attempted of the named instance.
-
- RecordAttempt(name, when): records one restart attempt of name at
- time in when.
-
- Remove(name): remove record given by name, if exists.
-
- Save(name): saves all records to file, releases lock and closes file.
-
"""
def __init__(self):
+ """Open, lock, read and parse the file.
+
+ Raises exception on lock contention.
+
+ """
# The two-step dance below is necessary to allow both opening existing
# file read/write and creating if not existing. Vanilla open will truncate
# an existing file -or- allow creating if not existing.
- f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
- f = os.fdopen(f, 'w+')
+ fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
+ self.statefile = os.fdopen(fd, 'w+')
+
+ utils.LockFile(self.statefile.fileno())
try:
- fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
- except IOError, x:
- if x.errno == errno.EAGAIN:
- raise StandardError("State file already locked")
- raise
+ state_data = self.statefile.read()
+ if not state_data:
+ self._data = {}
+ else:
+ self._data = serializer.Load(state_data)
+ except Exception, msg:
+ # Ignore errors while loading the file and treat it as empty
+ self._data = {}
+ logging.warning(("Invalid state file. Using defaults."
+ " Error message: %s"), msg)
- self.statefile = f
- self.inst_map = {}
+ if "instance" not in self._data:
+ self._data["instance"] = {}
+ if "node" not in self._data:
+ self._data["node"] = {}
- for line in f:
- name, when, count = line.rstrip().split(':')
+ self._orig_data = serializer.Dump(self._data)
- when = int(when)
- count = int(count)
+ def Save(self):
+ """Save state to file, then unlock and close it.
- self.inst_map[name] = (when, count)
+ """
+ assert self.statefile
- def NumberOfAttempts(self, instance):
- """Returns number of previous restart attempts.
+ serialized_form = serializer.Dump(self._data)
+ if self._orig_data == serialized_form:
+ logging.debug("Data didn't change, just touching status file")
+ os.utime(constants.WATCHER_STATEFILE, None)
+ return
- Args:
- instance - the instance to look up.
+ # We need to make sure the file is locked before renaming it, otherwise
+ # starting ganeti-watcher again at the same time will create a conflict.
+ fd = utils.WriteFile(constants.WATCHER_STATEFILE,
+ data=serialized_form,
+ prewrite=utils.LockFile, close=False)
+ self.statefile = os.fdopen(fd, 'w+')
+
+ def Close(self):
+ """Unlock configuration file and close it.
"""
assert self.statefile
- if instance.name in self.inst_map:
- return self.inst_map[instance.name][1]
+ # Files are automatically unlocked when closing them
+ self.statefile.close()
+ self.statefile = None
- return 0
+ def GetNodeBootID(self, name):
+ """Returns the last boot ID of a node or None.
- def RecordAttempt(self, instance):
- """Record a restart attempt.
+ """
+ ndata = self._data["node"]
+
+ if name in ndata and KEY_BOOT_ID in ndata[name]:
+ return ndata[name][KEY_BOOT_ID]
+ return None
- Args:
- instance - the instance being restarted
+ def SetNodeBootID(self, name, bootid):
+ """Sets the boot ID of a node.
"""
- assert self.statefile
+ assert bootid
- when = time.time()
+ ndata = self._data["node"]
- self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance))
+ if name not in ndata:
+ ndata[name] = {}
- def Remove(self, instance):
- """Update state to reflect that a machine is running, i.e. remove record.
+ ndata[name][KEY_BOOT_ID] = bootid
- Args:
- instance - the instance to remove from books
+ def NumberOfRestartAttempts(self, instance):
+ """Returns number of previous restart attempts.
- This method removes the record for a named instance.
+ @type instance: L{Instance}
+ @param instance: the instance to look up
"""
- assert self.statefile
+ idata = self._data["instance"]
- if instance.name in self.inst_map:
- del self.inst_map[instance.name]
+ if instance.name in idata:
+ return idata[instance.name][KEY_RESTART_COUNT]
- def Save(self):
- """Save records to file, then unlock and close file.
+ return 0
+
+ def RecordRestartAttempt(self, instance):
+ """Record a restart attempt.
+
+ @type instance: L{Instance}
+ @param instance: the instance being restarted
"""
- assert self.statefile
+ idata = self._data["instance"]
- self.statefile.seek(0)
- self.statefile.truncate()
+ if instance.name not in idata:
+ inst = idata[instance.name] = {}
+ else:
+ inst = idata[instance.name]
- for name in self.inst_map:
- print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name])
+ inst[KEY_RESTART_WHEN] = time.time()
+ inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
- fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
+ def RemoveInstance(self, instance):
+ """Update state to reflect that a machine is running.
- self.statefile.close()
- self.statefile = None
+ This method removes the record for a named instance (as we only
+ track down instances).
+
+ @type instance: L{Instance}
+ @param instance: the instance to remove from books
+
+ """
+ idata = self._data["instance"]
+
+ if instance.name in idata:
+ del idata[instance.name]
class Instance(object):
"""Abstraction for a Virtual Machine instance.
- Methods:
- Restart(): issue a command to restart the represented machine.
-
"""
- def __init__(self, name, state):
+ def __init__(self, name, state, autostart):
self.name = name
self.state = state
+ self.autostart = autostart
def Restart(self):
"""Encapsulates the start of an instance.
- This is currently done using the command line interface and not
- the Ganeti modules.
+ """
+ op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
+ cli.SubmitOpCode(op, cl=client)
+
+ def ActivateDisks(self):
+ """Encapsulates the activation of all disks of an instance.
"""
- DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
+ op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
+ cli.SubmitOpCode(op, cl=client)
-class InstanceList(object):
- """The set of Virtual Machine instances on a cluster.
+def GetClusterData():
+ """Get a list of instances on this cluster.
"""
- cmd = ['gnt-instance', 'list', '--lock-retries=15',
- '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:']
+ op1_fields = ["name", "status", "admin_state", "snodes"]
+ op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+ use_locking=True)
+ op2_fields = ["name", "bootid", "offline"]
+ op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+ use_locking=True)
- def __init__(self):
- res = DoCmd(self.cmd)
+ job_id = client.SubmitJob([op1, op2])
- lines = res.stdout.splitlines()
+ all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
- self.instances = []
- for line in lines:
- fields = [fld.strip() for fld in line.split(':')]
+ result = all_results[0]
+ smap = {}
- if len(fields) != 3:
- continue
- if fields[1] == "no": #no autostart, we don't care about this instance
- continue
- name, status = fields[0], fields[2]
+ instances = {}
+ for fields in result:
+ (name, status, autostart, snodes) = fields
- self.instances.append(Instance(name, status))
+ # update the secondary node map
+ for node in snodes:
+ if node not in smap:
+ smap[node] = []
+ smap[node].append(name)
- def __iter__(self):
- return self.instances.__iter__()
+ instances[name] = Instance(name, status, autostart)
+ nodes = dict([(name, (bootid, offline))
+ for name, bootid, offline in all_results[1]])
-class Message(object):
- """Encapsulation of a notice or error message.
+ client.ArchiveJob(job_id)
- """
- def __init__(self, level, msg):
- self.level = level
- self.msg = msg
- self.when = time.time()
+ return instances, nodes, smap
- def __str__(self):
- return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
-
-class Restarter(object):
+class Watcher(object):
"""Encapsulate the logic for restarting erronously halted virtual machines.
The calling program should periodically instantiate me and call Run().
to restart machines that are down.
"""
- def __init__(self):
- sstore = ssconf.SimpleStore()
- master = sstore.GetMasterNode()
- if master != socket.gethostname():
+ def __init__(self, opts, notepad):
+ self.notepad = notepad
+ master = client.QueryConfigValues(["master_node"])[0]
+ if master != utils.HostInfo().name:
raise NotMasterError("This is not the master node")
- self.instances = InstanceList()
- self.messages = []
+ self.instances, self.bootids, self.smap = GetClusterData()
+ self.started_instances = set()
+ self.opts = opts
def Run(self):
- """Make a pass over the list of instances, restarting downed ones.
+ """Watcher run sequence.
+
+ """
+ notepad = self.notepad
+ self.ArchiveJobs(self.opts.job_age)
+ self.CheckInstances(notepad)
+ self.CheckDisks(notepad)
+ self.VerifyDisks()
+
+ def ArchiveJobs(self, age):
+ """Archive old jobs.
"""
- notepad = RestarterState()
+ arch_count, left_count = client.AutoArchiveJobs(age)
+ logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
+
+ def CheckDisks(self, notepad):
+ """Check all nodes for restarted ones.
- for instance in self.instances:
+ """
+ check_nodes = []
+ for name, (new_id, offline) in self.bootids.iteritems():
+ old = notepad.GetNodeBootID(name)
+ if new_id is None:
+ # Bad node, not returning a boot id
+ if not offline:
+ logging.debug("Node %s missing boot id, skipping secondary checks",
+ name)
+ continue
+ if old != new_id:
+ # Node's boot ID has changed, proably through a reboot.
+ check_nodes.append(name)
+
+ if check_nodes:
+ # Activate disks for all instances with any of the checked nodes as a
+ # secondary node.
+ for node in check_nodes:
+ if node not in self.smap:
+ continue
+ for instance_name in self.smap[node]:
+ instance = self.instances[instance_name]
+ if not instance.autostart:
+ logging.info(("Skipping disk activation for non-autostart"
+ " instance %s"), instance.name)
+ continue
+ if instance.name in self.started_instances:
+ # we already tried to start the instance, which should have
+ # activated its drives (if they can be at all)
+ continue
+ try:
+ logging.info("Activating disks for instance %s", instance.name)
+ instance.ActivateDisks()
+ except Exception:
+ logging.exception("Error while activating disks for instance %s",
+ instance.name)
+
+ # Keep changed boot IDs
+ for name in check_nodes:
+ notepad.SetNodeBootID(name, self.bootids[name][0])
+
+ def CheckInstances(self, notepad):
+ """Make a pass over the list of instances, restarting downed ones.
+
+ """
+ for instance in self.instances.values():
if instance.state in BAD_STATES:
- n = notepad.NumberOfAttempts(instance)
+ n = notepad.NumberOfRestartAttempts(instance)
if n > MAXTRIES:
# stay quiet.
elif n < MAXTRIES:
last = " (Attempt #%d)" % (n + 1)
else:
- notepad.RecordAttempt(instance)
- self.messages.append(Message(ERROR, "Could not restart %s for %d"
- " times, giving up..." %
- (instance.name, MAXTRIES)))
+ notepad.RecordRestartAttempt(instance)
+ logging.error("Could not restart %s after %d attempts, giving up",
+ instance.name, MAXTRIES)
continue
try:
- self.messages.append(Message(NOTICE,
- "Restarting %s%s." %
- (instance.name, last)))
+ logging.info("Restarting %s%s",
+ instance.name, last)
instance.Restart()
- except Error, x:
- self.messages.append(Message(ERROR, str(x)))
+ self.started_instances.add(instance.name)
+ except Exception:
+ logging.exception("Error while restarting instance %s",
+ instance.name)
- notepad.RecordAttempt(instance)
+ notepad.RecordRestartAttempt(instance)
elif instance.state in HELPLESS_STATES:
- if notepad.NumberOfAttempts(instance):
- notepad.Remove(instance)
+ if notepad.NumberOfRestartAttempts(instance):
+ notepad.RemoveInstance(instance)
else:
- if notepad.NumberOfAttempts(instance):
- notepad.Remove(instance)
- msg = Message(NOTICE,
- "Restart of %s succeeded." % instance.name)
- self.messages.append(msg)
-
- notepad.Save()
-
- def WriteReport(self, logfile):
- """Log all messages to file.
+ if notepad.NumberOfRestartAttempts(instance):
+ notepad.RemoveInstance(instance)
+ logging.info("Restart of %s succeeded", instance.name)
- Args:
- logfile: file object open for writing (the log file)
+ @staticmethod
+ def VerifyDisks():
+ """Run gnt-cluster verify-disks.
"""
- for msg in self.messages:
- print >> logfile, str(msg)
+ op = opcodes.OpVerifyDisks()
+ job_id = client.SubmitJob([op])
+ result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+ client.ArchiveJob(job_id)
+ if not isinstance(result, (tuple, list)):
+ logging.error("Can't get a valid result from verify-disks")
+ return
+ offline_disk_instances = result[2]
+ if not offline_disk_instances:
+ # nothing to do
+ return
+ logging.debug("Will activate disks for instances %s",
+ ", ".join(offline_disk_instances))
+ # we submit only one job, and wait for it. not optimal, but spams
+ # less the job queue
+ job = [opcodes.OpActivateInstanceDisks(instance_name=name)
+ for name in offline_disk_instances]
+ job_id = cli.SendJob(job, cl=client)
+
+ cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
def ParseOptions():
"""Parse the command line options.
- Returns:
- (options, args) as from OptionParser.parse_args()
+ @return: (options, args) as from OptionParser.parse_args()
"""
parser = OptionParser(description="Ganeti cluster watcher",
constants.RELEASE_VERSION)
parser.add_option("-d", "--debug", dest="debug",
- help="Don't redirect messages to the log file",
+ help="Write all messages to stderr",
default=False, action="store_true")
+ parser.add_option("-A", "--job-age", dest="job_age",
+ help="Autoarchive jobs older than this age (default"
+ " 6 hours)", default=6*3600)
options, args = parser.parse_args()
+ options.job_age = cli.ParseTimespec(options.job_age)
return options, args
"""Main function.
"""
+ global client
+
options, args = ParseOptions()
- if not options.debug:
- sys.stderr = sys.stdout = open(LOGFILE, 'a')
+ utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
+ stderr_logging=options.debug)
+ update_file = True
try:
- restarter = Restarter()
- restarter.Run()
- restarter.WriteReport(sys.stdout)
+ notepad = WatcherState()
+ try:
+ try:
+ client = cli.GetClient()
+ except errors.OpPrereqError:
+ # this is, from cli.GetClient, a not-master case
+ logging.debug("Not on master, exiting")
+ sys.exit(constants.EXIT_SUCCESS)
+ except luxi.NoMasterError, err:
+ logging.warning("Master seems to be down (%s), trying to restart",
+ str(err))
+ if not StartMaster():
+ logging.critical("Can't start the master, exiting")
+ update_file = False
+ sys.exit(constants.EXIT_FAILURE)
+ # else retry the connection
+ client = cli.GetClient()
+
+ try:
+ watcher = Watcher(options, notepad)
+ except errors.ConfigurationError:
+ # Just exit if there's no configuration
+ sys.exit(constants.EXIT_SUCCESS)
+
+ watcher.Run()
+ finally:
+ if update_file:
+ notepad.Save()
+ else:
+ logging.debug("Not updating status file due to failure")
+ except SystemExit:
+ raise
except NotMasterError:
- if options.debug:
- sys.stderr.write("Not master, exiting.\n")
+ logging.debug("Not master, exiting")
sys.exit(constants.EXIT_NOTMASTER)
- except Error, err:
- print err
+ except errors.ResolverError, err:
+ logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
+ sys.exit(constants.EXIT_NODESETUP_ERROR)
+ except Exception, err:
+ logging.error(str(err), exc_info=True)
+ sys.exit(constants.EXIT_FAILURE)
+
if __name__ == '__main__':
main()