4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
52 from ganeti import pathutils
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
62 BAD_STATES = frozenset([
63 constants.INSTST_ERRORDOWN,
65 HELPLESS_STATES = frozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
79 class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
84 """Check whether we should pause.
87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
90 def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes.
94 # on master or not, try to start the node daemon
95 utils.EnsureDaemon(constants.NODED)
96 # start confd as well. On non candidates it will be in disabled mode.
97 if constants.ENABLE_CONFD:
98 utils.EnsureDaemon(constants.CONFD)
101 def RunWatcherHooks():
102 """Run the watcher hooks.
105 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
106 constants.HOOKS_NAME_WATCHER)
107 if not os.path.isdir(hooks_dir):
111 results = utils.RunParts(hooks_dir)
112 except Exception, err: # pylint: disable=W0703
113 logging.exception("RunParts %s failed: %s", hooks_dir, err)
116 for (relname, status, runresult) in results:
117 if status == constants.RUNPARTS_SKIP:
118 logging.debug("Watcher hook %s: skipped", relname)
119 elif status == constants.RUNPARTS_ERR:
120 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
121 elif status == constants.RUNPARTS_RUN:
123 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
124 relname, runresult.exit_code, runresult.output)
126 logging.debug("Watcher hook %s: success (output: %s)", relname,
129 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133 class Instance(object):
134 """Abstraction for a Virtual Machine instance.
137 def __init__(self, name, status, autostart, snodes):
140 self.autostart = autostart
143 def Restart(self, cl):
144 """Encapsulates the start of an instance.
147 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
148 cli.SubmitOpCode(op, cl=cl)
150 def ActivateDisks(self, cl):
151 """Encapsulates the activation of all disks of an instance.
154 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
155 cli.SubmitOpCode(op, cl=cl)
159 """Data container representing cluster node.
162 def __init__(self, name, bootid, offline, secondaries):
163 """Initializes this class.
168 self.offline = offline
169 self.secondaries = secondaries
172 def _CheckInstances(cl, notepad, instances):
173 """Make a pass over the list of instances, restarting downed ones.
176 notepad.MaintainInstanceList(instances.keys())
180 for inst in instances.values():
181 if inst.status in BAD_STATES:
182 n = notepad.NumberOfRestartAttempts(inst.name)
185 logging.warning("Not restarting instance '%s', retries exhausted",
190 notepad.RecordRestartAttempt(inst.name)
191 logging.error("Could not restart instance '%s' after %s attempts,"
192 " giving up", inst.name, MAXTRIES)
196 logging.info("Restarting instance '%s' (attempt #%s)",
199 except Exception: # pylint: disable=W0703
200 logging.exception("Error while restarting instance '%s'", inst.name)
202 started.add(inst.name)
204 notepad.RecordRestartAttempt(inst.name)
207 if notepad.NumberOfRestartAttempts(inst.name):
208 notepad.RemoveInstance(inst.name)
209 if inst.status not in HELPLESS_STATES:
210 logging.info("Restart of instance '%s' succeeded", inst.name)
215 def _CheckDisks(cl, notepad, nodes, instances, started):
216 """Check all nodes for restarted ones.
221 for node in nodes.values():
222 old = notepad.GetNodeBootID(node.name)
224 # Bad node, not returning a boot id
226 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230 if old != node.bootid:
231 # Node's boot ID has changed, probably through a reboot
232 check_nodes.append(node)
235 # Activate disks for all instances with any of the checked nodes as a
237 for node in check_nodes:
238 for instance_name in node.secondaries:
240 inst = instances[instance_name]
242 logging.info("Can't find instance '%s', maybe it was ignored",
246 if not inst.autostart:
247 logging.info("Skipping disk activation for non-autostart"
248 " instance '%s'", inst.name)
251 if inst.name in started:
252 # we already tried to start the instance, which should have
253 # activated its drives (if they can be at all)
254 logging.debug("Skipping disk activation for instance '%s' as"
255 " it was already started", inst.name)
259 logging.info("Activating disks for instance '%s'", inst.name)
260 inst.ActivateDisks(cl)
261 except Exception: # pylint: disable=W0703
262 logging.exception("Error while activating disks for instance '%s'",
265 # Keep changed boot IDs
266 for node in check_nodes:
267 notepad.SetNodeBootID(node.name, node.bootid)
270 def _CheckForOfflineNodes(nodes, instance):
271 """Checks if given instances has any secondary in offline status.
273 @param instance: The instance object
274 @return: True if any of the secondary is offline, False otherwise
277 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
280 def _VerifyDisks(cl, uuid, nodes, instances):
281 """Run a per-group "gnt-cluster verify-disks".
284 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
285 ((_, offline_disk_instances, _), ) = \
286 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
287 cl.ArchiveJob(job_id)
289 if not offline_disk_instances:
291 logging.debug("Verify-disks reported no offline disks, nothing to do")
294 logging.debug("Will activate disks for instance(s) %s",
295 utils.CommaJoin(offline_disk_instances))
297 # We submit only one job, and wait for it. Not optimal, but this puts less
298 # load on the job queue.
300 for name in offline_disk_instances:
302 inst = instances[name]
304 logging.info("Can't find instance '%s', maybe it was ignored", name)
307 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
308 logging.info("Skipping instance '%s' because it is in a helpless state"
309 " or has offline secondaries", name)
312 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
315 job_id = cli.SendJob(job, cl=cl)
318 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
319 except Exception: # pylint: disable=W0703
320 logging.exception("Error while activating disks")
323 def IsRapiResponding(hostname):
324 """Connects to RAPI port and does a simple test.
326 Connects to RAPI port of hostname and does a simple test. At this time, the
329 @type hostname: string
330 @param hostname: hostname of the node to connect to.
332 @return: Whether RAPI is working properly
335 curl_config = rapi.client.GenericCurlConfig()
336 rapi_client = rapi.client.GanetiRapiClient(hostname,
337 curl_config_fn=curl_config)
339 master_version = rapi_client.GetVersion()
340 except rapi.client.CertificateError, err:
341 logging.warning("RAPI certificate error: %s", err)
343 except rapi.client.GanetiApiError, err:
344 logging.warning("RAPI error: %s", err)
347 logging.debug("Reported RAPI version %s", master_version)
348 return master_version == constants.RAPI_VERSION
352 """Parse the command line options.
354 @return: (options, args) as from OptionParser.parse_args()
357 parser = OptionParser(description="Ganeti cluster watcher",
359 version="%%prog (ganeti) %s" %
360 constants.RELEASE_VERSION)
362 parser.add_option(cli.DEBUG_OPT)
363 parser.add_option(cli.NODEGROUP_OPT)
364 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
365 help="Autoarchive jobs older than this age (default"
367 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
368 action="store_true", help="Ignore cluster pause setting")
369 parser.add_option("--wait-children", dest="wait_children",
370 action="store_true", help="Wait for child processes")
371 parser.add_option("--no-wait-children", dest="wait_children",
372 action="store_false",
373 help="Don't wait for child processes")
374 # See optparse documentation for why default values are not set by options
375 parser.set_defaults(wait_children=True)
376 options, args = parser.parse_args()
377 options.job_age = cli.ParseTimespec(options.job_age)
380 parser.error("No arguments expected")
382 return (options, args)
385 def _WriteInstanceStatus(filename, data):
386 """Writes the per-group instance status file.
388 The entries are sorted.
390 @type filename: string
391 @param filename: Path to instance status file
392 @type data: list of tuple; (instance name as string, status as string)
393 @param data: Instance name and status
396 logging.debug("Updating instance status file '%s' with %s instances",
399 utils.WriteFile(filename,
400 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
404 def _UpdateInstanceStatus(filename, instances):
405 """Writes an instance status file from L{Instance} objects.
407 @type filename: string
408 @param filename: Path to status file
409 @type instances: list of L{Instance}
412 _WriteInstanceStatus(filename, [(inst.name, inst.status)
413 for inst in instances])
416 def _ReadInstanceStatus(filename):
417 """Reads an instance status file.
419 @type filename: string
420 @param filename: Path to status file
421 @rtype: tuple; (None or number, list of lists containing instance name and
423 @return: File's mtime and instance status contained in the file; mtime is
424 C{None} if file can't be read
427 logging.debug("Reading per-group instance status from '%s'", filename)
429 statcb = utils.FileStatHelper()
431 content = utils.ReadFile(filename, preread=statcb)
432 except EnvironmentError, err:
433 if err.errno == errno.ENOENT:
434 logging.error("Can't read '%s', does not exist (yet)", filename)
436 logging.exception("Unable to read '%s', ignoring", filename)
439 return (statcb.st.st_mtime, [line.split(None, 1)
440 for line in content.splitlines()])
443 def _MergeInstanceStatus(filename, pergroup_filename, groups):
444 """Merges all per-group instance status files into a global one.
446 @type filename: string
447 @param filename: Path to global instance status file
448 @type pergroup_filename: string
449 @param pergroup_filename: Path to per-group status files, must contain "%s"
450 to be replaced with group UUID
451 @type groups: sequence
452 @param groups: UUIDs of known groups
455 # Lock global status file in exclusive mode
456 lock = utils.FileLock.Open(filename)
458 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
459 except errors.LockError, err:
460 # All per-group processes will lock and update the file. None of them
461 # should take longer than 10 seconds (the value of
462 # INSTANCE_STATUS_LOCK_TIMEOUT).
463 logging.error("Can't acquire lock on instance status file '%s', not"
464 " updating: %s", filename, err)
467 logging.debug("Acquired exclusive lock on '%s'", filename)
471 # Load instance status from all groups
472 for group_uuid in groups:
473 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
475 if mtime is not None:
476 for (instance_name, status) in instdata:
477 data.setdefault(instance_name, []).append((mtime, status))
479 # Select last update based on file mtime
480 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
481 for (instance_name, status) in data.items()]
483 # Write the global status file. Don't touch file after it's been
484 # updated--there is no lock anymore.
485 _WriteInstanceStatus(filename, inststatus)
488 def GetLuxiClient(try_restart):
489 """Tries to connect to the master daemon.
491 @type try_restart: bool
492 @param try_restart: Whether to attempt to restart the master daemon
496 return cli.GetClient()
497 except errors.OpPrereqError, err:
498 # this is, from cli.GetClient, a not-master case
499 raise NotMasterError("Not on master node (%s)" % err)
501 except luxi.NoMasterError, err:
505 logging.warning("Master daemon seems to be down (%s), trying to restart",
508 if not utils.EnsureDaemon(constants.MASTERD):
509 raise errors.GenericError("Can't start the master daemon")
511 # Retry the connection
512 return cli.GetClient()
515 def _StartGroupChildren(cl, wait):
516 """Starts a new instance of the watcher for every node group.
519 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
522 result = cl.QueryGroups([], ["name", "uuid"], False)
526 for (idx, (name, uuid)) in enumerate(result):
527 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
530 # Let's not kill the system
531 time.sleep(CHILD_PROCESS_DELAY)
533 logging.debug("Spawning child for group '%s' (%s), arguments %s",
537 # TODO: Should utils.StartDaemon be used instead?
538 pid = os.spawnv(os.P_NOWAIT, args[0], args)
539 except Exception: # pylint: disable=W0703
540 logging.exception("Failed to start child for group '%s' (%s)",
543 logging.debug("Started with PID %s", pid)
548 logging.debug("Waiting for child PID %s", pid)
550 result = utils.RetryOnSignal(os.waitpid, pid, 0)
551 except EnvironmentError, err:
554 logging.debug("Child PID %s exited with status %s", pid, result)
557 def _ArchiveJobs(cl, age):
558 """Archives old jobs.
561 (arch_count, left_count) = cl.AutoArchiveJobs(age)
562 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
565 def _CheckMaster(cl):
566 """Ensures current host is master node.
569 (master, ) = cl.QueryConfigValues(["master_node"])
570 if master != netutils.Hostname.GetSysName():
571 raise NotMasterError("This is not the master node")
575 def _GlobalWatcher(opts):
576 """Main function for global watcher.
578 At the end child processes are spawned for every node group.
584 # Run node maintenance in all cases, even if master, so that old masters can
585 # be properly cleaned up
586 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
587 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
590 client = GetLuxiClient(True)
591 except NotMasterError:
592 # Don't proceed on non-master nodes
593 return constants.EXIT_SUCCESS
595 # we are on master now
596 utils.EnsureDaemon(constants.RAPI)
598 # If RAPI isn't responding to queries, try one restart
599 logging.debug("Attempting to talk to remote API on %s",
600 constants.IP4_ADDRESS_LOCALHOST)
601 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
602 logging.warning("Couldn't get answer from remote API, restaring daemon")
603 utils.StopDaemon(constants.RAPI)
604 utils.EnsureDaemon(constants.RAPI)
605 logging.debug("Second attempt to talk to remote API")
606 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
607 logging.fatal("RAPI is not responding")
608 logging.debug("Successfully talked to remote API")
611 _ArchiveJobs(client, opts.job_age)
613 # Spawn child processes for all node groups
614 _StartGroupChildren(client, opts.wait_children)
616 return constants.EXIT_SUCCESS
619 def _GetGroupData(cl, uuid):
620 """Retrieves instances and nodes per node group.
624 # Get all primary instances in group
625 opcodes.OpQuery(what=constants.QR_INSTANCE,
626 fields=["name", "status", "admin_state", "snodes",
627 "pnode.group.uuid", "snodes.group.uuid"],
628 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
631 # Get all nodes in group
632 opcodes.OpQuery(what=constants.QR_NODE,
633 fields=["name", "bootid", "offline"],
634 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
638 job_id = cl.SubmitJob(job)
639 results = map(objects.QueryResponse.FromDict,
640 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
641 cl.ArchiveJob(job_id)
643 results_data = map(operator.attrgetter("data"), results)
645 # Ensure results are tuples with two values
646 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
648 # Extract values ignoring result status
649 (raw_instances, raw_nodes) = [[map(compat.snd, values)
651 for res in results_data]
657 for (name, status, autostart, snodes, pnode_group_uuid,
658 snodes_group_uuid) in raw_instances:
659 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
660 logging.error("Ignoring split instance '%s', primary group %s, secondary"
661 " groups %s", name, pnode_group_uuid,
662 utils.CommaJoin(snodes_group_uuid))
664 instances.append(Instance(name, status, autostart, snodes))
667 secondaries.setdefault(node, set()).add(name)
670 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
671 for (name, bootid, offline) in raw_nodes]
673 return (dict((node.name, node) for node in nodes),
674 dict((inst.name, inst) for inst in instances))
677 def _LoadKnownGroups():
678 """Returns a list of all node groups known by L{ssconf}.
681 groups = ssconf.SimpleStore().GetNodegroupList()
683 result = list(line.split(None, 1)[0] for line in groups
686 if not compat.all(map(utils.UUID_RE.match, result)):
687 raise errors.GenericError("Ssconf contains invalid group UUID")
692 def _GroupWatcher(opts):
693 """Main function for per-group watcher process.
696 group_uuid = opts.nodegroup.lower()
698 if not utils.UUID_RE.match(group_uuid):
699 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
701 (cli.NODEGROUP_OPT_NAME, group_uuid))
703 logging.info("Watcher for node group '%s'", group_uuid)
705 known_groups = _LoadKnownGroups()
707 # Check if node group is known
708 if group_uuid not in known_groups:
709 raise errors.GenericError("Node group '%s' is not known by ssconf" %
712 # Group UUID has been verified and should not contain any dangerous
714 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
715 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
717 logging.debug("Using state file %s", state_path)
720 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
722 return constants.EXIT_FAILURE
724 notepad = state.WatcherState(statefile) # pylint: disable=E0602
726 # Connect to master daemon
727 client = GetLuxiClient(False)
731 (nodes, instances) = _GetGroupData(client, group_uuid)
733 # Update per-group instance status file
734 _UpdateInstanceStatus(inst_status_path, instances.values())
736 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
737 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
740 started = _CheckInstances(client, notepad, instances)
741 _CheckDisks(client, notepad, nodes, instances, started)
742 _VerifyDisks(client, group_uuid, nodes, instances)
743 except Exception, err:
744 logging.info("Not updating status file due to failure: %s", err)
747 # Save changes for next run
748 notepad.Save(state_path)
750 return constants.EXIT_SUCCESS
757 (options, _) = ParseOptions()
759 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
760 debug=options.debug, stderr_logging=options.debug)
762 if ShouldPause() and not options.ignore_pause:
763 logging.debug("Pause has been set, exiting")
764 return constants.EXIT_SUCCESS
766 # Try to acquire global watcher lock in shared mode
767 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
769 lock.Shared(blocking=False)
770 except (EnvironmentError, errors.LockError), err:
771 logging.error("Can't acquire lock on %s: %s",
772 pathutils.WATCHER_LOCK_FILE, err)
773 return constants.EXIT_SUCCESS
775 if options.nodegroup is None:
778 # Per-nodegroup watcher
783 except (SystemExit, KeyboardInterrupt):
785 except NotMasterError:
786 logging.debug("Not master, exiting")
787 return constants.EXIT_NOTMASTER
788 except errors.ResolverError, err:
789 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
790 return constants.EXIT_NODESETUP_ERROR
791 except errors.JobQueueFull:
792 logging.error("Job queue is full, can't query cluster state")
793 except errors.JobQueueDrainError:
794 logging.error("Job queue is drained, can't maintain cluster state")
795 except Exception, err:
796 logging.exception(str(err))
797 return constants.EXIT_FAILURE
799 return constants.EXIT_SUCCESS