Generalize functions used by both hspace and hcheck
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable=W0611
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61   constants.INSTST_ERRORDOWN,
62   ])
63 HELPLESS_STATES = frozenset([
64   constants.INSTST_NODEDOWN,
65   constants.INSTST_NODEOFFLINE,
66   ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
72
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75
76
77 class NotMasterError(errors.GenericError):
78   """Exception raised when this host is not the master."""
79
80
81 def ShouldPause():
82   """Check whether we should pause.
83
84   """
85   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87
88 def StartNodeDaemons():
89   """Start all the daemons that should be running on all nodes.
90
91   """
92   # on master or not, try to start the node daemon
93   utils.EnsureDaemon(constants.NODED)
94   # start confd as well. On non candidates it will be in disabled mode.
95   if constants.ENABLE_CONFD:
96     utils.EnsureDaemon(constants.CONFD)
97
98
99 def RunWatcherHooks():
100   """Run the watcher hooks.
101
102   """
103   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104                              constants.HOOKS_NAME_WATCHER)
105   if not os.path.isdir(hooks_dir):
106     return
107
108   try:
109     results = utils.RunParts(hooks_dir)
110   except Exception, err: # pylint: disable=W0703
111     logging.exception("RunParts %s failed: %s", hooks_dir, err)
112     return
113
114   for (relname, status, runresult) in results:
115     if status == constants.RUNPARTS_SKIP:
116       logging.debug("Watcher hook %s: skipped", relname)
117     elif status == constants.RUNPARTS_ERR:
118       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119     elif status == constants.RUNPARTS_RUN:
120       if runresult.failed:
121         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122                         relname, runresult.exit_code, runresult.output)
123       else:
124         logging.debug("Watcher hook %s: success (output: %s)", relname,
125                       runresult.output)
126     else:
127       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
128                                    status)
129
130
131 class Instance(object):
132   """Abstraction for a Virtual Machine instance.
133
134   """
135   def __init__(self, name, status, autostart, snodes):
136     self.name = name
137     self.status = status
138     self.autostart = autostart
139     self.snodes = snodes
140
141   def Restart(self, cl):
142     """Encapsulates the start of an instance.
143
144     """
145     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146     cli.SubmitOpCode(op, cl=cl)
147
148   def ActivateDisks(self, cl):
149     """Encapsulates the activation of all disks of an instance.
150
151     """
152     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153     cli.SubmitOpCode(op, cl=cl)
154
155
156 class Node:
157   """Data container representing cluster node.
158
159   """
160   def __init__(self, name, bootid, offline, secondaries):
161     """Initializes this class.
162
163     """
164     self.name = name
165     self.bootid = bootid
166     self.offline = offline
167     self.secondaries = secondaries
168
169
170 def _CheckInstances(cl, notepad, instances):
171   """Make a pass over the list of instances, restarting downed ones.
172
173   """
174   notepad.MaintainInstanceList(instances.keys())
175
176   started = set()
177
178   for inst in instances.values():
179     if inst.status in BAD_STATES:
180       n = notepad.NumberOfRestartAttempts(inst.name)
181
182       if n > MAXTRIES:
183         logging.warning("Not restarting instance '%s', retries exhausted",
184                         inst.name)
185         continue
186
187       if n == MAXTRIES:
188         notepad.RecordRestartAttempt(inst.name)
189         logging.error("Could not restart instance '%s' after %s attempts,"
190                       " giving up", inst.name, MAXTRIES)
191         continue
192
193       try:
194         logging.info("Restarting instance '%s' (attempt #%s)",
195                      inst.name, n + 1)
196         inst.Restart(cl)
197       except Exception: # pylint: disable=W0703
198         logging.exception("Error while restarting instance '%s'", inst.name)
199       else:
200         started.add(inst.name)
201
202       notepad.RecordRestartAttempt(inst.name)
203
204     else:
205       if notepad.NumberOfRestartAttempts(inst.name):
206         notepad.RemoveInstance(inst.name)
207         if inst.status not in HELPLESS_STATES:
208           logging.info("Restart of instance '%s' succeeded", inst.name)
209
210   return started
211
212
213 def _CheckDisks(cl, notepad, nodes, instances, started):
214   """Check all nodes for restarted ones.
215
216   """
217   check_nodes = []
218
219   for node in nodes.values():
220     old = notepad.GetNodeBootID(node.name)
221     if not node.bootid:
222       # Bad node, not returning a boot id
223       if not node.offline:
224         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
225                       node.name)
226       continue
227
228     if old != node.bootid:
229       # Node's boot ID has changed, probably through a reboot
230       check_nodes.append(node)
231
232   if check_nodes:
233     # Activate disks for all instances with any of the checked nodes as a
234     # secondary node.
235     for node in check_nodes:
236       for instance_name in node.secondaries:
237         try:
238           inst = instances[instance_name]
239         except KeyError:
240           logging.info("Can't find instance '%s', maybe it was ignored",
241                        instance_name)
242           continue
243
244         if not inst.autostart:
245           logging.info("Skipping disk activation for non-autostart"
246                        " instance '%s'", inst.name)
247           continue
248
249         if inst.name in started:
250           # we already tried to start the instance, which should have
251           # activated its drives (if they can be at all)
252           logging.debug("Skipping disk activation for instance '%s' as"
253                         " it was already started", inst.name)
254           continue
255
256         try:
257           logging.info("Activating disks for instance '%s'", inst.name)
258           inst.ActivateDisks(cl)
259         except Exception: # pylint: disable=W0703
260           logging.exception("Error while activating disks for instance '%s'",
261                             inst.name)
262
263     # Keep changed boot IDs
264     for node in check_nodes:
265       notepad.SetNodeBootID(node.name, node.bootid)
266
267
268 def _CheckForOfflineNodes(nodes, instance):
269   """Checks if given instances has any secondary in offline status.
270
271   @param instance: The instance object
272   @return: True if any of the secondary is offline, False otherwise
273
274   """
275   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
276
277
278 def _VerifyDisks(cl, uuid, nodes, instances):
279   """Run a per-group "gnt-cluster verify-disks".
280
281   """
282   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283   ((_, offline_disk_instances, _), ) = \
284     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285   cl.ArchiveJob(job_id)
286
287   if not offline_disk_instances:
288     # nothing to do
289     logging.debug("Verify-disks reported no offline disks, nothing to do")
290     return
291
292   logging.debug("Will activate disks for instance(s) %s",
293                 utils.CommaJoin(offline_disk_instances))
294
295   # We submit only one job, and wait for it. Not optimal, but this puts less
296   # load on the job queue.
297   job = []
298   for name in offline_disk_instances:
299     try:
300       inst = instances[name]
301     except KeyError:
302       logging.info("Can't find instance '%s', maybe it was ignored", name)
303       continue
304
305     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306       logging.info("Skipping instance '%s' because it is in a helpless state"
307                    " or has offline secondaries", name)
308       continue
309
310     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
311
312   if job:
313     job_id = cli.SendJob(job, cl=cl)
314
315     try:
316       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317     except Exception: # pylint: disable=W0703
318       logging.exception("Error while activating disks")
319
320
321 def IsRapiResponding(hostname):
322   """Connects to RAPI port and does a simple test.
323
324   Connects to RAPI port of hostname and does a simple test. At this time, the
325   test is GetVersion.
326
327   @type hostname: string
328   @param hostname: hostname of the node to connect to.
329   @rtype: bool
330   @return: Whether RAPI is working properly
331
332   """
333   curl_config = rapi.client.GenericCurlConfig()
334   rapi_client = rapi.client.GanetiRapiClient(hostname,
335                                              curl_config_fn=curl_config)
336   try:
337     master_version = rapi_client.GetVersion()
338   except rapi.client.CertificateError, err:
339     logging.warning("RAPI certificate error: %s", err)
340     return False
341   except rapi.client.GanetiApiError, err:
342     logging.warning("RAPI error: %s", err)
343     return False
344   else:
345     logging.debug("Reported RAPI version %s", master_version)
346     return master_version == constants.RAPI_VERSION
347
348
349 def ParseOptions():
350   """Parse the command line options.
351
352   @return: (options, args) as from OptionParser.parse_args()
353
354   """
355   parser = OptionParser(description="Ganeti cluster watcher",
356                         usage="%prog [-d]",
357                         version="%%prog (ganeti) %s" %
358                         constants.RELEASE_VERSION)
359
360   parser.add_option(cli.DEBUG_OPT)
361   parser.add_option(cli.NODEGROUP_OPT)
362   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363                     help="Autoarchive jobs older than this age (default"
364                           " 6 hours)")
365   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366                     action="store_true", help="Ignore cluster pause setting")
367   parser.add_option("--wait-children", dest="wait_children",
368                     action="store_true", help="Wait for child processes")
369   parser.add_option("--no-wait-children", dest="wait_children",
370                     action="store_false",
371                     help="Don't wait for child processes")
372   # See optparse documentation for why default values are not set by options
373   parser.set_defaults(wait_children=True)
374   options, args = parser.parse_args()
375   options.job_age = cli.ParseTimespec(options.job_age)
376
377   if args:
378     parser.error("No arguments expected")
379
380   return (options, args)
381
382
383 def _WriteInstanceStatus(filename, data):
384   """Writes the per-group instance status file.
385
386   The entries are sorted.
387
388   @type filename: string
389   @param filename: Path to instance status file
390   @type data: list of tuple; (instance name as string, status as string)
391   @param data: Instance name and status
392
393   """
394   logging.debug("Updating instance status file '%s' with %s instances",
395                 filename, len(data))
396
397   utils.WriteFile(filename,
398                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
399                                    sorted(data))))
400
401
402 def _UpdateInstanceStatus(filename, instances):
403   """Writes an instance status file from L{Instance} objects.
404
405   @type filename: string
406   @param filename: Path to status file
407   @type instances: list of L{Instance}
408
409   """
410   _WriteInstanceStatus(filename, [(inst.name, inst.status)
411                                   for inst in instances])
412
413
414 def _ReadInstanceStatus(filename):
415   """Reads an instance status file.
416
417   @type filename: string
418   @param filename: Path to status file
419   @rtype: tuple; (None or number, list of lists containing instance name and
420     status)
421   @return: File's mtime and instance status contained in the file; mtime is
422     C{None} if file can't be read
423
424   """
425   logging.debug("Reading per-group instance status from '%s'", filename)
426
427   statcb = utils.FileStatHelper()
428   try:
429     content = utils.ReadFile(filename, preread=statcb)
430   except EnvironmentError, err:
431     if err.errno == errno.ENOENT:
432       logging.error("Can't read '%s', does not exist (yet)", filename)
433     else:
434       logging.exception("Unable to read '%s', ignoring", filename)
435     return (None, None)
436   else:
437     return (statcb.st.st_mtime, [line.split(None, 1)
438                                  for line in content.splitlines()])
439
440
441 def _MergeInstanceStatus(filename, pergroup_filename, groups):
442   """Merges all per-group instance status files into a global one.
443
444   @type filename: string
445   @param filename: Path to global instance status file
446   @type pergroup_filename: string
447   @param pergroup_filename: Path to per-group status files, must contain "%s"
448     to be replaced with group UUID
449   @type groups: sequence
450   @param groups: UUIDs of known groups
451
452   """
453   # Lock global status file in exclusive mode
454   lock = utils.FileLock.Open(filename)
455   try:
456     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
457   except errors.LockError, err:
458     # All per-group processes will lock and update the file. None of them
459     # should take longer than 10 seconds (the value of
460     # INSTANCE_STATUS_LOCK_TIMEOUT).
461     logging.error("Can't acquire lock on instance status file '%s', not"
462                   " updating: %s", filename, err)
463     return
464
465   logging.debug("Acquired exclusive lock on '%s'", filename)
466
467   data = {}
468
469   # Load instance status from all groups
470   for group_uuid in groups:
471     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
472
473     if mtime is not None:
474       for (instance_name, status) in instdata:
475         data.setdefault(instance_name, []).append((mtime, status))
476
477   # Select last update based on file mtime
478   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
479                 for (instance_name, status) in data.items()]
480
481   # Write the global status file. Don't touch file after it's been
482   # updated--there is no lock anymore.
483   _WriteInstanceStatus(filename, inststatus)
484
485
486 def GetLuxiClient(try_restart):
487   """Tries to connect to the master daemon.
488
489   @type try_restart: bool
490   @param try_restart: Whether to attempt to restart the master daemon
491
492   """
493   try:
494     return cli.GetClient()
495   except errors.OpPrereqError, err:
496     # this is, from cli.GetClient, a not-master case
497     raise NotMasterError("Not on master node (%s)" % err)
498
499   except luxi.NoMasterError, err:
500     if not try_restart:
501       raise
502
503     logging.warning("Master daemon seems to be down (%s), trying to restart",
504                     err)
505
506     if not utils.EnsureDaemon(constants.MASTERD):
507       raise errors.GenericError("Can't start the master daemon")
508
509     # Retry the connection
510     return cli.GetClient()
511
512
513 def _StartGroupChildren(cl, wait):
514   """Starts a new instance of the watcher for every node group.
515
516   """
517   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
518                         for arg in sys.argv)
519
520   result = cl.QueryGroups([], ["name", "uuid"], False)
521
522   children = []
523
524   for (idx, (name, uuid)) in enumerate(result):
525     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
526
527     if idx > 0:
528       # Let's not kill the system
529       time.sleep(CHILD_PROCESS_DELAY)
530
531     logging.debug("Spawning child for group '%s' (%s), arguments %s",
532                   name, uuid, args)
533
534     try:
535       # TODO: Should utils.StartDaemon be used instead?
536       pid = os.spawnv(os.P_NOWAIT, args[0], args)
537     except Exception: # pylint: disable=W0703
538       logging.exception("Failed to start child for group '%s' (%s)",
539                         name, uuid)
540     else:
541       logging.debug("Started with PID %s", pid)
542       children.append(pid)
543
544   if wait:
545     for pid in children:
546       logging.debug("Waiting for child PID %s", pid)
547       try:
548         result = utils.RetryOnSignal(os.waitpid, pid, 0)
549       except EnvironmentError, err:
550         result = str(err)
551
552       logging.debug("Child PID %s exited with status %s", pid, result)
553
554
555 def _ArchiveJobs(cl, age):
556   """Archives old jobs.
557
558   """
559   (arch_count, left_count) = cl.AutoArchiveJobs(age)
560   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
561
562
563 def _CheckMaster(cl):
564   """Ensures current host is master node.
565
566   """
567   (master, ) = cl.QueryConfigValues(["master_node"])
568   if master != netutils.Hostname.GetSysName():
569     raise NotMasterError("This is not the master node")
570
571
572 @rapi.client.UsesRapiClient
573 def _GlobalWatcher(opts):
574   """Main function for global watcher.
575
576   At the end child processes are spawned for every node group.
577
578   """
579   StartNodeDaemons()
580   RunWatcherHooks()
581
582   # Run node maintenance in all cases, even if master, so that old masters can
583   # be properly cleaned up
584   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
585     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
586
587   try:
588     client = GetLuxiClient(True)
589   except NotMasterError:
590     # Don't proceed on non-master nodes
591     return constants.EXIT_SUCCESS
592
593   # we are on master now
594   utils.EnsureDaemon(constants.RAPI)
595
596   # If RAPI isn't responding to queries, try one restart
597   logging.debug("Attempting to talk to remote API on %s",
598                 constants.IP4_ADDRESS_LOCALHOST)
599   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
600     logging.warning("Couldn't get answer from remote API, restaring daemon")
601     utils.StopDaemon(constants.RAPI)
602     utils.EnsureDaemon(constants.RAPI)
603     logging.debug("Second attempt to talk to remote API")
604     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
605       logging.fatal("RAPI is not responding")
606   logging.debug("Successfully talked to remote API")
607
608   _CheckMaster(client)
609   _ArchiveJobs(client, opts.job_age)
610
611   # Spawn child processes for all node groups
612   _StartGroupChildren(client, opts.wait_children)
613
614   return constants.EXIT_SUCCESS
615
616
617 def _GetGroupData(cl, uuid):
618   """Retrieves instances and nodes per node group.
619
620   """
621   job = [
622     # Get all primary instances in group
623     opcodes.OpQuery(what=constants.QR_INSTANCE,
624                     fields=["name", "status", "admin_state", "snodes",
625                             "pnode.group.uuid", "snodes.group.uuid"],
626                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
627                     use_locking=True),
628
629     # Get all nodes in group
630     opcodes.OpQuery(what=constants.QR_NODE,
631                     fields=["name", "bootid", "offline"],
632                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
633                     use_locking=True),
634     ]
635
636   job_id = cl.SubmitJob(job)
637   results = map(objects.QueryResponse.FromDict,
638                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
639   cl.ArchiveJob(job_id)
640
641   results_data = map(operator.attrgetter("data"), results)
642
643   # Ensure results are tuples with two values
644   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
645
646   # Extract values ignoring result status
647   (raw_instances, raw_nodes) = [[map(compat.snd, values)
648                                  for values in res]
649                                 for res in results_data]
650
651   secondaries = {}
652   instances = []
653
654   # Load all instances
655   for (name, status, autostart, snodes, pnode_group_uuid,
656        snodes_group_uuid) in raw_instances:
657     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
658       logging.error("Ignoring split instance '%s', primary group %s, secondary"
659                     " groups %s", name, pnode_group_uuid,
660                     utils.CommaJoin(snodes_group_uuid))
661     else:
662       instances.append(Instance(name, status, autostart, snodes))
663
664       for node in snodes:
665         secondaries.setdefault(node, set()).add(name)
666
667   # Load all nodes
668   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
669            for (name, bootid, offline) in raw_nodes]
670
671   return (dict((node.name, node) for node in nodes),
672           dict((inst.name, inst) for inst in instances))
673
674
675 def _LoadKnownGroups():
676   """Returns a list of all node groups known by L{ssconf}.
677
678   """
679   groups = ssconf.SimpleStore().GetNodegroupList()
680
681   result = list(line.split(None, 1)[0] for line in groups
682                 if line.strip())
683
684   if not compat.all(map(utils.UUID_RE.match, result)):
685     raise errors.GenericError("Ssconf contains invalid group UUID")
686
687   return result
688
689
690 def _GroupWatcher(opts):
691   """Main function for per-group watcher process.
692
693   """
694   group_uuid = opts.nodegroup.lower()
695
696   if not utils.UUID_RE.match(group_uuid):
697     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
698                               " got '%s'" %
699                               (cli.NODEGROUP_OPT_NAME, group_uuid))
700
701   logging.info("Watcher for node group '%s'", group_uuid)
702
703   known_groups = _LoadKnownGroups()
704
705   # Check if node group is known
706   if group_uuid not in known_groups:
707     raise errors.GenericError("Node group '%s' is not known by ssconf" %
708                               group_uuid)
709
710   # Group UUID has been verified and should not contain any dangerous
711   # characters
712   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
713   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
714
715   logging.debug("Using state file %s", state_path)
716
717   # Global watcher
718   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
719   if not statefile:
720     return constants.EXIT_FAILURE
721
722   notepad = state.WatcherState(statefile) # pylint: disable=E0602
723   try:
724     # Connect to master daemon
725     client = GetLuxiClient(False)
726
727     _CheckMaster(client)
728
729     (nodes, instances) = _GetGroupData(client, group_uuid)
730
731     # Update per-group instance status file
732     _UpdateInstanceStatus(inst_status_path, instances.values())
733
734     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
735                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
736                          known_groups)
737
738     started = _CheckInstances(client, notepad, instances)
739     _CheckDisks(client, notepad, nodes, instances, started)
740     _VerifyDisks(client, group_uuid, nodes, instances)
741   except Exception, err:
742     logging.info("Not updating status file due to failure: %s", err)
743     raise
744   else:
745     # Save changes for next run
746     notepad.Save(state_path)
747
748   return constants.EXIT_SUCCESS
749
750
751 def Main():
752   """Main function.
753
754   """
755   (options, _) = ParseOptions()
756
757   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
758                      debug=options.debug, stderr_logging=options.debug)
759
760   if ShouldPause() and not options.ignore_pause:
761     logging.debug("Pause has been set, exiting")
762     return constants.EXIT_SUCCESS
763
764   # Try to acquire global watcher lock in shared mode
765   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
766   try:
767     lock.Shared(blocking=False)
768   except (EnvironmentError, errors.LockError), err:
769     logging.error("Can't acquire lock on %s: %s",
770                   constants.WATCHER_LOCK_FILE, err)
771     return constants.EXIT_SUCCESS
772
773   if options.nodegroup is None:
774     fn = _GlobalWatcher
775   else:
776     # Per-nodegroup watcher
777     fn = _GroupWatcher
778
779   try:
780     return fn(options)
781   except (SystemExit, KeyboardInterrupt):
782     raise
783   except NotMasterError:
784     logging.debug("Not master, exiting")
785     return constants.EXIT_NOTMASTER
786   except errors.ResolverError, err:
787     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
788     return constants.EXIT_NODESETUP_ERROR
789   except errors.JobQueueFull:
790     logging.error("Job queue is full, can't query cluster state")
791   except errors.JobQueueDrainError:
792     logging.error("Job queue is drained, can't maintain cluster state")
793   except Exception, err:
794     logging.exception(str(err))
795     return constants.EXIT_FAILURE
796
797   return constants.EXIT_SUCCESS