Fixes to errors/warnings raised by pylint 0.24
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable=W0611
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61   constants.INSTST_ERRORDOWN,
62   ])
63 HELPLESS_STATES = frozenset([
64   constants.INSTST_NODEDOWN,
65   constants.INSTST_NODEOFFLINE,
66   ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
72
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75
76
77 class NotMasterError(errors.GenericError):
78   """Exception raised when this host is not the master."""
79
80
81 def ShouldPause():
82   """Check whether we should pause.
83
84   """
85   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87
88 def StartNodeDaemons():
89   """Start all the daemons that should be running on all nodes.
90
91   """
92   # on master or not, try to start the node daemon
93   utils.EnsureDaemon(constants.NODED)
94   # start confd as well. On non candidates it will be in disabled mode.
95   utils.EnsureDaemon(constants.CONFD)
96
97
98 def RunWatcherHooks():
99   """Run the watcher hooks.
100
101   """
102   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103                              constants.HOOKS_NAME_WATCHER)
104   if not os.path.isdir(hooks_dir):
105     return
106
107   try:
108     results = utils.RunParts(hooks_dir)
109   except Exception, err: # pylint: disable=W0703
110     logging.exception("RunParts %s failed: %s", hooks_dir, err)
111     return
112
113   for (relname, status, runresult) in results:
114     if status == constants.RUNPARTS_SKIP:
115       logging.debug("Watcher hook %s: skipped", relname)
116     elif status == constants.RUNPARTS_ERR:
117       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118     elif status == constants.RUNPARTS_RUN:
119       if runresult.failed:
120         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121                         relname, runresult.exit_code, runresult.output)
122       else:
123         logging.debug("Watcher hook %s: success (output: %s)", relname,
124                       runresult.output)
125     else:
126       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
127                                    status)
128
129
130 class Instance(object):
131   """Abstraction for a Virtual Machine instance.
132
133   """
134   def __init__(self, name, status, autostart, snodes):
135     self.name = name
136     self.status = status
137     self.autostart = autostart
138     self.snodes = snodes
139
140   def Restart(self, cl):
141     """Encapsulates the start of an instance.
142
143     """
144     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145     cli.SubmitOpCode(op, cl=cl)
146
147   def ActivateDisks(self, cl):
148     """Encapsulates the activation of all disks of an instance.
149
150     """
151     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152     cli.SubmitOpCode(op, cl=cl)
153
154
155 class Node:
156   """Data container representing cluster node.
157
158   """
159   def __init__(self, name, bootid, offline, secondaries):
160     """Initializes this class.
161
162     """
163     self.name = name
164     self.bootid = bootid
165     self.offline = offline
166     self.secondaries = secondaries
167
168
169 def _CheckInstances(cl, notepad, instances):
170   """Make a pass over the list of instances, restarting downed ones.
171
172   """
173   notepad.MaintainInstanceList(instances.keys())
174
175   started = set()
176
177   for inst in instances.values():
178     if inst.status in BAD_STATES:
179       n = notepad.NumberOfRestartAttempts(inst.name)
180
181       if n > MAXTRIES:
182         logging.warning("Not restarting instance '%s', retries exhausted",
183                         inst.name)
184         continue
185
186       if n == MAXTRIES:
187         notepad.RecordRestartAttempt(inst.name)
188         logging.error("Could not restart instance '%s' after %s attempts,"
189                       " giving up", inst.name, MAXTRIES)
190         continue
191
192       try:
193         logging.info("Restarting instance '%s' (attempt #%s)",
194                      inst.name, n + 1)
195         inst.Restart(cl)
196       except Exception: # pylint: disable=W0703
197         logging.exception("Error while restarting instance '%s'", inst.name)
198       else:
199         started.add(inst.name)
200
201       notepad.RecordRestartAttempt(inst.name)
202
203     else:
204       if notepad.NumberOfRestartAttempts(inst.name):
205         notepad.RemoveInstance(inst.name)
206         if inst.status not in HELPLESS_STATES:
207           logging.info("Restart of instance '%s' succeeded", inst.name)
208
209   return started
210
211
212 def _CheckDisks(cl, notepad, nodes, instances, started):
213   """Check all nodes for restarted ones.
214
215   """
216   check_nodes = []
217
218   for node in nodes.values():
219     old = notepad.GetNodeBootID(node.name)
220     if not node.bootid:
221       # Bad node, not returning a boot id
222       if not node.offline:
223         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
224                       node.name)
225       continue
226
227     if old != node.bootid:
228       # Node's boot ID has changed, probably through a reboot
229       check_nodes.append(node)
230
231   if check_nodes:
232     # Activate disks for all instances with any of the checked nodes as a
233     # secondary node.
234     for node in check_nodes:
235       for instance_name in node.secondaries:
236         try:
237           inst = instances[instance_name]
238         except KeyError:
239           logging.info("Can't find instance '%s', maybe it was ignored",
240                        instance_name)
241           continue
242
243         if not inst.autostart:
244           logging.info("Skipping disk activation for non-autostart"
245                        " instance '%s'", inst.name)
246           continue
247
248         if inst.name in started:
249           # we already tried to start the instance, which should have
250           # activated its drives (if they can be at all)
251           logging.debug("Skipping disk activation for instance '%s' as"
252                         " it was already started", inst.name)
253           continue
254
255         try:
256           logging.info("Activating disks for instance '%s'", inst.name)
257           inst.ActivateDisks(cl)
258         except Exception: # pylint: disable=W0703
259           logging.exception("Error while activating disks for instance '%s'",
260                             inst.name)
261
262     # Keep changed boot IDs
263     for node in check_nodes:
264       notepad.SetNodeBootID(node.name, node.bootid)
265
266
267 def _CheckForOfflineNodes(nodes, instance):
268   """Checks if given instances has any secondary in offline status.
269
270   @param instance: The instance object
271   @return: True if any of the secondary is offline, False otherwise
272
273   """
274   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275
276
277 def _VerifyDisks(cl, uuid, nodes, instances):
278   """Run a per-group "gnt-cluster verify-disks".
279
280   """
281   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282   ((_, offline_disk_instances, _), ) = \
283     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284   cl.ArchiveJob(job_id)
285
286   if not offline_disk_instances:
287     # nothing to do
288     logging.debug("Verify-disks reported no offline disks, nothing to do")
289     return
290
291   logging.debug("Will activate disks for instance(s) %s",
292                 utils.CommaJoin(offline_disk_instances))
293
294   # We submit only one job, and wait for it. Not optimal, but this puts less
295   # load on the job queue.
296   job = []
297   for name in offline_disk_instances:
298     try:
299       inst = instances[name]
300     except KeyError:
301       logging.info("Can't find instance '%s', maybe it was ignored", name)
302       continue
303
304     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305       logging.info("Skipping instance '%s' because it is in a helpless state or"
306                    " has offline secondaries", name)
307       continue
308
309     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310
311   if job:
312     job_id = cli.SendJob(job, cl=cl)
313
314     try:
315       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316     except Exception: # pylint: disable=W0703
317       logging.exception("Error while activating disks")
318
319
320 def IsRapiResponding(hostname):
321   """Connects to RAPI port and does a simple test.
322
323   Connects to RAPI port of hostname and does a simple test. At this time, the
324   test is GetVersion.
325
326   @type hostname: string
327   @param hostname: hostname of the node to connect to.
328   @rtype: bool
329   @return: Whether RAPI is working properly
330
331   """
332   curl_config = rapi.client.GenericCurlConfig()
333   rapi_client = rapi.client.GanetiRapiClient(hostname,
334                                              curl_config_fn=curl_config)
335   try:
336     master_version = rapi_client.GetVersion()
337   except rapi.client.CertificateError, err:
338     logging.warning("RAPI certificate error: %s", err)
339     return False
340   except rapi.client.GanetiApiError, err:
341     logging.warning("RAPI error: %s", err)
342     return False
343   else:
344     logging.debug("Reported RAPI version %s", master_version)
345     return master_version == constants.RAPI_VERSION
346
347
348 def ParseOptions():
349   """Parse the command line options.
350
351   @return: (options, args) as from OptionParser.parse_args()
352
353   """
354   parser = OptionParser(description="Ganeti cluster watcher",
355                         usage="%prog [-d]",
356                         version="%%prog (ganeti) %s" %
357                         constants.RELEASE_VERSION)
358
359   parser.add_option(cli.DEBUG_OPT)
360   parser.add_option(cli.NODEGROUP_OPT)
361   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362                     help="Autoarchive jobs older than this age (default"
363                           " 6 hours)")
364   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365                     action="store_true", help="Ignore cluster pause setting")
366   parser.add_option("--wait-children", dest="wait_children",
367                     action="store_true", help="Wait for child processes")
368   parser.add_option("--no-wait-children", dest="wait_children",
369                     action="store_false", help="Don't wait for child processes")
370   # See optparse documentation for why default values are not set by options
371   parser.set_defaults(wait_children=True)
372   options, args = parser.parse_args()
373   options.job_age = cli.ParseTimespec(options.job_age)
374
375   if args:
376     parser.error("No arguments expected")
377
378   return (options, args)
379
380
381 def _WriteInstanceStatus(filename, data):
382   """Writes the per-group instance status file.
383
384   The entries are sorted.
385
386   @type filename: string
387   @param filename: Path to instance status file
388   @type data: list of tuple; (instance name as string, status as string)
389   @param data: Instance name and status
390
391   """
392   logging.debug("Updating instance status file '%s' with %s instances",
393                 filename, len(data))
394
395   utils.WriteFile(filename,
396                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
397                                    sorted(data))))
398
399
400 def _UpdateInstanceStatus(filename, instances):
401   """Writes an instance status file from L{Instance} objects.
402
403   @type filename: string
404   @param filename: Path to status file
405   @type instances: list of L{Instance}
406
407   """
408   _WriteInstanceStatus(filename, [(inst.name, inst.status)
409                                   for inst in instances])
410
411
412 class _StatCb:
413   """Helper to store file handle's C{fstat}.
414
415   """
416   def __init__(self):
417     """Initializes this class.
418
419     """
420     self.st = None
421
422   def __call__(self, fh):
423     """Calls C{fstat} on file handle.
424
425     """
426     self.st = os.fstat(fh.fileno())
427
428
429 def _ReadInstanceStatus(filename):
430   """Reads an instance status file.
431
432   @type filename: string
433   @param filename: Path to status file
434   @rtype: tuple; (None or number, list of lists containing instance name and
435     status)
436   @return: File's mtime and instance status contained in the file; mtime is
437     C{None} if file can't be read
438
439   """
440   logging.debug("Reading per-group instance status from '%s'", filename)
441
442   statcb = _StatCb()
443   try:
444     content = utils.ReadFile(filename, preread=statcb)
445   except EnvironmentError, err:
446     if err.errno == errno.ENOENT:
447       logging.error("Can't read '%s', does not exist (yet)", filename)
448     else:
449       logging.exception("Unable to read '%s', ignoring", filename)
450     return (None, None)
451   else:
452     return (statcb.st.st_mtime, [line.split(None, 1)
453                                  for line in content.splitlines()])
454
455
456 def _MergeInstanceStatus(filename, pergroup_filename, groups):
457   """Merges all per-group instance status files into a global one.
458
459   @type filename: string
460   @param filename: Path to global instance status file
461   @type pergroup_filename: string
462   @param pergroup_filename: Path to per-group status files, must contain "%s"
463     to be replaced with group UUID
464   @type groups: sequence
465   @param groups: UUIDs of known groups
466
467   """
468   # Lock global status file in exclusive mode
469   lock = utils.FileLock.Open(filename)
470   try:
471     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
472   except errors.LockError, err:
473     # All per-group processes will lock and update the file. None of them
474     # should take longer than 10 seconds (the value of
475     # INSTANCE_STATUS_LOCK_TIMEOUT).
476     logging.error("Can't acquire lock on instance status file '%s', not"
477                   " updating: %s", filename, err)
478     return
479
480   logging.debug("Acquired exclusive lock on '%s'", filename)
481
482   data = {}
483
484   # Load instance status from all groups
485   for group_uuid in groups:
486     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
487
488     if mtime is not None:
489       for (instance_name, status) in instdata:
490         data.setdefault(instance_name, []).append((mtime, status))
491
492   # Select last update based on file mtime
493   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
494                 for (instance_name, status) in data.items()]
495
496   # Write the global status file. Don't touch file after it's been
497   # updated--there is no lock anymore.
498   _WriteInstanceStatus(filename, inststatus)
499
500
501 def GetLuxiClient(try_restart):
502   """Tries to connect to the master daemon.
503
504   @type try_restart: bool
505   @param try_restart: Whether to attempt to restart the master daemon
506
507   """
508   try:
509     return cli.GetClient()
510   except errors.OpPrereqError, err:
511     # this is, from cli.GetClient, a not-master case
512     raise NotMasterError("Not on master node (%s)" % err)
513
514   except luxi.NoMasterError, err:
515     if not try_restart:
516       raise
517
518     logging.warning("Master daemon seems to be down (%s), trying to restart",
519                     err)
520
521     if not utils.EnsureDaemon(constants.MASTERD):
522       raise errors.GenericError("Can't start the master daemon")
523
524     # Retry the connection
525     return cli.GetClient()
526
527
528 def _StartGroupChildren(cl, wait):
529   """Starts a new instance of the watcher for every node group.
530
531   """
532   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
533                         for arg in sys.argv)
534
535   result = cl.QueryGroups([], ["name", "uuid"], False)
536
537   children = []
538
539   for (idx, (name, uuid)) in enumerate(result):
540     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
541
542     if idx > 0:
543       # Let's not kill the system
544       time.sleep(CHILD_PROCESS_DELAY)
545
546     logging.debug("Spawning child for group '%s' (%s), arguments %s",
547                   name, uuid, args)
548
549     try:
550       # TODO: Should utils.StartDaemon be used instead?
551       pid = os.spawnv(os.P_NOWAIT, args[0], args)
552     except Exception: # pylint: disable=W0703
553       logging.exception("Failed to start child for group '%s' (%s)",
554                         name, uuid)
555     else:
556       logging.debug("Started with PID %s", pid)
557       children.append(pid)
558
559   if wait:
560     for pid in children:
561       logging.debug("Waiting for child PID %s", pid)
562       try:
563         result = utils.RetryOnSignal(os.waitpid, pid, 0)
564       except EnvironmentError, err:
565         result = str(err)
566
567       logging.debug("Child PID %s exited with status %s", pid, result)
568
569
570 def _ArchiveJobs(cl, age):
571   """Archives old jobs.
572
573   """
574   (arch_count, left_count) = cl.AutoArchiveJobs(age)
575   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
576
577
578 def _CheckMaster(cl):
579   """Ensures current host is master node.
580
581   """
582   (master, ) = cl.QueryConfigValues(["master_node"])
583   if master != netutils.Hostname.GetSysName():
584     raise NotMasterError("This is not the master node")
585
586
587 @rapi.client.UsesRapiClient
588 def _GlobalWatcher(opts):
589   """Main function for global watcher.
590
591   At the end child processes are spawned for every node group.
592
593   """
594   StartNodeDaemons()
595   RunWatcherHooks()
596
597   # Run node maintenance in all cases, even if master, so that old masters can
598   # be properly cleaned up
599   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
600     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
601
602   try:
603     client = GetLuxiClient(True)
604   except NotMasterError:
605     # Don't proceed on non-master nodes
606     return constants.EXIT_SUCCESS
607
608   # we are on master now
609   utils.EnsureDaemon(constants.RAPI)
610
611   # If RAPI isn't responding to queries, try one restart
612   logging.debug("Attempting to talk to remote API on %s",
613                 constants.IP4_ADDRESS_LOCALHOST)
614   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
615     logging.warning("Couldn't get answer from remote API, restaring daemon")
616     utils.StopDaemon(constants.RAPI)
617     utils.EnsureDaemon(constants.RAPI)
618     logging.debug("Second attempt to talk to remote API")
619     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
620       logging.fatal("RAPI is not responding")
621   logging.debug("Successfully talked to remote API")
622
623   _CheckMaster(client)
624   _ArchiveJobs(client, opts.job_age)
625
626   # Spawn child processes for all node groups
627   _StartGroupChildren(client, opts.wait_children)
628
629   return constants.EXIT_SUCCESS
630
631
632 def _GetGroupData(cl, uuid):
633   """Retrieves instances and nodes per node group.
634
635   """
636   job = [
637     # Get all primary instances in group
638     opcodes.OpQuery(what=constants.QR_INSTANCE,
639                     fields=["name", "status", "admin_state", "snodes",
640                             "pnode.group.uuid", "snodes.group.uuid"],
641                     filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
642                     use_locking=True),
643
644     # Get all nodes in group
645     opcodes.OpQuery(what=constants.QR_NODE,
646                     fields=["name", "bootid", "offline"],
647                     filter=[qlang.OP_EQUAL, "group.uuid", uuid],
648                     use_locking=True),
649     ]
650
651   job_id = cl.SubmitJob(job)
652   results = map(objects.QueryResponse.FromDict,
653                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
654   cl.ArchiveJob(job_id)
655
656   results_data = map(operator.attrgetter("data"), results)
657
658   # Ensure results are tuples with two values
659   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
660
661   # Extract values ignoring result status
662   (raw_instances, raw_nodes) = [[map(compat.snd, values)
663                                  for values in res]
664                                 for res in results_data]
665
666   secondaries = {}
667   instances = []
668
669   # Load all instances
670   for (name, status, autostart, snodes, pnode_group_uuid,
671        snodes_group_uuid) in raw_instances:
672     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
673       logging.error("Ignoring split instance '%s', primary group %s, secondary"
674                     " groups %s", name, pnode_group_uuid,
675                     utils.CommaJoin(snodes_group_uuid))
676     else:
677       instances.append(Instance(name, status, autostart, snodes))
678
679       for node in snodes:
680         secondaries.setdefault(node, set()).add(name)
681
682   # Load all nodes
683   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
684            for (name, bootid, offline) in raw_nodes]
685
686   return (dict((node.name, node) for node in nodes),
687           dict((inst.name, inst) for inst in instances))
688
689
690 def _LoadKnownGroups():
691   """Returns a list of all node groups known by L{ssconf}.
692
693   """
694   groups = ssconf.SimpleStore().GetNodegroupList()
695
696   result = list(line.split(None, 1)[0] for line in groups
697                 if line.strip())
698
699   if not compat.all(map(utils.UUID_RE.match, result)):
700     raise errors.GenericError("Ssconf contains invalid group UUID")
701
702   return result
703
704
705 def _GroupWatcher(opts):
706   """Main function for per-group watcher process.
707
708   """
709   group_uuid = opts.nodegroup.lower()
710
711   if not utils.UUID_RE.match(group_uuid):
712     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
713                               " got '%s'" %
714                               (cli.NODEGROUP_OPT_NAME, group_uuid))
715
716   logging.info("Watcher for node group '%s'", group_uuid)
717
718   known_groups = _LoadKnownGroups()
719
720   # Check if node group is known
721   if group_uuid not in known_groups:
722     raise errors.GenericError("Node group '%s' is not known by ssconf" %
723                               group_uuid)
724
725   # Group UUID has been verified and should not contain any dangerous characters
726   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
727   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
728
729   logging.debug("Using state file %s", state_path)
730
731   # Global watcher
732   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
733   if not statefile:
734     return constants.EXIT_FAILURE
735
736   notepad = state.WatcherState(statefile) # pylint: disable=E0602
737   try:
738     # Connect to master daemon
739     client = GetLuxiClient(False)
740
741     _CheckMaster(client)
742
743     (nodes, instances) = _GetGroupData(client, group_uuid)
744
745     # Update per-group instance status file
746     _UpdateInstanceStatus(inst_status_path, instances.values())
747
748     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
749                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
750                          known_groups)
751
752     started = _CheckInstances(client, notepad, instances)
753     _CheckDisks(client, notepad, nodes, instances, started)
754     _VerifyDisks(client, group_uuid, nodes, instances)
755   except Exception, err:
756     logging.info("Not updating status file due to failure: %s", err)
757     raise
758   else:
759     # Save changes for next run
760     notepad.Save(state_path)
761
762   return constants.EXIT_SUCCESS
763
764
765 def Main():
766   """Main function.
767
768   """
769   (options, _) = ParseOptions()
770
771   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
772                      debug=options.debug, stderr_logging=options.debug)
773
774   if ShouldPause() and not options.ignore_pause:
775     logging.debug("Pause has been set, exiting")
776     return constants.EXIT_SUCCESS
777
778   # Try to acquire global watcher lock in shared mode
779   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
780   try:
781     lock.Shared(blocking=False)
782   except (EnvironmentError, errors.LockError), err:
783     logging.error("Can't acquire lock on %s: %s",
784                   constants.WATCHER_LOCK_FILE, err)
785     return constants.EXIT_SUCCESS
786
787   if options.nodegroup is None:
788     fn = _GlobalWatcher
789   else:
790     # Per-nodegroup watcher
791     fn = _GroupWatcher
792
793   try:
794     return fn(options)
795   except (SystemExit, KeyboardInterrupt):
796     raise
797   except NotMasterError:
798     logging.debug("Not master, exiting")
799     return constants.EXIT_NOTMASTER
800   except errors.ResolverError, err:
801     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
802     return constants.EXIT_NODESETUP_ERROR
803   except errors.JobQueueFull:
804     logging.error("Job queue is full, can't query cluster state")
805   except errors.JobQueueDrainError:
806     logging.error("Job queue is drained, can't maintain cluster state")
807   except Exception, err:
808     logging.exception(str(err))
809     return constants.EXIT_FAILURE
810
811   return constants.EXIT_SUCCESS