4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
52 from ganeti import pathutils
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
79 class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
84 """Check whether we should pause.
87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
90 def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes.
94 # on master or not, try to start the node daemon
95 utils.EnsureDaemon(constants.NODED)
96 # start confd as well. On non candidates it will be in disabled mode.
97 if constants.ENABLE_CONFD:
98 utils.EnsureDaemon(constants.CONFD)
99 # start mond as well: all nodes need monitoring
100 if constants.ENABLE_MOND:
101 utils.EnsureDaemon(constants.MOND)
104 def RunWatcherHooks():
105 """Run the watcher hooks.
108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109 constants.HOOKS_NAME_WATCHER)
110 if not os.path.isdir(hooks_dir):
114 results = utils.RunParts(hooks_dir)
115 except Exception, err: # pylint: disable=W0703
116 logging.exception("RunParts %s failed: %s", hooks_dir, err)
119 for (relname, status, runresult) in results:
120 if status == constants.RUNPARTS_SKIP:
121 logging.debug("Watcher hook %s: skipped", relname)
122 elif status == constants.RUNPARTS_ERR:
123 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124 elif status == constants.RUNPARTS_RUN:
126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127 relname, runresult.exit_code, runresult.output)
129 logging.debug("Watcher hook %s: success (output: %s)", relname,
132 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
136 class Instance(object):
137 """Abstraction for a Virtual Machine instance.
140 def __init__(self, name, status, disks_active, snodes):
143 self.disks_active = disks_active
146 def Restart(self, cl):
147 """Encapsulates the start of an instance.
150 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151 cli.SubmitOpCode(op, cl=cl)
153 def ActivateDisks(self, cl):
154 """Encapsulates the activation of all disks of an instance.
157 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158 cli.SubmitOpCode(op, cl=cl)
162 """Data container representing cluster node.
165 def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class.
171 self.offline = offline
172 self.secondaries = secondaries
175 def _CheckInstances(cl, notepad, instances):
176 """Make a pass over the list of instances, restarting downed ones.
179 notepad.MaintainInstanceList(instances.keys())
183 for inst in instances.values():
184 if inst.status in BAD_STATES:
185 n = notepad.NumberOfRestartAttempts(inst.name)
188 logging.warning("Not restarting instance '%s', retries exhausted",
193 notepad.RecordRestartAttempt(inst.name)
194 logging.error("Could not restart instance '%s' after %s attempts,"
195 " giving up", inst.name, MAXTRIES)
199 logging.info("Restarting instance '%s' (attempt #%s)",
202 except Exception: # pylint: disable=W0703
203 logging.exception("Error while restarting instance '%s'", inst.name)
205 started.add(inst.name)
207 notepad.RecordRestartAttempt(inst.name)
210 if notepad.NumberOfRestartAttempts(inst.name):
211 notepad.RemoveInstance(inst.name)
212 if inst.status not in HELPLESS_STATES:
213 logging.info("Restart of instance '%s' succeeded", inst.name)
218 def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones.
224 for node in nodes.values():
225 old = notepad.GetNodeBootID(node.name)
227 # Bad node, not returning a boot id
229 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
233 if old != node.bootid:
234 # Node's boot ID has changed, probably through a reboot
235 check_nodes.append(node)
238 # Activate disks for all instances with any of the checked nodes as a
240 for node in check_nodes:
241 for instance_name in node.secondaries:
243 inst = instances[instance_name]
245 logging.info("Can't find instance '%s', maybe it was ignored",
249 if not inst.disks_active:
250 logging.info("Skipping disk activation for instance with not"
251 " activated disks '%s'", inst.name)
254 if inst.name in started:
255 # we already tried to start the instance, which should have
256 # activated its drives (if they can be at all)
257 logging.debug("Skipping disk activation for instance '%s' as"
258 " it was already started", inst.name)
262 logging.info("Activating disks for instance '%s'", inst.name)
263 inst.ActivateDisks(cl)
264 except Exception: # pylint: disable=W0703
265 logging.exception("Error while activating disks for instance '%s'",
268 # Keep changed boot IDs
269 for node in check_nodes:
270 notepad.SetNodeBootID(node.name, node.bootid)
273 def _CheckForOfflineNodes(nodes, instance):
274 """Checks if given instances has any secondary in offline status.
276 @param instance: The instance object
277 @return: True if any of the secondary is offline, False otherwise
280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
283 def _VerifyDisks(cl, uuid, nodes, instances):
284 """Run a per-group "gnt-cluster verify-disks".
287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288 ((_, offline_disk_instances, _), ) = \
289 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290 cl.ArchiveJob(job_id)
292 if not offline_disk_instances:
294 logging.debug("Verify-disks reported no offline disks, nothing to do")
297 logging.debug("Will activate disks for instance(s) %s",
298 utils.CommaJoin(offline_disk_instances))
300 # We submit only one job, and wait for it. Not optimal, but this puts less
301 # load on the job queue.
303 for name in offline_disk_instances:
305 inst = instances[name]
307 logging.info("Can't find instance '%s', maybe it was ignored", name)
310 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311 logging.info("Skipping instance '%s' because it is in a helpless state"
312 " or has offline secondaries", name)
315 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
318 job_id = cli.SendJob(job, cl=cl)
321 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322 except Exception: # pylint: disable=W0703
323 logging.exception("Error while activating disks")
326 def IsRapiResponding(hostname):
327 """Connects to RAPI port and does a simple test.
329 Connects to RAPI port of hostname and does a simple test. At this time, the
332 @type hostname: string
333 @param hostname: hostname of the node to connect to.
335 @return: Whether RAPI is working properly
338 curl_config = rapi.client.GenericCurlConfig()
339 rapi_client = rapi.client.GanetiRapiClient(hostname,
340 curl_config_fn=curl_config)
342 master_version = rapi_client.GetVersion()
343 except rapi.client.CertificateError, err:
344 logging.warning("RAPI certificate error: %s", err)
346 except rapi.client.GanetiApiError, err:
347 logging.warning("RAPI error: %s", err)
350 logging.debug("Reported RAPI version %s", master_version)
351 return master_version == constants.RAPI_VERSION
355 """Parse the command line options.
357 @return: (options, args) as from OptionParser.parse_args()
360 parser = OptionParser(description="Ganeti cluster watcher",
362 version="%%prog (ganeti) %s" %
363 constants.RELEASE_VERSION)
365 parser.add_option(cli.DEBUG_OPT)
366 parser.add_option(cli.NODEGROUP_OPT)
367 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
368 help="Autoarchive jobs older than this age (default"
370 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
371 action="store_true", help="Ignore cluster pause setting")
372 parser.add_option("--wait-children", dest="wait_children",
373 action="store_true", help="Wait for child processes")
374 parser.add_option("--no-wait-children", dest="wait_children",
375 action="store_false",
376 help="Don't wait for child processes")
377 # See optparse documentation for why default values are not set by options
378 parser.set_defaults(wait_children=True)
379 options, args = parser.parse_args()
380 options.job_age = cli.ParseTimespec(options.job_age)
383 parser.error("No arguments expected")
385 return (options, args)
388 def _WriteInstanceStatus(filename, data):
389 """Writes the per-group instance status file.
391 The entries are sorted.
393 @type filename: string
394 @param filename: Path to instance status file
395 @type data: list of tuple; (instance name as string, status as string)
396 @param data: Instance name and status
399 logging.debug("Updating instance status file '%s' with %s instances",
402 utils.WriteFile(filename,
403 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
407 def _UpdateInstanceStatus(filename, instances):
408 """Writes an instance status file from L{Instance} objects.
410 @type filename: string
411 @param filename: Path to status file
412 @type instances: list of L{Instance}
415 _WriteInstanceStatus(filename, [(inst.name, inst.status)
416 for inst in instances])
419 def _ReadInstanceStatus(filename):
420 """Reads an instance status file.
422 @type filename: string
423 @param filename: Path to status file
424 @rtype: tuple; (None or number, list of lists containing instance name and
426 @return: File's mtime and instance status contained in the file; mtime is
427 C{None} if file can't be read
430 logging.debug("Reading per-group instance status from '%s'", filename)
432 statcb = utils.FileStatHelper()
434 content = utils.ReadFile(filename, preread=statcb)
435 except EnvironmentError, err:
436 if err.errno == errno.ENOENT:
437 logging.error("Can't read '%s', does not exist (yet)", filename)
439 logging.exception("Unable to read '%s', ignoring", filename)
442 return (statcb.st.st_mtime, [line.split(None, 1)
443 for line in content.splitlines()])
446 def _MergeInstanceStatus(filename, pergroup_filename, groups):
447 """Merges all per-group instance status files into a global one.
449 @type filename: string
450 @param filename: Path to global instance status file
451 @type pergroup_filename: string
452 @param pergroup_filename: Path to per-group status files, must contain "%s"
453 to be replaced with group UUID
454 @type groups: sequence
455 @param groups: UUIDs of known groups
458 # Lock global status file in exclusive mode
459 lock = utils.FileLock.Open(filename)
461 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
462 except errors.LockError, err:
463 # All per-group processes will lock and update the file. None of them
464 # should take longer than 10 seconds (the value of
465 # INSTANCE_STATUS_LOCK_TIMEOUT).
466 logging.error("Can't acquire lock on instance status file '%s', not"
467 " updating: %s", filename, err)
470 logging.debug("Acquired exclusive lock on '%s'", filename)
474 # Load instance status from all groups
475 for group_uuid in groups:
476 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
478 if mtime is not None:
479 for (instance_name, status) in instdata:
480 data.setdefault(instance_name, []).append((mtime, status))
482 # Select last update based on file mtime
483 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
484 for (instance_name, status) in data.items()]
486 # Write the global status file. Don't touch file after it's been
487 # updated--there is no lock anymore.
488 _WriteInstanceStatus(filename, inststatus)
491 def GetLuxiClient(try_restart):
492 """Tries to connect to the master daemon.
494 @type try_restart: bool
495 @param try_restart: Whether to attempt to restart the master daemon
499 return cli.GetClient()
500 except errors.OpPrereqError, err:
501 # this is, from cli.GetClient, a not-master case
502 raise NotMasterError("Not on master node (%s)" % err)
504 except luxi.NoMasterError, err:
508 logging.warning("Master daemon seems to be down (%s), trying to restart",
511 if not utils.EnsureDaemon(constants.MASTERD):
512 raise errors.GenericError("Can't start the master daemon")
514 # Retry the connection
515 return cli.GetClient()
518 def _StartGroupChildren(cl, wait):
519 """Starts a new instance of the watcher for every node group.
522 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
525 result = cl.QueryGroups([], ["name", "uuid"], False)
529 for (idx, (name, uuid)) in enumerate(result):
530 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
533 # Let's not kill the system
534 time.sleep(CHILD_PROCESS_DELAY)
536 logging.debug("Spawning child for group '%s' (%s), arguments %s",
540 # TODO: Should utils.StartDaemon be used instead?
541 pid = os.spawnv(os.P_NOWAIT, args[0], args)
542 except Exception: # pylint: disable=W0703
543 logging.exception("Failed to start child for group '%s' (%s)",
546 logging.debug("Started with PID %s", pid)
551 logging.debug("Waiting for child PID %s", pid)
553 result = utils.RetryOnSignal(os.waitpid, pid, 0)
554 except EnvironmentError, err:
557 logging.debug("Child PID %s exited with status %s", pid, result)
560 def _ArchiveJobs(cl, age):
561 """Archives old jobs.
564 (arch_count, left_count) = cl.AutoArchiveJobs(age)
565 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
568 def _CheckMaster(cl):
569 """Ensures current host is master node.
572 (master, ) = cl.QueryConfigValues(["master_node"])
573 if master != netutils.Hostname.GetSysName():
574 raise NotMasterError("This is not the master node")
578 def _GlobalWatcher(opts):
579 """Main function for global watcher.
581 At the end child processes are spawned for every node group.
587 # Run node maintenance in all cases, even if master, so that old masters can
588 # be properly cleaned up
589 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
590 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
593 client = GetLuxiClient(True)
594 except NotMasterError:
595 # Don't proceed on non-master nodes
596 return constants.EXIT_SUCCESS
598 # we are on master now
599 utils.EnsureDaemon(constants.RAPI)
601 # If RAPI isn't responding to queries, try one restart
602 logging.debug("Attempting to talk to remote API on %s",
603 constants.IP4_ADDRESS_LOCALHOST)
604 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
605 logging.warning("Couldn't get answer from remote API, restaring daemon")
606 utils.StopDaemon(constants.RAPI)
607 utils.EnsureDaemon(constants.RAPI)
608 logging.debug("Second attempt to talk to remote API")
609 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
610 logging.fatal("RAPI is not responding")
611 logging.debug("Successfully talked to remote API")
614 _ArchiveJobs(client, opts.job_age)
616 # Spawn child processes for all node groups
617 _StartGroupChildren(client, opts.wait_children)
619 return constants.EXIT_SUCCESS
622 def _GetGroupData(cl, uuid):
623 """Retrieves instances and nodes per node group.
627 # Get all primary instances in group
628 opcodes.OpQuery(what=constants.QR_INSTANCE,
629 fields=["name", "status", "disks_active", "snodes",
630 "pnode.group.uuid", "snodes.group.uuid"],
631 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
634 # Get all nodes in group
635 opcodes.OpQuery(what=constants.QR_NODE,
636 fields=["name", "bootid", "offline"],
637 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
641 job_id = cl.SubmitJob(job)
642 results = map(objects.QueryResponse.FromDict,
643 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
644 cl.ArchiveJob(job_id)
646 results_data = map(operator.attrgetter("data"), results)
648 # Ensure results are tuples with two values
649 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
651 # Extract values ignoring result status
652 (raw_instances, raw_nodes) = [[map(compat.snd, values)
654 for res in results_data]
660 for (name, status, disks_active, snodes, pnode_group_uuid,
661 snodes_group_uuid) in raw_instances:
662 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
663 logging.error("Ignoring split instance '%s', primary group %s, secondary"
664 " groups %s", name, pnode_group_uuid,
665 utils.CommaJoin(snodes_group_uuid))
667 instances.append(Instance(name, status, disks_active, snodes))
670 secondaries.setdefault(node, set()).add(name)
673 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
674 for (name, bootid, offline) in raw_nodes]
676 return (dict((node.name, node) for node in nodes),
677 dict((inst.name, inst) for inst in instances))
680 def _LoadKnownGroups():
681 """Returns a list of all node groups known by L{ssconf}.
684 groups = ssconf.SimpleStore().GetNodegroupList()
686 result = list(line.split(None, 1)[0] for line in groups
689 if not compat.all(map(utils.UUID_RE.match, result)):
690 raise errors.GenericError("Ssconf contains invalid group UUID")
695 def _GroupWatcher(opts):
696 """Main function for per-group watcher process.
699 group_uuid = opts.nodegroup.lower()
701 if not utils.UUID_RE.match(group_uuid):
702 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
704 (cli.NODEGROUP_OPT_NAME, group_uuid))
706 logging.info("Watcher for node group '%s'", group_uuid)
708 known_groups = _LoadKnownGroups()
710 # Check if node group is known
711 if group_uuid not in known_groups:
712 raise errors.GenericError("Node group '%s' is not known by ssconf" %
715 # Group UUID has been verified and should not contain any dangerous
717 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
718 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
720 logging.debug("Using state file %s", state_path)
723 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
725 return constants.EXIT_FAILURE
727 notepad = state.WatcherState(statefile) # pylint: disable=E0602
729 # Connect to master daemon
730 client = GetLuxiClient(False)
734 (nodes, instances) = _GetGroupData(client, group_uuid)
736 # Update per-group instance status file
737 _UpdateInstanceStatus(inst_status_path, instances.values())
739 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
740 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
743 started = _CheckInstances(client, notepad, instances)
744 _CheckDisks(client, notepad, nodes, instances, started)
745 _VerifyDisks(client, group_uuid, nodes, instances)
746 except Exception, err:
747 logging.info("Not updating status file due to failure: %s", err)
750 # Save changes for next run
751 notepad.Save(state_path)
753 return constants.EXIT_SUCCESS
760 (options, _) = ParseOptions()
762 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
763 debug=options.debug, stderr_logging=options.debug)
765 if ShouldPause() and not options.ignore_pause:
766 logging.debug("Pause has been set, exiting")
767 return constants.EXIT_SUCCESS
769 # Try to acquire global watcher lock in shared mode
770 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
772 lock.Shared(blocking=False)
773 except (EnvironmentError, errors.LockError), err:
774 logging.error("Can't acquire lock on %s: %s",
775 pathutils.WATCHER_LOCK_FILE, err)
776 return constants.EXIT_SUCCESS
778 if options.nodegroup is None:
781 # Per-nodegroup watcher
786 except (SystemExit, KeyboardInterrupt):
788 except NotMasterError:
789 logging.debug("Not master, exiting")
790 return constants.EXIT_NOTMASTER
791 except errors.ResolverError, err:
792 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
793 return constants.EXIT_NODESETUP_ERROR
794 except errors.JobQueueFull:
795 logging.error("Job queue is full, can't query cluster state")
796 except errors.JobQueueDrainError:
797 logging.error("Job queue is drained, can't maintain cluster state")
798 except Exception, err:
799 logging.exception(str(err))
800 return constants.EXIT_FAILURE
802 return constants.EXIT_SUCCESS