Re-activate previously active disks in watcher
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63   constants.INSTST_ERRORDOWN,
64   ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66   constants.INSTST_NODEDOWN,
67   constants.INSTST_NODEOFFLINE,
68   ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
74
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77
78
79 class NotMasterError(errors.GenericError):
80   """Exception raised when this host is not the master."""
81
82
83 def ShouldPause():
84   """Check whether we should pause.
85
86   """
87   return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89
90 def StartNodeDaemons():
91   """Start all the daemons that should be running on all nodes.
92
93   """
94   # on master or not, try to start the node daemon
95   utils.EnsureDaemon(constants.NODED)
96   # start confd as well. On non candidates it will be in disabled mode.
97   if constants.ENABLE_CONFD:
98     utils.EnsureDaemon(constants.CONFD)
99   # start mond as well: all nodes need monitoring
100   if constants.ENABLE_MOND:
101     utils.EnsureDaemon(constants.MOND)
102
103
104 def RunWatcherHooks():
105   """Run the watcher hooks.
106
107   """
108   hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109                              constants.HOOKS_NAME_WATCHER)
110   if not os.path.isdir(hooks_dir):
111     return
112
113   try:
114     results = utils.RunParts(hooks_dir)
115   except Exception, err: # pylint: disable=W0703
116     logging.exception("RunParts %s failed: %s", hooks_dir, err)
117     return
118
119   for (relname, status, runresult) in results:
120     if status == constants.RUNPARTS_SKIP:
121       logging.debug("Watcher hook %s: skipped", relname)
122     elif status == constants.RUNPARTS_ERR:
123       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124     elif status == constants.RUNPARTS_RUN:
125       if runresult.failed:
126         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127                         relname, runresult.exit_code, runresult.output)
128       else:
129         logging.debug("Watcher hook %s: success (output: %s)", relname,
130                       runresult.output)
131     else:
132       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133                                    status)
134
135
136 class Instance(object):
137   """Abstraction for a Virtual Machine instance.
138
139   """
140   def __init__(self, name, status, disks_active, snodes):
141     self.name = name
142     self.status = status
143     self.disks_active = disks_active
144     self.snodes = snodes
145
146   def Restart(self, cl):
147     """Encapsulates the start of an instance.
148
149     """
150     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151     cli.SubmitOpCode(op, cl=cl)
152
153   def ActivateDisks(self, cl):
154     """Encapsulates the activation of all disks of an instance.
155
156     """
157     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158     cli.SubmitOpCode(op, cl=cl)
159
160
161 class Node:
162   """Data container representing cluster node.
163
164   """
165   def __init__(self, name, bootid, offline, secondaries):
166     """Initializes this class.
167
168     """
169     self.name = name
170     self.bootid = bootid
171     self.offline = offline
172     self.secondaries = secondaries
173
174
175 def _CheckInstances(cl, notepad, instances):
176   """Make a pass over the list of instances, restarting downed ones.
177
178   """
179   notepad.MaintainInstanceList(instances.keys())
180
181   started = set()
182
183   for inst in instances.values():
184     if inst.status in BAD_STATES:
185       n = notepad.NumberOfRestartAttempts(inst.name)
186
187       if n > MAXTRIES:
188         logging.warning("Not restarting instance '%s', retries exhausted",
189                         inst.name)
190         continue
191
192       if n == MAXTRIES:
193         notepad.RecordRestartAttempt(inst.name)
194         logging.error("Could not restart instance '%s' after %s attempts,"
195                       " giving up", inst.name, MAXTRIES)
196         continue
197
198       try:
199         logging.info("Restarting instance '%s' (attempt #%s)",
200                      inst.name, n + 1)
201         inst.Restart(cl)
202       except Exception: # pylint: disable=W0703
203         logging.exception("Error while restarting instance '%s'", inst.name)
204       else:
205         started.add(inst.name)
206
207       notepad.RecordRestartAttempt(inst.name)
208
209     else:
210       if notepad.NumberOfRestartAttempts(inst.name):
211         notepad.RemoveInstance(inst.name)
212         if inst.status not in HELPLESS_STATES:
213           logging.info("Restart of instance '%s' succeeded", inst.name)
214
215   return started
216
217
218 def _CheckDisks(cl, notepad, nodes, instances, started):
219   """Check all nodes for restarted ones.
220
221   """
222   check_nodes = []
223
224   for node in nodes.values():
225     old = notepad.GetNodeBootID(node.name)
226     if not node.bootid:
227       # Bad node, not returning a boot id
228       if not node.offline:
229         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230                       node.name)
231       continue
232
233     if old != node.bootid:
234       # Node's boot ID has changed, probably through a reboot
235       check_nodes.append(node)
236
237   if check_nodes:
238     # Activate disks for all instances with any of the checked nodes as a
239     # secondary node.
240     for node in check_nodes:
241       for instance_name in node.secondaries:
242         try:
243           inst = instances[instance_name]
244         except KeyError:
245           logging.info("Can't find instance '%s', maybe it was ignored",
246                        instance_name)
247           continue
248
249         if not inst.disks_active:
250           logging.info("Skipping disk activation for instance with not"
251                        " activated disks '%s'", inst.name)
252           continue
253
254         if inst.name in started:
255           # we already tried to start the instance, which should have
256           # activated its drives (if they can be at all)
257           logging.debug("Skipping disk activation for instance '%s' as"
258                         " it was already started", inst.name)
259           continue
260
261         try:
262           logging.info("Activating disks for instance '%s'", inst.name)
263           inst.ActivateDisks(cl)
264         except Exception: # pylint: disable=W0703
265           logging.exception("Error while activating disks for instance '%s'",
266                             inst.name)
267
268     # Keep changed boot IDs
269     for node in check_nodes:
270       notepad.SetNodeBootID(node.name, node.bootid)
271
272
273 def _CheckForOfflineNodes(nodes, instance):
274   """Checks if given instances has any secondary in offline status.
275
276   @param instance: The instance object
277   @return: True if any of the secondary is offline, False otherwise
278
279   """
280   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
282
283 def _VerifyDisks(cl, uuid, nodes, instances):
284   """Run a per-group "gnt-cluster verify-disks".
285
286   """
287   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288   ((_, offline_disk_instances, _), ) = \
289     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290   cl.ArchiveJob(job_id)
291
292   if not offline_disk_instances:
293     # nothing to do
294     logging.debug("Verify-disks reported no offline disks, nothing to do")
295     return
296
297   logging.debug("Will activate disks for instance(s) %s",
298                 utils.CommaJoin(offline_disk_instances))
299
300   # We submit only one job, and wait for it. Not optimal, but this puts less
301   # load on the job queue.
302   job = []
303   for name in offline_disk_instances:
304     try:
305       inst = instances[name]
306     except KeyError:
307       logging.info("Can't find instance '%s', maybe it was ignored", name)
308       continue
309
310     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311       logging.info("Skipping instance '%s' because it is in a helpless state"
312                    " or has offline secondaries", name)
313       continue
314
315     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
316
317   if job:
318     job_id = cli.SendJob(job, cl=cl)
319
320     try:
321       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322     except Exception: # pylint: disable=W0703
323       logging.exception("Error while activating disks")
324
325
326 def IsRapiResponding(hostname):
327   """Connects to RAPI port and does a simple test.
328
329   Connects to RAPI port of hostname and does a simple test. At this time, the
330   test is GetVersion.
331
332   @type hostname: string
333   @param hostname: hostname of the node to connect to.
334   @rtype: bool
335   @return: Whether RAPI is working properly
336
337   """
338   curl_config = rapi.client.GenericCurlConfig()
339   rapi_client = rapi.client.GanetiRapiClient(hostname,
340                                              curl_config_fn=curl_config)
341   try:
342     master_version = rapi_client.GetVersion()
343   except rapi.client.CertificateError, err:
344     logging.warning("RAPI certificate error: %s", err)
345     return False
346   except rapi.client.GanetiApiError, err:
347     logging.warning("RAPI error: %s", err)
348     return False
349   else:
350     logging.debug("Reported RAPI version %s", master_version)
351     return master_version == constants.RAPI_VERSION
352
353
354 def ParseOptions():
355   """Parse the command line options.
356
357   @return: (options, args) as from OptionParser.parse_args()
358
359   """
360   parser = OptionParser(description="Ganeti cluster watcher",
361                         usage="%prog [-d]",
362                         version="%%prog (ganeti) %s" %
363                         constants.RELEASE_VERSION)
364
365   parser.add_option(cli.DEBUG_OPT)
366   parser.add_option(cli.NODEGROUP_OPT)
367   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
368                     help="Autoarchive jobs older than this age (default"
369                           " 6 hours)")
370   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
371                     action="store_true", help="Ignore cluster pause setting")
372   parser.add_option("--wait-children", dest="wait_children",
373                     action="store_true", help="Wait for child processes")
374   parser.add_option("--no-wait-children", dest="wait_children",
375                     action="store_false",
376                     help="Don't wait for child processes")
377   # See optparse documentation for why default values are not set by options
378   parser.set_defaults(wait_children=True)
379   options, args = parser.parse_args()
380   options.job_age = cli.ParseTimespec(options.job_age)
381
382   if args:
383     parser.error("No arguments expected")
384
385   return (options, args)
386
387
388 def _WriteInstanceStatus(filename, data):
389   """Writes the per-group instance status file.
390
391   The entries are sorted.
392
393   @type filename: string
394   @param filename: Path to instance status file
395   @type data: list of tuple; (instance name as string, status as string)
396   @param data: Instance name and status
397
398   """
399   logging.debug("Updating instance status file '%s' with %s instances",
400                 filename, len(data))
401
402   utils.WriteFile(filename,
403                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
404                                    sorted(data))))
405
406
407 def _UpdateInstanceStatus(filename, instances):
408   """Writes an instance status file from L{Instance} objects.
409
410   @type filename: string
411   @param filename: Path to status file
412   @type instances: list of L{Instance}
413
414   """
415   _WriteInstanceStatus(filename, [(inst.name, inst.status)
416                                   for inst in instances])
417
418
419 def _ReadInstanceStatus(filename):
420   """Reads an instance status file.
421
422   @type filename: string
423   @param filename: Path to status file
424   @rtype: tuple; (None or number, list of lists containing instance name and
425     status)
426   @return: File's mtime and instance status contained in the file; mtime is
427     C{None} if file can't be read
428
429   """
430   logging.debug("Reading per-group instance status from '%s'", filename)
431
432   statcb = utils.FileStatHelper()
433   try:
434     content = utils.ReadFile(filename, preread=statcb)
435   except EnvironmentError, err:
436     if err.errno == errno.ENOENT:
437       logging.error("Can't read '%s', does not exist (yet)", filename)
438     else:
439       logging.exception("Unable to read '%s', ignoring", filename)
440     return (None, None)
441   else:
442     return (statcb.st.st_mtime, [line.split(None, 1)
443                                  for line in content.splitlines()])
444
445
446 def _MergeInstanceStatus(filename, pergroup_filename, groups):
447   """Merges all per-group instance status files into a global one.
448
449   @type filename: string
450   @param filename: Path to global instance status file
451   @type pergroup_filename: string
452   @param pergroup_filename: Path to per-group status files, must contain "%s"
453     to be replaced with group UUID
454   @type groups: sequence
455   @param groups: UUIDs of known groups
456
457   """
458   # Lock global status file in exclusive mode
459   lock = utils.FileLock.Open(filename)
460   try:
461     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
462   except errors.LockError, err:
463     # All per-group processes will lock and update the file. None of them
464     # should take longer than 10 seconds (the value of
465     # INSTANCE_STATUS_LOCK_TIMEOUT).
466     logging.error("Can't acquire lock on instance status file '%s', not"
467                   " updating: %s", filename, err)
468     return
469
470   logging.debug("Acquired exclusive lock on '%s'", filename)
471
472   data = {}
473
474   # Load instance status from all groups
475   for group_uuid in groups:
476     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
477
478     if mtime is not None:
479       for (instance_name, status) in instdata:
480         data.setdefault(instance_name, []).append((mtime, status))
481
482   # Select last update based on file mtime
483   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
484                 for (instance_name, status) in data.items()]
485
486   # Write the global status file. Don't touch file after it's been
487   # updated--there is no lock anymore.
488   _WriteInstanceStatus(filename, inststatus)
489
490
491 def GetLuxiClient(try_restart):
492   """Tries to connect to the master daemon.
493
494   @type try_restart: bool
495   @param try_restart: Whether to attempt to restart the master daemon
496
497   """
498   try:
499     return cli.GetClient()
500   except errors.OpPrereqError, err:
501     # this is, from cli.GetClient, a not-master case
502     raise NotMasterError("Not on master node (%s)" % err)
503
504   except luxi.NoMasterError, err:
505     if not try_restart:
506       raise
507
508     logging.warning("Master daemon seems to be down (%s), trying to restart",
509                     err)
510
511     if not utils.EnsureDaemon(constants.MASTERD):
512       raise errors.GenericError("Can't start the master daemon")
513
514     # Retry the connection
515     return cli.GetClient()
516
517
518 def _StartGroupChildren(cl, wait):
519   """Starts a new instance of the watcher for every node group.
520
521   """
522   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
523                         for arg in sys.argv)
524
525   result = cl.QueryGroups([], ["name", "uuid"], False)
526
527   children = []
528
529   for (idx, (name, uuid)) in enumerate(result):
530     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
531
532     if idx > 0:
533       # Let's not kill the system
534       time.sleep(CHILD_PROCESS_DELAY)
535
536     logging.debug("Spawning child for group '%s' (%s), arguments %s",
537                   name, uuid, args)
538
539     try:
540       # TODO: Should utils.StartDaemon be used instead?
541       pid = os.spawnv(os.P_NOWAIT, args[0], args)
542     except Exception: # pylint: disable=W0703
543       logging.exception("Failed to start child for group '%s' (%s)",
544                         name, uuid)
545     else:
546       logging.debug("Started with PID %s", pid)
547       children.append(pid)
548
549   if wait:
550     for pid in children:
551       logging.debug("Waiting for child PID %s", pid)
552       try:
553         result = utils.RetryOnSignal(os.waitpid, pid, 0)
554       except EnvironmentError, err:
555         result = str(err)
556
557       logging.debug("Child PID %s exited with status %s", pid, result)
558
559
560 def _ArchiveJobs(cl, age):
561   """Archives old jobs.
562
563   """
564   (arch_count, left_count) = cl.AutoArchiveJobs(age)
565   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
566
567
568 def _CheckMaster(cl):
569   """Ensures current host is master node.
570
571   """
572   (master, ) = cl.QueryConfigValues(["master_node"])
573   if master != netutils.Hostname.GetSysName():
574     raise NotMasterError("This is not the master node")
575
576
577 @UsesRapiClient
578 def _GlobalWatcher(opts):
579   """Main function for global watcher.
580
581   At the end child processes are spawned for every node group.
582
583   """
584   StartNodeDaemons()
585   RunWatcherHooks()
586
587   # Run node maintenance in all cases, even if master, so that old masters can
588   # be properly cleaned up
589   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
590     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
591
592   try:
593     client = GetLuxiClient(True)
594   except NotMasterError:
595     # Don't proceed on non-master nodes
596     return constants.EXIT_SUCCESS
597
598   # we are on master now
599   utils.EnsureDaemon(constants.RAPI)
600
601   # If RAPI isn't responding to queries, try one restart
602   logging.debug("Attempting to talk to remote API on %s",
603                 constants.IP4_ADDRESS_LOCALHOST)
604   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
605     logging.warning("Couldn't get answer from remote API, restaring daemon")
606     utils.StopDaemon(constants.RAPI)
607     utils.EnsureDaemon(constants.RAPI)
608     logging.debug("Second attempt to talk to remote API")
609     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
610       logging.fatal("RAPI is not responding")
611   logging.debug("Successfully talked to remote API")
612
613   _CheckMaster(client)
614   _ArchiveJobs(client, opts.job_age)
615
616   # Spawn child processes for all node groups
617   _StartGroupChildren(client, opts.wait_children)
618
619   return constants.EXIT_SUCCESS
620
621
622 def _GetGroupData(cl, uuid):
623   """Retrieves instances and nodes per node group.
624
625   """
626   job = [
627     # Get all primary instances in group
628     opcodes.OpQuery(what=constants.QR_INSTANCE,
629                     fields=["name", "status", "disks_active", "snodes",
630                             "pnode.group.uuid", "snodes.group.uuid"],
631                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
632                     use_locking=True),
633
634     # Get all nodes in group
635     opcodes.OpQuery(what=constants.QR_NODE,
636                     fields=["name", "bootid", "offline"],
637                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
638                     use_locking=True),
639     ]
640
641   job_id = cl.SubmitJob(job)
642   results = map(objects.QueryResponse.FromDict,
643                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
644   cl.ArchiveJob(job_id)
645
646   results_data = map(operator.attrgetter("data"), results)
647
648   # Ensure results are tuples with two values
649   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
650
651   # Extract values ignoring result status
652   (raw_instances, raw_nodes) = [[map(compat.snd, values)
653                                  for values in res]
654                                 for res in results_data]
655
656   secondaries = {}
657   instances = []
658
659   # Load all instances
660   for (name, status, disks_active, snodes, pnode_group_uuid,
661        snodes_group_uuid) in raw_instances:
662     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
663       logging.error("Ignoring split instance '%s', primary group %s, secondary"
664                     " groups %s", name, pnode_group_uuid,
665                     utils.CommaJoin(snodes_group_uuid))
666     else:
667       instances.append(Instance(name, status, disks_active, snodes))
668
669       for node in snodes:
670         secondaries.setdefault(node, set()).add(name)
671
672   # Load all nodes
673   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
674            for (name, bootid, offline) in raw_nodes]
675
676   return (dict((node.name, node) for node in nodes),
677           dict((inst.name, inst) for inst in instances))
678
679
680 def _LoadKnownGroups():
681   """Returns a list of all node groups known by L{ssconf}.
682
683   """
684   groups = ssconf.SimpleStore().GetNodegroupList()
685
686   result = list(line.split(None, 1)[0] for line in groups
687                 if line.strip())
688
689   if not compat.all(map(utils.UUID_RE.match, result)):
690     raise errors.GenericError("Ssconf contains invalid group UUID")
691
692   return result
693
694
695 def _GroupWatcher(opts):
696   """Main function for per-group watcher process.
697
698   """
699   group_uuid = opts.nodegroup.lower()
700
701   if not utils.UUID_RE.match(group_uuid):
702     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
703                               " got '%s'" %
704                               (cli.NODEGROUP_OPT_NAME, group_uuid))
705
706   logging.info("Watcher for node group '%s'", group_uuid)
707
708   known_groups = _LoadKnownGroups()
709
710   # Check if node group is known
711   if group_uuid not in known_groups:
712     raise errors.GenericError("Node group '%s' is not known by ssconf" %
713                               group_uuid)
714
715   # Group UUID has been verified and should not contain any dangerous
716   # characters
717   state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
718   inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
719
720   logging.debug("Using state file %s", state_path)
721
722   # Global watcher
723   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
724   if not statefile:
725     return constants.EXIT_FAILURE
726
727   notepad = state.WatcherState(statefile) # pylint: disable=E0602
728   try:
729     # Connect to master daemon
730     client = GetLuxiClient(False)
731
732     _CheckMaster(client)
733
734     (nodes, instances) = _GetGroupData(client, group_uuid)
735
736     # Update per-group instance status file
737     _UpdateInstanceStatus(inst_status_path, instances.values())
738
739     _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
740                          pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
741                          known_groups)
742
743     started = _CheckInstances(client, notepad, instances)
744     _CheckDisks(client, notepad, nodes, instances, started)
745     _VerifyDisks(client, group_uuid, nodes, instances)
746   except Exception, err:
747     logging.info("Not updating status file due to failure: %s", err)
748     raise
749   else:
750     # Save changes for next run
751     notepad.Save(state_path)
752
753   return constants.EXIT_SUCCESS
754
755
756 def Main():
757   """Main function.
758
759   """
760   (options, _) = ParseOptions()
761
762   utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
763                      debug=options.debug, stderr_logging=options.debug)
764
765   if ShouldPause() and not options.ignore_pause:
766     logging.debug("Pause has been set, exiting")
767     return constants.EXIT_SUCCESS
768
769   # Try to acquire global watcher lock in shared mode
770   lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
771   try:
772     lock.Shared(blocking=False)
773   except (EnvironmentError, errors.LockError), err:
774     logging.error("Can't acquire lock on %s: %s",
775                   pathutils.WATCHER_LOCK_FILE, err)
776     return constants.EXIT_SUCCESS
777
778   if options.nodegroup is None:
779     fn = _GlobalWatcher
780   else:
781     # Per-nodegroup watcher
782     fn = _GroupWatcher
783
784   try:
785     return fn(options)
786   except (SystemExit, KeyboardInterrupt):
787     raise
788   except NotMasterError:
789     logging.debug("Not master, exiting")
790     return constants.EXIT_NOTMASTER
791   except errors.ResolverError, err:
792     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
793     return constants.EXIT_NODESETUP_ERROR
794   except errors.JobQueueFull:
795     logging.error("Job queue is full, can't query cluster state")
796   except errors.JobQueueDrainError:
797     logging.error("Job queue is drained, can't maintain cluster state")
798   except Exception, err:
799     logging.exception(str(err))
800     return constants.EXIT_FAILURE
801
802   return constants.EXIT_SUCCESS