4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
53 import ganeti.rapi.client # pylint: disable-msg=W0611
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
60 BAD_STATES = frozenset([
61 constants.INSTST_ERRORDOWN,
63 HELPLESS_STATES = frozenset([
64 constants.INSTST_NODEDOWN,
65 constants.INSTST_NODEOFFLINE,
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77 class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
82 """Check whether we should pause.
85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
88 def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes.
92 # on master or not, try to start the node daemon
93 utils.EnsureDaemon(constants.NODED)
94 # start confd as well. On non candidates it will be in disabled mode.
95 utils.EnsureDaemon(constants.CONFD)
98 def RunWatcherHooks():
99 """Run the watcher hooks.
102 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103 constants.HOOKS_NAME_WATCHER)
104 if not os.path.isdir(hooks_dir):
108 results = utils.RunParts(hooks_dir)
109 except Exception: # pylint: disable-msg=W0703
110 logging.exception("RunParts %s failed: %s", hooks_dir)
113 for (relname, status, runresult) in results:
114 if status == constants.RUNPARTS_SKIP:
115 logging.debug("Watcher hook %s: skipped", relname)
116 elif status == constants.RUNPARTS_ERR:
117 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118 elif status == constants.RUNPARTS_RUN:
120 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121 relname, runresult.exit_code, runresult.output)
123 logging.debug("Watcher hook %s: success (output: %s)", relname,
126 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
130 class Instance(object):
131 """Abstraction for a Virtual Machine instance.
134 def __init__(self, name, status, autostart, snodes):
137 self.autostart = autostart
140 def Restart(self, cl):
141 """Encapsulates the start of an instance.
144 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145 cli.SubmitOpCode(op, cl=cl)
147 def ActivateDisks(self, cl):
148 """Encapsulates the activation of all disks of an instance.
151 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152 cli.SubmitOpCode(op, cl=cl)
156 """Data container representing cluster node.
159 def __init__(self, name, bootid, offline, secondaries):
160 """Initializes this class.
165 self.offline = offline
166 self.secondaries = secondaries
169 def _CheckInstances(cl, notepad, instances):
170 """Make a pass over the list of instances, restarting downed ones.
173 notepad.MaintainInstanceList(instances.keys())
177 for inst in instances.values():
178 if inst.status in BAD_STATES:
179 n = notepad.NumberOfRestartAttempts(inst.name)
182 logging.warning("Not restarting instance '%s', retries exhausted",
187 notepad.RecordRestartAttempt(inst.name)
188 logging.error("Could not restart instance '%s' after %s attempts,"
189 " giving up", inst.name, MAXTRIES)
193 logging.info("Restarting instance '%s' (attempt #%s)",
196 except Exception: # pylint: disable-msg=W0703
197 logging.exception("Error while restarting instance '%s'", inst.name)
199 started.add(inst.name)
201 notepad.RecordRestartAttempt(inst.name)
204 if notepad.NumberOfRestartAttempts(inst.name):
205 notepad.RemoveInstance(inst.name)
206 if inst.status not in HELPLESS_STATES:
207 logging.info("Restart of instance '%s' succeeded", inst.name)
212 def _CheckDisks(cl, notepad, nodes, instances, started):
213 """Check all nodes for restarted ones.
218 for node in nodes.values():
219 old = notepad.GetNodeBootID(node.name)
221 # Bad node, not returning a boot id
223 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
227 if old != node.bootid:
228 # Node's boot ID has changed, probably through a reboot
229 check_nodes.append(node)
232 # Activate disks for all instances with any of the checked nodes as a
234 for node in check_nodes:
235 for instance_name in node.secondaries:
237 inst = instances[instance_name]
239 logging.info("Can't find instance '%s', maybe it was ignored",
243 if not inst.autostart:
244 logging.info("Skipping disk activation for non-autostart"
245 " instance '%s'", inst.name)
248 if inst.name in started:
249 # we already tried to start the instance, which should have
250 # activated its drives (if they can be at all)
251 logging.debug("Skipping disk activation for instance '%s' as"
252 " it was already started", inst.name)
256 logging.info("Activating disks for instance '%s'", inst.name)
257 inst.ActivateDisks(cl)
258 except Exception: # pylint: disable-msg=W0703
259 logging.exception("Error while activating disks for instance '%s'",
262 # Keep changed boot IDs
263 for node in check_nodes:
264 notepad.SetNodeBootID(node.name, node.bootid)
267 def _CheckForOfflineNodes(nodes, instance):
268 """Checks if given instances has any secondary in offline status.
270 @param instance: The instance object
271 @return: True if any of the secondary is offline, False otherwise
274 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277 def _VerifyDisks(cl, uuid, nodes, instances):
278 """Run a per-group "gnt-cluster verify-disks".
281 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282 ((_, offline_disk_instances, _), ) = \
283 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284 cl.ArchiveJob(job_id)
286 if not offline_disk_instances:
288 logging.debug("Verify-disks reported no offline disks, nothing to do")
291 logging.debug("Will activate disks for instance(s) %s",
292 utils.CommaJoin(offline_disk_instances))
294 # We submit only one job, and wait for it. Not optimal, but this puts less
295 # load on the job queue.
297 for name in offline_disk_instances:
299 inst = instances[name]
301 logging.info("Can't find instance '%s', maybe it was ignored", name)
304 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305 logging.info("Skipping instance '%s' because it is in a helpless state or"
306 " has offline secondaries", name)
309 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312 job_id = cli.SendJob(job, cl=cl)
315 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316 except Exception: # pylint: disable-msg=W0703
317 logging.exception("Error while activating disks")
320 def IsRapiResponding(hostname):
321 """Connects to RAPI port and does a simple test.
323 Connects to RAPI port of hostname and does a simple test. At this time, the
326 @type hostname: string
327 @param hostname: hostname of the node to connect to.
329 @return: Whether RAPI is working properly
332 curl_config = rapi.client.GenericCurlConfig()
333 rapi_client = rapi.client.GanetiRapiClient(hostname,
334 curl_config_fn=curl_config)
336 master_version = rapi_client.GetVersion()
337 except rapi.client.CertificateError, err:
338 logging.warning("RAPI certificate error: %s", err)
340 except rapi.client.GanetiApiError, err:
341 logging.warning("RAPI error: %s", err)
344 logging.debug("Reported RAPI version %s", master_version)
345 return master_version == constants.RAPI_VERSION
349 """Parse the command line options.
351 @return: (options, args) as from OptionParser.parse_args()
354 parser = OptionParser(description="Ganeti cluster watcher",
356 version="%%prog (ganeti) %s" %
357 constants.RELEASE_VERSION)
359 parser.add_option(cli.DEBUG_OPT)
360 parser.add_option(cli.NODEGROUP_OPT)
361 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362 help="Autoarchive jobs older than this age (default"
364 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365 action="store_true", help="Ignore cluster pause setting")
366 parser.add_option("--wait-children", dest="wait_children", default=False,
367 action="store_true", help="Wait for child processes")
368 options, args = parser.parse_args()
369 options.job_age = cli.ParseTimespec(options.job_age)
372 parser.error("No arguments expected")
374 return (options, args)
377 def _WriteInstanceStatus(filename, data):
378 """Writes the per-group instance status file.
380 The entries are sorted.
382 @type filename: string
383 @param filename: Path to instance status file
384 @type data: list of tuple; (instance name as string, status as string)
385 @param data: Instance name and status
388 logging.debug("Updating instance status file '%s' with %s instances",
391 utils.WriteFile(filename,
392 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
396 def _UpdateInstanceStatus(filename, instances):
397 """Writes an instance status file from L{Instance} objects.
399 @type filename: string
400 @param filename: Path to status file
401 @type instances: list of L{Instance}
404 _WriteInstanceStatus(filename, [(inst.name, inst.status)
405 for inst in instances])
409 """Helper to store file handle's C{fstat}.
413 """Initializes this class.
418 def __call__(self, fh):
419 """Calls C{fstat} on file handle.
422 self.st = os.fstat(fh.fileno())
425 def _ReadInstanceStatus(filename):
426 """Reads an instance status file.
428 @type filename: string
429 @param filename: Path to status file
430 @rtype: tuple; (None or number, list of lists containing instance name and
432 @return: File's mtime and instance status contained in the file; mtime is
433 C{None} if file can't be read
436 logging.debug("Reading per-group instance status from '%s'", filename)
440 content = utils.ReadFile(filename, preread=statcb)
441 except EnvironmentError, err:
442 if err.errno == errno.ENOENT:
443 logging.error("Can't read '%s', does not exist (yet)", filename)
445 logging.exception("Unable to read '%s', ignoring", filename)
448 return (statcb.st.st_mtime, [line.split(1)
449 for line in content.splitlines()])
452 def _MergeInstanceStatus(filename, pergroup_filename, groups):
453 """Merges all per-group instance status files into a global one.
455 @type filename: string
456 @param filename: Path to global instance status file
457 @type pergroup_filename: string
458 @param pergroup_filename: Path to per-group status files, must contain "%s"
459 to be replaced with group UUID
460 @type groups: sequence
461 @param groups: UUIDs of known groups
464 # Lock global status file in exclusive mode
465 lock = utils.FileLock.Open(filename)
467 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
468 except errors.LockError, err:
469 # All per-group processes will lock and update the file. None of them
470 # should take longer than 10 seconds (the value of
471 # INSTANCE_STATUS_LOCK_TIMEOUT).
472 logging.error("Can't acquire lock on instance status file '%s', not"
473 " updating: %s", filename, err)
476 logging.debug("Acquired exclusive lock on '%s'", filename)
480 # Load instance status from all groups
481 for group_uuid in groups:
482 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
484 if mtime is not None:
485 for (instance_name, status) in instdata:
486 data.setdefault(instance_name, []).append((mtime, status))
488 # Select last update based on file mtime
489 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
490 for (instance_name, status) in data.items()]
492 # Write the global status file. Don't touch file after it's been
493 # updated--there is no lock anymore.
494 _WriteInstanceStatus(filename, inststatus)
497 def GetLuxiClient(try_restart):
498 """Tries to connect to the master daemon.
500 @type try_restart: bool
501 @param try_restart: Whether to attempt to restart the master daemon
505 return cli.GetClient()
506 except errors.OpPrereqError, err:
507 # this is, from cli.GetClient, a not-master case
508 raise NotMasterError("Not on master node (%s)" % err)
510 except luxi.NoMasterError, err:
514 logging.warning("Master daemon seems to be down (%s), trying to restart",
517 if not utils.EnsureDaemon(constants.MASTERD):
518 raise errors.GenericError("Can't start the master daemon")
520 # Retry the connection
521 return cli.GetClient()
524 def _StartGroupChildren(cl, wait):
525 """Starts a new instance of the watcher for every node group.
528 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
531 result = cl.QueryGroups([], ["name", "uuid"], False)
535 for (idx, (name, uuid)) in enumerate(result):
536 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
539 # Let's not kill the system
540 time.sleep(CHILD_PROCESS_DELAY)
542 logging.debug("Spawning child for group '%s' (%s), arguments %s",
546 # TODO: Should utils.StartDaemon be used instead?
547 pid = os.spawnv(os.P_NOWAIT, args[0], args)
548 except Exception: # pylint: disable-msg=W0703
549 logging.exception("Failed to start child for group '%s' (%s)",
552 logging.debug("Started with PID %s", pid)
557 logging.debug("Waiting for child PID %s", pid)
559 result = utils.RetryOnSignal(os.waitpid, pid, 0)
560 except EnvironmentError, err:
563 logging.debug("Child PID %s exited with status %s", pid, result)
566 def _ArchiveJobs(cl, age):
567 """Archives old jobs.
570 (arch_count, left_count) = cl.AutoArchiveJobs(age)
571 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
574 def _CheckMaster(cl):
575 """Ensures current host is master node.
578 (master, ) = cl.QueryConfigValues(["master_node"])
579 if master != netutils.Hostname.GetSysName():
580 raise NotMasterError("This is not the master node")
583 @rapi.client.UsesRapiClient
584 def _GlobalWatcher(opts):
585 """Main function for global watcher.
587 At the end child processes are spawned for every node group.
593 # Run node maintenance in all cases, even if master, so that old masters can
594 # be properly cleaned up
595 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
596 nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
599 client = GetLuxiClient(True)
600 except NotMasterError:
601 # Don't proceed on non-master nodes
602 return constants.EXIT_SUCCESS
604 # we are on master now
605 utils.EnsureDaemon(constants.RAPI)
607 # If RAPI isn't responding to queries, try one restart
608 logging.debug("Attempting to talk to remote API on %s",
609 constants.IP4_ADDRESS_LOCALHOST)
610 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
611 logging.warning("Couldn't get answer from remote API, restaring daemon")
612 utils.StopDaemon(constants.RAPI)
613 utils.EnsureDaemon(constants.RAPI)
614 logging.debug("Second attempt to talk to remote API")
615 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616 logging.fatal("RAPI is not responding")
617 logging.debug("Successfully talked to remote API")
620 _ArchiveJobs(client, opts.job_age)
622 # Spawn child processes for all node groups
623 _StartGroupChildren(client, opts.wait_children)
625 return constants.EXIT_SUCCESS
628 def _GetGroupData(cl, uuid):
629 """Retrieves instances and nodes per node group.
632 # TODO: Implement locking
634 # Get all primary instances in group
635 opcodes.OpQuery(what=constants.QR_INSTANCE,
636 fields=["name", "status", "admin_state", "snodes",
637 "pnode.group.uuid", "snodes.group.uuid"],
638 filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
640 # Get all nodes in group
641 opcodes.OpQuery(what=constants.QR_NODE,
642 fields=["name", "bootid", "offline"],
643 filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
646 job_id = cl.SubmitJob(job)
647 results = map(objects.QueryResponse.FromDict,
648 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
649 cl.ArchiveJob(job_id)
651 results_data = map(operator.attrgetter("data"), results)
653 # Ensure results are tuples with two values
654 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
656 # Extract values ignoring result status
657 (raw_instances, raw_nodes) = [[map(compat.snd, values)
659 for res in results_data]
665 for (name, status, autostart, snodes, pnode_group_uuid,
666 snodes_group_uuid) in raw_instances:
667 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
668 logging.error("Ignoring split instance '%s', primary group %s, secondary"
669 " groups %s", name, pnode_group_uuid,
670 utils.CommaJoin(snodes_group_uuid))
672 instances.append(Instance(name, status, autostart, snodes))
675 secondaries.setdefault(node, set()).add(name)
678 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
679 for (name, bootid, offline) in raw_nodes]
681 return (dict((node.name, node) for node in nodes),
682 dict((inst.name, inst) for inst in instances))
685 def _LoadKnownGroups():
686 """Returns a list of all node groups known by L{ssconf}.
689 groups = ssconf.SimpleStore().GetNodegroupList()
691 result = list(line.split(None, 1)[0] for line in groups
694 if not compat.all(map(utils.UUID_RE.match, result)):
695 raise errors.GenericError("Ssconf contains invalid group UUID")
700 def _GroupWatcher(opts):
701 """Main function for per-group watcher process.
704 group_uuid = opts.nodegroup.lower()
706 if not utils.UUID_RE.match(group_uuid):
707 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
709 (cli.NODEGROUP_OPT_NAME, group_uuid))
711 logging.info("Watcher for node group '%s'", group_uuid)
713 known_groups = _LoadKnownGroups()
715 # Check if node group is known
716 if group_uuid not in known_groups:
717 raise errors.GenericError("Node group '%s' is not known by ssconf" %
720 # Group UUID has been verified and should not contain any dangerous characters
721 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
722 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
724 logging.debug("Using state file %s", state_path)
727 statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
729 return constants.EXIT_FAILURE
731 notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
733 # Connect to master daemon
734 client = GetLuxiClient(False)
738 (nodes, instances) = _GetGroupData(client, group_uuid)
740 # Update per-group instance status file
741 _UpdateInstanceStatus(inst_status_path, instances.values())
743 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
744 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
747 started = _CheckInstances(client, notepad, instances)
748 _CheckDisks(client, notepad, nodes, instances, started)
749 _VerifyDisks(client, group_uuid, nodes, instances)
750 except Exception, err:
751 logging.info("Not updating status file due to failure: %s", err)
754 # Save changes for next run
755 notepad.Save(state_path)
757 return constants.EXIT_SUCCESS
764 (options, _) = ParseOptions()
766 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
767 debug=options.debug, stderr_logging=options.debug)
769 if ShouldPause() and not options.ignore_pause:
770 logging.debug("Pause has been set, exiting")
771 return constants.EXIT_SUCCESS
773 # Try to acquire global watcher lock in shared mode
774 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
776 lock.Shared(blocking=False)
777 except (EnvironmentError, errors.LockError), err:
778 logging.error("Can't acquire lock on %s: %s",
779 constants.WATCHER_LOCK_FILE, err)
780 return constants.EXIT_SUCCESS
782 if options.nodegroup is None:
785 # Per-nodegroup watcher
790 except (SystemExit, KeyboardInterrupt):
792 except NotMasterError:
793 logging.debug("Not master, exiting")
794 return constants.EXIT_NOTMASTER
795 except errors.ResolverError, err:
796 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
797 return constants.EXIT_NODESETUP_ERROR
798 except errors.JobQueueFull:
799 logging.error("Job queue is full, can't query cluster state")
800 except errors.JobQueueDrainError:
801 logging.error("Job queue is drained, can't maintain cluster state")
802 except Exception, err:
803 logging.exception(str(err))
804 return constants.EXIT_FAILURE
806 return constants.EXIT_SUCCESS