4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
30 # pylint: disable-msg=C0103,W0142
32 # C0103: Invalid name ganeti-watcher
38 from optparse import OptionParser
40 from ganeti import utils
41 from ganeti import constants
42 from ganeti import serializer
43 from ganeti import errors
44 from ganeti import opcodes
45 from ganeti import cli
46 from ganeti import luxi
47 from ganeti import ssconf
48 from ganeti import bdev
49 from ganeti import hypervisor
50 from ganeti import rapi
51 from ganeti.confd import client as confd_client
53 import ganeti.rapi.client # pylint: disable-msg=W0611
57 BAD_STATES = ['ERROR_down']
58 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
61 KEY_RESTART_COUNT = "restart_count"
62 KEY_RESTART_WHEN = "restart_when"
63 KEY_BOOT_ID = "bootid"
66 # Global client object
70 class NotMasterError(errors.GenericError):
71 """Exception raised when this host is not the master."""
75 """Check whether we should pause.
78 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
81 def StartNodeDaemons():
82 """Start all the daemons that should be running on all nodes.
85 # on master or not, try to start the node daemon
86 utils.EnsureDaemon(constants.NODED)
87 # start confd as well. On non candidates it will be in disabled mode.
88 utils.EnsureDaemon(constants.CONFD)
91 def RunWatcherHooks():
92 """Run the watcher hooks.
95 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
96 constants.HOOKS_NAME_WATCHER)
97 if not os.path.isdir(hooks_dir):
101 results = utils.RunParts(hooks_dir)
102 except Exception, msg: # pylint: disable-msg=W0703
103 logging.critical("RunParts %s failed: %s", hooks_dir, msg)
105 for (relname, status, runresult) in results:
106 if status == constants.RUNPARTS_SKIP:
107 logging.debug("Watcher hook %s: skipped", relname)
108 elif status == constants.RUNPARTS_ERR:
109 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110 elif status == constants.RUNPARTS_RUN:
112 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113 relname, runresult.exit_code, runresult.output)
115 logging.debug("Watcher hook %s: success (output: %s)", relname,
119 class NodeMaintenance(object):
120 """Talks to confd daemons and possible shutdown instances/drbd devices.
124 self.store_cb = confd_client.StoreResultCallback()
125 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
126 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
130 """Checks whether node maintenance should run.
134 return ssconf.SimpleStore().GetMaintainNodeHealth()
135 except errors.ConfigurationError, err:
136 logging.error("Configuration error, not activating node maintenance: %s",
141 def GetRunningInstances():
142 """Compute list of hypervisor/running instances.
145 hyp_list = ssconf.SimpleStore().GetHypervisorList()
147 for hv_name in hyp_list:
149 hv = hypervisor.GetHypervisor(hv_name)
150 ilist = hv.ListInstances()
151 results.extend([(iname, hv_name) for iname in ilist])
152 except: # pylint: disable-msg=W0702
153 logging.error("Error while listing instances for hypervisor %s",
154 hv_name, exc_info=True)
159 """Get list of used DRBD minors.
162 return bdev.DRBD8.GetUsedDevs().keys()
165 def DoMaintenance(cls, role):
166 """Maintain the instance list.
169 if role == constants.CONFD_NODE_ROLE_OFFLINE:
170 inst_running = cls.GetRunningInstances()
171 cls.ShutdownInstances(inst_running)
172 drbd_running = cls.GetUsedDRBDs()
173 cls.ShutdownDRBD(drbd_running)
175 logging.debug("Not doing anything for role %s", role)
178 def ShutdownInstances(inst_running):
179 """Shutdown running instances.
182 names_running = set([i[0] for i in inst_running])
184 logging.info("Following instances should not be running,"
185 " shutting them down: %s", utils.CommaJoin(names_running))
186 # this dictionary will collapse duplicate instance names (only
187 # xen pvm/vhm) into a single key, which is fine
188 i2h = dict(inst_running)
189 for name in names_running:
191 hv = hypervisor.GetHypervisor(hv_name)
192 hv.StopInstance(None, force=True, name=name)
195 def ShutdownDRBD(drbd_running):
196 """Shutdown active DRBD devices.
200 logging.info("Following DRBD minors should not be active,"
201 " shutting them down: %s", utils.CommaJoin(drbd_running))
202 for minor in drbd_running:
203 # pylint: disable-msg=W0212
204 # using the private method as is, pending enhancements to the DRBD
206 bdev.DRBD8._ShutdownAll(minor)
209 """Check node status versus cluster desired state.
212 my_name = utils.HostInfo().name
213 req = confd_client.ConfdClientRequest(type=
214 constants.CONFD_REQ_NODE_ROLE_BYNAME,
216 self.confd_client.SendRequest(req, async=False, coverage=-1)
217 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
219 # should have a valid response
220 status, result = self.store_cb.GetResponse(req.rsalt)
221 assert status, "Missing result but received replies"
222 if not self.filter_cb.consistent[req.rsalt]:
223 logging.warning("Inconsistent replies, not doing anything")
225 self.DoMaintenance(result.server_reply.answer)
227 logging.warning("Confd query timed out, cannot do maintenance actions")
230 class WatcherState(object):
231 """Interface to a state file recording restart attempts.
234 def __init__(self, statefile):
235 """Open, lock, read and parse the file.
237 @type statefile: file
238 @param statefile: State file object
241 self.statefile = statefile
244 state_data = self.statefile.read()
248 self._data = serializer.Load(state_data)
249 except Exception, msg: # pylint: disable-msg=W0703
250 # Ignore errors while loading the file and treat it as empty
252 logging.warning(("Invalid state file. Using defaults."
253 " Error message: %s"), msg)
255 if "instance" not in self._data:
256 self._data["instance"] = {}
257 if "node" not in self._data:
258 self._data["node"] = {}
260 self._orig_data = serializer.Dump(self._data)
263 """Save state to file, then unlock and close it.
266 assert self.statefile
268 serialized_form = serializer.Dump(self._data)
269 if self._orig_data == serialized_form:
270 logging.debug("Data didn't change, just touching status file")
271 os.utime(constants.WATCHER_STATEFILE, None)
274 # We need to make sure the file is locked before renaming it, otherwise
275 # starting ganeti-watcher again at the same time will create a conflict.
276 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
277 data=serialized_form,
278 prewrite=utils.LockFile, close=False)
279 self.statefile = os.fdopen(fd, 'w+')
282 """Unlock configuration file and close it.
285 assert self.statefile
287 # Files are automatically unlocked when closing them
288 self.statefile.close()
289 self.statefile = None
291 def GetNodeBootID(self, name):
292 """Returns the last boot ID of a node or None.
295 ndata = self._data["node"]
297 if name in ndata and KEY_BOOT_ID in ndata[name]:
298 return ndata[name][KEY_BOOT_ID]
301 def SetNodeBootID(self, name, bootid):
302 """Sets the boot ID of a node.
307 ndata = self._data["node"]
309 if name not in ndata:
312 ndata[name][KEY_BOOT_ID] = bootid
314 def NumberOfRestartAttempts(self, instance):
315 """Returns number of previous restart attempts.
317 @type instance: L{Instance}
318 @param instance: the instance to look up
321 idata = self._data["instance"]
323 if instance.name in idata:
324 return idata[instance.name][KEY_RESTART_COUNT]
328 def RecordRestartAttempt(self, instance):
329 """Record a restart attempt.
331 @type instance: L{Instance}
332 @param instance: the instance being restarted
335 idata = self._data["instance"]
337 if instance.name not in idata:
338 inst = idata[instance.name] = {}
340 inst = idata[instance.name]
342 inst[KEY_RESTART_WHEN] = time.time()
343 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
345 def RemoveInstance(self, instance):
346 """Update state to reflect that a machine is running.
348 This method removes the record for a named instance (as we only
349 track down instances).
351 @type instance: L{Instance}
352 @param instance: the instance to remove from books
355 idata = self._data["instance"]
357 if instance.name in idata:
358 del idata[instance.name]
361 class Instance(object):
362 """Abstraction for a Virtual Machine instance.
365 def __init__(self, name, state, autostart):
368 self.autostart = autostart
371 """Encapsulates the start of an instance.
374 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
375 cli.SubmitOpCode(op, cl=client)
377 def ActivateDisks(self):
378 """Encapsulates the activation of all disks of an instance.
381 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
382 cli.SubmitOpCode(op, cl=client)
385 def GetClusterData():
386 """Get a list of instances on this cluster.
389 op1_fields = ["name", "status", "admin_state", "snodes"]
390 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
392 op2_fields = ["name", "bootid", "offline"]
393 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
396 job_id = client.SubmitJob([op1, op2])
398 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
400 logging.debug("Got data from cluster, writing instance status file")
402 result = all_results[0]
408 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
409 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
411 for fields in result:
412 (name, status, autostart, snodes) = fields
414 # update the secondary node map
418 smap[node].append(name)
420 instances[name] = Instance(name, status, autostart)
422 nodes = dict([(name, (bootid, offline))
423 for name, bootid, offline in all_results[1]])
425 client.ArchiveJob(job_id)
427 return instances, nodes, smap
430 class Watcher(object):
431 """Encapsulate the logic for restarting erroneously halted virtual machines.
433 The calling program should periodically instantiate me and call Run().
434 This will traverse the list of instances, and make up to MAXTRIES attempts
435 to restart machines that are down.
438 def __init__(self, opts, notepad):
439 self.notepad = notepad
440 master = client.QueryConfigValues(["master_node"])[0]
441 if master != utils.HostInfo().name:
442 raise NotMasterError("This is not the master node")
443 # first archive old jobs
444 self.ArchiveJobs(opts.job_age)
445 # and only then submit new ones
446 self.instances, self.bootids, self.smap = GetClusterData()
447 self.started_instances = set()
451 """Watcher run sequence.
454 notepad = self.notepad
455 self.CheckInstances(notepad)
456 self.CheckDisks(notepad)
460 def ArchiveJobs(age):
464 arch_count, left_count = client.AutoArchiveJobs(age)
465 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
467 def CheckDisks(self, notepad):
468 """Check all nodes for restarted ones.
472 for name, (new_id, offline) in self.bootids.iteritems():
473 old = notepad.GetNodeBootID(name)
475 # Bad node, not returning a boot id
477 logging.debug("Node %s missing boot id, skipping secondary checks",
481 # Node's boot ID has changed, proably through a reboot.
482 check_nodes.append(name)
485 # Activate disks for all instances with any of the checked nodes as a
487 for node in check_nodes:
488 if node not in self.smap:
490 for instance_name in self.smap[node]:
491 instance = self.instances[instance_name]
492 if not instance.autostart:
493 logging.info(("Skipping disk activation for non-autostart"
494 " instance %s"), instance.name)
496 if instance.name in self.started_instances:
497 # we already tried to start the instance, which should have
498 # activated its drives (if they can be at all)
501 logging.info("Activating disks for instance %s", instance.name)
502 instance.ActivateDisks()
503 except Exception: # pylint: disable-msg=W0703
504 logging.exception("Error while activating disks for instance %s",
507 # Keep changed boot IDs
508 for name in check_nodes:
509 notepad.SetNodeBootID(name, self.bootids[name][0])
511 def CheckInstances(self, notepad):
512 """Make a pass over the list of instances, restarting downed ones.
515 for instance in self.instances.values():
516 if instance.state in BAD_STATES:
517 n = notepad.NumberOfRestartAttempts(instance)
523 last = " (Attempt #%d)" % (n + 1)
525 notepad.RecordRestartAttempt(instance)
526 logging.error("Could not restart %s after %d attempts, giving up",
527 instance.name, MAXTRIES)
530 logging.info("Restarting %s%s",
533 self.started_instances.add(instance.name)
534 except Exception: # pylint: disable-msg=W0703
535 logging.exception("Error while restarting instance %s",
538 notepad.RecordRestartAttempt(instance)
539 elif instance.state in HELPLESS_STATES:
540 if notepad.NumberOfRestartAttempts(instance):
541 notepad.RemoveInstance(instance)
543 if notepad.NumberOfRestartAttempts(instance):
544 notepad.RemoveInstance(instance)
545 logging.info("Restart of %s succeeded", instance.name)
549 """Run gnt-cluster verify-disks.
552 op = opcodes.OpVerifyDisks()
553 job_id = client.SubmitJob([op])
554 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
555 client.ArchiveJob(job_id)
556 if not isinstance(result, (tuple, list)):
557 logging.error("Can't get a valid result from verify-disks")
559 offline_disk_instances = result[2]
560 if not offline_disk_instances:
563 logging.debug("Will activate disks for instances %s",
564 utils.CommaJoin(offline_disk_instances))
565 # we submit only one job, and wait for it. not optimal, but spams
567 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
568 for name in offline_disk_instances]
569 job_id = cli.SendJob(job, cl=client)
572 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
573 except Exception: # pylint: disable-msg=W0703
574 logging.exception("Error while activating disks")
577 def OpenStateFile(path):
578 """Opens the state file and acquires a lock on it.
581 @param path: Path to state file
584 # The two-step dance below is necessary to allow both opening existing
585 # file read/write and creating if not existing. Vanilla open will truncate
586 # an existing file -or- allow creating if not existing.
587 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
589 # Try to acquire lock on state file. If this fails, another watcher instance
590 # might already be running or another program is temporarily blocking the
591 # watcher from running.
593 utils.LockFile(statefile_fd)
594 except errors.LockError, err:
595 logging.error("Can't acquire lock on state file %s: %s", path, err)
598 return os.fdopen(statefile_fd, "w+")
601 def IsRapiResponding(hostname):
602 """Connects to RAPI port and does a simple test.
604 Connects to RAPI port of hostname and does a simple test. At this time, the
607 @type hostname: string
608 @param hostname: hostname of the node to connect to.
610 @return: Whether RAPI is working properly
613 ssl_config = rapi.client.CertAuthorityVerify(constants.RAPI_CERT_FILE)
615 rapi.client.GanetiRapiClient(hostname,
616 config_ssl_verification=ssl_config)
618 master_version = rapi_client.GetVersion()
619 except rapi.client.CertificateError, err:
620 logging.warning("RAPI Error: CertificateError (%s)", err)
622 except rapi.client.GanetiApiError, err:
623 logging.warning("RAPI Error: GanetiApiError (%s)", err)
625 logging.debug("RAPI Result: master_version is %s", master_version)
626 return master_version == constants.RAPI_VERSION
630 """Parse the command line options.
632 @return: (options, args) as from OptionParser.parse_args()
635 parser = OptionParser(description="Ganeti cluster watcher",
637 version="%%prog (ganeti) %s" %
638 constants.RELEASE_VERSION)
640 parser.add_option(cli.DEBUG_OPT)
641 parser.add_option("-A", "--job-age", dest="job_age",
642 help="Autoarchive jobs older than this age (default"
643 " 6 hours)", default=6*3600)
644 options, args = parser.parse_args()
645 options.job_age = cli.ParseTimespec(options.job_age)
653 global client # pylint: disable-msg=W0603
655 options, args = ParseOptions()
657 if args: # watcher doesn't take any arguments
658 print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
659 sys.exit(constants.EXIT_FAILURE)
661 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
662 stderr_logging=options.debug)
665 logging.debug("Pause has been set, exiting")
666 sys.exit(constants.EXIT_SUCCESS)
668 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
670 sys.exit(constants.EXIT_FAILURE)
676 # run node maintenance in all cases, even if master, so that old
677 # masters can be properly cleaned up too
678 if NodeMaintenance.ShouldRun():
679 NodeMaintenance().Exec()
681 notepad = WatcherState(statefile)
684 client = cli.GetClient()
685 except errors.OpPrereqError:
686 # this is, from cli.GetClient, a not-master case
687 logging.debug("Not on master, exiting")
689 sys.exit(constants.EXIT_SUCCESS)
690 except luxi.NoMasterError, err:
691 logging.warning("Master seems to be down (%s), trying to restart",
693 if not utils.EnsureDaemon(constants.MASTERD):
694 logging.critical("Can't start the master, exiting")
695 sys.exit(constants.EXIT_FAILURE)
696 # else retry the connection
697 client = cli.GetClient()
699 # we are on master now
700 utils.EnsureDaemon(constants.RAPI)
702 # If RAPI isn't responding to queries, try one restart.
703 logging.debug("Attempting to talk with RAPI.")
704 if not IsRapiResponding(constants.LOCALHOST_IP_ADDRESS):
705 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
706 " Restarting Ganeti RAPI.")
707 utils.StopDaemon(constants.RAPI)
708 utils.EnsureDaemon(constants.RAPI)
709 logging.debug("Second attempt to talk with RAPI")
710 if not IsRapiResponding(constants.LOCALHOST_IP_ADDRESS):
711 logging.fatal("RAPI is not responding. Please investigate.")
712 logging.debug("Successfully talked to RAPI.")
715 watcher = Watcher(options, notepad)
716 except errors.ConfigurationError:
717 # Just exit if there's no configuration
719 sys.exit(constants.EXIT_SUCCESS)
728 logging.debug("Not updating status file due to failure")
731 except NotMasterError:
732 logging.debug("Not master, exiting")
733 sys.exit(constants.EXIT_NOTMASTER)
734 except errors.ResolverError, err:
735 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
736 sys.exit(constants.EXIT_NODESETUP_ERROR)
737 except errors.JobQueueFull:
738 logging.error("Job queue is full, can't query cluster state")
739 except errors.JobQueueDrainError:
740 logging.error("Job queue is drained, can't maintain cluster state")
741 except Exception, err:
742 logging.exception(str(err))
743 sys.exit(constants.EXIT_FAILURE)
746 if __name__ == '__main__':