4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
53 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
60 BAD_STATES = frozenset([
61 constants.INSTST_ERRORDOWN,
63 HELPLESS_STATES = frozenset([
64 constants.INSTST_NODEDOWN,
65 constants.INSTST_NODEOFFLINE,
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77 class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
82 """Check whether we should pause.
85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
88 def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes.
92 # on master or not, try to start the node daemon
93 utils.EnsureDaemon(constants.NODED)
94 # start confd as well. On non candidates it will be in disabled mode.
95 if constants.ENABLE_CONFD:
96 utils.EnsureDaemon(constants.CONFD)
99 def RunWatcherHooks():
100 """Run the watcher hooks.
103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104 constants.HOOKS_NAME_WATCHER)
105 if not os.path.isdir(hooks_dir):
109 results = utils.RunParts(hooks_dir)
110 except Exception, err: # pylint: disable=W0703
111 logging.exception("RunParts %s failed: %s", hooks_dir, err)
114 for (relname, status, runresult) in results:
115 if status == constants.RUNPARTS_SKIP:
116 logging.debug("Watcher hook %s: skipped", relname)
117 elif status == constants.RUNPARTS_ERR:
118 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119 elif status == constants.RUNPARTS_RUN:
121 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122 relname, runresult.exit_code, runresult.output)
124 logging.debug("Watcher hook %s: success (output: %s)", relname,
127 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
131 class Instance(object):
132 """Abstraction for a Virtual Machine instance.
135 def __init__(self, name, status, autostart, snodes):
138 self.autostart = autostart
141 def Restart(self, cl):
142 """Encapsulates the start of an instance.
145 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146 cli.SubmitOpCode(op, cl=cl)
148 def ActivateDisks(self, cl):
149 """Encapsulates the activation of all disks of an instance.
152 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153 cli.SubmitOpCode(op, cl=cl)
157 """Data container representing cluster node.
160 def __init__(self, name, bootid, offline, secondaries):
161 """Initializes this class.
166 self.offline = offline
167 self.secondaries = secondaries
170 def _CheckInstances(cl, notepad, instances):
171 """Make a pass over the list of instances, restarting downed ones.
174 notepad.MaintainInstanceList(instances.keys())
178 for inst in instances.values():
179 if inst.status in BAD_STATES:
180 n = notepad.NumberOfRestartAttempts(inst.name)
183 logging.warning("Not restarting instance '%s', retries exhausted",
188 notepad.RecordRestartAttempt(inst.name)
189 logging.error("Could not restart instance '%s' after %s attempts,"
190 " giving up", inst.name, MAXTRIES)
194 logging.info("Restarting instance '%s' (attempt #%s)",
197 except Exception: # pylint: disable=W0703
198 logging.exception("Error while restarting instance '%s'", inst.name)
200 started.add(inst.name)
202 notepad.RecordRestartAttempt(inst.name)
205 if notepad.NumberOfRestartAttempts(inst.name):
206 notepad.RemoveInstance(inst.name)
207 if inst.status not in HELPLESS_STATES:
208 logging.info("Restart of instance '%s' succeeded", inst.name)
213 def _CheckDisks(cl, notepad, nodes, instances, started):
214 """Check all nodes for restarted ones.
219 for node in nodes.values():
220 old = notepad.GetNodeBootID(node.name)
222 # Bad node, not returning a boot id
224 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
228 if old != node.bootid:
229 # Node's boot ID has changed, probably through a reboot
230 check_nodes.append(node)
233 # Activate disks for all instances with any of the checked nodes as a
235 for node in check_nodes:
236 for instance_name in node.secondaries:
238 inst = instances[instance_name]
240 logging.info("Can't find instance '%s', maybe it was ignored",
244 if not inst.autostart:
245 logging.info("Skipping disk activation for non-autostart"
246 " instance '%s'", inst.name)
249 if inst.name in started:
250 # we already tried to start the instance, which should have
251 # activated its drives (if they can be at all)
252 logging.debug("Skipping disk activation for instance '%s' as"
253 " it was already started", inst.name)
257 logging.info("Activating disks for instance '%s'", inst.name)
258 inst.ActivateDisks(cl)
259 except Exception: # pylint: disable=W0703
260 logging.exception("Error while activating disks for instance '%s'",
263 # Keep changed boot IDs
264 for node in check_nodes:
265 notepad.SetNodeBootID(node.name, node.bootid)
268 def _CheckForOfflineNodes(nodes, instance):
269 """Checks if given instances has any secondary in offline status.
271 @param instance: The instance object
272 @return: True if any of the secondary is offline, False otherwise
275 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278 def _VerifyDisks(cl, uuid, nodes, instances):
279 """Run a per-group "gnt-cluster verify-disks".
282 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283 ((_, offline_disk_instances, _), ) = \
284 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285 cl.ArchiveJob(job_id)
287 if not offline_disk_instances:
289 logging.debug("Verify-disks reported no offline disks, nothing to do")
292 logging.debug("Will activate disks for instance(s) %s",
293 utils.CommaJoin(offline_disk_instances))
295 # We submit only one job, and wait for it. Not optimal, but this puts less
296 # load on the job queue.
298 for name in offline_disk_instances:
300 inst = instances[name]
302 logging.info("Can't find instance '%s', maybe it was ignored", name)
305 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306 logging.info("Skipping instance '%s' because it is in a helpless state or"
307 " has offline secondaries", name)
310 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
313 job_id = cli.SendJob(job, cl=cl)
316 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317 except Exception: # pylint: disable=W0703
318 logging.exception("Error while activating disks")
321 def IsRapiResponding(hostname):
322 """Connects to RAPI port and does a simple test.
324 Connects to RAPI port of hostname and does a simple test. At this time, the
327 @type hostname: string
328 @param hostname: hostname of the node to connect to.
330 @return: Whether RAPI is working properly
333 curl_config = rapi.client.GenericCurlConfig()
334 rapi_client = rapi.client.GanetiRapiClient(hostname,
335 curl_config_fn=curl_config)
337 master_version = rapi_client.GetVersion()
338 except rapi.client.CertificateError, err:
339 logging.warning("RAPI certificate error: %s", err)
341 except rapi.client.GanetiApiError, err:
342 logging.warning("RAPI error: %s", err)
345 logging.debug("Reported RAPI version %s", master_version)
346 return master_version == constants.RAPI_VERSION
350 """Parse the command line options.
352 @return: (options, args) as from OptionParser.parse_args()
355 parser = OptionParser(description="Ganeti cluster watcher",
357 version="%%prog (ganeti) %s" %
358 constants.RELEASE_VERSION)
360 parser.add_option(cli.DEBUG_OPT)
361 parser.add_option(cli.NODEGROUP_OPT)
362 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363 help="Autoarchive jobs older than this age (default"
365 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366 action="store_true", help="Ignore cluster pause setting")
367 parser.add_option("--wait-children", dest="wait_children",
368 action="store_true", help="Wait for child processes")
369 parser.add_option("--no-wait-children", dest="wait_children",
370 action="store_false", help="Don't wait for child processes")
371 # See optparse documentation for why default values are not set by options
372 parser.set_defaults(wait_children=True)
373 options, args = parser.parse_args()
374 options.job_age = cli.ParseTimespec(options.job_age)
377 parser.error("No arguments expected")
379 return (options, args)
382 def _WriteInstanceStatus(filename, data):
383 """Writes the per-group instance status file.
385 The entries are sorted.
387 @type filename: string
388 @param filename: Path to instance status file
389 @type data: list of tuple; (instance name as string, status as string)
390 @param data: Instance name and status
393 logging.debug("Updating instance status file '%s' with %s instances",
396 utils.WriteFile(filename,
397 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
401 def _UpdateInstanceStatus(filename, instances):
402 """Writes an instance status file from L{Instance} objects.
404 @type filename: string
405 @param filename: Path to status file
406 @type instances: list of L{Instance}
409 _WriteInstanceStatus(filename, [(inst.name, inst.status)
410 for inst in instances])
413 def _ReadInstanceStatus(filename):
414 """Reads an instance status file.
416 @type filename: string
417 @param filename: Path to status file
418 @rtype: tuple; (None or number, list of lists containing instance name and
420 @return: File's mtime and instance status contained in the file; mtime is
421 C{None} if file can't be read
424 logging.debug("Reading per-group instance status from '%s'", filename)
426 statcb = utils.FileStatHelper()
428 content = utils.ReadFile(filename, preread=statcb)
429 except EnvironmentError, err:
430 if err.errno == errno.ENOENT:
431 logging.error("Can't read '%s', does not exist (yet)", filename)
433 logging.exception("Unable to read '%s', ignoring", filename)
436 return (statcb.st.st_mtime, [line.split(None, 1)
437 for line in content.splitlines()])
440 def _MergeInstanceStatus(filename, pergroup_filename, groups):
441 """Merges all per-group instance status files into a global one.
443 @type filename: string
444 @param filename: Path to global instance status file
445 @type pergroup_filename: string
446 @param pergroup_filename: Path to per-group status files, must contain "%s"
447 to be replaced with group UUID
448 @type groups: sequence
449 @param groups: UUIDs of known groups
452 # Lock global status file in exclusive mode
453 lock = utils.FileLock.Open(filename)
455 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
456 except errors.LockError, err:
457 # All per-group processes will lock and update the file. None of them
458 # should take longer than 10 seconds (the value of
459 # INSTANCE_STATUS_LOCK_TIMEOUT).
460 logging.error("Can't acquire lock on instance status file '%s', not"
461 " updating: %s", filename, err)
464 logging.debug("Acquired exclusive lock on '%s'", filename)
468 # Load instance status from all groups
469 for group_uuid in groups:
470 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
472 if mtime is not None:
473 for (instance_name, status) in instdata:
474 data.setdefault(instance_name, []).append((mtime, status))
476 # Select last update based on file mtime
477 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
478 for (instance_name, status) in data.items()]
480 # Write the global status file. Don't touch file after it's been
481 # updated--there is no lock anymore.
482 _WriteInstanceStatus(filename, inststatus)
485 def GetLuxiClient(try_restart):
486 """Tries to connect to the master daemon.
488 @type try_restart: bool
489 @param try_restart: Whether to attempt to restart the master daemon
493 return cli.GetClient()
494 except errors.OpPrereqError, err:
495 # this is, from cli.GetClient, a not-master case
496 raise NotMasterError("Not on master node (%s)" % err)
498 except luxi.NoMasterError, err:
502 logging.warning("Master daemon seems to be down (%s), trying to restart",
505 if not utils.EnsureDaemon(constants.MASTERD):
506 raise errors.GenericError("Can't start the master daemon")
508 # Retry the connection
509 return cli.GetClient()
512 def _StartGroupChildren(cl, wait):
513 """Starts a new instance of the watcher for every node group.
516 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
519 result = cl.QueryGroups([], ["name", "uuid"], False)
523 for (idx, (name, uuid)) in enumerate(result):
524 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
527 # Let's not kill the system
528 time.sleep(CHILD_PROCESS_DELAY)
530 logging.debug("Spawning child for group '%s' (%s), arguments %s",
534 # TODO: Should utils.StartDaemon be used instead?
535 pid = os.spawnv(os.P_NOWAIT, args[0], args)
536 except Exception: # pylint: disable=W0703
537 logging.exception("Failed to start child for group '%s' (%s)",
540 logging.debug("Started with PID %s", pid)
545 logging.debug("Waiting for child PID %s", pid)
547 result = utils.RetryOnSignal(os.waitpid, pid, 0)
548 except EnvironmentError, err:
551 logging.debug("Child PID %s exited with status %s", pid, result)
554 def _ArchiveJobs(cl, age):
555 """Archives old jobs.
558 (arch_count, left_count) = cl.AutoArchiveJobs(age)
559 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
562 def _CheckMaster(cl):
563 """Ensures current host is master node.
566 (master, ) = cl.QueryConfigValues(["master_node"])
567 if master != netutils.Hostname.GetSysName():
568 raise NotMasterError("This is not the master node")
571 @rapi.client.UsesRapiClient
572 def _GlobalWatcher(opts):
573 """Main function for global watcher.
575 At the end child processes are spawned for every node group.
581 # Run node maintenance in all cases, even if master, so that old masters can
582 # be properly cleaned up
583 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
584 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
587 client = GetLuxiClient(True)
588 except NotMasterError:
589 # Don't proceed on non-master nodes
590 return constants.EXIT_SUCCESS
592 # we are on master now
593 utils.EnsureDaemon(constants.RAPI)
595 # If RAPI isn't responding to queries, try one restart
596 logging.debug("Attempting to talk to remote API on %s",
597 constants.IP4_ADDRESS_LOCALHOST)
598 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
599 logging.warning("Couldn't get answer from remote API, restaring daemon")
600 utils.StopDaemon(constants.RAPI)
601 utils.EnsureDaemon(constants.RAPI)
602 logging.debug("Second attempt to talk to remote API")
603 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
604 logging.fatal("RAPI is not responding")
605 logging.debug("Successfully talked to remote API")
608 _ArchiveJobs(client, opts.job_age)
610 # Spawn child processes for all node groups
611 _StartGroupChildren(client, opts.wait_children)
613 return constants.EXIT_SUCCESS
616 def _GetGroupData(cl, uuid):
617 """Retrieves instances and nodes per node group.
621 # Get all primary instances in group
622 opcodes.OpQuery(what=constants.QR_INSTANCE,
623 fields=["name", "status", "admin_state", "snodes",
624 "pnode.group.uuid", "snodes.group.uuid"],
625 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
628 # Get all nodes in group
629 opcodes.OpQuery(what=constants.QR_NODE,
630 fields=["name", "bootid", "offline"],
631 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
635 job_id = cl.SubmitJob(job)
636 results = map(objects.QueryResponse.FromDict,
637 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
638 cl.ArchiveJob(job_id)
640 results_data = map(operator.attrgetter("data"), results)
642 # Ensure results are tuples with two values
643 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
645 # Extract values ignoring result status
646 (raw_instances, raw_nodes) = [[map(compat.snd, values)
648 for res in results_data]
654 for (name, status, autostart, snodes, pnode_group_uuid,
655 snodes_group_uuid) in raw_instances:
656 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
657 logging.error("Ignoring split instance '%s', primary group %s, secondary"
658 " groups %s", name, pnode_group_uuid,
659 utils.CommaJoin(snodes_group_uuid))
661 instances.append(Instance(name, status, autostart, snodes))
664 secondaries.setdefault(node, set()).add(name)
667 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
668 for (name, bootid, offline) in raw_nodes]
670 return (dict((node.name, node) for node in nodes),
671 dict((inst.name, inst) for inst in instances))
674 def _LoadKnownGroups():
675 """Returns a list of all node groups known by L{ssconf}.
678 groups = ssconf.SimpleStore().GetNodegroupList()
680 result = list(line.split(None, 1)[0] for line in groups
683 if not compat.all(map(utils.UUID_RE.match, result)):
684 raise errors.GenericError("Ssconf contains invalid group UUID")
689 def _GroupWatcher(opts):
690 """Main function for per-group watcher process.
693 group_uuid = opts.nodegroup.lower()
695 if not utils.UUID_RE.match(group_uuid):
696 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
698 (cli.NODEGROUP_OPT_NAME, group_uuid))
700 logging.info("Watcher for node group '%s'", group_uuid)
702 known_groups = _LoadKnownGroups()
704 # Check if node group is known
705 if group_uuid not in known_groups:
706 raise errors.GenericError("Node group '%s' is not known by ssconf" %
709 # Group UUID has been verified and should not contain any dangerous characters
710 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
711 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
713 logging.debug("Using state file %s", state_path)
716 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
718 return constants.EXIT_FAILURE
720 notepad = state.WatcherState(statefile) # pylint: disable=E0602
722 # Connect to master daemon
723 client = GetLuxiClient(False)
727 (nodes, instances) = _GetGroupData(client, group_uuid)
729 # Update per-group instance status file
730 _UpdateInstanceStatus(inst_status_path, instances.values())
732 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
733 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
736 started = _CheckInstances(client, notepad, instances)
737 _CheckDisks(client, notepad, nodes, instances, started)
738 _VerifyDisks(client, group_uuid, nodes, instances)
739 except Exception, err:
740 logging.info("Not updating status file due to failure: %s", err)
743 # Save changes for next run
744 notepad.Save(state_path)
746 return constants.EXIT_SUCCESS
753 (options, _) = ParseOptions()
755 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
756 debug=options.debug, stderr_logging=options.debug)
758 if ShouldPause() and not options.ignore_pause:
759 logging.debug("Pause has been set, exiting")
760 return constants.EXIT_SUCCESS
762 # Try to acquire global watcher lock in shared mode
763 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
765 lock.Shared(blocking=False)
766 except (EnvironmentError, errors.LockError), err:
767 logging.error("Can't acquire lock on %s: %s",
768 constants.WATCHER_LOCK_FILE, err)
769 return constants.EXIT_SUCCESS
771 if options.nodegroup is None:
774 # Per-nodegroup watcher
779 except (SystemExit, KeyboardInterrupt):
781 except NotMasterError:
782 logging.debug("Not master, exiting")
783 return constants.EXIT_NOTMASTER
784 except errors.ResolverError, err:
785 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
786 return constants.EXIT_NODESETUP_ERROR
787 except errors.JobQueueFull:
788 logging.error("Job queue is full, can't query cluster state")
789 except errors.JobQueueDrainError:
790 logging.error("Job queue is drained, can't maintain cluster state")
791 except Exception, err:
792 logging.exception(str(err))
793 return constants.EXIT_FAILURE
795 return constants.EXIT_SUCCESS