4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
53 import ganeti.rapi.client # pylint: disable=W0611
54 from ganeti.rapi.client import UsesRapiClient
56 from ganeti.watcher import nodemaint
57 from ganeti.watcher import state
61 BAD_STATES = frozenset([
62 constants.INSTST_ERRORDOWN,
64 HELPLESS_STATES = frozenset([
65 constants.INSTST_NODEDOWN,
66 constants.INSTST_NODEOFFLINE,
71 #: Number of seconds to wait between starting child processes for node groups
72 CHILD_PROCESS_DELAY = 1.0
74 #: How many seconds to wait for instance status file lock
75 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
78 class NotMasterError(errors.GenericError):
79 """Exception raised when this host is not the master."""
83 """Check whether we should pause.
86 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
89 def StartNodeDaemons():
90 """Start all the daemons that should be running on all nodes.
93 # on master or not, try to start the node daemon
94 utils.EnsureDaemon(constants.NODED)
95 # start confd as well. On non candidates it will be in disabled mode.
96 if constants.ENABLE_CONFD:
97 utils.EnsureDaemon(constants.CONFD)
100 def RunWatcherHooks():
101 """Run the watcher hooks.
104 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
105 constants.HOOKS_NAME_WATCHER)
106 if not os.path.isdir(hooks_dir):
110 results = utils.RunParts(hooks_dir)
111 except Exception, err: # pylint: disable=W0703
112 logging.exception("RunParts %s failed: %s", hooks_dir, err)
115 for (relname, status, runresult) in results:
116 if status == constants.RUNPARTS_SKIP:
117 logging.debug("Watcher hook %s: skipped", relname)
118 elif status == constants.RUNPARTS_ERR:
119 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
120 elif status == constants.RUNPARTS_RUN:
122 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
123 relname, runresult.exit_code, runresult.output)
125 logging.debug("Watcher hook %s: success (output: %s)", relname,
128 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
132 class Instance(object):
133 """Abstraction for a Virtual Machine instance.
136 def __init__(self, name, status, autostart, snodes):
139 self.autostart = autostart
142 def Restart(self, cl):
143 """Encapsulates the start of an instance.
146 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
147 cli.SubmitOpCode(op, cl=cl)
149 def ActivateDisks(self, cl):
150 """Encapsulates the activation of all disks of an instance.
153 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
154 cli.SubmitOpCode(op, cl=cl)
158 """Data container representing cluster node.
161 def __init__(self, name, bootid, offline, secondaries):
162 """Initializes this class.
167 self.offline = offline
168 self.secondaries = secondaries
171 def _CheckInstances(cl, notepad, instances):
172 """Make a pass over the list of instances, restarting downed ones.
175 notepad.MaintainInstanceList(instances.keys())
179 for inst in instances.values():
180 if inst.status in BAD_STATES:
181 n = notepad.NumberOfRestartAttempts(inst.name)
184 logging.warning("Not restarting instance '%s', retries exhausted",
189 notepad.RecordRestartAttempt(inst.name)
190 logging.error("Could not restart instance '%s' after %s attempts,"
191 " giving up", inst.name, MAXTRIES)
195 logging.info("Restarting instance '%s' (attempt #%s)",
198 except Exception: # pylint: disable=W0703
199 logging.exception("Error while restarting instance '%s'", inst.name)
201 started.add(inst.name)
203 notepad.RecordRestartAttempt(inst.name)
206 if notepad.NumberOfRestartAttempts(inst.name):
207 notepad.RemoveInstance(inst.name)
208 if inst.status not in HELPLESS_STATES:
209 logging.info("Restart of instance '%s' succeeded", inst.name)
214 def _CheckDisks(cl, notepad, nodes, instances, started):
215 """Check all nodes for restarted ones.
220 for node in nodes.values():
221 old = notepad.GetNodeBootID(node.name)
223 # Bad node, not returning a boot id
225 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
229 if old != node.bootid:
230 # Node's boot ID has changed, probably through a reboot
231 check_nodes.append(node)
234 # Activate disks for all instances with any of the checked nodes as a
236 for node in check_nodes:
237 for instance_name in node.secondaries:
239 inst = instances[instance_name]
241 logging.info("Can't find instance '%s', maybe it was ignored",
245 if not inst.autostart:
246 logging.info("Skipping disk activation for non-autostart"
247 " instance '%s'", inst.name)
250 if inst.name in started:
251 # we already tried to start the instance, which should have
252 # activated its drives (if they can be at all)
253 logging.debug("Skipping disk activation for instance '%s' as"
254 " it was already started", inst.name)
258 logging.info("Activating disks for instance '%s'", inst.name)
259 inst.ActivateDisks(cl)
260 except Exception: # pylint: disable=W0703
261 logging.exception("Error while activating disks for instance '%s'",
264 # Keep changed boot IDs
265 for node in check_nodes:
266 notepad.SetNodeBootID(node.name, node.bootid)
269 def _CheckForOfflineNodes(nodes, instance):
270 """Checks if given instances has any secondary in offline status.
272 @param instance: The instance object
273 @return: True if any of the secondary is offline, False otherwise
276 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
279 def _VerifyDisks(cl, uuid, nodes, instances):
280 """Run a per-group "gnt-cluster verify-disks".
283 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
284 ((_, offline_disk_instances, _), ) = \
285 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
286 cl.ArchiveJob(job_id)
288 if not offline_disk_instances:
290 logging.debug("Verify-disks reported no offline disks, nothing to do")
293 logging.debug("Will activate disks for instance(s) %s",
294 utils.CommaJoin(offline_disk_instances))
296 # We submit only one job, and wait for it. Not optimal, but this puts less
297 # load on the job queue.
299 for name in offline_disk_instances:
301 inst = instances[name]
303 logging.info("Can't find instance '%s', maybe it was ignored", name)
306 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
307 logging.info("Skipping instance '%s' because it is in a helpless state"
308 " or has offline secondaries", name)
311 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
314 job_id = cli.SendJob(job, cl=cl)
317 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
318 except Exception: # pylint: disable=W0703
319 logging.exception("Error while activating disks")
322 def IsRapiResponding(hostname):
323 """Connects to RAPI port and does a simple test.
325 Connects to RAPI port of hostname and does a simple test. At this time, the
328 @type hostname: string
329 @param hostname: hostname of the node to connect to.
331 @return: Whether RAPI is working properly
334 curl_config = rapi.client.GenericCurlConfig()
335 rapi_client = rapi.client.GanetiRapiClient(hostname,
336 curl_config_fn=curl_config)
338 master_version = rapi_client.GetVersion()
339 except rapi.client.CertificateError, err:
340 logging.warning("RAPI certificate error: %s", err)
342 except rapi.client.GanetiApiError, err:
343 logging.warning("RAPI error: %s", err)
346 logging.debug("Reported RAPI version %s", master_version)
347 return master_version == constants.RAPI_VERSION
351 """Parse the command line options.
353 @return: (options, args) as from OptionParser.parse_args()
356 parser = OptionParser(description="Ganeti cluster watcher",
358 version="%%prog (ganeti) %s" %
359 constants.RELEASE_VERSION)
361 parser.add_option(cli.DEBUG_OPT)
362 parser.add_option(cli.NODEGROUP_OPT)
363 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
364 help="Autoarchive jobs older than this age (default"
366 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
367 action="store_true", help="Ignore cluster pause setting")
368 parser.add_option("--wait-children", dest="wait_children",
369 action="store_true", help="Wait for child processes")
370 parser.add_option("--no-wait-children", dest="wait_children",
371 action="store_false",
372 help="Don't wait for child processes")
373 # See optparse documentation for why default values are not set by options
374 parser.set_defaults(wait_children=True)
375 options, args = parser.parse_args()
376 options.job_age = cli.ParseTimespec(options.job_age)
379 parser.error("No arguments expected")
381 return (options, args)
384 def _WriteInstanceStatus(filename, data):
385 """Writes the per-group instance status file.
387 The entries are sorted.
389 @type filename: string
390 @param filename: Path to instance status file
391 @type data: list of tuple; (instance name as string, status as string)
392 @param data: Instance name and status
395 logging.debug("Updating instance status file '%s' with %s instances",
398 utils.WriteFile(filename,
399 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
403 def _UpdateInstanceStatus(filename, instances):
404 """Writes an instance status file from L{Instance} objects.
406 @type filename: string
407 @param filename: Path to status file
408 @type instances: list of L{Instance}
411 _WriteInstanceStatus(filename, [(inst.name, inst.status)
412 for inst in instances])
415 def _ReadInstanceStatus(filename):
416 """Reads an instance status file.
418 @type filename: string
419 @param filename: Path to status file
420 @rtype: tuple; (None or number, list of lists containing instance name and
422 @return: File's mtime and instance status contained in the file; mtime is
423 C{None} if file can't be read
426 logging.debug("Reading per-group instance status from '%s'", filename)
428 statcb = utils.FileStatHelper()
430 content = utils.ReadFile(filename, preread=statcb)
431 except EnvironmentError, err:
432 if err.errno == errno.ENOENT:
433 logging.error("Can't read '%s', does not exist (yet)", filename)
435 logging.exception("Unable to read '%s', ignoring", filename)
438 return (statcb.st.st_mtime, [line.split(None, 1)
439 for line in content.splitlines()])
442 def _MergeInstanceStatus(filename, pergroup_filename, groups):
443 """Merges all per-group instance status files into a global one.
445 @type filename: string
446 @param filename: Path to global instance status file
447 @type pergroup_filename: string
448 @param pergroup_filename: Path to per-group status files, must contain "%s"
449 to be replaced with group UUID
450 @type groups: sequence
451 @param groups: UUIDs of known groups
454 # Lock global status file in exclusive mode
455 lock = utils.FileLock.Open(filename)
457 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
458 except errors.LockError, err:
459 # All per-group processes will lock and update the file. None of them
460 # should take longer than 10 seconds (the value of
461 # INSTANCE_STATUS_LOCK_TIMEOUT).
462 logging.error("Can't acquire lock on instance status file '%s', not"
463 " updating: %s", filename, err)
466 logging.debug("Acquired exclusive lock on '%s'", filename)
470 # Load instance status from all groups
471 for group_uuid in groups:
472 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
474 if mtime is not None:
475 for (instance_name, status) in instdata:
476 data.setdefault(instance_name, []).append((mtime, status))
478 # Select last update based on file mtime
479 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
480 for (instance_name, status) in data.items()]
482 # Write the global status file. Don't touch file after it's been
483 # updated--there is no lock anymore.
484 _WriteInstanceStatus(filename, inststatus)
487 def GetLuxiClient(try_restart):
488 """Tries to connect to the master daemon.
490 @type try_restart: bool
491 @param try_restart: Whether to attempt to restart the master daemon
495 return cli.GetClient()
496 except errors.OpPrereqError, err:
497 # this is, from cli.GetClient, a not-master case
498 raise NotMasterError("Not on master node (%s)" % err)
500 except luxi.NoMasterError, err:
504 logging.warning("Master daemon seems to be down (%s), trying to restart",
507 if not utils.EnsureDaemon(constants.MASTERD):
508 raise errors.GenericError("Can't start the master daemon")
510 # Retry the connection
511 return cli.GetClient()
514 def _StartGroupChildren(cl, wait):
515 """Starts a new instance of the watcher for every node group.
518 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
521 result = cl.QueryGroups([], ["name", "uuid"], False)
525 for (idx, (name, uuid)) in enumerate(result):
526 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
529 # Let's not kill the system
530 time.sleep(CHILD_PROCESS_DELAY)
532 logging.debug("Spawning child for group '%s' (%s), arguments %s",
536 # TODO: Should utils.StartDaemon be used instead?
537 pid = os.spawnv(os.P_NOWAIT, args[0], args)
538 except Exception: # pylint: disable=W0703
539 logging.exception("Failed to start child for group '%s' (%s)",
542 logging.debug("Started with PID %s", pid)
547 logging.debug("Waiting for child PID %s", pid)
549 result = utils.RetryOnSignal(os.waitpid, pid, 0)
550 except EnvironmentError, err:
553 logging.debug("Child PID %s exited with status %s", pid, result)
556 def _ArchiveJobs(cl, age):
557 """Archives old jobs.
560 (arch_count, left_count) = cl.AutoArchiveJobs(age)
561 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
564 def _CheckMaster(cl):
565 """Ensures current host is master node.
568 (master, ) = cl.QueryConfigValues(["master_node"])
569 if master != netutils.Hostname.GetSysName():
570 raise NotMasterError("This is not the master node")
574 def _GlobalWatcher(opts):
575 """Main function for global watcher.
577 At the end child processes are spawned for every node group.
583 # Run node maintenance in all cases, even if master, so that old masters can
584 # be properly cleaned up
585 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
586 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
589 client = GetLuxiClient(True)
590 except NotMasterError:
591 # Don't proceed on non-master nodes
592 return constants.EXIT_SUCCESS
594 # we are on master now
595 utils.EnsureDaemon(constants.RAPI)
597 # If RAPI isn't responding to queries, try one restart
598 logging.debug("Attempting to talk to remote API on %s",
599 constants.IP4_ADDRESS_LOCALHOST)
600 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
601 logging.warning("Couldn't get answer from remote API, restaring daemon")
602 utils.StopDaemon(constants.RAPI)
603 utils.EnsureDaemon(constants.RAPI)
604 logging.debug("Second attempt to talk to remote API")
605 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
606 logging.fatal("RAPI is not responding")
607 logging.debug("Successfully talked to remote API")
610 _ArchiveJobs(client, opts.job_age)
612 # Spawn child processes for all node groups
613 _StartGroupChildren(client, opts.wait_children)
615 return constants.EXIT_SUCCESS
618 def _GetGroupData(cl, uuid):
619 """Retrieves instances and nodes per node group.
623 # Get all primary instances in group
624 opcodes.OpQuery(what=constants.QR_INSTANCE,
625 fields=["name", "status", "admin_state", "snodes",
626 "pnode.group.uuid", "snodes.group.uuid"],
627 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
630 # Get all nodes in group
631 opcodes.OpQuery(what=constants.QR_NODE,
632 fields=["name", "bootid", "offline"],
633 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
637 job_id = cl.SubmitJob(job)
638 results = map(objects.QueryResponse.FromDict,
639 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
640 cl.ArchiveJob(job_id)
642 results_data = map(operator.attrgetter("data"), results)
644 # Ensure results are tuples with two values
645 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
647 # Extract values ignoring result status
648 (raw_instances, raw_nodes) = [[map(compat.snd, values)
650 for res in results_data]
656 for (name, status, autostart, snodes, pnode_group_uuid,
657 snodes_group_uuid) in raw_instances:
658 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
659 logging.error("Ignoring split instance '%s', primary group %s, secondary"
660 " groups %s", name, pnode_group_uuid,
661 utils.CommaJoin(snodes_group_uuid))
663 instances.append(Instance(name, status, autostart, snodes))
666 secondaries.setdefault(node, set()).add(name)
669 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
670 for (name, bootid, offline) in raw_nodes]
672 return (dict((node.name, node) for node in nodes),
673 dict((inst.name, inst) for inst in instances))
676 def _LoadKnownGroups():
677 """Returns a list of all node groups known by L{ssconf}.
680 groups = ssconf.SimpleStore().GetNodegroupList()
682 result = list(line.split(None, 1)[0] for line in groups
685 if not compat.all(map(utils.UUID_RE.match, result)):
686 raise errors.GenericError("Ssconf contains invalid group UUID")
691 def _GroupWatcher(opts):
692 """Main function for per-group watcher process.
695 group_uuid = opts.nodegroup.lower()
697 if not utils.UUID_RE.match(group_uuid):
698 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
700 (cli.NODEGROUP_OPT_NAME, group_uuid))
702 logging.info("Watcher for node group '%s'", group_uuid)
704 known_groups = _LoadKnownGroups()
706 # Check if node group is known
707 if group_uuid not in known_groups:
708 raise errors.GenericError("Node group '%s' is not known by ssconf" %
711 # Group UUID has been verified and should not contain any dangerous
713 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
714 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
716 logging.debug("Using state file %s", state_path)
719 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
721 return constants.EXIT_FAILURE
723 notepad = state.WatcherState(statefile) # pylint: disable=E0602
725 # Connect to master daemon
726 client = GetLuxiClient(False)
730 (nodes, instances) = _GetGroupData(client, group_uuid)
732 # Update per-group instance status file
733 _UpdateInstanceStatus(inst_status_path, instances.values())
735 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
736 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
739 started = _CheckInstances(client, notepad, instances)
740 _CheckDisks(client, notepad, nodes, instances, started)
741 _VerifyDisks(client, group_uuid, nodes, instances)
742 except Exception, err:
743 logging.info("Not updating status file due to failure: %s", err)
746 # Save changes for next run
747 notepad.Save(state_path)
749 return constants.EXIT_SUCCESS
756 (options, _) = ParseOptions()
758 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
759 debug=options.debug, stderr_logging=options.debug)
761 if ShouldPause() and not options.ignore_pause:
762 logging.debug("Pause has been set, exiting")
763 return constants.EXIT_SUCCESS
765 # Try to acquire global watcher lock in shared mode
766 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
768 lock.Shared(blocking=False)
769 except (EnvironmentError, errors.LockError), err:
770 logging.error("Can't acquire lock on %s: %s",
771 constants.WATCHER_LOCK_FILE, err)
772 return constants.EXIT_SUCCESS
774 if options.nodegroup is None:
777 # Per-nodegroup watcher
782 except (SystemExit, KeyboardInterrupt):
784 except NotMasterError:
785 logging.debug("Not master, exiting")
786 return constants.EXIT_NOTMASTER
787 except errors.ResolverError, err:
788 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
789 return constants.EXIT_NODESETUP_ERROR
790 except errors.JobQueueFull:
791 logging.error("Job queue is full, can't query cluster state")
792 except errors.JobQueueDrainError:
793 logging.error("Job queue is drained, can't maintain cluster state")
794 except Exception, err:
795 logging.exception(str(err))
796 return constants.EXIT_FAILURE
798 return constants.EXIT_SUCCESS