watcher: improve logging a bit
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 # pylint: disable-msg=C0103,W0142
31
32 # C0103: Invalid name ganeti-watcher
33
34 import os
35 import os.path
36 import sys
37 import time
38 import logging
39 from optparse import OptionParser
40
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import serializer
45 from ganeti import errors
46 from ganeti import opcodes
47 from ganeti import cli
48 from ganeti import luxi
49 from ganeti import ssconf
50 from ganeti import bdev
51 from ganeti import hypervisor
52 from ganeti import rapi
53 from ganeti.confd import client as confd_client
54 from ganeti import netutils
55
56 import ganeti.rapi.client # pylint: disable-msg=W0611
57
58
59 MAXTRIES = 5
60 # Delete any record that is older than 8 hours; this value is based on
61 # the fact that the current retry counter is 5, and watcher runs every
62 # 5 minutes, so it takes around half an hour to exceed the retry
63 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64 RETRY_EXPIRATION = 8 * 3600
65 BAD_STATES = [constants.INSTST_ERRORDOWN]
66 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
67 NOTICE = 'NOTICE'
68 ERROR = 'ERROR'
69 KEY_RESTART_COUNT = "restart_count"
70 KEY_RESTART_WHEN = "restart_when"
71 KEY_BOOT_ID = "bootid"
72
73
74 # Global client object
75 client = None
76
77
78 class NotMasterError(errors.GenericError):
79   """Exception raised when this host is not the master."""
80
81
82 def ShouldPause():
83   """Check whether we should pause.
84
85   """
86   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87
88
89 def StartNodeDaemons():
90   """Start all the daemons that should be running on all nodes.
91
92   """
93   # on master or not, try to start the node daemon
94   utils.EnsureDaemon(constants.NODED)
95   # start confd as well. On non candidates it will be in disabled mode.
96   utils.EnsureDaemon(constants.CONFD)
97
98
99 def RunWatcherHooks():
100   """Run the watcher hooks.
101
102   """
103   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104                              constants.HOOKS_NAME_WATCHER)
105   if not os.path.isdir(hooks_dir):
106     return
107
108   try:
109     results = utils.RunParts(hooks_dir)
110   except Exception, msg: # pylint: disable-msg=W0703
111     logging.critical("RunParts %s failed: %s", hooks_dir, msg)
112
113   for (relname, status, runresult) in results:
114     if status == constants.RUNPARTS_SKIP:
115       logging.debug("Watcher hook %s: skipped", relname)
116     elif status == constants.RUNPARTS_ERR:
117       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118     elif status == constants.RUNPARTS_RUN:
119       if runresult.failed:
120         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121                         relname, runresult.exit_code, runresult.output)
122       else:
123         logging.debug("Watcher hook %s: success (output: %s)", relname,
124                       runresult.output)
125
126
127 class NodeMaintenance(object):
128   """Talks to confd daemons and possible shutdown instances/drbd devices.
129
130   """
131   def __init__(self):
132     self.store_cb = confd_client.StoreResultCallback()
133     self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
134     self.confd_client = confd_client.GetConfdClient(self.filter_cb)
135
136   @staticmethod
137   def ShouldRun():
138     """Checks whether node maintenance should run.
139
140     """
141     try:
142       return ssconf.SimpleStore().GetMaintainNodeHealth()
143     except errors.ConfigurationError, err:
144       logging.error("Configuration error, not activating node maintenance: %s",
145                     err)
146       return False
147
148   @staticmethod
149   def GetRunningInstances():
150     """Compute list of hypervisor/running instances.
151
152     """
153     hyp_list = ssconf.SimpleStore().GetHypervisorList()
154     results = []
155     for hv_name in hyp_list:
156       try:
157         hv = hypervisor.GetHypervisor(hv_name)
158         ilist = hv.ListInstances()
159         results.extend([(iname, hv_name) for iname in ilist])
160       except: # pylint: disable-msg=W0702
161         logging.error("Error while listing instances for hypervisor %s",
162                       hv_name, exc_info=True)
163     return results
164
165   @staticmethod
166   def GetUsedDRBDs():
167     """Get list of used DRBD minors.
168
169     """
170     return bdev.DRBD8.GetUsedDevs().keys()
171
172   @classmethod
173   def DoMaintenance(cls, role):
174     """Maintain the instance list.
175
176     """
177     if role == constants.CONFD_NODE_ROLE_OFFLINE:
178       inst_running = cls.GetRunningInstances()
179       cls.ShutdownInstances(inst_running)
180       drbd_running = cls.GetUsedDRBDs()
181       cls.ShutdownDRBD(drbd_running)
182     else:
183       logging.debug("Not doing anything for role %s", role)
184
185   @staticmethod
186   def ShutdownInstances(inst_running):
187     """Shutdown running instances.
188
189     """
190     names_running = set([i[0] for i in inst_running])
191     if names_running:
192       logging.info("Following instances should not be running,"
193                    " shutting them down: %s", utils.CommaJoin(names_running))
194       # this dictionary will collapse duplicate instance names (only
195       # xen pvm/vhm) into a single key, which is fine
196       i2h = dict(inst_running)
197       for name in names_running:
198         hv_name = i2h[name]
199         hv = hypervisor.GetHypervisor(hv_name)
200         hv.StopInstance(None, force=True, name=name)
201
202   @staticmethod
203   def ShutdownDRBD(drbd_running):
204     """Shutdown active DRBD devices.
205
206     """
207     if drbd_running:
208       logging.info("Following DRBD minors should not be active,"
209                    " shutting them down: %s", utils.CommaJoin(drbd_running))
210       for minor in drbd_running:
211         # pylint: disable-msg=W0212
212         # using the private method as is, pending enhancements to the DRBD
213         # interface
214         bdev.DRBD8._ShutdownAll(minor)
215
216   def Exec(self):
217     """Check node status versus cluster desired state.
218
219     """
220     my_name = netutils.Hostname.GetSysName()
221     req = confd_client.ConfdClientRequest(type=
222                                           constants.CONFD_REQ_NODE_ROLE_BYNAME,
223                                           query=my_name)
224     self.confd_client.SendRequest(req, async=False, coverage=-1)
225     timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
226     if not timed_out:
227       # should have a valid response
228       status, result = self.store_cb.GetResponse(req.rsalt)
229       assert status, "Missing result but received replies"
230       if not self.filter_cb.consistent[req.rsalt]:
231         logging.warning("Inconsistent replies, not doing anything")
232         return
233       self.DoMaintenance(result.server_reply.answer)
234     else:
235       logging.warning("Confd query timed out, cannot do maintenance actions")
236
237
238 class WatcherState(object):
239   """Interface to a state file recording restart attempts.
240
241   """
242   def __init__(self, statefile):
243     """Open, lock, read and parse the file.
244
245     @type statefile: file
246     @param statefile: State file object
247
248     """
249     self.statefile = statefile
250
251     try:
252       state_data = self.statefile.read()
253       if not state_data:
254         self._data = {}
255       else:
256         self._data = serializer.Load(state_data)
257     except Exception, msg: # pylint: disable-msg=W0703
258       # Ignore errors while loading the file and treat it as empty
259       self._data = {}
260       logging.warning(("Invalid state file. Using defaults."
261                        " Error message: %s"), msg)
262
263     if "instance" not in self._data:
264       self._data["instance"] = {}
265     if "node" not in self._data:
266       self._data["node"] = {}
267
268     self._orig_data = serializer.Dump(self._data)
269
270   def Save(self):
271     """Save state to file, then unlock and close it.
272
273     """
274     assert self.statefile
275
276     serialized_form = serializer.Dump(self._data)
277     if self._orig_data == serialized_form:
278       logging.debug("Data didn't change, just touching status file")
279       os.utime(constants.WATCHER_STATEFILE, None)
280       return
281
282     # We need to make sure the file is locked before renaming it, otherwise
283     # starting ganeti-watcher again at the same time will create a conflict.
284     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
285                          data=serialized_form,
286                          prewrite=utils.LockFile, close=False)
287     self.statefile = os.fdopen(fd, 'w+')
288
289   def Close(self):
290     """Unlock configuration file and close it.
291
292     """
293     assert self.statefile
294
295     # Files are automatically unlocked when closing them
296     self.statefile.close()
297     self.statefile = None
298
299   def GetNodeBootID(self, name):
300     """Returns the last boot ID of a node or None.
301
302     """
303     ndata = self._data["node"]
304
305     if name in ndata and KEY_BOOT_ID in ndata[name]:
306       return ndata[name][KEY_BOOT_ID]
307     return None
308
309   def SetNodeBootID(self, name, bootid):
310     """Sets the boot ID of a node.
311
312     """
313     assert bootid
314
315     ndata = self._data["node"]
316
317     if name not in ndata:
318       ndata[name] = {}
319
320     ndata[name][KEY_BOOT_ID] = bootid
321
322   def NumberOfRestartAttempts(self, instance):
323     """Returns number of previous restart attempts.
324
325     @type instance: L{Instance}
326     @param instance: the instance to look up
327
328     """
329     idata = self._data["instance"]
330
331     if instance.name in idata:
332       return idata[instance.name][KEY_RESTART_COUNT]
333
334     return 0
335
336   def MaintainInstanceList(self, instances):
337     """Perform maintenance on the recorded instances.
338
339     @type instances: list of string
340     @param instances: the list of currently existing instances
341
342     """
343     idict = self._data["instance"]
344     # First, delete obsolete instances
345     obsolete_instances = set(idict).difference(instances)
346     for inst in obsolete_instances:
347       logging.debug("Forgetting obsolete instance %s", inst)
348       del idict[inst]
349
350     # Second, delete expired records
351     earliest = time.time() - RETRY_EXPIRATION
352     expired_instances = [i for i in idict
353                          if idict[i][KEY_RESTART_WHEN] < earliest]
354     for inst in expired_instances:
355       logging.debug("Expiring record for instance %s", inst)
356       del idict[inst]
357
358   def RecordRestartAttempt(self, instance):
359     """Record a restart attempt.
360
361     @type instance: L{Instance}
362     @param instance: the instance being restarted
363
364     """
365     idata = self._data["instance"]
366
367     if instance.name not in idata:
368       inst = idata[instance.name] = {}
369     else:
370       inst = idata[instance.name]
371
372     inst[KEY_RESTART_WHEN] = time.time()
373     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
374
375   def RemoveInstance(self, instance):
376     """Update state to reflect that a machine is running.
377
378     This method removes the record for a named instance (as we only
379     track down instances).
380
381     @type instance: L{Instance}
382     @param instance: the instance to remove from books
383
384     """
385     idata = self._data["instance"]
386
387     if instance.name in idata:
388       del idata[instance.name]
389
390
391 class Instance(object):
392   """Abstraction for a Virtual Machine instance.
393
394   """
395   def __init__(self, name, state, autostart, snodes):
396     self.name = name
397     self.state = state
398     self.autostart = autostart
399     self.snodes = snodes
400
401   def Restart(self):
402     """Encapsulates the start of an instance.
403
404     """
405     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
406     cli.SubmitOpCode(op, cl=client)
407
408   def ActivateDisks(self):
409     """Encapsulates the activation of all disks of an instance.
410
411     """
412     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
413     cli.SubmitOpCode(op, cl=client)
414
415
416 def GetClusterData():
417   """Get a list of instances on this cluster.
418
419   """
420   op1_fields = ["name", "status", "admin_state", "snodes"]
421   op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
422                                 use_locking=True)
423   op2_fields = ["name", "bootid", "offline"]
424   op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
425                             use_locking=True)
426
427   job_id = client.SubmitJob([op1, op2])
428
429   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430
431   logging.debug("Got data from cluster, writing instance status file")
432
433   result = all_results[0]
434   smap = {}
435
436   instances = {}
437
438   # write the upfile
439   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
440   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
441
442   for fields in result:
443     (name, status, autostart, snodes) = fields
444
445     # update the secondary node map
446     for node in snodes:
447       if node not in smap:
448         smap[node] = []
449       smap[node].append(name)
450
451     instances[name] = Instance(name, status, autostart, snodes)
452
453   nodes =  dict([(name, (bootid, offline))
454                  for name, bootid, offline in all_results[1]])
455
456   client.ArchiveJob(job_id)
457
458   return instances, nodes, smap
459
460
461 class Watcher(object):
462   """Encapsulate the logic for restarting erroneously halted virtual machines.
463
464   The calling program should periodically instantiate me and call Run().
465   This will traverse the list of instances, and make up to MAXTRIES attempts
466   to restart machines that are down.
467
468   """
469   def __init__(self, opts, notepad):
470     self.notepad = notepad
471     master = client.QueryConfigValues(["master_node"])[0]
472     if master != netutils.Hostname.GetSysName():
473       raise NotMasterError("This is not the master node")
474     # first archive old jobs
475     self.ArchiveJobs(opts.job_age)
476     # and only then submit new ones
477     self.instances, self.bootids, self.smap = GetClusterData()
478     self.started_instances = set()
479     self.opts = opts
480
481   def Run(self):
482     """Watcher run sequence.
483
484     """
485     notepad = self.notepad
486     self.CheckInstances(notepad)
487     self.CheckDisks(notepad)
488     self.VerifyDisks()
489
490   @staticmethod
491   def ArchiveJobs(age):
492     """Archive old jobs.
493
494     """
495     arch_count, left_count = client.AutoArchiveJobs(age)
496     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
497
498   def CheckDisks(self, notepad):
499     """Check all nodes for restarted ones.
500
501     """
502     check_nodes = []
503     for name, (new_id, offline) in self.bootids.iteritems():
504       old = notepad.GetNodeBootID(name)
505       if new_id is None:
506         # Bad node, not returning a boot id
507         if not offline:
508           logging.debug("Node %s missing boot id, skipping secondary checks",
509                         name)
510         continue
511       if old != new_id:
512         # Node's boot ID has changed, proably through a reboot.
513         check_nodes.append(name)
514
515     if check_nodes:
516       # Activate disks for all instances with any of the checked nodes as a
517       # secondary node.
518       for node in check_nodes:
519         if node not in self.smap:
520           continue
521         for instance_name in self.smap[node]:
522           instance = self.instances[instance_name]
523           if not instance.autostart:
524             logging.info(("Skipping disk activation for non-autostart"
525                           " instance %s"), instance.name)
526             continue
527           if instance.name in self.started_instances:
528             # we already tried to start the instance, which should have
529             # activated its drives (if they can be at all)
530             logging.debug("Skipping disk activation for instance %s, as"
531                           " it was already started", instance.name)
532             continue
533           try:
534             logging.info("Activating disks for instance %s", instance.name)
535             instance.ActivateDisks()
536           except Exception: # pylint: disable-msg=W0703
537             logging.exception("Error while activating disks for instance %s",
538                               instance.name)
539
540       # Keep changed boot IDs
541       for name in check_nodes:
542         notepad.SetNodeBootID(name, self.bootids[name][0])
543
544   def CheckInstances(self, notepad):
545     """Make a pass over the list of instances, restarting downed ones.
546
547     """
548     notepad.MaintainInstanceList(self.instances.keys())
549
550     for instance in self.instances.values():
551       if instance.state in BAD_STATES:
552         n = notepad.NumberOfRestartAttempts(instance)
553
554         if n > MAXTRIES:
555           logging.warning("Not restarting instance %s, retries exhausted",
556                           instance.name)
557           continue
558         elif n < MAXTRIES:
559           last = " (Attempt #%d)" % (n + 1)
560         else:
561           notepad.RecordRestartAttempt(instance)
562           logging.error("Could not restart %s after %d attempts, giving up",
563                         instance.name, MAXTRIES)
564           continue
565         try:
566           logging.info("Restarting %s%s", instance.name, last)
567           instance.Restart()
568           self.started_instances.add(instance.name)
569         except Exception: # pylint: disable-msg=W0703
570           logging.exception("Error while restarting instance %s",
571                             instance.name)
572
573         notepad.RecordRestartAttempt(instance)
574       elif instance.state in HELPLESS_STATES:
575         if notepad.NumberOfRestartAttempts(instance):
576           notepad.RemoveInstance(instance)
577       else:
578         if notepad.NumberOfRestartAttempts(instance):
579           notepad.RemoveInstance(instance)
580           logging.info("Restart of %s succeeded", instance.name)
581
582   def _CheckForOfflineNodes(self, instance):
583     """Checks if given instances has any secondary in offline status.
584
585     @param instance: The instance object
586     @return: True if any of the secondary is offline, False otherwise
587
588     """
589     bootids = []
590     for node in instance.snodes:
591       bootids.append(self.bootids[node])
592
593     return compat.any(offline for (_, offline) in bootids)
594
595   def VerifyDisks(self):
596     """Run gnt-cluster verify-disks.
597
598     """
599     op = opcodes.OpClusterVerifyDisks()
600     job_id = client.SubmitJob([op])
601     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602     client.ArchiveJob(job_id)
603     if not isinstance(result, (tuple, list)):
604       logging.error("Can't get a valid result from verify-disks")
605       return
606     offline_disk_instances = result[1]
607     if not offline_disk_instances:
608       # nothing to do
609       logging.debug("verify-disks reported no offline disks, nothing to do")
610       return
611     logging.debug("Will activate disks for instance(s) %s",
612                   utils.CommaJoin(offline_disk_instances))
613     # we submit only one job, and wait for it. not optimal, but spams
614     # less the job queue
615     job = []
616     for name in offline_disk_instances:
617       instance = self.instances[name]
618       if (instance.state in HELPLESS_STATES or
619           self._CheckForOfflineNodes(instance)):
620         logging.info("Skip instance %s because it is in helpless state or has"
621                      " one offline secondary", name)
622         continue
623       job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
624
625     if job:
626       job_id = cli.SendJob(job, cl=client)
627
628       try:
629         cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
630       except Exception: # pylint: disable-msg=W0703
631         logging.exception("Error while activating disks")
632
633
634 def OpenStateFile(path):
635   """Opens the state file and acquires a lock on it.
636
637   @type path: string
638   @param path: Path to state file
639
640   """
641   # The two-step dance below is necessary to allow both opening existing
642   # file read/write and creating if not existing. Vanilla open will truncate
643   # an existing file -or- allow creating if not existing.
644   statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
645
646   # Try to acquire lock on state file. If this fails, another watcher instance
647   # might already be running or another program is temporarily blocking the
648   # watcher from running.
649   try:
650     utils.LockFile(statefile_fd)
651   except errors.LockError, err:
652     logging.error("Can't acquire lock on state file %s: %s", path, err)
653     return None
654
655   return os.fdopen(statefile_fd, "w+")
656
657
658 def IsRapiResponding(hostname):
659   """Connects to RAPI port and does a simple test.
660
661   Connects to RAPI port of hostname and does a simple test. At this time, the
662   test is GetVersion.
663
664   @type hostname: string
665   @param hostname: hostname of the node to connect to.
666   @rtype: bool
667   @return: Whether RAPI is working properly
668
669   """
670   curl_config = rapi.client.GenericCurlConfig()
671   rapi_client = rapi.client.GanetiRapiClient(hostname,
672                                              curl_config_fn=curl_config)
673   try:
674     master_version = rapi_client.GetVersion()
675   except rapi.client.CertificateError, err:
676     logging.warning("RAPI Error: CertificateError (%s)", err)
677     return False
678   except rapi.client.GanetiApiError, err:
679     logging.warning("RAPI Error: GanetiApiError (%s)", err)
680     return False
681   logging.debug("RAPI Result: master_version is %s", master_version)
682   return master_version == constants.RAPI_VERSION
683
684
685 def ParseOptions():
686   """Parse the command line options.
687
688   @return: (options, args) as from OptionParser.parse_args()
689
690   """
691   parser = OptionParser(description="Ganeti cluster watcher",
692                         usage="%prog [-d]",
693                         version="%%prog (ganeti) %s" %
694                         constants.RELEASE_VERSION)
695
696   parser.add_option(cli.DEBUG_OPT)
697   parser.add_option("-A", "--job-age", dest="job_age",
698                     help="Autoarchive jobs older than this age (default"
699                     " 6 hours)", default=6*3600)
700   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
701                     action="store_true", help="Ignore cluster pause setting")
702   options, args = parser.parse_args()
703   options.job_age = cli.ParseTimespec(options.job_age)
704   return options, args
705
706
707 @rapi.client.UsesRapiClient
708 def Main():
709   """Main function.
710
711   """
712   global client # pylint: disable-msg=W0603
713
714   options, args = ParseOptions()
715
716   if args: # watcher doesn't take any arguments
717     print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
718     return constants.EXIT_FAILURE
719
720   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
721                      debug=options.debug, stderr_logging=options.debug)
722
723   if ShouldPause() and not options.ignore_pause:
724     logging.debug("Pause has been set, exiting")
725     return constants.EXIT_SUCCESS
726
727   statefile = OpenStateFile(constants.WATCHER_STATEFILE)
728   if not statefile:
729     return constants.EXIT_FAILURE
730
731   update_file = False
732   try:
733     StartNodeDaemons()
734     RunWatcherHooks()
735     # run node maintenance in all cases, even if master, so that old
736     # masters can be properly cleaned up too
737     if NodeMaintenance.ShouldRun():
738       NodeMaintenance().Exec()
739
740     notepad = WatcherState(statefile)
741     try:
742       try:
743         client = cli.GetClient()
744       except errors.OpPrereqError:
745         # this is, from cli.GetClient, a not-master case
746         logging.debug("Not on master, exiting")
747         update_file = True
748         return constants.EXIT_SUCCESS
749       except luxi.NoMasterError, err:
750         logging.warning("Master seems to be down (%s), trying to restart",
751                         str(err))
752         if not utils.EnsureDaemon(constants.MASTERD):
753           logging.critical("Can't start the master, exiting")
754           return constants.EXIT_FAILURE
755         # else retry the connection
756         client = cli.GetClient()
757
758       # we are on master now
759       utils.EnsureDaemon(constants.RAPI)
760
761       # If RAPI isn't responding to queries, try one restart.
762       logging.debug("Attempting to talk with RAPI.")
763       if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
764         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
765                         " Restarting Ganeti RAPI.")
766         utils.StopDaemon(constants.RAPI)
767         utils.EnsureDaemon(constants.RAPI)
768         logging.debug("Second attempt to talk with RAPI")
769         if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
770           logging.fatal("RAPI is not responding. Please investigate.")
771       logging.debug("Successfully talked to RAPI.")
772
773       try:
774         watcher = Watcher(options, notepad)
775       except errors.ConfigurationError:
776         # Just exit if there's no configuration
777         update_file = True
778         return constants.EXIT_SUCCESS
779
780       watcher.Run()
781       update_file = True
782
783     finally:
784       if update_file:
785         notepad.Save()
786       else:
787         logging.debug("Not updating status file due to failure")
788   except SystemExit:
789     raise
790   except NotMasterError:
791     logging.debug("Not master, exiting")
792     return constants.EXIT_NOTMASTER
793   except errors.ResolverError, err:
794     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
795     return constants.EXIT_NODESETUP_ERROR
796   except errors.JobQueueFull:
797     logging.error("Job queue is full, can't query cluster state")
798   except errors.JobQueueDrainError:
799     logging.error("Job queue is drained, can't maintain cluster state")
800   except Exception, err:
801     logging.exception(str(err))
802     return constants.EXIT_FAILURE
803
804   return constants.EXIT_SUCCESS