Fix decorator uses which crash newer pylint
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable=W0611
54 from ganeti.rapi.client import UsesRapiClient
55
56 from ganeti.watcher import nodemaint
57 from ganeti.watcher import state
58
59
60 MAXTRIES = 5
61 BAD_STATES = frozenset([
62   constants.INSTST_ERRORDOWN,
63   ])
64 HELPLESS_STATES = frozenset([
65   constants.INSTST_NODEDOWN,
66   constants.INSTST_NODEOFFLINE,
67   ])
68 NOTICE = "NOTICE"
69 ERROR = "ERROR"
70
71 #: Number of seconds to wait between starting child processes for node groups
72 CHILD_PROCESS_DELAY = 1.0
73
74 #: How many seconds to wait for instance status file lock
75 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
76
77
78 class NotMasterError(errors.GenericError):
79   """Exception raised when this host is not the master."""
80
81
82 def ShouldPause():
83   """Check whether we should pause.
84
85   """
86   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87
88
89 def StartNodeDaemons():
90   """Start all the daemons that should be running on all nodes.
91
92   """
93   # on master or not, try to start the node daemon
94   utils.EnsureDaemon(constants.NODED)
95   # start confd as well. On non candidates it will be in disabled mode.
96   if constants.ENABLE_CONFD:
97     utils.EnsureDaemon(constants.CONFD)
98
99
100 def RunWatcherHooks():
101   """Run the watcher hooks.
102
103   """
104   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
105                              constants.HOOKS_NAME_WATCHER)
106   if not os.path.isdir(hooks_dir):
107     return
108
109   try:
110     results = utils.RunParts(hooks_dir)
111   except Exception, err: # pylint: disable=W0703
112     logging.exception("RunParts %s failed: %s", hooks_dir, err)
113     return
114
115   for (relname, status, runresult) in results:
116     if status == constants.RUNPARTS_SKIP:
117       logging.debug("Watcher hook %s: skipped", relname)
118     elif status == constants.RUNPARTS_ERR:
119       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
120     elif status == constants.RUNPARTS_RUN:
121       if runresult.failed:
122         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
123                         relname, runresult.exit_code, runresult.output)
124       else:
125         logging.debug("Watcher hook %s: success (output: %s)", relname,
126                       runresult.output)
127     else:
128       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
129                                    status)
130
131
132 class Instance(object):
133   """Abstraction for a Virtual Machine instance.
134
135   """
136   def __init__(self, name, status, autostart, snodes):
137     self.name = name
138     self.status = status
139     self.autostart = autostart
140     self.snodes = snodes
141
142   def Restart(self, cl):
143     """Encapsulates the start of an instance.
144
145     """
146     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
147     cli.SubmitOpCode(op, cl=cl)
148
149   def ActivateDisks(self, cl):
150     """Encapsulates the activation of all disks of an instance.
151
152     """
153     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
154     cli.SubmitOpCode(op, cl=cl)
155
156
157 class Node:
158   """Data container representing cluster node.
159
160   """
161   def __init__(self, name, bootid, offline, secondaries):
162     """Initializes this class.
163
164     """
165     self.name = name
166     self.bootid = bootid
167     self.offline = offline
168     self.secondaries = secondaries
169
170
171 def _CheckInstances(cl, notepad, instances):
172   """Make a pass over the list of instances, restarting downed ones.
173
174   """
175   notepad.MaintainInstanceList(instances.keys())
176
177   started = set()
178
179   for inst in instances.values():
180     if inst.status in BAD_STATES:
181       n = notepad.NumberOfRestartAttempts(inst.name)
182
183       if n > MAXTRIES:
184         logging.warning("Not restarting instance '%s', retries exhausted",
185                         inst.name)
186         continue
187
188       if n == MAXTRIES:
189         notepad.RecordRestartAttempt(inst.name)
190         logging.error("Could not restart instance '%s' after %s attempts,"
191                       " giving up", inst.name, MAXTRIES)
192         continue
193
194       try:
195         logging.info("Restarting instance '%s' (attempt #%s)",
196                      inst.name, n + 1)
197         inst.Restart(cl)
198       except Exception: # pylint: disable=W0703
199         logging.exception("Error while restarting instance '%s'", inst.name)
200       else:
201         started.add(inst.name)
202
203       notepad.RecordRestartAttempt(inst.name)
204
205     else:
206       if notepad.NumberOfRestartAttempts(inst.name):
207         notepad.RemoveInstance(inst.name)
208         if inst.status not in HELPLESS_STATES:
209           logging.info("Restart of instance '%s' succeeded", inst.name)
210
211   return started
212
213
214 def _CheckDisks(cl, notepad, nodes, instances, started):
215   """Check all nodes for restarted ones.
216
217   """
218   check_nodes = []
219
220   for node in nodes.values():
221     old = notepad.GetNodeBootID(node.name)
222     if not node.bootid:
223       # Bad node, not returning a boot id
224       if not node.offline:
225         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
226                       node.name)
227       continue
228
229     if old != node.bootid:
230       # Node's boot ID has changed, probably through a reboot
231       check_nodes.append(node)
232
233   if check_nodes:
234     # Activate disks for all instances with any of the checked nodes as a
235     # secondary node.
236     for node in check_nodes:
237       for instance_name in node.secondaries:
238         try:
239           inst = instances[instance_name]
240         except KeyError:
241           logging.info("Can't find instance '%s', maybe it was ignored",
242                        instance_name)
243           continue
244
245         if not inst.autostart:
246           logging.info("Skipping disk activation for non-autostart"
247                        " instance '%s'", inst.name)
248           continue
249
250         if inst.name in started:
251           # we already tried to start the instance, which should have
252           # activated its drives (if they can be at all)
253           logging.debug("Skipping disk activation for instance '%s' as"
254                         " it was already started", inst.name)
255           continue
256
257         try:
258           logging.info("Activating disks for instance '%s'", inst.name)
259           inst.ActivateDisks(cl)
260         except Exception: # pylint: disable=W0703
261           logging.exception("Error while activating disks for instance '%s'",
262                             inst.name)
263
264     # Keep changed boot IDs
265     for node in check_nodes:
266       notepad.SetNodeBootID(node.name, node.bootid)
267
268
269 def _CheckForOfflineNodes(nodes, instance):
270   """Checks if given instances has any secondary in offline status.
271
272   @param instance: The instance object
273   @return: True if any of the secondary is offline, False otherwise
274
275   """
276   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277
278
279 def _VerifyDisks(cl, uuid, nodes, instances):
280   """Run a per-group "gnt-cluster verify-disks".
281
282   """
283   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
284   ((_, offline_disk_instances, _), ) = \
285     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
286   cl.ArchiveJob(job_id)
287
288   if not offline_disk_instances:
289     # nothing to do
290     logging.debug("Verify-disks reported no offline disks, nothing to do")
291     return
292
293   logging.debug("Will activate disks for instance(s) %s",
294                 utils.CommaJoin(offline_disk_instances))
295
296   # We submit only one job, and wait for it. Not optimal, but this puts less
297   # load on the job queue.
298   job = []
299   for name in offline_disk_instances:
300     try:
301       inst = instances[name]
302     except KeyError:
303       logging.info("Can't find instance '%s', maybe it was ignored", name)
304       continue
305
306     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
307       logging.info("Skipping instance '%s' because it is in a helpless state"
308                    " or has offline secondaries", name)
309       continue
310
311     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312
313   if job:
314     job_id = cli.SendJob(job, cl=cl)
315
316     try:
317       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
318     except Exception: # pylint: disable=W0703
319       logging.exception("Error while activating disks")
320
321
322 def IsRapiResponding(hostname):
323   """Connects to RAPI port and does a simple test.
324
325   Connects to RAPI port of hostname and does a simple test. At this time, the
326   test is GetVersion.
327
328   @type hostname: string
329   @param hostname: hostname of the node to connect to.
330   @rtype: bool
331   @return: Whether RAPI is working properly
332
333   """
334   curl_config = rapi.client.GenericCurlConfig()
335   rapi_client = rapi.client.GanetiRapiClient(hostname,
336                                              curl_config_fn=curl_config)
337   try:
338     master_version = rapi_client.GetVersion()
339   except rapi.client.CertificateError, err:
340     logging.warning("RAPI certificate error: %s", err)
341     return False
342   except rapi.client.GanetiApiError, err:
343     logging.warning("RAPI error: %s", err)
344     return False
345   else:
346     logging.debug("Reported RAPI version %s", master_version)
347     return master_version == constants.RAPI_VERSION
348
349
350 def ParseOptions():
351   """Parse the command line options.
352
353   @return: (options, args) as from OptionParser.parse_args()
354
355   """
356   parser = OptionParser(description="Ganeti cluster watcher",
357                         usage="%prog [-d]",
358                         version="%%prog (ganeti) %s" %
359                         constants.RELEASE_VERSION)
360
361   parser.add_option(cli.DEBUG_OPT)
362   parser.add_option(cli.NODEGROUP_OPT)
363   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
364                     help="Autoarchive jobs older than this age (default"
365                           " 6 hours)")
366   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
367                     action="store_true", help="Ignore cluster pause setting")
368   parser.add_option("--wait-children", dest="wait_children",
369                     action="store_true", help="Wait for child processes")
370   parser.add_option("--no-wait-children", dest="wait_children",
371                     action="store_false",
372                     help="Don't wait for child processes")
373   # See optparse documentation for why default values are not set by options
374   parser.set_defaults(wait_children=True)
375   options, args = parser.parse_args()
376   options.job_age = cli.ParseTimespec(options.job_age)
377
378   if args:
379     parser.error("No arguments expected")
380
381   return (options, args)
382
383
384 def _WriteInstanceStatus(filename, data):
385   """Writes the per-group instance status file.
386
387   The entries are sorted.
388
389   @type filename: string
390   @param filename: Path to instance status file
391   @type data: list of tuple; (instance name as string, status as string)
392   @param data: Instance name and status
393
394   """
395   logging.debug("Updating instance status file '%s' with %s instances",
396                 filename, len(data))
397
398   utils.WriteFile(filename,
399                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
400                                    sorted(data))))
401
402
403 def _UpdateInstanceStatus(filename, instances):
404   """Writes an instance status file from L{Instance} objects.
405
406   @type filename: string
407   @param filename: Path to status file
408   @type instances: list of L{Instance}
409
410   """
411   _WriteInstanceStatus(filename, [(inst.name, inst.status)
412                                   for inst in instances])
413
414
415 def _ReadInstanceStatus(filename):
416   """Reads an instance status file.
417
418   @type filename: string
419   @param filename: Path to status file
420   @rtype: tuple; (None or number, list of lists containing instance name and
421     status)
422   @return: File's mtime and instance status contained in the file; mtime is
423     C{None} if file can't be read
424
425   """
426   logging.debug("Reading per-group instance status from '%s'", filename)
427
428   statcb = utils.FileStatHelper()
429   try:
430     content = utils.ReadFile(filename, preread=statcb)
431   except EnvironmentError, err:
432     if err.errno == errno.ENOENT:
433       logging.error("Can't read '%s', does not exist (yet)", filename)
434     else:
435       logging.exception("Unable to read '%s', ignoring", filename)
436     return (None, None)
437   else:
438     return (statcb.st.st_mtime, [line.split(None, 1)
439                                  for line in content.splitlines()])
440
441
442 def _MergeInstanceStatus(filename, pergroup_filename, groups):
443   """Merges all per-group instance status files into a global one.
444
445   @type filename: string
446   @param filename: Path to global instance status file
447   @type pergroup_filename: string
448   @param pergroup_filename: Path to per-group status files, must contain "%s"
449     to be replaced with group UUID
450   @type groups: sequence
451   @param groups: UUIDs of known groups
452
453   """
454   # Lock global status file in exclusive mode
455   lock = utils.FileLock.Open(filename)
456   try:
457     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
458   except errors.LockError, err:
459     # All per-group processes will lock and update the file. None of them
460     # should take longer than 10 seconds (the value of
461     # INSTANCE_STATUS_LOCK_TIMEOUT).
462     logging.error("Can't acquire lock on instance status file '%s', not"
463                   " updating: %s", filename, err)
464     return
465
466   logging.debug("Acquired exclusive lock on '%s'", filename)
467
468   data = {}
469
470   # Load instance status from all groups
471   for group_uuid in groups:
472     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
473
474     if mtime is not None:
475       for (instance_name, status) in instdata:
476         data.setdefault(instance_name, []).append((mtime, status))
477
478   # Select last update based on file mtime
479   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
480                 for (instance_name, status) in data.items()]
481
482   # Write the global status file. Don't touch file after it's been
483   # updated--there is no lock anymore.
484   _WriteInstanceStatus(filename, inststatus)
485
486
487 def GetLuxiClient(try_restart):
488   """Tries to connect to the master daemon.
489
490   @type try_restart: bool
491   @param try_restart: Whether to attempt to restart the master daemon
492
493   """
494   try:
495     return cli.GetClient()
496   except errors.OpPrereqError, err:
497     # this is, from cli.GetClient, a not-master case
498     raise NotMasterError("Not on master node (%s)" % err)
499
500   except luxi.NoMasterError, err:
501     if not try_restart:
502       raise
503
504     logging.warning("Master daemon seems to be down (%s), trying to restart",
505                     err)
506
507     if not utils.EnsureDaemon(constants.MASTERD):
508       raise errors.GenericError("Can't start the master daemon")
509
510     # Retry the connection
511     return cli.GetClient()
512
513
514 def _StartGroupChildren(cl, wait):
515   """Starts a new instance of the watcher for every node group.
516
517   """
518   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
519                         for arg in sys.argv)
520
521   result = cl.QueryGroups([], ["name", "uuid"], False)
522
523   children = []
524
525   for (idx, (name, uuid)) in enumerate(result):
526     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
527
528     if idx > 0:
529       # Let's not kill the system
530       time.sleep(CHILD_PROCESS_DELAY)
531
532     logging.debug("Spawning child for group '%s' (%s), arguments %s",
533                   name, uuid, args)
534
535     try:
536       # TODO: Should utils.StartDaemon be used instead?
537       pid = os.spawnv(os.P_NOWAIT, args[0], args)
538     except Exception: # pylint: disable=W0703
539       logging.exception("Failed to start child for group '%s' (%s)",
540                         name, uuid)
541     else:
542       logging.debug("Started with PID %s", pid)
543       children.append(pid)
544
545   if wait:
546     for pid in children:
547       logging.debug("Waiting for child PID %s", pid)
548       try:
549         result = utils.RetryOnSignal(os.waitpid, pid, 0)
550       except EnvironmentError, err:
551         result = str(err)
552
553       logging.debug("Child PID %s exited with status %s", pid, result)
554
555
556 def _ArchiveJobs(cl, age):
557   """Archives old jobs.
558
559   """
560   (arch_count, left_count) = cl.AutoArchiveJobs(age)
561   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
562
563
564 def _CheckMaster(cl):
565   """Ensures current host is master node.
566
567   """
568   (master, ) = cl.QueryConfigValues(["master_node"])
569   if master != netutils.Hostname.GetSysName():
570     raise NotMasterError("This is not the master node")
571
572
573 @UsesRapiClient
574 def _GlobalWatcher(opts):
575   """Main function for global watcher.
576
577   At the end child processes are spawned for every node group.
578
579   """
580   StartNodeDaemons()
581   RunWatcherHooks()
582
583   # Run node maintenance in all cases, even if master, so that old masters can
584   # be properly cleaned up
585   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
586     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
587
588   try:
589     client = GetLuxiClient(True)
590   except NotMasterError:
591     # Don't proceed on non-master nodes
592     return constants.EXIT_SUCCESS
593
594   # we are on master now
595   utils.EnsureDaemon(constants.RAPI)
596
597   # If RAPI isn't responding to queries, try one restart
598   logging.debug("Attempting to talk to remote API on %s",
599                 constants.IP4_ADDRESS_LOCALHOST)
600   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
601     logging.warning("Couldn't get answer from remote API, restaring daemon")
602     utils.StopDaemon(constants.RAPI)
603     utils.EnsureDaemon(constants.RAPI)
604     logging.debug("Second attempt to talk to remote API")
605     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
606       logging.fatal("RAPI is not responding")
607   logging.debug("Successfully talked to remote API")
608
609   _CheckMaster(client)
610   _ArchiveJobs(client, opts.job_age)
611
612   # Spawn child processes for all node groups
613   _StartGroupChildren(client, opts.wait_children)
614
615   return constants.EXIT_SUCCESS
616
617
618 def _GetGroupData(cl, uuid):
619   """Retrieves instances and nodes per node group.
620
621   """
622   job = [
623     # Get all primary instances in group
624     opcodes.OpQuery(what=constants.QR_INSTANCE,
625                     fields=["name", "status", "admin_state", "snodes",
626                             "pnode.group.uuid", "snodes.group.uuid"],
627                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
628                     use_locking=True),
629
630     # Get all nodes in group
631     opcodes.OpQuery(what=constants.QR_NODE,
632                     fields=["name", "bootid", "offline"],
633                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
634                     use_locking=True),
635     ]
636
637   job_id = cl.SubmitJob(job)
638   results = map(objects.QueryResponse.FromDict,
639                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
640   cl.ArchiveJob(job_id)
641
642   results_data = map(operator.attrgetter("data"), results)
643
644   # Ensure results are tuples with two values
645   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
646
647   # Extract values ignoring result status
648   (raw_instances, raw_nodes) = [[map(compat.snd, values)
649                                  for values in res]
650                                 for res in results_data]
651
652   secondaries = {}
653   instances = []
654
655   # Load all instances
656   for (name, status, autostart, snodes, pnode_group_uuid,
657        snodes_group_uuid) in raw_instances:
658     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
659       logging.error("Ignoring split instance '%s', primary group %s, secondary"
660                     " groups %s", name, pnode_group_uuid,
661                     utils.CommaJoin(snodes_group_uuid))
662     else:
663       instances.append(Instance(name, status, autostart, snodes))
664
665       for node in snodes:
666         secondaries.setdefault(node, set()).add(name)
667
668   # Load all nodes
669   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
670            for (name, bootid, offline) in raw_nodes]
671
672   return (dict((node.name, node) for node in nodes),
673           dict((inst.name, inst) for inst in instances))
674
675
676 def _LoadKnownGroups():
677   """Returns a list of all node groups known by L{ssconf}.
678
679   """
680   groups = ssconf.SimpleStore().GetNodegroupList()
681
682   result = list(line.split(None, 1)[0] for line in groups
683                 if line.strip())
684
685   if not compat.all(map(utils.UUID_RE.match, result)):
686     raise errors.GenericError("Ssconf contains invalid group UUID")
687
688   return result
689
690
691 def _GroupWatcher(opts):
692   """Main function for per-group watcher process.
693
694   """
695   group_uuid = opts.nodegroup.lower()
696
697   if not utils.UUID_RE.match(group_uuid):
698     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
699                               " got '%s'" %
700                               (cli.NODEGROUP_OPT_NAME, group_uuid))
701
702   logging.info("Watcher for node group '%s'", group_uuid)
703
704   known_groups = _LoadKnownGroups()
705
706   # Check if node group is known
707   if group_uuid not in known_groups:
708     raise errors.GenericError("Node group '%s' is not known by ssconf" %
709                               group_uuid)
710
711   # Group UUID has been verified and should not contain any dangerous
712   # characters
713   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
714   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
715
716   logging.debug("Using state file %s", state_path)
717
718   # Global watcher
719   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
720   if not statefile:
721     return constants.EXIT_FAILURE
722
723   notepad = state.WatcherState(statefile) # pylint: disable=E0602
724   try:
725     # Connect to master daemon
726     client = GetLuxiClient(False)
727
728     _CheckMaster(client)
729
730     (nodes, instances) = _GetGroupData(client, group_uuid)
731
732     # Update per-group instance status file
733     _UpdateInstanceStatus(inst_status_path, instances.values())
734
735     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
736                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
737                          known_groups)
738
739     started = _CheckInstances(client, notepad, instances)
740     _CheckDisks(client, notepad, nodes, instances, started)
741     _VerifyDisks(client, group_uuid, nodes, instances)
742   except Exception, err:
743     logging.info("Not updating status file due to failure: %s", err)
744     raise
745   else:
746     # Save changes for next run
747     notepad.Save(state_path)
748
749   return constants.EXIT_SUCCESS
750
751
752 def Main():
753   """Main function.
754
755   """
756   (options, _) = ParseOptions()
757
758   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
759                      debug=options.debug, stderr_logging=options.debug)
760
761   if ShouldPause() and not options.ignore_pause:
762     logging.debug("Pause has been set, exiting")
763     return constants.EXIT_SUCCESS
764
765   # Try to acquire global watcher lock in shared mode
766   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
767   try:
768     lock.Shared(blocking=False)
769   except (EnvironmentError, errors.LockError), err:
770     logging.error("Can't acquire lock on %s: %s",
771                   constants.WATCHER_LOCK_FILE, err)
772     return constants.EXIT_SUCCESS
773
774   if options.nodegroup is None:
775     fn = _GlobalWatcher
776   else:
777     # Per-nodegroup watcher
778     fn = _GroupWatcher
779
780   try:
781     return fn(options)
782   except (SystemExit, KeyboardInterrupt):
783     raise
784   except NotMasterError:
785     logging.debug("Not master, exiting")
786     return constants.EXIT_NOTMASTER
787   except errors.ResolverError, err:
788     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
789     return constants.EXIT_NODESETUP_ERROR
790   except errors.JobQueueFull:
791     logging.error("Job queue is full, can't query cluster state")
792   except errors.JobQueueDrainError:
793     logging.error("Job queue is drained, can't maintain cluster state")
794   except Exception, err:
795     logging.exception(str(err))
796     return constants.EXIT_FAILURE
797
798   return constants.EXIT_SUCCESS