4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
53 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
60 BAD_STATES = frozenset([
61 constants.INSTST_ERRORDOWN,
63 HELPLESS_STATES = frozenset([
64 constants.INSTST_NODEDOWN,
65 constants.INSTST_NODEOFFLINE,
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77 class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
82 """Check whether we should pause.
85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
88 def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes.
92 # on master or not, try to start the node daemon
93 utils.EnsureDaemon(constants.NODED)
94 # start confd as well. On non candidates it will be in disabled mode.
95 if constants.ENABLE_CONFD:
96 utils.EnsureDaemon(constants.CONFD)
99 def RunWatcherHooks():
100 """Run the watcher hooks.
103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104 constants.HOOKS_NAME_WATCHER)
105 if not os.path.isdir(hooks_dir):
109 results = utils.RunParts(hooks_dir)
110 except Exception, err: # pylint: disable=W0703
111 logging.exception("RunParts %s failed: %s", hooks_dir, err)
114 for (relname, status, runresult) in results:
115 if status == constants.RUNPARTS_SKIP:
116 logging.debug("Watcher hook %s: skipped", relname)
117 elif status == constants.RUNPARTS_ERR:
118 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119 elif status == constants.RUNPARTS_RUN:
121 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122 relname, runresult.exit_code, runresult.output)
124 logging.debug("Watcher hook %s: success (output: %s)", relname,
127 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
131 class Instance(object):
132 """Abstraction for a Virtual Machine instance.
135 def __init__(self, name, status, autostart, snodes):
138 self.autostart = autostart
141 def Restart(self, cl):
142 """Encapsulates the start of an instance.
145 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146 cli.SubmitOpCode(op, cl=cl)
148 def ActivateDisks(self, cl):
149 """Encapsulates the activation of all disks of an instance.
152 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153 cli.SubmitOpCode(op, cl=cl)
157 """Data container representing cluster node.
160 def __init__(self, name, bootid, offline, secondaries):
161 """Initializes this class.
166 self.offline = offline
167 self.secondaries = secondaries
170 def _CheckInstances(cl, notepad, instances):
171 """Make a pass over the list of instances, restarting downed ones.
174 notepad.MaintainInstanceList(instances.keys())
178 for inst in instances.values():
179 if inst.status in BAD_STATES:
180 n = notepad.NumberOfRestartAttempts(inst.name)
183 logging.warning("Not restarting instance '%s', retries exhausted",
188 notepad.RecordRestartAttempt(inst.name)
189 logging.error("Could not restart instance '%s' after %s attempts,"
190 " giving up", inst.name, MAXTRIES)
194 logging.info("Restarting instance '%s' (attempt #%s)",
197 except Exception: # pylint: disable=W0703
198 logging.exception("Error while restarting instance '%s'", inst.name)
200 started.add(inst.name)
202 notepad.RecordRestartAttempt(inst.name)
205 if notepad.NumberOfRestartAttempts(inst.name):
206 notepad.RemoveInstance(inst.name)
207 if inst.status not in HELPLESS_STATES:
208 logging.info("Restart of instance '%s' succeeded", inst.name)
213 def _CheckDisks(cl, notepad, nodes, instances, started):
214 """Check all nodes for restarted ones.
219 for node in nodes.values():
220 old = notepad.GetNodeBootID(node.name)
222 # Bad node, not returning a boot id
224 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
228 if old != node.bootid:
229 # Node's boot ID has changed, probably through a reboot
230 check_nodes.append(node)
233 # Activate disks for all instances with any of the checked nodes as a
235 for node in check_nodes:
236 for instance_name in node.secondaries:
238 inst = instances[instance_name]
240 logging.info("Can't find instance '%s', maybe it was ignored",
244 if not inst.autostart:
245 logging.info("Skipping disk activation for non-autostart"
246 " instance '%s'", inst.name)
249 if inst.name in started:
250 # we already tried to start the instance, which should have
251 # activated its drives (if they can be at all)
252 logging.debug("Skipping disk activation for instance '%s' as"
253 " it was already started", inst.name)
257 logging.info("Activating disks for instance '%s'", inst.name)
258 inst.ActivateDisks(cl)
259 except Exception: # pylint: disable=W0703
260 logging.exception("Error while activating disks for instance '%s'",
263 # Keep changed boot IDs
264 for node in check_nodes:
265 notepad.SetNodeBootID(node.name, node.bootid)
268 def _CheckForOfflineNodes(nodes, instance):
269 """Checks if given instances has any secondary in offline status.
271 @param instance: The instance object
272 @return: True if any of the secondary is offline, False otherwise
275 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278 def _VerifyDisks(cl, uuid, nodes, instances):
279 """Run a per-group "gnt-cluster verify-disks".
282 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283 ((_, offline_disk_instances, _), ) = \
284 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285 cl.ArchiveJob(job_id)
287 if not offline_disk_instances:
289 logging.debug("Verify-disks reported no offline disks, nothing to do")
292 logging.debug("Will activate disks for instance(s) %s",
293 utils.CommaJoin(offline_disk_instances))
295 # We submit only one job, and wait for it. Not optimal, but this puts less
296 # load on the job queue.
298 for name in offline_disk_instances:
300 inst = instances[name]
302 logging.info("Can't find instance '%s', maybe it was ignored", name)
305 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306 logging.info("Skipping instance '%s' because it is in a helpless state or"
307 " has offline secondaries", name)
310 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
313 job_id = cli.SendJob(job, cl=cl)
316 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317 except Exception: # pylint: disable=W0703
318 logging.exception("Error while activating disks")
321 def IsRapiResponding(hostname):
322 """Connects to RAPI port and does a simple test.
324 Connects to RAPI port of hostname and does a simple test. At this time, the
327 @type hostname: string
328 @param hostname: hostname of the node to connect to.
330 @return: Whether RAPI is working properly
333 curl_config = rapi.client.GenericCurlConfig()
334 rapi_client = rapi.client.GanetiRapiClient(hostname,
335 curl_config_fn=curl_config)
337 master_version = rapi_client.GetVersion()
338 except rapi.client.CertificateError, err:
339 logging.warning("RAPI certificate error: %s", err)
341 except rapi.client.GanetiApiError, err:
342 logging.warning("RAPI error: %s", err)
345 logging.debug("Reported RAPI version %s", master_version)
346 return master_version == constants.RAPI_VERSION
350 """Parse the command line options.
352 @return: (options, args) as from OptionParser.parse_args()
355 parser = OptionParser(description="Ganeti cluster watcher",
357 version="%%prog (ganeti) %s" %
358 constants.RELEASE_VERSION)
360 parser.add_option(cli.DEBUG_OPT)
361 parser.add_option(cli.NODEGROUP_OPT)
362 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363 help="Autoarchive jobs older than this age (default"
365 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366 action="store_true", help="Ignore cluster pause setting")
367 parser.add_option("--wait-children", dest="wait_children",
368 action="store_true", help="Wait for child processes")
369 parser.add_option("--no-wait-children", dest="wait_children",
370 action="store_false", help="Don't wait for child processes")
371 # See optparse documentation for why default values are not set by options
372 parser.set_defaults(wait_children=True)
373 options, args = parser.parse_args()
374 options.job_age = cli.ParseTimespec(options.job_age)
377 parser.error("No arguments expected")
379 return (options, args)
382 def _WriteInstanceStatus(filename, data):
383 """Writes the per-group instance status file.
385 The entries are sorted.
387 @type filename: string
388 @param filename: Path to instance status file
389 @type data: list of tuple; (instance name as string, status as string)
390 @param data: Instance name and status
393 logging.debug("Updating instance status file '%s' with %s instances",
396 utils.WriteFile(filename,
397 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
401 def _UpdateInstanceStatus(filename, instances):
402 """Writes an instance status file from L{Instance} objects.
404 @type filename: string
405 @param filename: Path to status file
406 @type instances: list of L{Instance}
409 _WriteInstanceStatus(filename, [(inst.name, inst.status)
410 for inst in instances])
414 """Helper to store file handle's C{fstat}.
418 """Initializes this class.
423 def __call__(self, fh):
424 """Calls C{fstat} on file handle.
427 self.st = os.fstat(fh.fileno())
430 def _ReadInstanceStatus(filename):
431 """Reads an instance status file.
433 @type filename: string
434 @param filename: Path to status file
435 @rtype: tuple; (None or number, list of lists containing instance name and
437 @return: File's mtime and instance status contained in the file; mtime is
438 C{None} if file can't be read
441 logging.debug("Reading per-group instance status from '%s'", filename)
445 content = utils.ReadFile(filename, preread=statcb)
446 except EnvironmentError, err:
447 if err.errno == errno.ENOENT:
448 logging.error("Can't read '%s', does not exist (yet)", filename)
450 logging.exception("Unable to read '%s', ignoring", filename)
453 return (statcb.st.st_mtime, [line.split(None, 1)
454 for line in content.splitlines()])
457 def _MergeInstanceStatus(filename, pergroup_filename, groups):
458 """Merges all per-group instance status files into a global one.
460 @type filename: string
461 @param filename: Path to global instance status file
462 @type pergroup_filename: string
463 @param pergroup_filename: Path to per-group status files, must contain "%s"
464 to be replaced with group UUID
465 @type groups: sequence
466 @param groups: UUIDs of known groups
469 # Lock global status file in exclusive mode
470 lock = utils.FileLock.Open(filename)
472 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
473 except errors.LockError, err:
474 # All per-group processes will lock and update the file. None of them
475 # should take longer than 10 seconds (the value of
476 # INSTANCE_STATUS_LOCK_TIMEOUT).
477 logging.error("Can't acquire lock on instance status file '%s', not"
478 " updating: %s", filename, err)
481 logging.debug("Acquired exclusive lock on '%s'", filename)
485 # Load instance status from all groups
486 for group_uuid in groups:
487 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
489 if mtime is not None:
490 for (instance_name, status) in instdata:
491 data.setdefault(instance_name, []).append((mtime, status))
493 # Select last update based on file mtime
494 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
495 for (instance_name, status) in data.items()]
497 # Write the global status file. Don't touch file after it's been
498 # updated--there is no lock anymore.
499 _WriteInstanceStatus(filename, inststatus)
502 def GetLuxiClient(try_restart):
503 """Tries to connect to the master daemon.
505 @type try_restart: bool
506 @param try_restart: Whether to attempt to restart the master daemon
510 return cli.GetClient()
511 except errors.OpPrereqError, err:
512 # this is, from cli.GetClient, a not-master case
513 raise NotMasterError("Not on master node (%s)" % err)
515 except luxi.NoMasterError, err:
519 logging.warning("Master daemon seems to be down (%s), trying to restart",
522 if not utils.EnsureDaemon(constants.MASTERD):
523 raise errors.GenericError("Can't start the master daemon")
525 # Retry the connection
526 return cli.GetClient()
529 def _StartGroupChildren(cl, wait):
530 """Starts a new instance of the watcher for every node group.
533 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
536 result = cl.QueryGroups([], ["name", "uuid"], False)
540 for (idx, (name, uuid)) in enumerate(result):
541 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
544 # Let's not kill the system
545 time.sleep(CHILD_PROCESS_DELAY)
547 logging.debug("Spawning child for group '%s' (%s), arguments %s",
551 # TODO: Should utils.StartDaemon be used instead?
552 pid = os.spawnv(os.P_NOWAIT, args[0], args)
553 except Exception: # pylint: disable=W0703
554 logging.exception("Failed to start child for group '%s' (%s)",
557 logging.debug("Started with PID %s", pid)
562 logging.debug("Waiting for child PID %s", pid)
564 result = utils.RetryOnSignal(os.waitpid, pid, 0)
565 except EnvironmentError, err:
568 logging.debug("Child PID %s exited with status %s", pid, result)
571 def _ArchiveJobs(cl, age):
572 """Archives old jobs.
575 (arch_count, left_count) = cl.AutoArchiveJobs(age)
576 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
579 def _CheckMaster(cl):
580 """Ensures current host is master node.
583 (master, ) = cl.QueryConfigValues(["master_node"])
584 if master != netutils.Hostname.GetSysName():
585 raise NotMasterError("This is not the master node")
588 @rapi.client.UsesRapiClient
589 def _GlobalWatcher(opts):
590 """Main function for global watcher.
592 At the end child processes are spawned for every node group.
598 # Run node maintenance in all cases, even if master, so that old masters can
599 # be properly cleaned up
600 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
601 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
604 client = GetLuxiClient(True)
605 except NotMasterError:
606 # Don't proceed on non-master nodes
607 return constants.EXIT_SUCCESS
609 # we are on master now
610 utils.EnsureDaemon(constants.RAPI)
612 # If RAPI isn't responding to queries, try one restart
613 logging.debug("Attempting to talk to remote API on %s",
614 constants.IP4_ADDRESS_LOCALHOST)
615 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616 logging.warning("Couldn't get answer from remote API, restaring daemon")
617 utils.StopDaemon(constants.RAPI)
618 utils.EnsureDaemon(constants.RAPI)
619 logging.debug("Second attempt to talk to remote API")
620 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
621 logging.fatal("RAPI is not responding")
622 logging.debug("Successfully talked to remote API")
625 _ArchiveJobs(client, opts.job_age)
627 # Spawn child processes for all node groups
628 _StartGroupChildren(client, opts.wait_children)
630 return constants.EXIT_SUCCESS
633 def _GetGroupData(cl, uuid):
634 """Retrieves instances and nodes per node group.
638 # Get all primary instances in group
639 opcodes.OpQuery(what=constants.QR_INSTANCE,
640 fields=["name", "status", "admin_state", "snodes",
641 "pnode.group.uuid", "snodes.group.uuid"],
642 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
645 # Get all nodes in group
646 opcodes.OpQuery(what=constants.QR_NODE,
647 fields=["name", "bootid", "offline"],
648 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
652 job_id = cl.SubmitJob(job)
653 results = map(objects.QueryResponse.FromDict,
654 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
655 cl.ArchiveJob(job_id)
657 results_data = map(operator.attrgetter("data"), results)
659 # Ensure results are tuples with two values
660 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
662 # Extract values ignoring result status
663 (raw_instances, raw_nodes) = [[map(compat.snd, values)
665 for res in results_data]
671 for (name, status, autostart, snodes, pnode_group_uuid,
672 snodes_group_uuid) in raw_instances:
673 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
674 logging.error("Ignoring split instance '%s', primary group %s, secondary"
675 " groups %s", name, pnode_group_uuid,
676 utils.CommaJoin(snodes_group_uuid))
678 instances.append(Instance(name, status, autostart, snodes))
681 secondaries.setdefault(node, set()).add(name)
684 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
685 for (name, bootid, offline) in raw_nodes]
687 return (dict((node.name, node) for node in nodes),
688 dict((inst.name, inst) for inst in instances))
691 def _LoadKnownGroups():
692 """Returns a list of all node groups known by L{ssconf}.
695 groups = ssconf.SimpleStore().GetNodegroupList()
697 result = list(line.split(None, 1)[0] for line in groups
700 if not compat.all(map(utils.UUID_RE.match, result)):
701 raise errors.GenericError("Ssconf contains invalid group UUID")
706 def _GroupWatcher(opts):
707 """Main function for per-group watcher process.
710 group_uuid = opts.nodegroup.lower()
712 if not utils.UUID_RE.match(group_uuid):
713 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
715 (cli.NODEGROUP_OPT_NAME, group_uuid))
717 logging.info("Watcher for node group '%s'", group_uuid)
719 known_groups = _LoadKnownGroups()
721 # Check if node group is known
722 if group_uuid not in known_groups:
723 raise errors.GenericError("Node group '%s' is not known by ssconf" %
726 # Group UUID has been verified and should not contain any dangerous characters
727 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
728 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
730 logging.debug("Using state file %s", state_path)
733 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
735 return constants.EXIT_FAILURE
737 notepad = state.WatcherState(statefile) # pylint: disable=E0602
739 # Connect to master daemon
740 client = GetLuxiClient(False)
744 (nodes, instances) = _GetGroupData(client, group_uuid)
746 # Update per-group instance status file
747 _UpdateInstanceStatus(inst_status_path, instances.values())
749 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
750 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
753 started = _CheckInstances(client, notepad, instances)
754 _CheckDisks(client, notepad, nodes, instances, started)
755 _VerifyDisks(client, group_uuid, nodes, instances)
756 except Exception, err:
757 logging.info("Not updating status file due to failure: %s", err)
760 # Save changes for next run
761 notepad.Save(state_path)
763 return constants.EXIT_SUCCESS
770 (options, _) = ParseOptions()
772 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
773 debug=options.debug, stderr_logging=options.debug)
775 if ShouldPause() and not options.ignore_pause:
776 logging.debug("Pause has been set, exiting")
777 return constants.EXIT_SUCCESS
779 # Try to acquire global watcher lock in shared mode
780 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
782 lock.Shared(blocking=False)
783 except (EnvironmentError, errors.LockError), err:
784 logging.error("Can't acquire lock on %s: %s",
785 constants.WATCHER_LOCK_FILE, err)
786 return constants.EXIT_SUCCESS
788 if options.nodegroup is None:
791 # Per-nodegroup watcher
796 except (SystemExit, KeyboardInterrupt):
798 except NotMasterError:
799 logging.debug("Not master, exiting")
800 return constants.EXIT_NOTMASTER
801 except errors.ResolverError, err:
802 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
803 return constants.EXIT_NODESETUP_ERROR
804 except errors.JobQueueFull:
805 logging.error("Job queue is full, can't query cluster state")
806 except errors.JobQueueDrainError:
807 logging.error("Job queue is drained, can't maintain cluster state")
808 except Exception, err:
809 logging.exception(str(err))
810 return constants.EXIT_FAILURE
812 return constants.EXIT_SUCCESS