Remove two hlint overrides
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = frozenset([
63   constants.INSTST_ERRORDOWN,
64   ])
65 HELPLESS_STATES = frozenset([
66   constants.INSTST_NODEDOWN,
67   constants.INSTST_NODEOFFLINE,
68   ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
74
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77
78
79 class NotMasterError(errors.GenericError):
80   """Exception raised when this host is not the master."""
81
82
83 def ShouldPause():
84   """Check whether we should pause.
85
86   """
87   return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89
90 def StartNodeDaemons():
91   """Start all the daemons that should be running on all nodes.
92
93   """
94   # on master or not, try to start the node daemon
95   utils.EnsureDaemon(constants.NODED)
96   # start confd as well. On non candidates it will be in disabled mode.
97   if constants.ENABLE_CONFD:
98     utils.EnsureDaemon(constants.CONFD)
99
100
101 def RunWatcherHooks():
102   """Run the watcher hooks.
103
104   """
105   hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
106                              constants.HOOKS_NAME_WATCHER)
107   if not os.path.isdir(hooks_dir):
108     return
109
110   try:
111     results = utils.RunParts(hooks_dir)
112   except Exception, err: # pylint: disable=W0703
113     logging.exception("RunParts %s failed: %s", hooks_dir, err)
114     return
115
116   for (relname, status, runresult) in results:
117     if status == constants.RUNPARTS_SKIP:
118       logging.debug("Watcher hook %s: skipped", relname)
119     elif status == constants.RUNPARTS_ERR:
120       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
121     elif status == constants.RUNPARTS_RUN:
122       if runresult.failed:
123         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
124                         relname, runresult.exit_code, runresult.output)
125       else:
126         logging.debug("Watcher hook %s: success (output: %s)", relname,
127                       runresult.output)
128     else:
129       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
130                                    status)
131
132
133 class Instance(object):
134   """Abstraction for a Virtual Machine instance.
135
136   """
137   def __init__(self, name, status, autostart, snodes):
138     self.name = name
139     self.status = status
140     self.autostart = autostart
141     self.snodes = snodes
142
143   def Restart(self, cl):
144     """Encapsulates the start of an instance.
145
146     """
147     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
148     cli.SubmitOpCode(op, cl=cl)
149
150   def ActivateDisks(self, cl):
151     """Encapsulates the activation of all disks of an instance.
152
153     """
154     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
155     cli.SubmitOpCode(op, cl=cl)
156
157
158 class Node:
159   """Data container representing cluster node.
160
161   """
162   def __init__(self, name, bootid, offline, secondaries):
163     """Initializes this class.
164
165     """
166     self.name = name
167     self.bootid = bootid
168     self.offline = offline
169     self.secondaries = secondaries
170
171
172 def _CheckInstances(cl, notepad, instances):
173   """Make a pass over the list of instances, restarting downed ones.
174
175   """
176   notepad.MaintainInstanceList(instances.keys())
177
178   started = set()
179
180   for inst in instances.values():
181     if inst.status in BAD_STATES:
182       n = notepad.NumberOfRestartAttempts(inst.name)
183
184       if n > MAXTRIES:
185         logging.warning("Not restarting instance '%s', retries exhausted",
186                         inst.name)
187         continue
188
189       if n == MAXTRIES:
190         notepad.RecordRestartAttempt(inst.name)
191         logging.error("Could not restart instance '%s' after %s attempts,"
192                       " giving up", inst.name, MAXTRIES)
193         continue
194
195       try:
196         logging.info("Restarting instance '%s' (attempt #%s)",
197                      inst.name, n + 1)
198         inst.Restart(cl)
199       except Exception: # pylint: disable=W0703
200         logging.exception("Error while restarting instance '%s'", inst.name)
201       else:
202         started.add(inst.name)
203
204       notepad.RecordRestartAttempt(inst.name)
205
206     else:
207       if notepad.NumberOfRestartAttempts(inst.name):
208         notepad.RemoveInstance(inst.name)
209         if inst.status not in HELPLESS_STATES:
210           logging.info("Restart of instance '%s' succeeded", inst.name)
211
212   return started
213
214
215 def _CheckDisks(cl, notepad, nodes, instances, started):
216   """Check all nodes for restarted ones.
217
218   """
219   check_nodes = []
220
221   for node in nodes.values():
222     old = notepad.GetNodeBootID(node.name)
223     if not node.bootid:
224       # Bad node, not returning a boot id
225       if not node.offline:
226         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
227                       node.name)
228       continue
229
230     if old != node.bootid:
231       # Node's boot ID has changed, probably through a reboot
232       check_nodes.append(node)
233
234   if check_nodes:
235     # Activate disks for all instances with any of the checked nodes as a
236     # secondary node.
237     for node in check_nodes:
238       for instance_name in node.secondaries:
239         try:
240           inst = instances[instance_name]
241         except KeyError:
242           logging.info("Can't find instance '%s', maybe it was ignored",
243                        instance_name)
244           continue
245
246         if not inst.autostart:
247           logging.info("Skipping disk activation for non-autostart"
248                        " instance '%s'", inst.name)
249           continue
250
251         if inst.name in started:
252           # we already tried to start the instance, which should have
253           # activated its drives (if they can be at all)
254           logging.debug("Skipping disk activation for instance '%s' as"
255                         " it was already started", inst.name)
256           continue
257
258         try:
259           logging.info("Activating disks for instance '%s'", inst.name)
260           inst.ActivateDisks(cl)
261         except Exception: # pylint: disable=W0703
262           logging.exception("Error while activating disks for instance '%s'",
263                             inst.name)
264
265     # Keep changed boot IDs
266     for node in check_nodes:
267       notepad.SetNodeBootID(node.name, node.bootid)
268
269
270 def _CheckForOfflineNodes(nodes, instance):
271   """Checks if given instances has any secondary in offline status.
272
273   @param instance: The instance object
274   @return: True if any of the secondary is offline, False otherwise
275
276   """
277   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278
279
280 def _VerifyDisks(cl, uuid, nodes, instances):
281   """Run a per-group "gnt-cluster verify-disks".
282
283   """
284   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
285   ((_, offline_disk_instances, _), ) = \
286     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
287   cl.ArchiveJob(job_id)
288
289   if not offline_disk_instances:
290     # nothing to do
291     logging.debug("Verify-disks reported no offline disks, nothing to do")
292     return
293
294   logging.debug("Will activate disks for instance(s) %s",
295                 utils.CommaJoin(offline_disk_instances))
296
297   # We submit only one job, and wait for it. Not optimal, but this puts less
298   # load on the job queue.
299   job = []
300   for name in offline_disk_instances:
301     try:
302       inst = instances[name]
303     except KeyError:
304       logging.info("Can't find instance '%s', maybe it was ignored", name)
305       continue
306
307     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
308       logging.info("Skipping instance '%s' because it is in a helpless state"
309                    " or has offline secondaries", name)
310       continue
311
312     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
313
314   if job:
315     job_id = cli.SendJob(job, cl=cl)
316
317     try:
318       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
319     except Exception: # pylint: disable=W0703
320       logging.exception("Error while activating disks")
321
322
323 def IsRapiResponding(hostname):
324   """Connects to RAPI port and does a simple test.
325
326   Connects to RAPI port of hostname and does a simple test. At this time, the
327   test is GetVersion.
328
329   @type hostname: string
330   @param hostname: hostname of the node to connect to.
331   @rtype: bool
332   @return: Whether RAPI is working properly
333
334   """
335   curl_config = rapi.client.GenericCurlConfig()
336   rapi_client = rapi.client.GanetiRapiClient(hostname,
337                                              curl_config_fn=curl_config)
338   try:
339     master_version = rapi_client.GetVersion()
340   except rapi.client.CertificateError, err:
341     logging.warning("RAPI certificate error: %s", err)
342     return False
343   except rapi.client.GanetiApiError, err:
344     logging.warning("RAPI error: %s", err)
345     return False
346   else:
347     logging.debug("Reported RAPI version %s", master_version)
348     return master_version == constants.RAPI_VERSION
349
350
351 def ParseOptions():
352   """Parse the command line options.
353
354   @return: (options, args) as from OptionParser.parse_args()
355
356   """
357   parser = OptionParser(description="Ganeti cluster watcher",
358                         usage="%prog [-d]",
359                         version="%%prog (ganeti) %s" %
360                         constants.RELEASE_VERSION)
361
362   parser.add_option(cli.DEBUG_OPT)
363   parser.add_option(cli.NODEGROUP_OPT)
364   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
365                     help="Autoarchive jobs older than this age (default"
366                           " 6 hours)")
367   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
368                     action="store_true", help="Ignore cluster pause setting")
369   parser.add_option("--wait-children", dest="wait_children",
370                     action="store_true", help="Wait for child processes")
371   parser.add_option("--no-wait-children", dest="wait_children",
372                     action="store_false",
373                     help="Don't wait for child processes")
374   # See optparse documentation for why default values are not set by options
375   parser.set_defaults(wait_children=True)
376   options, args = parser.parse_args()
377   options.job_age = cli.ParseTimespec(options.job_age)
378
379   if args:
380     parser.error("No arguments expected")
381
382   return (options, args)
383
384
385 def _WriteInstanceStatus(filename, data):
386   """Writes the per-group instance status file.
387
388   The entries are sorted.
389
390   @type filename: string
391   @param filename: Path to instance status file
392   @type data: list of tuple; (instance name as string, status as string)
393   @param data: Instance name and status
394
395   """
396   logging.debug("Updating instance status file '%s' with %s instances",
397                 filename, len(data))
398
399   utils.WriteFile(filename,
400                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
401                                    sorted(data))))
402
403
404 def _UpdateInstanceStatus(filename, instances):
405   """Writes an instance status file from L{Instance} objects.
406
407   @type filename: string
408   @param filename: Path to status file
409   @type instances: list of L{Instance}
410
411   """
412   _WriteInstanceStatus(filename, [(inst.name, inst.status)
413                                   for inst in instances])
414
415
416 def _ReadInstanceStatus(filename):
417   """Reads an instance status file.
418
419   @type filename: string
420   @param filename: Path to status file
421   @rtype: tuple; (None or number, list of lists containing instance name and
422     status)
423   @return: File's mtime and instance status contained in the file; mtime is
424     C{None} if file can't be read
425
426   """
427   logging.debug("Reading per-group instance status from '%s'", filename)
428
429   statcb = utils.FileStatHelper()
430   try:
431     content = utils.ReadFile(filename, preread=statcb)
432   except EnvironmentError, err:
433     if err.errno == errno.ENOENT:
434       logging.error("Can't read '%s', does not exist (yet)", filename)
435     else:
436       logging.exception("Unable to read '%s', ignoring", filename)
437     return (None, None)
438   else:
439     return (statcb.st.st_mtime, [line.split(None, 1)
440                                  for line in content.splitlines()])
441
442
443 def _MergeInstanceStatus(filename, pergroup_filename, groups):
444   """Merges all per-group instance status files into a global one.
445
446   @type filename: string
447   @param filename: Path to global instance status file
448   @type pergroup_filename: string
449   @param pergroup_filename: Path to per-group status files, must contain "%s"
450     to be replaced with group UUID
451   @type groups: sequence
452   @param groups: UUIDs of known groups
453
454   """
455   # Lock global status file in exclusive mode
456   lock = utils.FileLock.Open(filename)
457   try:
458     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
459   except errors.LockError, err:
460     # All per-group processes will lock and update the file. None of them
461     # should take longer than 10 seconds (the value of
462     # INSTANCE_STATUS_LOCK_TIMEOUT).
463     logging.error("Can't acquire lock on instance status file '%s', not"
464                   " updating: %s", filename, err)
465     return
466
467   logging.debug("Acquired exclusive lock on '%s'", filename)
468
469   data = {}
470
471   # Load instance status from all groups
472   for group_uuid in groups:
473     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
474
475     if mtime is not None:
476       for (instance_name, status) in instdata:
477         data.setdefault(instance_name, []).append((mtime, status))
478
479   # Select last update based on file mtime
480   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
481                 for (instance_name, status) in data.items()]
482
483   # Write the global status file. Don't touch file after it's been
484   # updated--there is no lock anymore.
485   _WriteInstanceStatus(filename, inststatus)
486
487
488 def GetLuxiClient(try_restart):
489   """Tries to connect to the master daemon.
490
491   @type try_restart: bool
492   @param try_restart: Whether to attempt to restart the master daemon
493
494   """
495   try:
496     return cli.GetClient()
497   except errors.OpPrereqError, err:
498     # this is, from cli.GetClient, a not-master case
499     raise NotMasterError("Not on master node (%s)" % err)
500
501   except luxi.NoMasterError, err:
502     if not try_restart:
503       raise
504
505     logging.warning("Master daemon seems to be down (%s), trying to restart",
506                     err)
507
508     if not utils.EnsureDaemon(constants.MASTERD):
509       raise errors.GenericError("Can't start the master daemon")
510
511     # Retry the connection
512     return cli.GetClient()
513
514
515 def _StartGroupChildren(cl, wait):
516   """Starts a new instance of the watcher for every node group.
517
518   """
519   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
520                         for arg in sys.argv)
521
522   result = cl.QueryGroups([], ["name", "uuid"], False)
523
524   children = []
525
526   for (idx, (name, uuid)) in enumerate(result):
527     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
528
529     if idx > 0:
530       # Let's not kill the system
531       time.sleep(CHILD_PROCESS_DELAY)
532
533     logging.debug("Spawning child for group '%s' (%s), arguments %s",
534                   name, uuid, args)
535
536     try:
537       # TODO: Should utils.StartDaemon be used instead?
538       pid = os.spawnv(os.P_NOWAIT, args[0], args)
539     except Exception: # pylint: disable=W0703
540       logging.exception("Failed to start child for group '%s' (%s)",
541                         name, uuid)
542     else:
543       logging.debug("Started with PID %s", pid)
544       children.append(pid)
545
546   if wait:
547     for pid in children:
548       logging.debug("Waiting for child PID %s", pid)
549       try:
550         result = utils.RetryOnSignal(os.waitpid, pid, 0)
551       except EnvironmentError, err:
552         result = str(err)
553
554       logging.debug("Child PID %s exited with status %s", pid, result)
555
556
557 def _ArchiveJobs(cl, age):
558   """Archives old jobs.
559
560   """
561   (arch_count, left_count) = cl.AutoArchiveJobs(age)
562   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
563
564
565 def _CheckMaster(cl):
566   """Ensures current host is master node.
567
568   """
569   (master, ) = cl.QueryConfigValues(["master_node"])
570   if master != netutils.Hostname.GetSysName():
571     raise NotMasterError("This is not the master node")
572
573
574 @UsesRapiClient
575 def _GlobalWatcher(opts):
576   """Main function for global watcher.
577
578   At the end child processes are spawned for every node group.
579
580   """
581   StartNodeDaemons()
582   RunWatcherHooks()
583
584   # Run node maintenance in all cases, even if master, so that old masters can
585   # be properly cleaned up
586   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
587     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
588
589   try:
590     client = GetLuxiClient(True)
591   except NotMasterError:
592     # Don't proceed on non-master nodes
593     return constants.EXIT_SUCCESS
594
595   # we are on master now
596   utils.EnsureDaemon(constants.RAPI)
597
598   # If RAPI isn't responding to queries, try one restart
599   logging.debug("Attempting to talk to remote API on %s",
600                 constants.IP4_ADDRESS_LOCALHOST)
601   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
602     logging.warning("Couldn't get answer from remote API, restaring daemon")
603     utils.StopDaemon(constants.RAPI)
604     utils.EnsureDaemon(constants.RAPI)
605     logging.debug("Second attempt to talk to remote API")
606     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
607       logging.fatal("RAPI is not responding")
608   logging.debug("Successfully talked to remote API")
609
610   _CheckMaster(client)
611   _ArchiveJobs(client, opts.job_age)
612
613   # Spawn child processes for all node groups
614   _StartGroupChildren(client, opts.wait_children)
615
616   return constants.EXIT_SUCCESS
617
618
619 def _GetGroupData(cl, uuid):
620   """Retrieves instances and nodes per node group.
621
622   """
623   job = [
624     # Get all primary instances in group
625     opcodes.OpQuery(what=constants.QR_INSTANCE,
626                     fields=["name", "status", "admin_state", "snodes",
627                             "pnode.group.uuid", "snodes.group.uuid"],
628                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
629                     use_locking=True),
630
631     # Get all nodes in group
632     opcodes.OpQuery(what=constants.QR_NODE,
633                     fields=["name", "bootid", "offline"],
634                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
635                     use_locking=True),
636     ]
637
638   job_id = cl.SubmitJob(job)
639   results = map(objects.QueryResponse.FromDict,
640                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
641   cl.ArchiveJob(job_id)
642
643   results_data = map(operator.attrgetter("data"), results)
644
645   # Ensure results are tuples with two values
646   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
647
648   # Extract values ignoring result status
649   (raw_instances, raw_nodes) = [[map(compat.snd, values)
650                                  for values in res]
651                                 for res in results_data]
652
653   secondaries = {}
654   instances = []
655
656   # Load all instances
657   for (name, status, autostart, snodes, pnode_group_uuid,
658        snodes_group_uuid) in raw_instances:
659     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
660       logging.error("Ignoring split instance '%s', primary group %s, secondary"
661                     " groups %s", name, pnode_group_uuid,
662                     utils.CommaJoin(snodes_group_uuid))
663     else:
664       instances.append(Instance(name, status, autostart, snodes))
665
666       for node in snodes:
667         secondaries.setdefault(node, set()).add(name)
668
669   # Load all nodes
670   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
671            for (name, bootid, offline) in raw_nodes]
672
673   return (dict((node.name, node) for node in nodes),
674           dict((inst.name, inst) for inst in instances))
675
676
677 def _LoadKnownGroups():
678   """Returns a list of all node groups known by L{ssconf}.
679
680   """
681   groups = ssconf.SimpleStore().GetNodegroupList()
682
683   result = list(line.split(None, 1)[0] for line in groups
684                 if line.strip())
685
686   if not compat.all(map(utils.UUID_RE.match, result)):
687     raise errors.GenericError("Ssconf contains invalid group UUID")
688
689   return result
690
691
692 def _GroupWatcher(opts):
693   """Main function for per-group watcher process.
694
695   """
696   group_uuid = opts.nodegroup.lower()
697
698   if not utils.UUID_RE.match(group_uuid):
699     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
700                               " got '%s'" %
701                               (cli.NODEGROUP_OPT_NAME, group_uuid))
702
703   logging.info("Watcher for node group '%s'", group_uuid)
704
705   known_groups = _LoadKnownGroups()
706
707   # Check if node group is known
708   if group_uuid not in known_groups:
709     raise errors.GenericError("Node group '%s' is not known by ssconf" %
710                               group_uuid)
711
712   # Group UUID has been verified and should not contain any dangerous
713   # characters
714   state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
715   inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
716
717   logging.debug("Using state file %s", state_path)
718
719   # Global watcher
720   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
721   if not statefile:
722     return constants.EXIT_FAILURE
723
724   notepad = state.WatcherState(statefile) # pylint: disable=E0602
725   try:
726     # Connect to master daemon
727     client = GetLuxiClient(False)
728
729     _CheckMaster(client)
730
731     (nodes, instances) = _GetGroupData(client, group_uuid)
732
733     # Update per-group instance status file
734     _UpdateInstanceStatus(inst_status_path, instances.values())
735
736     _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
737                          pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
738                          known_groups)
739
740     started = _CheckInstances(client, notepad, instances)
741     _CheckDisks(client, notepad, nodes, instances, started)
742     _VerifyDisks(client, group_uuid, nodes, instances)
743   except Exception, err:
744     logging.info("Not updating status file due to failure: %s", err)
745     raise
746   else:
747     # Save changes for next run
748     notepad.Save(state_path)
749
750   return constants.EXIT_SUCCESS
751
752
753 def Main():
754   """Main function.
755
756   """
757   (options, _) = ParseOptions()
758
759   utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
760                      debug=options.debug, stderr_logging=options.debug)
761
762   if ShouldPause() and not options.ignore_pause:
763     logging.debug("Pause has been set, exiting")
764     return constants.EXIT_SUCCESS
765
766   # Try to acquire global watcher lock in shared mode
767   lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
768   try:
769     lock.Shared(blocking=False)
770   except (EnvironmentError, errors.LockError), err:
771     logging.error("Can't acquire lock on %s: %s",
772                   pathutils.WATCHER_LOCK_FILE, err)
773     return constants.EXIT_SUCCESS
774
775   if options.nodegroup is None:
776     fn = _GlobalWatcher
777   else:
778     # Per-nodegroup watcher
779     fn = _GroupWatcher
780
781   try:
782     return fn(options)
783   except (SystemExit, KeyboardInterrupt):
784     raise
785   except NotMasterError:
786     logging.debug("Not master, exiting")
787     return constants.EXIT_NOTMASTER
788   except errors.ResolverError, err:
789     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
790     return constants.EXIT_NODESETUP_ERROR
791   except errors.JobQueueFull:
792     logging.error("Job queue is full, can't query cluster state")
793   except errors.JobQueueDrainError:
794     logging.error("Job queue is drained, can't maintain cluster state")
795   except Exception, err:
796     logging.exception(str(err))
797     return constants.EXIT_FAILURE
798
799   return constants.EXIT_SUCCESS