X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/78f3bd300c38be60754df2d98b991f4613d073b4..aad81f98ca9fca86aeb5292ae87dca300fc6b83a:/daemons/ganeti-watcher diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher index f16d56f..074fc3b 100755 --- a/daemons/ganeti-watcher +++ b/daemons/ganeti-watcher @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -29,7 +29,6 @@ by a node reboot. Run from cron or similar. import os import sys -import re import time import fcntl import errno @@ -41,6 +40,9 @@ from ganeti import constants from ganeti import serializer from ganeti import ssconf from ganeti import errors +from ganeti import opcodes +from ganeti import logger +from ganeti import cli MAXTRIES = 5 @@ -53,11 +55,11 @@ KEY_RESTART_WHEN = "restart_when" KEY_BOOT_ID = "bootid" -class Error(Exception): - """Generic custom error class.""" +# Global client object +client = None -class NotMasterError(Error): +class NotMasterError(errors.GenericError): """Exception raised when this host is not the master.""" @@ -84,11 +86,12 @@ def DoCmd(cmd): res = utils.RunCmd(cmd) if res.failed: - raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" % - (repr(cmd), - Indent(res.fail_reason), - Indent(res.stdout), - Indent(res.stderr))) + msg = ("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" % + (repr(cmd), + Indent(res.fail_reason), + Indent(res.stdout), + Indent(res.stderr))) + raise errors.CommandError(msg) return res @@ -100,52 +103,58 @@ class WatcherState(object): def __init__(self): """Open, lock, read and parse the file. - Raises StandardError on lock contention. + Raises exception on lock contention. """ # The two-step dance below is necessary to allow both opening existing # file read/write and creating if not existing. Vanilla open will truncate # an existing file -or- allow creating if not existing. - f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT) - f = os.fdopen(f, 'w+') + fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT) + self.statefile = os.fdopen(fd, 'w+') - try: - fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB) - except IOError, x: - if x.errno == errno.EAGAIN: - raise StandardError("State file already locked") - raise - - self.statefile = f + utils.LockFile(self.statefile.fileno()) try: - self.data = serializer.Load(self.statefile.read()) + self._data = serializer.Load(self.statefile.read()) except Exception, msg: # Ignore errors while loading the file and treat it as empty - self.data = {} + self._data = {} logging.warning(("Empty or invalid state file. Using defaults." " Error message: %s"), msg) - if "instance" not in self.data: - self.data["instance"] = {} - if "node" not in self.data: - self.data["node"] = {} + if "instance" not in self._data: + self._data["instance"] = {} + if "node" not in self._data: + self._data["node"] = {} + + self._orig_data = serializer.Dump(self._data) - def __del__(self): - """Called on destruction. + def Save(self): + """Save state to file, then unlock and close it. """ - if self.statefile: - self._Close() + assert self.statefile + + serialized_form = serializer.Dump(self._data) + if self._orig_data == serialized_form: + logging.debug("Data didn't change, just touching status file") + os.utime(constants.WATCHER_STATEFILE, None) + return - def _Close(self): + # We need to make sure the file is locked before renaming it, otherwise + # starting ganeti-watcher again at the same time will create a conflict. + fd = utils.WriteFile(constants.WATCHER_STATEFILE, + data=serialized_form, + prewrite=utils.LockFile, close=False) + self.statefile = os.fdopen(fd, 'w+') + + def Close(self): """Unlock configuration file and close it. """ assert self.statefile - fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN) - + # Files are automatically unlocked when closing them self.statefile.close() self.statefile = None @@ -153,7 +162,7 @@ class WatcherState(object): """Returns the last boot ID of a node or None. """ - ndata = self.data["node"] + ndata = self._data["node"] if name in ndata and KEY_BOOT_ID in ndata[name]: return ndata[name][KEY_BOOT_ID] @@ -165,7 +174,7 @@ class WatcherState(object): """ assert bootid - ndata = self.data["node"] + ndata = self._data["node"] if name not in ndata: ndata[name] = {} @@ -179,7 +188,7 @@ class WatcherState(object): instance - the instance to look up. """ - idata = self.data["instance"] + idata = self._data["instance"] if instance.name in idata: return idata[instance.name][KEY_RESTART_COUNT] @@ -193,7 +202,7 @@ class WatcherState(object): instance - the instance being restarted """ - idata = self.data["instance"] + idata = self._data["instance"] if instance.name not in idata: inst = idata[instance.name] = {} @@ -212,24 +221,11 @@ class WatcherState(object): This method removes the record for a named instance. """ - idata = self.data["instance"] + idata = self._data["instance"] if instance.name in idata: del idata[instance.name] - def Save(self): - """Save state to file, then unlock and close it. - - """ - assert self.statefile - - self.statefile.seek(0) - self.statefile.truncate() - - self.statefile.write(serializer.Dump(self.data)) - - self._Close() - class Instance(object): """Abstraction for a Virtual Machine instance. @@ -247,48 +243,40 @@ class Instance(object): """Encapsulates the start of an instance. """ - DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name]) + op = opcodes.OpStartupInstance(instance_name=self.name, + force=False, + extra_args=None) + cli.SubmitOpCode(op, cl=client) def ActivateDisks(self): """Encapsulates the activation of all disks of an instance. """ - DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name]) - - -def _RunListCmd(cmd): - """Runs a command and parses its output into lists. - - """ - for line in DoCmd(cmd).stdout.splitlines(): - yield line.split(':') + op = opcodes.OpActivateInstanceDisks(instance_name=self.name) + cli.SubmitOpCode(op, cl=client) def GetInstanceList(with_secondaries=None): """Get a list of instances on this cluster. """ - cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers', - '--separator=:'] - - fields = 'name,oper_state,admin_state' + fields = ["name", "oper_state", "admin_state"] if with_secondaries is not None: - fields += ',snodes' + fields.append("snodes") - cmd.append('-o') - cmd.append(fields) + result = client.QueryInstances([], fields) instances = [] - for fields in _RunListCmd(cmd): + for fields in result: if with_secondaries is not None: (name, status, autostart, snodes) = fields - if snodes == "-": + if not snodes: continue for node in with_secondaries: - if node in snodes.split(','): + if node in snodes: break else: continue @@ -296,7 +284,7 @@ def GetInstanceList(with_secondaries=None): else: (name, status, autostart) = fields - instances.append(Instance(name, status, autostart != "no")) + instances.append(Instance(name, status, autostart)) return instances @@ -305,15 +293,8 @@ def GetNodeBootIDs(): """Get a dict mapping nodes to boot IDs. """ - cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers', - '--separator=:', '-o', 'name,bootid'] - - ids = {} - for fields in _RunListCmd(cmd): - (name, bootid) = fields - ids[name] = bootid - - return ids + result = client.QueryNodes([], ["name", "bootid"]) + return dict([(name, bootid) for name, bootid in result]) class Watcher(object): @@ -347,9 +328,9 @@ class Watcher(object): """ check_nodes = [] - for name, id in self.bootids.iteritems(): + for name, new_id in self.bootids.iteritems(): old = notepad.GetNodeBootID(name) - if old != id: + if old != new_id: # Node's boot ID has changed, proably through a reboot. check_nodes.append(name) @@ -368,7 +349,7 @@ class Watcher(object): try: logging.info("Activating disks for instance %s", instance.name) instance.ActivateDisks() - except Error, err: + except Exception, err: logging.error(str(err), exc_info=True) # Keep changed boot IDs @@ -402,7 +383,7 @@ class Watcher(object): instance.name, last) instance.Restart() self.started_instances.add(instance.name) - except Error, err: + except Exception, err: logging.error(str(err), exc_info=True) notepad.RecordRestartAttempt(instance) @@ -418,7 +399,8 @@ class Watcher(object): """Run gnt-cluster verify-disks. """ - result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15']) + # TODO: What should we do here? + result = DoCmd(['gnt-cluster', 'verify-disks']) if result.output: logging.info(result.output) @@ -442,44 +424,28 @@ def ParseOptions(): return options, args -def SetupLogging(debug): - """Configures the logging module. - - """ - formatter = logging.Formatter("%(asctime)s: %(message)s") - - logfile_handler = logging.FileHandler(constants.LOG_WATCHER) - logfile_handler.setFormatter(formatter) - logfile_handler.setLevel(logging.INFO) - - stderr_handler = logging.StreamHandler() - stderr_handler.setFormatter(formatter) - if debug: - stderr_handler.setLevel(logging.NOTSET) - else: - stderr_handler.setLevel(logging.CRITICAL) - - root_logger = logging.getLogger("") - root_logger.setLevel(logging.NOTSET) - root_logger.addHandler(logfile_handler) - root_logger.addHandler(stderr_handler) - - def main(): """Main function. """ + global client + options, args = ParseOptions() - SetupLogging(options.debug) + logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug) try: + client = cli.GetClient() + try: watcher = Watcher() except errors.ConfigurationError: # Just exit if there's no configuration sys.exit(constants.EXIT_SUCCESS) + watcher.Run() + except SystemExit: + raise except NotMasterError: logging.debug("Not master, exiting") sys.exit(constants.EXIT_NOTMASTER)