watcher: Use locks when querying for resource information
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable-msg=W0611
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61   constants.INSTST_ERRORDOWN,
62   ])
63 HELPLESS_STATES = frozenset([
64   constants.INSTST_NODEDOWN,
65   constants.INSTST_NODEOFFLINE,
66   ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
72
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75
76
77 class NotMasterError(errors.GenericError):
78   """Exception raised when this host is not the master."""
79
80
81 def ShouldPause():
82   """Check whether we should pause.
83
84   """
85   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87
88 def StartNodeDaemons():
89   """Start all the daemons that should be running on all nodes.
90
91   """
92   # on master or not, try to start the node daemon
93   utils.EnsureDaemon(constants.NODED)
94   # start confd as well. On non candidates it will be in disabled mode.
95   utils.EnsureDaemon(constants.CONFD)
96
97
98 def RunWatcherHooks():
99   """Run the watcher hooks.
100
101   """
102   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103                              constants.HOOKS_NAME_WATCHER)
104   if not os.path.isdir(hooks_dir):
105     return
106
107   try:
108     results = utils.RunParts(hooks_dir)
109   except Exception: # pylint: disable-msg=W0703
110     logging.exception("RunParts %s failed: %s", hooks_dir)
111     return
112
113   for (relname, status, runresult) in results:
114     if status == constants.RUNPARTS_SKIP:
115       logging.debug("Watcher hook %s: skipped", relname)
116     elif status == constants.RUNPARTS_ERR:
117       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118     elif status == constants.RUNPARTS_RUN:
119       if runresult.failed:
120         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121                         relname, runresult.exit_code, runresult.output)
122       else:
123         logging.debug("Watcher hook %s: success (output: %s)", relname,
124                       runresult.output)
125     else:
126       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
127                                    status)
128
129
130 class Instance(object):
131   """Abstraction for a Virtual Machine instance.
132
133   """
134   def __init__(self, name, status, autostart, snodes):
135     self.name = name
136     self.status = status
137     self.autostart = autostart
138     self.snodes = snodes
139
140   def Restart(self, cl):
141     """Encapsulates the start of an instance.
142
143     """
144     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145     cli.SubmitOpCode(op, cl=cl)
146
147   def ActivateDisks(self, cl):
148     """Encapsulates the activation of all disks of an instance.
149
150     """
151     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152     cli.SubmitOpCode(op, cl=cl)
153
154
155 class Node:
156   """Data container representing cluster node.
157
158   """
159   def __init__(self, name, bootid, offline, secondaries):
160     """Initializes this class.
161
162     """
163     self.name = name
164     self.bootid = bootid
165     self.offline = offline
166     self.secondaries = secondaries
167
168
169 def _CheckInstances(cl, notepad, instances):
170   """Make a pass over the list of instances, restarting downed ones.
171
172   """
173   notepad.MaintainInstanceList(instances.keys())
174
175   started = set()
176
177   for inst in instances.values():
178     if inst.status in BAD_STATES:
179       n = notepad.NumberOfRestartAttempts(inst.name)
180
181       if n > MAXTRIES:
182         logging.warning("Not restarting instance '%s', retries exhausted",
183                         inst.name)
184         continue
185
186       if n == MAXTRIES:
187         notepad.RecordRestartAttempt(inst.name)
188         logging.error("Could not restart instance '%s' after %s attempts,"
189                       " giving up", inst.name, MAXTRIES)
190         continue
191
192       try:
193         logging.info("Restarting instance '%s' (attempt #%s)",
194                      inst.name, n + 1)
195         inst.Restart(cl)
196       except Exception: # pylint: disable-msg=W0703
197         logging.exception("Error while restarting instance '%s'", inst.name)
198       else:
199         started.add(inst.name)
200
201       notepad.RecordRestartAttempt(inst.name)
202
203     else:
204       if notepad.NumberOfRestartAttempts(inst.name):
205         notepad.RemoveInstance(inst.name)
206         if inst.status not in HELPLESS_STATES:
207           logging.info("Restart of instance '%s' succeeded", inst.name)
208
209   return started
210
211
212 def _CheckDisks(cl, notepad, nodes, instances, started):
213   """Check all nodes for restarted ones.
214
215   """
216   check_nodes = []
217
218   for node in nodes.values():
219     old = notepad.GetNodeBootID(node.name)
220     if not node.bootid:
221       # Bad node, not returning a boot id
222       if not node.offline:
223         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
224                       node.name)
225       continue
226
227     if old != node.bootid:
228       # Node's boot ID has changed, probably through a reboot
229       check_nodes.append(node)
230
231   if check_nodes:
232     # Activate disks for all instances with any of the checked nodes as a
233     # secondary node.
234     for node in check_nodes:
235       for instance_name in node.secondaries:
236         try:
237           inst = instances[instance_name]
238         except KeyError:
239           logging.info("Can't find instance '%s', maybe it was ignored",
240                        instance_name)
241           continue
242
243         if not inst.autostart:
244           logging.info("Skipping disk activation for non-autostart"
245                        " instance '%s'", inst.name)
246           continue
247
248         if inst.name in started:
249           # we already tried to start the instance, which should have
250           # activated its drives (if they can be at all)
251           logging.debug("Skipping disk activation for instance '%s' as"
252                         " it was already started", inst.name)
253           continue
254
255         try:
256           logging.info("Activating disks for instance '%s'", inst.name)
257           inst.ActivateDisks(cl)
258         except Exception: # pylint: disable-msg=W0703
259           logging.exception("Error while activating disks for instance '%s'",
260                             inst.name)
261
262     # Keep changed boot IDs
263     for node in check_nodes:
264       notepad.SetNodeBootID(node.name, node.bootid)
265
266
267 def _CheckForOfflineNodes(nodes, instance):
268   """Checks if given instances has any secondary in offline status.
269
270   @param instance: The instance object
271   @return: True if any of the secondary is offline, False otherwise
272
273   """
274   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275
276
277 def _VerifyDisks(cl, uuid, nodes, instances):
278   """Run a per-group "gnt-cluster verify-disks".
279
280   """
281   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282   ((_, offline_disk_instances, _), ) = \
283     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284   cl.ArchiveJob(job_id)
285
286   if not offline_disk_instances:
287     # nothing to do
288     logging.debug("Verify-disks reported no offline disks, nothing to do")
289     return
290
291   logging.debug("Will activate disks for instance(s) %s",
292                 utils.CommaJoin(offline_disk_instances))
293
294   # We submit only one job, and wait for it. Not optimal, but this puts less
295   # load on the job queue.
296   job = []
297   for name in offline_disk_instances:
298     try:
299       inst = instances[name]
300     except KeyError:
301       logging.info("Can't find instance '%s', maybe it was ignored", name)
302       continue
303
304     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305       logging.info("Skipping instance '%s' because it is in a helpless state or"
306                    " has offline secondaries", name)
307       continue
308
309     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310
311   if job:
312     job_id = cli.SendJob(job, cl=cl)
313
314     try:
315       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316     except Exception: # pylint: disable-msg=W0703
317       logging.exception("Error while activating disks")
318
319
320 def IsRapiResponding(hostname):
321   """Connects to RAPI port and does a simple test.
322
323   Connects to RAPI port of hostname and does a simple test. At this time, the
324   test is GetVersion.
325
326   @type hostname: string
327   @param hostname: hostname of the node to connect to.
328   @rtype: bool
329   @return: Whether RAPI is working properly
330
331   """
332   curl_config = rapi.client.GenericCurlConfig()
333   rapi_client = rapi.client.GanetiRapiClient(hostname,
334                                              curl_config_fn=curl_config)
335   try:
336     master_version = rapi_client.GetVersion()
337   except rapi.client.CertificateError, err:
338     logging.warning("RAPI certificate error: %s", err)
339     return False
340   except rapi.client.GanetiApiError, err:
341     logging.warning("RAPI error: %s", err)
342     return False
343   else:
344     logging.debug("Reported RAPI version %s", master_version)
345     return master_version == constants.RAPI_VERSION
346
347
348 def ParseOptions():
349   """Parse the command line options.
350
351   @return: (options, args) as from OptionParser.parse_args()
352
353   """
354   parser = OptionParser(description="Ganeti cluster watcher",
355                         usage="%prog [-d]",
356                         version="%%prog (ganeti) %s" %
357                         constants.RELEASE_VERSION)
358
359   parser.add_option(cli.DEBUG_OPT)
360   parser.add_option(cli.NODEGROUP_OPT)
361   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362                     help="Autoarchive jobs older than this age (default"
363                           " 6 hours)")
364   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365                     action="store_true", help="Ignore cluster pause setting")
366   parser.add_option("--wait-children", dest="wait_children", default=False,
367                     action="store_true", help="Wait for child processes")
368   options, args = parser.parse_args()
369   options.job_age = cli.ParseTimespec(options.job_age)
370
371   if args:
372     parser.error("No arguments expected")
373
374   return (options, args)
375
376
377 def _WriteInstanceStatus(filename, data):
378   """Writes the per-group instance status file.
379
380   The entries are sorted.
381
382   @type filename: string
383   @param filename: Path to instance status file
384   @type data: list of tuple; (instance name as string, status as string)
385   @param data: Instance name and status
386
387   """
388   logging.debug("Updating instance status file '%s' with %s instances",
389                 filename, len(data))
390
391   utils.WriteFile(filename,
392                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
393                                    sorted(data))))
394
395
396 def _UpdateInstanceStatus(filename, instances):
397   """Writes an instance status file from L{Instance} objects.
398
399   @type filename: string
400   @param filename: Path to status file
401   @type instances: list of L{Instance}
402
403   """
404   _WriteInstanceStatus(filename, [(inst.name, inst.status)
405                                   for inst in instances])
406
407
408 class _StatCb:
409   """Helper to store file handle's C{fstat}.
410
411   """
412   def __init__(self):
413     """Initializes this class.
414
415     """
416     self.st = None
417
418   def __call__(self, fh):
419     """Calls C{fstat} on file handle.
420
421     """
422     self.st = os.fstat(fh.fileno())
423
424
425 def _ReadInstanceStatus(filename):
426   """Reads an instance status file.
427
428   @type filename: string
429   @param filename: Path to status file
430   @rtype: tuple; (None or number, list of lists containing instance name and
431     status)
432   @return: File's mtime and instance status contained in the file; mtime is
433     C{None} if file can't be read
434
435   """
436   logging.debug("Reading per-group instance status from '%s'", filename)
437
438   statcb = _StatCb()
439   try:
440     content = utils.ReadFile(filename, preread=statcb)
441   except EnvironmentError, err:
442     if err.errno == errno.ENOENT:
443       logging.error("Can't read '%s', does not exist (yet)", filename)
444     else:
445       logging.exception("Unable to read '%s', ignoring", filename)
446     return (None, None)
447   else:
448     return (statcb.st.st_mtime, [line.split(None, 1)
449                                  for line in content.splitlines()])
450
451
452 def _MergeInstanceStatus(filename, pergroup_filename, groups):
453   """Merges all per-group instance status files into a global one.
454
455   @type filename: string
456   @param filename: Path to global instance status file
457   @type pergroup_filename: string
458   @param pergroup_filename: Path to per-group status files, must contain "%s"
459     to be replaced with group UUID
460   @type groups: sequence
461   @param groups: UUIDs of known groups
462
463   """
464   # Lock global status file in exclusive mode
465   lock = utils.FileLock.Open(filename)
466   try:
467     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
468   except errors.LockError, err:
469     # All per-group processes will lock and update the file. None of them
470     # should take longer than 10 seconds (the value of
471     # INSTANCE_STATUS_LOCK_TIMEOUT).
472     logging.error("Can't acquire lock on instance status file '%s', not"
473                   " updating: %s", filename, err)
474     return
475
476   logging.debug("Acquired exclusive lock on '%s'", filename)
477
478   data = {}
479
480   # Load instance status from all groups
481   for group_uuid in groups:
482     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
483
484     if mtime is not None:
485       for (instance_name, status) in instdata:
486         data.setdefault(instance_name, []).append((mtime, status))
487
488   # Select last update based on file mtime
489   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
490                 for (instance_name, status) in data.items()]
491
492   # Write the global status file. Don't touch file after it's been
493   # updated--there is no lock anymore.
494   _WriteInstanceStatus(filename, inststatus)
495
496
497 def GetLuxiClient(try_restart):
498   """Tries to connect to the master daemon.
499
500   @type try_restart: bool
501   @param try_restart: Whether to attempt to restart the master daemon
502
503   """
504   try:
505     return cli.GetClient()
506   except errors.OpPrereqError, err:
507     # this is, from cli.GetClient, a not-master case
508     raise NotMasterError("Not on master node (%s)" % err)
509
510   except luxi.NoMasterError, err:
511     if not try_restart:
512       raise
513
514     logging.warning("Master daemon seems to be down (%s), trying to restart",
515                     err)
516
517     if not utils.EnsureDaemon(constants.MASTERD):
518       raise errors.GenericError("Can't start the master daemon")
519
520     # Retry the connection
521     return cli.GetClient()
522
523
524 def _StartGroupChildren(cl, wait):
525   """Starts a new instance of the watcher for every node group.
526
527   """
528   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
529                         for arg in sys.argv)
530
531   result = cl.QueryGroups([], ["name", "uuid"], False)
532
533   children = []
534
535   for (idx, (name, uuid)) in enumerate(result):
536     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
537
538     if idx > 0:
539       # Let's not kill the system
540       time.sleep(CHILD_PROCESS_DELAY)
541
542     logging.debug("Spawning child for group '%s' (%s), arguments %s",
543                   name, uuid, args)
544
545     try:
546       # TODO: Should utils.StartDaemon be used instead?
547       pid = os.spawnv(os.P_NOWAIT, args[0], args)
548     except Exception: # pylint: disable-msg=W0703
549       logging.exception("Failed to start child for group '%s' (%s)",
550                         name, uuid)
551     else:
552       logging.debug("Started with PID %s", pid)
553       children.append(pid)
554
555   if wait:
556     for pid in children:
557       logging.debug("Waiting for child PID %s", pid)
558       try:
559         result = utils.RetryOnSignal(os.waitpid, pid, 0)
560       except EnvironmentError, err:
561         result = str(err)
562
563       logging.debug("Child PID %s exited with status %s", pid, result)
564
565
566 def _ArchiveJobs(cl, age):
567   """Archives old jobs.
568
569   """
570   (arch_count, left_count) = cl.AutoArchiveJobs(age)
571   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
572
573
574 def _CheckMaster(cl):
575   """Ensures current host is master node.
576
577   """
578   (master, ) = cl.QueryConfigValues(["master_node"])
579   if master != netutils.Hostname.GetSysName():
580     raise NotMasterError("This is not the master node")
581
582
583 @rapi.client.UsesRapiClient
584 def _GlobalWatcher(opts):
585   """Main function for global watcher.
586
587   At the end child processes are spawned for every node group.
588
589   """
590   StartNodeDaemons()
591   RunWatcherHooks()
592
593   # Run node maintenance in all cases, even if master, so that old masters can
594   # be properly cleaned up
595   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
596     nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
597
598   try:
599     client = GetLuxiClient(True)
600   except NotMasterError:
601     # Don't proceed on non-master nodes
602     return constants.EXIT_SUCCESS
603
604   # we are on master now
605   utils.EnsureDaemon(constants.RAPI)
606
607   # If RAPI isn't responding to queries, try one restart
608   logging.debug("Attempting to talk to remote API on %s",
609                 constants.IP4_ADDRESS_LOCALHOST)
610   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
611     logging.warning("Couldn't get answer from remote API, restaring daemon")
612     utils.StopDaemon(constants.RAPI)
613     utils.EnsureDaemon(constants.RAPI)
614     logging.debug("Second attempt to talk to remote API")
615     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616       logging.fatal("RAPI is not responding")
617   logging.debug("Successfully talked to remote API")
618
619   _CheckMaster(client)
620   _ArchiveJobs(client, opts.job_age)
621
622   # Spawn child processes for all node groups
623   _StartGroupChildren(client, opts.wait_children)
624
625   return constants.EXIT_SUCCESS
626
627
628 def _GetGroupData(cl, uuid):
629   """Retrieves instances and nodes per node group.
630
631   """
632   job = [
633     # Get all primary instances in group
634     opcodes.OpQuery(what=constants.QR_INSTANCE,
635                     fields=["name", "status", "admin_state", "snodes",
636                             "pnode.group.uuid", "snodes.group.uuid"],
637                     filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
638                     use_locking=True),
639
640     # Get all nodes in group
641     opcodes.OpQuery(what=constants.QR_NODE,
642                     fields=["name", "bootid", "offline"],
643                     filter=[qlang.OP_EQUAL, "group.uuid", uuid],
644                     use_locking=True),
645     ]
646
647   job_id = cl.SubmitJob(job)
648   results = map(objects.QueryResponse.FromDict,
649                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
650   cl.ArchiveJob(job_id)
651
652   results_data = map(operator.attrgetter("data"), results)
653
654   # Ensure results are tuples with two values
655   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
656
657   # Extract values ignoring result status
658   (raw_instances, raw_nodes) = [[map(compat.snd, values)
659                                  for values in res]
660                                 for res in results_data]
661
662   secondaries = {}
663   instances = []
664
665   # Load all instances
666   for (name, status, autostart, snodes, pnode_group_uuid,
667        snodes_group_uuid) in raw_instances:
668     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
669       logging.error("Ignoring split instance '%s', primary group %s, secondary"
670                     " groups %s", name, pnode_group_uuid,
671                     utils.CommaJoin(snodes_group_uuid))
672     else:
673       instances.append(Instance(name, status, autostart, snodes))
674
675       for node in snodes:
676         secondaries.setdefault(node, set()).add(name)
677
678   # Load all nodes
679   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
680            for (name, bootid, offline) in raw_nodes]
681
682   return (dict((node.name, node) for node in nodes),
683           dict((inst.name, inst) for inst in instances))
684
685
686 def _LoadKnownGroups():
687   """Returns a list of all node groups known by L{ssconf}.
688
689   """
690   groups = ssconf.SimpleStore().GetNodegroupList()
691
692   result = list(line.split(None, 1)[0] for line in groups
693                 if line.strip())
694
695   if not compat.all(map(utils.UUID_RE.match, result)):
696     raise errors.GenericError("Ssconf contains invalid group UUID")
697
698   return result
699
700
701 def _GroupWatcher(opts):
702   """Main function for per-group watcher process.
703
704   """
705   group_uuid = opts.nodegroup.lower()
706
707   if not utils.UUID_RE.match(group_uuid):
708     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
709                               " got '%s'" %
710                               (cli.NODEGROUP_OPT_NAME, group_uuid))
711
712   logging.info("Watcher for node group '%s'", group_uuid)
713
714   known_groups = _LoadKnownGroups()
715
716   # Check if node group is known
717   if group_uuid not in known_groups:
718     raise errors.GenericError("Node group '%s' is not known by ssconf" %
719                               group_uuid)
720
721   # Group UUID has been verified and should not contain any dangerous characters
722   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
723   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
724
725   logging.debug("Using state file %s", state_path)
726
727   # Global watcher
728   statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
729   if not statefile:
730     return constants.EXIT_FAILURE
731
732   notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
733   try:
734     # Connect to master daemon
735     client = GetLuxiClient(False)
736
737     _CheckMaster(client)
738
739     (nodes, instances) = _GetGroupData(client, group_uuid)
740
741     # Update per-group instance status file
742     _UpdateInstanceStatus(inst_status_path, instances.values())
743
744     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
745                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
746                          known_groups)
747
748     started = _CheckInstances(client, notepad, instances)
749     _CheckDisks(client, notepad, nodes, instances, started)
750     _VerifyDisks(client, group_uuid, nodes, instances)
751   except Exception, err:
752     logging.info("Not updating status file due to failure: %s", err)
753     raise
754   else:
755     # Save changes for next run
756     notepad.Save(state_path)
757
758   return constants.EXIT_SUCCESS
759
760
761 def Main():
762   """Main function.
763
764   """
765   (options, _) = ParseOptions()
766
767   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
768                      debug=options.debug, stderr_logging=options.debug)
769
770   if ShouldPause() and not options.ignore_pause:
771     logging.debug("Pause has been set, exiting")
772     return constants.EXIT_SUCCESS
773
774   # Try to acquire global watcher lock in shared mode
775   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
776   try:
777     lock.Shared(blocking=False)
778   except (EnvironmentError, errors.LockError), err:
779     logging.error("Can't acquire lock on %s: %s",
780                   constants.WATCHER_LOCK_FILE, err)
781     return constants.EXIT_SUCCESS
782
783   if options.nodegroup is None:
784     fn = _GlobalWatcher
785   else:
786     # Per-nodegroup watcher
787     fn = _GroupWatcher
788
789   try:
790     return fn(options)
791   except (SystemExit, KeyboardInterrupt):
792     raise
793   except NotMasterError:
794     logging.debug("Not master, exiting")
795     return constants.EXIT_NOTMASTER
796   except errors.ResolverError, err:
797     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
798     return constants.EXIT_NODESETUP_ERROR
799   except errors.JobQueueFull:
800     logging.error("Job queue is full, can't query cluster state")
801   except errors.JobQueueDrainError:
802     logging.error("Job queue is drained, can't maintain cluster state")
803   except Exception, err:
804     logging.exception(str(err))
805     return constants.EXIT_FAILURE
806
807   return constants.EXIT_SUCCESS