Change OpClusterVerifyDisks to per-group opcodes
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 # pylint: disable-msg=C0103,W0142
31
32 # C0103: Invalid name ganeti-watcher
33
34 import os
35 import os.path
36 import sys
37 import time
38 import logging
39 from optparse import OptionParser
40
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import serializer
45 from ganeti import errors
46 from ganeti import opcodes
47 from ganeti import cli
48 from ganeti import luxi
49 from ganeti import ssconf
50 from ganeti import bdev
51 from ganeti import hypervisor
52 from ganeti import rapi
53 from ganeti.confd import client as confd_client
54 from ganeti import netutils
55
56 import ganeti.rapi.client # pylint: disable-msg=W0611
57
58
59 MAXTRIES = 5
60 # Delete any record that is older than 8 hours; this value is based on
61 # the fact that the current retry counter is 5, and watcher runs every
62 # 5 minutes, so it takes around half an hour to exceed the retry
63 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64 RETRY_EXPIRATION = 8 * 3600
65 BAD_STATES = [constants.INSTST_ERRORDOWN]
66 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
67 NOTICE = 'NOTICE'
68 ERROR = 'ERROR'
69 KEY_RESTART_COUNT = "restart_count"
70 KEY_RESTART_WHEN = "restart_when"
71 KEY_BOOT_ID = "bootid"
72
73
74 # Global LUXI client object
75 client = None
76
77
78 class NotMasterError(errors.GenericError):
79   """Exception raised when this host is not the master."""
80
81
82 def ShouldPause():
83   """Check whether we should pause.
84
85   """
86   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87
88
89 def StartNodeDaemons():
90   """Start all the daemons that should be running on all nodes.
91
92   """
93   # on master or not, try to start the node daemon
94   utils.EnsureDaemon(constants.NODED)
95   # start confd as well. On non candidates it will be in disabled mode.
96   utils.EnsureDaemon(constants.CONFD)
97
98
99 def RunWatcherHooks():
100   """Run the watcher hooks.
101
102   """
103   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104                              constants.HOOKS_NAME_WATCHER)
105   if not os.path.isdir(hooks_dir):
106     return
107
108   try:
109     results = utils.RunParts(hooks_dir)
110   except Exception: # pylint: disable-msg=W0703
111     logging.exception("RunParts %s failed: %s", hooks_dir)
112     return
113
114   for (relname, status, runresult) in results:
115     if status == constants.RUNPARTS_SKIP:
116       logging.debug("Watcher hook %s: skipped", relname)
117     elif status == constants.RUNPARTS_ERR:
118       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119     elif status == constants.RUNPARTS_RUN:
120       if runresult.failed:
121         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122                         relname, runresult.exit_code, runresult.output)
123       else:
124         logging.debug("Watcher hook %s: success (output: %s)", relname,
125                       runresult.output)
126
127
128 class NodeMaintenance(object):
129   """Talks to confd daemons and possible shutdown instances/drbd devices.
130
131   """
132   def __init__(self):
133     self.store_cb = confd_client.StoreResultCallback()
134     self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
135     self.confd_client = confd_client.GetConfdClient(self.filter_cb)
136
137   @staticmethod
138   def ShouldRun():
139     """Checks whether node maintenance should run.
140
141     """
142     try:
143       return ssconf.SimpleStore().GetMaintainNodeHealth()
144     except errors.ConfigurationError, err:
145       logging.error("Configuration error, not activating node maintenance: %s",
146                     err)
147       return False
148
149   @staticmethod
150   def GetRunningInstances():
151     """Compute list of hypervisor/running instances.
152
153     """
154     hyp_list = ssconf.SimpleStore().GetHypervisorList()
155     results = []
156     for hv_name in hyp_list:
157       try:
158         hv = hypervisor.GetHypervisor(hv_name)
159         ilist = hv.ListInstances()
160         results.extend([(iname, hv_name) for iname in ilist])
161       except: # pylint: disable-msg=W0702
162         logging.error("Error while listing instances for hypervisor %s",
163                       hv_name, exc_info=True)
164     return results
165
166   @staticmethod
167   def GetUsedDRBDs():
168     """Get list of used DRBD minors.
169
170     """
171     return bdev.DRBD8.GetUsedDevs().keys()
172
173   @classmethod
174   def DoMaintenance(cls, role):
175     """Maintain the instance list.
176
177     """
178     if role == constants.CONFD_NODE_ROLE_OFFLINE:
179       inst_running = cls.GetRunningInstances()
180       cls.ShutdownInstances(inst_running)
181       drbd_running = cls.GetUsedDRBDs()
182       cls.ShutdownDRBD(drbd_running)
183     else:
184       logging.debug("Not doing anything for role %s", role)
185
186   @staticmethod
187   def ShutdownInstances(inst_running):
188     """Shutdown running instances.
189
190     """
191     names_running = set([i[0] for i in inst_running])
192     if names_running:
193       logging.info("Following instances should not be running,"
194                    " shutting them down: %s", utils.CommaJoin(names_running))
195       # this dictionary will collapse duplicate instance names (only
196       # xen pvm/vhm) into a single key, which is fine
197       i2h = dict(inst_running)
198       for name in names_running:
199         hv_name = i2h[name]
200         hv = hypervisor.GetHypervisor(hv_name)
201         hv.StopInstance(None, force=True, name=name)
202
203   @staticmethod
204   def ShutdownDRBD(drbd_running):
205     """Shutdown active DRBD devices.
206
207     """
208     if drbd_running:
209       logging.info("Following DRBD minors should not be active,"
210                    " shutting them down: %s", utils.CommaJoin(drbd_running))
211       for minor in drbd_running:
212         # pylint: disable-msg=W0212
213         # using the private method as is, pending enhancements to the DRBD
214         # interface
215         bdev.DRBD8._ShutdownAll(minor)
216
217   def Exec(self):
218     """Check node status versus cluster desired state.
219
220     """
221     my_name = netutils.Hostname.GetSysName()
222     req = confd_client.ConfdClientRequest(type=
223                                           constants.CONFD_REQ_NODE_ROLE_BYNAME,
224                                           query=my_name)
225     self.confd_client.SendRequest(req, async=False, coverage=-1)
226     timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
227     if not timed_out:
228       # should have a valid response
229       status, result = self.store_cb.GetResponse(req.rsalt)
230       assert status, "Missing result but received replies"
231       if not self.filter_cb.consistent[req.rsalt]:
232         logging.warning("Inconsistent replies, not doing anything")
233         return
234       self.DoMaintenance(result.server_reply.answer)
235     else:
236       logging.warning("Confd query timed out, cannot do maintenance actions")
237
238
239 class WatcherState(object):
240   """Interface to a state file recording restart attempts.
241
242   """
243   def __init__(self, statefile):
244     """Open, lock, read and parse the file.
245
246     @type statefile: file
247     @param statefile: State file object
248
249     """
250     self.statefile = statefile
251
252     try:
253       state_data = self.statefile.read()
254       if not state_data:
255         self._data = {}
256       else:
257         self._data = serializer.Load(state_data)
258     except Exception, msg: # pylint: disable-msg=W0703
259       # Ignore errors while loading the file and treat it as empty
260       self._data = {}
261       logging.warning(("Invalid state file. Using defaults."
262                        " Error message: %s"), msg)
263
264     if "instance" not in self._data:
265       self._data["instance"] = {}
266     if "node" not in self._data:
267       self._data["node"] = {}
268
269     self._orig_data = serializer.Dump(self._data)
270
271   def Save(self):
272     """Save state to file, then unlock and close it.
273
274     """
275     assert self.statefile
276
277     serialized_form = serializer.Dump(self._data)
278     if self._orig_data == serialized_form:
279       logging.debug("Data didn't change, just touching status file")
280       os.utime(constants.WATCHER_STATEFILE, None)
281       return
282
283     # We need to make sure the file is locked before renaming it, otherwise
284     # starting ganeti-watcher again at the same time will create a conflict.
285     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
286                          data=serialized_form,
287                          prewrite=utils.LockFile, close=False)
288     self.statefile = os.fdopen(fd, 'w+')
289
290   def Close(self):
291     """Unlock configuration file and close it.
292
293     """
294     assert self.statefile
295
296     # Files are automatically unlocked when closing them
297     self.statefile.close()
298     self.statefile = None
299
300   def GetNodeBootID(self, name):
301     """Returns the last boot ID of a node or None.
302
303     """
304     ndata = self._data["node"]
305
306     if name in ndata and KEY_BOOT_ID in ndata[name]:
307       return ndata[name][KEY_BOOT_ID]
308     return None
309
310   def SetNodeBootID(self, name, bootid):
311     """Sets the boot ID of a node.
312
313     """
314     assert bootid
315
316     ndata = self._data["node"]
317
318     if name not in ndata:
319       ndata[name] = {}
320
321     ndata[name][KEY_BOOT_ID] = bootid
322
323   def NumberOfRestartAttempts(self, instance):
324     """Returns number of previous restart attempts.
325
326     @type instance: L{Instance}
327     @param instance: the instance to look up
328
329     """
330     idata = self._data["instance"]
331
332     if instance.name in idata:
333       return idata[instance.name][KEY_RESTART_COUNT]
334
335     return 0
336
337   def MaintainInstanceList(self, instances):
338     """Perform maintenance on the recorded instances.
339
340     @type instances: list of string
341     @param instances: the list of currently existing instances
342
343     """
344     idict = self._data["instance"]
345     # First, delete obsolete instances
346     obsolete_instances = set(idict).difference(instances)
347     for inst in obsolete_instances:
348       logging.debug("Forgetting obsolete instance %s", inst)
349       del idict[inst]
350
351     # Second, delete expired records
352     earliest = time.time() - RETRY_EXPIRATION
353     expired_instances = [i for i in idict
354                          if idict[i][KEY_RESTART_WHEN] < earliest]
355     for inst in expired_instances:
356       logging.debug("Expiring record for instance %s", inst)
357       del idict[inst]
358
359   def RecordRestartAttempt(self, instance):
360     """Record a restart attempt.
361
362     @type instance: L{Instance}
363     @param instance: the instance being restarted
364
365     """
366     idata = self._data["instance"]
367
368     if instance.name not in idata:
369       inst = idata[instance.name] = {}
370     else:
371       inst = idata[instance.name]
372
373     inst[KEY_RESTART_WHEN] = time.time()
374     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
375
376   def RemoveInstance(self, instance):
377     """Update state to reflect that a machine is running.
378
379     This method removes the record for a named instance (as we only
380     track down instances).
381
382     @type instance: L{Instance}
383     @param instance: the instance to remove from books
384
385     """
386     idata = self._data["instance"]
387
388     if instance.name in idata:
389       del idata[instance.name]
390
391
392 class Instance(object):
393   """Abstraction for a Virtual Machine instance.
394
395   """
396   def __init__(self, name, state, autostart, snodes):
397     self.name = name
398     self.state = state
399     self.autostart = autostart
400     self.snodes = snodes
401
402   def Restart(self):
403     """Encapsulates the start of an instance.
404
405     """
406     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
407     cli.SubmitOpCode(op, cl=client)
408
409   def ActivateDisks(self):
410     """Encapsulates the activation of all disks of an instance.
411
412     """
413     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
414     cli.SubmitOpCode(op, cl=client)
415
416
417 def GetClusterData():
418   """Get a list of instances on this cluster.
419
420   """
421   op1_fields = ["name", "status", "admin_state", "snodes"]
422   op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
423                                 use_locking=True)
424   op2_fields = ["name", "bootid", "offline"]
425   op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
426                             use_locking=True)
427
428   job_id = client.SubmitJob([op1, op2])
429
430   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
431
432   logging.debug("Got data from cluster, writing instance status file")
433
434   result = all_results[0]
435   smap = {}
436
437   instances = {}
438
439   # write the upfile
440   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
441   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
442
443   for fields in result:
444     (name, status, autostart, snodes) = fields
445
446     # update the secondary node map
447     for node in snodes:
448       if node not in smap:
449         smap[node] = []
450       smap[node].append(name)
451
452     instances[name] = Instance(name, status, autostart, snodes)
453
454   nodes =  dict([(name, (bootid, offline))
455                  for name, bootid, offline in all_results[1]])
456
457   client.ArchiveJob(job_id)
458
459   return instances, nodes, smap
460
461
462 class Watcher(object):
463   """Encapsulate the logic for restarting erroneously halted virtual machines.
464
465   The calling program should periodically instantiate me and call Run().
466   This will traverse the list of instances, and make up to MAXTRIES attempts
467   to restart machines that are down.
468
469   """
470   def __init__(self, opts, notepad):
471     self.notepad = notepad
472     master = client.QueryConfigValues(["master_node"])[0]
473     if master != netutils.Hostname.GetSysName():
474       raise NotMasterError("This is not the master node")
475     # first archive old jobs
476     self.ArchiveJobs(opts.job_age)
477     # and only then submit new ones
478     self.instances, self.bootids, self.smap = GetClusterData()
479     self.started_instances = set()
480     self.opts = opts
481
482   def Run(self):
483     """Watcher run sequence.
484
485     """
486     notepad = self.notepad
487     self.CheckInstances(notepad)
488     self.CheckDisks(notepad)
489     self.VerifyDisks()
490
491   @staticmethod
492   def ArchiveJobs(age):
493     """Archive old jobs.
494
495     """
496     arch_count, left_count = client.AutoArchiveJobs(age)
497     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
498
499   def CheckDisks(self, notepad):
500     """Check all nodes for restarted ones.
501
502     """
503     check_nodes = []
504     for name, (new_id, offline) in self.bootids.iteritems():
505       old = notepad.GetNodeBootID(name)
506       if new_id is None:
507         # Bad node, not returning a boot id
508         if not offline:
509           logging.debug("Node %s missing boot id, skipping secondary checks",
510                         name)
511         continue
512       if old != new_id:
513         # Node's boot ID has changed, proably through a reboot.
514         check_nodes.append(name)
515
516     if check_nodes:
517       # Activate disks for all instances with any of the checked nodes as a
518       # secondary node.
519       for node in check_nodes:
520         if node not in self.smap:
521           continue
522         for instance_name in self.smap[node]:
523           instance = self.instances[instance_name]
524           if not instance.autostart:
525             logging.info(("Skipping disk activation for non-autostart"
526                           " instance %s"), instance.name)
527             continue
528           if instance.name in self.started_instances:
529             # we already tried to start the instance, which should have
530             # activated its drives (if they can be at all)
531             logging.debug("Skipping disk activation for instance %s, as"
532                           " it was already started", instance.name)
533             continue
534           try:
535             logging.info("Activating disks for instance %s", instance.name)
536             instance.ActivateDisks()
537           except Exception: # pylint: disable-msg=W0703
538             logging.exception("Error while activating disks for instance %s",
539                               instance.name)
540
541       # Keep changed boot IDs
542       for name in check_nodes:
543         notepad.SetNodeBootID(name, self.bootids[name][0])
544
545   def CheckInstances(self, notepad):
546     """Make a pass over the list of instances, restarting downed ones.
547
548     """
549     notepad.MaintainInstanceList(self.instances.keys())
550
551     for instance in self.instances.values():
552       if instance.state in BAD_STATES:
553         n = notepad.NumberOfRestartAttempts(instance)
554
555         if n > MAXTRIES:
556           logging.warning("Not restarting instance %s, retries exhausted",
557                           instance.name)
558           continue
559         elif n < MAXTRIES:
560           last = " (Attempt #%d)" % (n + 1)
561         else:
562           notepad.RecordRestartAttempt(instance)
563           logging.error("Could not restart %s after %d attempts, giving up",
564                         instance.name, MAXTRIES)
565           continue
566         try:
567           logging.info("Restarting %s%s", instance.name, last)
568           instance.Restart()
569           self.started_instances.add(instance.name)
570         except Exception: # pylint: disable-msg=W0703
571           logging.exception("Error while restarting instance %s",
572                             instance.name)
573
574         notepad.RecordRestartAttempt(instance)
575       elif instance.state in HELPLESS_STATES:
576         if notepad.NumberOfRestartAttempts(instance):
577           notepad.RemoveInstance(instance)
578       else:
579         if notepad.NumberOfRestartAttempts(instance):
580           notepad.RemoveInstance(instance)
581           logging.info("Restart of %s succeeded", instance.name)
582
583   def _CheckForOfflineNodes(self, instance):
584     """Checks if given instances has any secondary in offline status.
585
586     @param instance: The instance object
587     @return: True if any of the secondary is offline, False otherwise
588
589     """
590     bootids = []
591     for node in instance.snodes:
592       bootids.append(self.bootids[node])
593
594     return compat.any(offline for (_, offline) in bootids)
595
596   def VerifyDisks(self):
597     """Run gnt-cluster verify-disks.
598
599     """
600     job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
601     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602     client.ArchiveJob(job_id)
603
604     # Keep track of submitted jobs
605     jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
606
607     archive_jobs = set()
608     for (status, job_id) in result[constants.JOB_IDS_KEY]:
609       jex.AddJobId(None, status, job_id)
610       if status:
611         archive_jobs.add(job_id)
612
613     offline_disk_instances = set()
614
615     for (status, result) in jex.GetResults():
616       if not status:
617         logging.error("Verify-disks job failed: %s", result)
618         continue
619
620       ((_, instances, _), ) = result
621
622       offline_disk_instances.update(instances)
623
624     for job_id in archive_jobs:
625       client.ArchiveJob(job_id)
626
627     if not offline_disk_instances:
628       # nothing to do
629       logging.debug("verify-disks reported no offline disks, nothing to do")
630       return
631
632     logging.debug("Will activate disks for instance(s) %s",
633                   utils.CommaJoin(offline_disk_instances))
634
635     # we submit only one job, and wait for it. not optimal, but spams
636     # less the job queue
637     job = []
638     for name in offline_disk_instances:
639       instance = self.instances[name]
640       if (instance.state in HELPLESS_STATES or
641           self._CheckForOfflineNodes(instance)):
642         logging.info("Skip instance %s because it is in helpless state or has"
643                      " one offline secondary", name)
644         continue
645       job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
646
647     if job:
648       job_id = cli.SendJob(job, cl=client)
649
650       try:
651         cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
652       except Exception: # pylint: disable-msg=W0703
653         logging.exception("Error while activating disks")
654
655
656 def OpenStateFile(path):
657   """Opens the state file and acquires a lock on it.
658
659   @type path: string
660   @param path: Path to state file
661
662   """
663   # The two-step dance below is necessary to allow both opening existing
664   # file read/write and creating if not existing. Vanilla open will truncate
665   # an existing file -or- allow creating if not existing.
666   statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
667
668   # Try to acquire lock on state file. If this fails, another watcher instance
669   # might already be running or another program is temporarily blocking the
670   # watcher from running.
671   try:
672     utils.LockFile(statefile_fd)
673   except errors.LockError, err:
674     logging.error("Can't acquire lock on state file %s: %s", path, err)
675     return None
676
677   return os.fdopen(statefile_fd, "w+")
678
679
680 def IsRapiResponding(hostname):
681   """Connects to RAPI port and does a simple test.
682
683   Connects to RAPI port of hostname and does a simple test. At this time, the
684   test is GetVersion.
685
686   @type hostname: string
687   @param hostname: hostname of the node to connect to.
688   @rtype: bool
689   @return: Whether RAPI is working properly
690
691   """
692   curl_config = rapi.client.GenericCurlConfig()
693   rapi_client = rapi.client.GanetiRapiClient(hostname,
694                                              curl_config_fn=curl_config)
695   try:
696     master_version = rapi_client.GetVersion()
697   except rapi.client.CertificateError, err:
698     logging.warning("RAPI Error: CertificateError (%s)", err)
699     return False
700   except rapi.client.GanetiApiError, err:
701     logging.warning("RAPI Error: GanetiApiError (%s)", err)
702     return False
703   logging.debug("RAPI Result: master_version is %s", master_version)
704   return master_version == constants.RAPI_VERSION
705
706
707 def ParseOptions():
708   """Parse the command line options.
709
710   @return: (options, args) as from OptionParser.parse_args()
711
712   """
713   parser = OptionParser(description="Ganeti cluster watcher",
714                         usage="%prog [-d]",
715                         version="%%prog (ganeti) %s" %
716                         constants.RELEASE_VERSION)
717
718   parser.add_option(cli.DEBUG_OPT)
719   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
720                     help="Autoarchive jobs older than this age (default"
721                           " 6 hours)")
722   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
723                     action="store_true", help="Ignore cluster pause setting")
724   options, args = parser.parse_args()
725   options.job_age = cli.ParseTimespec(options.job_age)
726
727   if args:
728     parser.error("No arguments expected")
729
730   return (options, args)
731
732
733 @rapi.client.UsesRapiClient
734 def Main():
735   """Main function.
736
737   """
738   global client # pylint: disable-msg=W0603
739
740   (options, _) = ParseOptions()
741
742   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
743                      debug=options.debug, stderr_logging=options.debug)
744
745   if ShouldPause() and not options.ignore_pause:
746     logging.debug("Pause has been set, exiting")
747     return constants.EXIT_SUCCESS
748
749   statefile = OpenStateFile(constants.WATCHER_STATEFILE)
750   if not statefile:
751     return constants.EXIT_FAILURE
752
753   update_file = False
754   try:
755     StartNodeDaemons()
756     RunWatcherHooks()
757     # run node maintenance in all cases, even if master, so that old
758     # masters can be properly cleaned up too
759     if NodeMaintenance.ShouldRun():
760       NodeMaintenance().Exec()
761
762     notepad = WatcherState(statefile)
763     try:
764       try:
765         client = cli.GetClient()
766       except errors.OpPrereqError:
767         # this is, from cli.GetClient, a not-master case
768         logging.debug("Not on master, exiting")
769         update_file = True
770         return constants.EXIT_SUCCESS
771       except luxi.NoMasterError, err:
772         logging.warning("Master seems to be down (%s), trying to restart",
773                         str(err))
774         if not utils.EnsureDaemon(constants.MASTERD):
775           logging.critical("Can't start the master, exiting")
776           return constants.EXIT_FAILURE
777         # else retry the connection
778         client = cli.GetClient()
779
780       # we are on master now
781       utils.EnsureDaemon(constants.RAPI)
782
783       # If RAPI isn't responding to queries, try one restart.
784       logging.debug("Attempting to talk with RAPI.")
785       if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
786         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
787                         " Restarting Ganeti RAPI.")
788         utils.StopDaemon(constants.RAPI)
789         utils.EnsureDaemon(constants.RAPI)
790         logging.debug("Second attempt to talk with RAPI")
791         if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
792           logging.fatal("RAPI is not responding. Please investigate.")
793       logging.debug("Successfully talked to RAPI.")
794
795       try:
796         watcher = Watcher(options, notepad)
797       except errors.ConfigurationError:
798         # Just exit if there's no configuration
799         update_file = True
800         return constants.EXIT_SUCCESS
801
802       watcher.Run()
803       update_file = True
804
805     finally:
806       if update_file:
807         notepad.Save()
808       else:
809         logging.debug("Not updating status file due to failure")
810   except SystemExit:
811     raise
812   except NotMasterError:
813     logging.debug("Not master, exiting")
814     return constants.EXIT_NOTMASTER
815   except errors.ResolverError, err:
816     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
817     return constants.EXIT_NODESETUP_ERROR
818   except errors.JobQueueFull:
819     logging.error("Job queue is full, can't query cluster state")
820   except errors.JobQueueDrainError:
821     logging.error("Job queue is drained, can't maintain cluster state")
822   except Exception, err:
823     logging.exception(str(err))
824     return constants.EXIT_FAILURE
825
826   return constants.EXIT_SUCCESS