Move helper class from watcher to utils.io
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable=W0611
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61   constants.INSTST_ERRORDOWN,
62   ])
63 HELPLESS_STATES = frozenset([
64   constants.INSTST_NODEDOWN,
65   constants.INSTST_NODEOFFLINE,
66   ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
72
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75
76
77 class NotMasterError(errors.GenericError):
78   """Exception raised when this host is not the master."""
79
80
81 def ShouldPause():
82   """Check whether we should pause.
83
84   """
85   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87
88 def StartNodeDaemons():
89   """Start all the daemons that should be running on all nodes.
90
91   """
92   # on master or not, try to start the node daemon
93   utils.EnsureDaemon(constants.NODED)
94   # start confd as well. On non candidates it will be in disabled mode.
95   if constants.ENABLE_CONFD:
96     utils.EnsureDaemon(constants.CONFD)
97
98
99 def RunWatcherHooks():
100   """Run the watcher hooks.
101
102   """
103   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104                              constants.HOOKS_NAME_WATCHER)
105   if not os.path.isdir(hooks_dir):
106     return
107
108   try:
109     results = utils.RunParts(hooks_dir)
110   except Exception, err: # pylint: disable=W0703
111     logging.exception("RunParts %s failed: %s", hooks_dir, err)
112     return
113
114   for (relname, status, runresult) in results:
115     if status == constants.RUNPARTS_SKIP:
116       logging.debug("Watcher hook %s: skipped", relname)
117     elif status == constants.RUNPARTS_ERR:
118       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119     elif status == constants.RUNPARTS_RUN:
120       if runresult.failed:
121         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122                         relname, runresult.exit_code, runresult.output)
123       else:
124         logging.debug("Watcher hook %s: success (output: %s)", relname,
125                       runresult.output)
126     else:
127       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
128                                    status)
129
130
131 class Instance(object):
132   """Abstraction for a Virtual Machine instance.
133
134   """
135   def __init__(self, name, status, autostart, snodes):
136     self.name = name
137     self.status = status
138     self.autostart = autostart
139     self.snodes = snodes
140
141   def Restart(self, cl):
142     """Encapsulates the start of an instance.
143
144     """
145     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146     cli.SubmitOpCode(op, cl=cl)
147
148   def ActivateDisks(self, cl):
149     """Encapsulates the activation of all disks of an instance.
150
151     """
152     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153     cli.SubmitOpCode(op, cl=cl)
154
155
156 class Node:
157   """Data container representing cluster node.
158
159   """
160   def __init__(self, name, bootid, offline, secondaries):
161     """Initializes this class.
162
163     """
164     self.name = name
165     self.bootid = bootid
166     self.offline = offline
167     self.secondaries = secondaries
168
169
170 def _CheckInstances(cl, notepad, instances):
171   """Make a pass over the list of instances, restarting downed ones.
172
173   """
174   notepad.MaintainInstanceList(instances.keys())
175
176   started = set()
177
178   for inst in instances.values():
179     if inst.status in BAD_STATES:
180       n = notepad.NumberOfRestartAttempts(inst.name)
181
182       if n > MAXTRIES:
183         logging.warning("Not restarting instance '%s', retries exhausted",
184                         inst.name)
185         continue
186
187       if n == MAXTRIES:
188         notepad.RecordRestartAttempt(inst.name)
189         logging.error("Could not restart instance '%s' after %s attempts,"
190                       " giving up", inst.name, MAXTRIES)
191         continue
192
193       try:
194         logging.info("Restarting instance '%s' (attempt #%s)",
195                      inst.name, n + 1)
196         inst.Restart(cl)
197       except Exception: # pylint: disable=W0703
198         logging.exception("Error while restarting instance '%s'", inst.name)
199       else:
200         started.add(inst.name)
201
202       notepad.RecordRestartAttempt(inst.name)
203
204     else:
205       if notepad.NumberOfRestartAttempts(inst.name):
206         notepad.RemoveInstance(inst.name)
207         if inst.status not in HELPLESS_STATES:
208           logging.info("Restart of instance '%s' succeeded", inst.name)
209
210   return started
211
212
213 def _CheckDisks(cl, notepad, nodes, instances, started):
214   """Check all nodes for restarted ones.
215
216   """
217   check_nodes = []
218
219   for node in nodes.values():
220     old = notepad.GetNodeBootID(node.name)
221     if not node.bootid:
222       # Bad node, not returning a boot id
223       if not node.offline:
224         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
225                       node.name)
226       continue
227
228     if old != node.bootid:
229       # Node's boot ID has changed, probably through a reboot
230       check_nodes.append(node)
231
232   if check_nodes:
233     # Activate disks for all instances with any of the checked nodes as a
234     # secondary node.
235     for node in check_nodes:
236       for instance_name in node.secondaries:
237         try:
238           inst = instances[instance_name]
239         except KeyError:
240           logging.info("Can't find instance '%s', maybe it was ignored",
241                        instance_name)
242           continue
243
244         if not inst.autostart:
245           logging.info("Skipping disk activation for non-autostart"
246                        " instance '%s'", inst.name)
247           continue
248
249         if inst.name in started:
250           # we already tried to start the instance, which should have
251           # activated its drives (if they can be at all)
252           logging.debug("Skipping disk activation for instance '%s' as"
253                         " it was already started", inst.name)
254           continue
255
256         try:
257           logging.info("Activating disks for instance '%s'", inst.name)
258           inst.ActivateDisks(cl)
259         except Exception: # pylint: disable=W0703
260           logging.exception("Error while activating disks for instance '%s'",
261                             inst.name)
262
263     # Keep changed boot IDs
264     for node in check_nodes:
265       notepad.SetNodeBootID(node.name, node.bootid)
266
267
268 def _CheckForOfflineNodes(nodes, instance):
269   """Checks if given instances has any secondary in offline status.
270
271   @param instance: The instance object
272   @return: True if any of the secondary is offline, False otherwise
273
274   """
275   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
276
277
278 def _VerifyDisks(cl, uuid, nodes, instances):
279   """Run a per-group "gnt-cluster verify-disks".
280
281   """
282   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283   ((_, offline_disk_instances, _), ) = \
284     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285   cl.ArchiveJob(job_id)
286
287   if not offline_disk_instances:
288     # nothing to do
289     logging.debug("Verify-disks reported no offline disks, nothing to do")
290     return
291
292   logging.debug("Will activate disks for instance(s) %s",
293                 utils.CommaJoin(offline_disk_instances))
294
295   # We submit only one job, and wait for it. Not optimal, but this puts less
296   # load on the job queue.
297   job = []
298   for name in offline_disk_instances:
299     try:
300       inst = instances[name]
301     except KeyError:
302       logging.info("Can't find instance '%s', maybe it was ignored", name)
303       continue
304
305     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306       logging.info("Skipping instance '%s' because it is in a helpless state or"
307                    " has offline secondaries", name)
308       continue
309
310     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
311
312   if job:
313     job_id = cli.SendJob(job, cl=cl)
314
315     try:
316       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317     except Exception: # pylint: disable=W0703
318       logging.exception("Error while activating disks")
319
320
321 def IsRapiResponding(hostname):
322   """Connects to RAPI port and does a simple test.
323
324   Connects to RAPI port of hostname and does a simple test. At this time, the
325   test is GetVersion.
326
327   @type hostname: string
328   @param hostname: hostname of the node to connect to.
329   @rtype: bool
330   @return: Whether RAPI is working properly
331
332   """
333   curl_config = rapi.client.GenericCurlConfig()
334   rapi_client = rapi.client.GanetiRapiClient(hostname,
335                                              curl_config_fn=curl_config)
336   try:
337     master_version = rapi_client.GetVersion()
338   except rapi.client.CertificateError, err:
339     logging.warning("RAPI certificate error: %s", err)
340     return False
341   except rapi.client.GanetiApiError, err:
342     logging.warning("RAPI error: %s", err)
343     return False
344   else:
345     logging.debug("Reported RAPI version %s", master_version)
346     return master_version == constants.RAPI_VERSION
347
348
349 def ParseOptions():
350   """Parse the command line options.
351
352   @return: (options, args) as from OptionParser.parse_args()
353
354   """
355   parser = OptionParser(description="Ganeti cluster watcher",
356                         usage="%prog [-d]",
357                         version="%%prog (ganeti) %s" %
358                         constants.RELEASE_VERSION)
359
360   parser.add_option(cli.DEBUG_OPT)
361   parser.add_option(cli.NODEGROUP_OPT)
362   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363                     help="Autoarchive jobs older than this age (default"
364                           " 6 hours)")
365   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366                     action="store_true", help="Ignore cluster pause setting")
367   parser.add_option("--wait-children", dest="wait_children",
368                     action="store_true", help="Wait for child processes")
369   parser.add_option("--no-wait-children", dest="wait_children",
370                     action="store_false", help="Don't wait for child processes")
371   # See optparse documentation for why default values are not set by options
372   parser.set_defaults(wait_children=True)
373   options, args = parser.parse_args()
374   options.job_age = cli.ParseTimespec(options.job_age)
375
376   if args:
377     parser.error("No arguments expected")
378
379   return (options, args)
380
381
382 def _WriteInstanceStatus(filename, data):
383   """Writes the per-group instance status file.
384
385   The entries are sorted.
386
387   @type filename: string
388   @param filename: Path to instance status file
389   @type data: list of tuple; (instance name as string, status as string)
390   @param data: Instance name and status
391
392   """
393   logging.debug("Updating instance status file '%s' with %s instances",
394                 filename, len(data))
395
396   utils.WriteFile(filename,
397                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
398                                    sorted(data))))
399
400
401 def _UpdateInstanceStatus(filename, instances):
402   """Writes an instance status file from L{Instance} objects.
403
404   @type filename: string
405   @param filename: Path to status file
406   @type instances: list of L{Instance}
407
408   """
409   _WriteInstanceStatus(filename, [(inst.name, inst.status)
410                                   for inst in instances])
411
412
413 def _ReadInstanceStatus(filename):
414   """Reads an instance status file.
415
416   @type filename: string
417   @param filename: Path to status file
418   @rtype: tuple; (None or number, list of lists containing instance name and
419     status)
420   @return: File's mtime and instance status contained in the file; mtime is
421     C{None} if file can't be read
422
423   """
424   logging.debug("Reading per-group instance status from '%s'", filename)
425
426   statcb = utils.FileStatHelper()
427   try:
428     content = utils.ReadFile(filename, preread=statcb)
429   except EnvironmentError, err:
430     if err.errno == errno.ENOENT:
431       logging.error("Can't read '%s', does not exist (yet)", filename)
432     else:
433       logging.exception("Unable to read '%s', ignoring", filename)
434     return (None, None)
435   else:
436     return (statcb.st.st_mtime, [line.split(None, 1)
437                                  for line in content.splitlines()])
438
439
440 def _MergeInstanceStatus(filename, pergroup_filename, groups):
441   """Merges all per-group instance status files into a global one.
442
443   @type filename: string
444   @param filename: Path to global instance status file
445   @type pergroup_filename: string
446   @param pergroup_filename: Path to per-group status files, must contain "%s"
447     to be replaced with group UUID
448   @type groups: sequence
449   @param groups: UUIDs of known groups
450
451   """
452   # Lock global status file in exclusive mode
453   lock = utils.FileLock.Open(filename)
454   try:
455     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
456   except errors.LockError, err:
457     # All per-group processes will lock and update the file. None of them
458     # should take longer than 10 seconds (the value of
459     # INSTANCE_STATUS_LOCK_TIMEOUT).
460     logging.error("Can't acquire lock on instance status file '%s', not"
461                   " updating: %s", filename, err)
462     return
463
464   logging.debug("Acquired exclusive lock on '%s'", filename)
465
466   data = {}
467
468   # Load instance status from all groups
469   for group_uuid in groups:
470     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
471
472     if mtime is not None:
473       for (instance_name, status) in instdata:
474         data.setdefault(instance_name, []).append((mtime, status))
475
476   # Select last update based on file mtime
477   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
478                 for (instance_name, status) in data.items()]
479
480   # Write the global status file. Don't touch file after it's been
481   # updated--there is no lock anymore.
482   _WriteInstanceStatus(filename, inststatus)
483
484
485 def GetLuxiClient(try_restart):
486   """Tries to connect to the master daemon.
487
488   @type try_restart: bool
489   @param try_restart: Whether to attempt to restart the master daemon
490
491   """
492   try:
493     return cli.GetClient()
494   except errors.OpPrereqError, err:
495     # this is, from cli.GetClient, a not-master case
496     raise NotMasterError("Not on master node (%s)" % err)
497
498   except luxi.NoMasterError, err:
499     if not try_restart:
500       raise
501
502     logging.warning("Master daemon seems to be down (%s), trying to restart",
503                     err)
504
505     if not utils.EnsureDaemon(constants.MASTERD):
506       raise errors.GenericError("Can't start the master daemon")
507
508     # Retry the connection
509     return cli.GetClient()
510
511
512 def _StartGroupChildren(cl, wait):
513   """Starts a new instance of the watcher for every node group.
514
515   """
516   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
517                         for arg in sys.argv)
518
519   result = cl.QueryGroups([], ["name", "uuid"], False)
520
521   children = []
522
523   for (idx, (name, uuid)) in enumerate(result):
524     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
525
526     if idx > 0:
527       # Let's not kill the system
528       time.sleep(CHILD_PROCESS_DELAY)
529
530     logging.debug("Spawning child for group '%s' (%s), arguments %s",
531                   name, uuid, args)
532
533     try:
534       # TODO: Should utils.StartDaemon be used instead?
535       pid = os.spawnv(os.P_NOWAIT, args[0], args)
536     except Exception: # pylint: disable=W0703
537       logging.exception("Failed to start child for group '%s' (%s)",
538                         name, uuid)
539     else:
540       logging.debug("Started with PID %s", pid)
541       children.append(pid)
542
543   if wait:
544     for pid in children:
545       logging.debug("Waiting for child PID %s", pid)
546       try:
547         result = utils.RetryOnSignal(os.waitpid, pid, 0)
548       except EnvironmentError, err:
549         result = str(err)
550
551       logging.debug("Child PID %s exited with status %s", pid, result)
552
553
554 def _ArchiveJobs(cl, age):
555   """Archives old jobs.
556
557   """
558   (arch_count, left_count) = cl.AutoArchiveJobs(age)
559   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
560
561
562 def _CheckMaster(cl):
563   """Ensures current host is master node.
564
565   """
566   (master, ) = cl.QueryConfigValues(["master_node"])
567   if master != netutils.Hostname.GetSysName():
568     raise NotMasterError("This is not the master node")
569
570
571 @rapi.client.UsesRapiClient
572 def _GlobalWatcher(opts):
573   """Main function for global watcher.
574
575   At the end child processes are spawned for every node group.
576
577   """
578   StartNodeDaemons()
579   RunWatcherHooks()
580
581   # Run node maintenance in all cases, even if master, so that old masters can
582   # be properly cleaned up
583   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
584     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
585
586   try:
587     client = GetLuxiClient(True)
588   except NotMasterError:
589     # Don't proceed on non-master nodes
590     return constants.EXIT_SUCCESS
591
592   # we are on master now
593   utils.EnsureDaemon(constants.RAPI)
594
595   # If RAPI isn't responding to queries, try one restart
596   logging.debug("Attempting to talk to remote API on %s",
597                 constants.IP4_ADDRESS_LOCALHOST)
598   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
599     logging.warning("Couldn't get answer from remote API, restaring daemon")
600     utils.StopDaemon(constants.RAPI)
601     utils.EnsureDaemon(constants.RAPI)
602     logging.debug("Second attempt to talk to remote API")
603     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
604       logging.fatal("RAPI is not responding")
605   logging.debug("Successfully talked to remote API")
606
607   _CheckMaster(client)
608   _ArchiveJobs(client, opts.job_age)
609
610   # Spawn child processes for all node groups
611   _StartGroupChildren(client, opts.wait_children)
612
613   return constants.EXIT_SUCCESS
614
615
616 def _GetGroupData(cl, uuid):
617   """Retrieves instances and nodes per node group.
618
619   """
620   job = [
621     # Get all primary instances in group
622     opcodes.OpQuery(what=constants.QR_INSTANCE,
623                     fields=["name", "status", "admin_state", "snodes",
624                             "pnode.group.uuid", "snodes.group.uuid"],
625                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
626                     use_locking=True),
627
628     # Get all nodes in group
629     opcodes.OpQuery(what=constants.QR_NODE,
630                     fields=["name", "bootid", "offline"],
631                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
632                     use_locking=True),
633     ]
634
635   job_id = cl.SubmitJob(job)
636   results = map(objects.QueryResponse.FromDict,
637                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
638   cl.ArchiveJob(job_id)
639
640   results_data = map(operator.attrgetter("data"), results)
641
642   # Ensure results are tuples with two values
643   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
644
645   # Extract values ignoring result status
646   (raw_instances, raw_nodes) = [[map(compat.snd, values)
647                                  for values in res]
648                                 for res in results_data]
649
650   secondaries = {}
651   instances = []
652
653   # Load all instances
654   for (name, status, autostart, snodes, pnode_group_uuid,
655        snodes_group_uuid) in raw_instances:
656     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
657       logging.error("Ignoring split instance '%s', primary group %s, secondary"
658                     " groups %s", name, pnode_group_uuid,
659                     utils.CommaJoin(snodes_group_uuid))
660     else:
661       instances.append(Instance(name, status, autostart, snodes))
662
663       for node in snodes:
664         secondaries.setdefault(node, set()).add(name)
665
666   # Load all nodes
667   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
668            for (name, bootid, offline) in raw_nodes]
669
670   return (dict((node.name, node) for node in nodes),
671           dict((inst.name, inst) for inst in instances))
672
673
674 def _LoadKnownGroups():
675   """Returns a list of all node groups known by L{ssconf}.
676
677   """
678   groups = ssconf.SimpleStore().GetNodegroupList()
679
680   result = list(line.split(None, 1)[0] for line in groups
681                 if line.strip())
682
683   if not compat.all(map(utils.UUID_RE.match, result)):
684     raise errors.GenericError("Ssconf contains invalid group UUID")
685
686   return result
687
688
689 def _GroupWatcher(opts):
690   """Main function for per-group watcher process.
691
692   """
693   group_uuid = opts.nodegroup.lower()
694
695   if not utils.UUID_RE.match(group_uuid):
696     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
697                               " got '%s'" %
698                               (cli.NODEGROUP_OPT_NAME, group_uuid))
699
700   logging.info("Watcher for node group '%s'", group_uuid)
701
702   known_groups = _LoadKnownGroups()
703
704   # Check if node group is known
705   if group_uuid not in known_groups:
706     raise errors.GenericError("Node group '%s' is not known by ssconf" %
707                               group_uuid)
708
709   # Group UUID has been verified and should not contain any dangerous characters
710   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
711   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
712
713   logging.debug("Using state file %s", state_path)
714
715   # Global watcher
716   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
717   if not statefile:
718     return constants.EXIT_FAILURE
719
720   notepad = state.WatcherState(statefile) # pylint: disable=E0602
721   try:
722     # Connect to master daemon
723     client = GetLuxiClient(False)
724
725     _CheckMaster(client)
726
727     (nodes, instances) = _GetGroupData(client, group_uuid)
728
729     # Update per-group instance status file
730     _UpdateInstanceStatus(inst_status_path, instances.values())
731
732     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
733                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
734                          known_groups)
735
736     started = _CheckInstances(client, notepad, instances)
737     _CheckDisks(client, notepad, nodes, instances, started)
738     _VerifyDisks(client, group_uuid, nodes, instances)
739   except Exception, err:
740     logging.info("Not updating status file due to failure: %s", err)
741     raise
742   else:
743     # Save changes for next run
744     notepad.Save(state_path)
745
746   return constants.EXIT_SUCCESS
747
748
749 def Main():
750   """Main function.
751
752   """
753   (options, _) = ParseOptions()
754
755   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
756                      debug=options.debug, stderr_logging=options.debug)
757
758   if ShouldPause() and not options.ignore_pause:
759     logging.debug("Pause has been set, exiting")
760     return constants.EXIT_SUCCESS
761
762   # Try to acquire global watcher lock in shared mode
763   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
764   try:
765     lock.Shared(blocking=False)
766   except (EnvironmentError, errors.LockError), err:
767     logging.error("Can't acquire lock on %s: %s",
768                   constants.WATCHER_LOCK_FILE, err)
769     return constants.EXIT_SUCCESS
770
771   if options.nodegroup is None:
772     fn = _GlobalWatcher
773   else:
774     # Per-nodegroup watcher
775     fn = _GroupWatcher
776
777   try:
778     return fn(options)
779   except (SystemExit, KeyboardInterrupt):
780     raise
781   except NotMasterError:
782     logging.debug("Not master, exiting")
783     return constants.EXIT_NOTMASTER
784   except errors.ResolverError, err:
785     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
786     return constants.EXIT_NODESETUP_ERROR
787   except errors.JobQueueFull:
788     logging.error("Job queue is full, can't query cluster state")
789   except errors.JobQueueDrainError:
790     logging.error("Job queue is drained, can't maintain cluster state")
791   except Exception, err:
792     logging.exception(str(err))
793     return constants.EXIT_FAILURE
794
795   return constants.EXIT_SUCCESS