mem_count is now mem_size everywhere
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client # pylint: disable=W0611
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61   constants.INSTST_ERRORDOWN,
62   ])
63 HELPLESS_STATES = frozenset([
64   constants.INSTST_NODEDOWN,
65   constants.INSTST_NODEOFFLINE,
66   ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
72
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75
76
77 class NotMasterError(errors.GenericError):
78   """Exception raised when this host is not the master."""
79
80
81 def ShouldPause():
82   """Check whether we should pause.
83
84   """
85   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87
88 def StartNodeDaemons():
89   """Start all the daemons that should be running on all nodes.
90
91   """
92   # on master or not, try to start the node daemon
93   utils.EnsureDaemon(constants.NODED)
94   # start confd as well. On non candidates it will be in disabled mode.
95   if constants.ENABLE_CONFD:
96     utils.EnsureDaemon(constants.CONFD)
97
98
99 def RunWatcherHooks():
100   """Run the watcher hooks.
101
102   """
103   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104                              constants.HOOKS_NAME_WATCHER)
105   if not os.path.isdir(hooks_dir):
106     return
107
108   try:
109     results = utils.RunParts(hooks_dir)
110   except Exception, err: # pylint: disable=W0703
111     logging.exception("RunParts %s failed: %s", hooks_dir, err)
112     return
113
114   for (relname, status, runresult) in results:
115     if status == constants.RUNPARTS_SKIP:
116       logging.debug("Watcher hook %s: skipped", relname)
117     elif status == constants.RUNPARTS_ERR:
118       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119     elif status == constants.RUNPARTS_RUN:
120       if runresult.failed:
121         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122                         relname, runresult.exit_code, runresult.output)
123       else:
124         logging.debug("Watcher hook %s: success (output: %s)", relname,
125                       runresult.output)
126     else:
127       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
128                                    status)
129
130
131 class Instance(object):
132   """Abstraction for a Virtual Machine instance.
133
134   """
135   def __init__(self, name, status, autostart, snodes):
136     self.name = name
137     self.status = status
138     self.autostart = autostart
139     self.snodes = snodes
140
141   def Restart(self, cl):
142     """Encapsulates the start of an instance.
143
144     """
145     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146     cli.SubmitOpCode(op, cl=cl)
147
148   def ActivateDisks(self, cl):
149     """Encapsulates the activation of all disks of an instance.
150
151     """
152     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153     cli.SubmitOpCode(op, cl=cl)
154
155
156 class Node:
157   """Data container representing cluster node.
158
159   """
160   def __init__(self, name, bootid, offline, secondaries):
161     """Initializes this class.
162
163     """
164     self.name = name
165     self.bootid = bootid
166     self.offline = offline
167     self.secondaries = secondaries
168
169
170 def _CheckInstances(cl, notepad, instances):
171   """Make a pass over the list of instances, restarting downed ones.
172
173   """
174   notepad.MaintainInstanceList(instances.keys())
175
176   started = set()
177
178   for inst in instances.values():
179     if inst.status in BAD_STATES:
180       n = notepad.NumberOfRestartAttempts(inst.name)
181
182       if n > MAXTRIES:
183         logging.warning("Not restarting instance '%s', retries exhausted",
184                         inst.name)
185         continue
186
187       if n == MAXTRIES:
188         notepad.RecordRestartAttempt(inst.name)
189         logging.error("Could not restart instance '%s' after %s attempts,"
190                       " giving up", inst.name, MAXTRIES)
191         continue
192
193       try:
194         logging.info("Restarting instance '%s' (attempt #%s)",
195                      inst.name, n + 1)
196         inst.Restart(cl)
197       except Exception: # pylint: disable=W0703
198         logging.exception("Error while restarting instance '%s'", inst.name)
199       else:
200         started.add(inst.name)
201
202       notepad.RecordRestartAttempt(inst.name)
203
204     else:
205       if notepad.NumberOfRestartAttempts(inst.name):
206         notepad.RemoveInstance(inst.name)
207         if inst.status not in HELPLESS_STATES:
208           logging.info("Restart of instance '%s' succeeded", inst.name)
209
210   return started
211
212
213 def _CheckDisks(cl, notepad, nodes, instances, started):
214   """Check all nodes for restarted ones.
215
216   """
217   check_nodes = []
218
219   for node in nodes.values():
220     old = notepad.GetNodeBootID(node.name)
221     if not node.bootid:
222       # Bad node, not returning a boot id
223       if not node.offline:
224         logging.debug("Node '%s' missing boot ID, skipping secondary checks",
225                       node.name)
226       continue
227
228     if old != node.bootid:
229       # Node's boot ID has changed, probably through a reboot
230       check_nodes.append(node)
231
232   if check_nodes:
233     # Activate disks for all instances with any of the checked nodes as a
234     # secondary node.
235     for node in check_nodes:
236       for instance_name in node.secondaries:
237         try:
238           inst = instances[instance_name]
239         except KeyError:
240           logging.info("Can't find instance '%s', maybe it was ignored",
241                        instance_name)
242           continue
243
244         if not inst.autostart:
245           logging.info("Skipping disk activation for non-autostart"
246                        " instance '%s'", inst.name)
247           continue
248
249         if inst.name in started:
250           # we already tried to start the instance, which should have
251           # activated its drives (if they can be at all)
252           logging.debug("Skipping disk activation for instance '%s' as"
253                         " it was already started", inst.name)
254           continue
255
256         try:
257           logging.info("Activating disks for instance '%s'", inst.name)
258           inst.ActivateDisks(cl)
259         except Exception: # pylint: disable=W0703
260           logging.exception("Error while activating disks for instance '%s'",
261                             inst.name)
262
263     # Keep changed boot IDs
264     for node in check_nodes:
265       notepad.SetNodeBootID(node.name, node.bootid)
266
267
268 def _CheckForOfflineNodes(nodes, instance):
269   """Checks if given instances has any secondary in offline status.
270
271   @param instance: The instance object
272   @return: True if any of the secondary is offline, False otherwise
273
274   """
275   return compat.any(nodes[node_name].offline for node_name in instance.snodes)
276
277
278 def _VerifyDisks(cl, uuid, nodes, instances):
279   """Run a per-group "gnt-cluster verify-disks".
280
281   """
282   job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283   ((_, offline_disk_instances, _), ) = \
284     cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285   cl.ArchiveJob(job_id)
286
287   if not offline_disk_instances:
288     # nothing to do
289     logging.debug("Verify-disks reported no offline disks, nothing to do")
290     return
291
292   logging.debug("Will activate disks for instance(s) %s",
293                 utils.CommaJoin(offline_disk_instances))
294
295   # We submit only one job, and wait for it. Not optimal, but this puts less
296   # load on the job queue.
297   job = []
298   for name in offline_disk_instances:
299     try:
300       inst = instances[name]
301     except KeyError:
302       logging.info("Can't find instance '%s', maybe it was ignored", name)
303       continue
304
305     if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306       logging.info("Skipping instance '%s' because it is in a helpless state or"
307                    " has offline secondaries", name)
308       continue
309
310     job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
311
312   if job:
313     job_id = cli.SendJob(job, cl=cl)
314
315     try:
316       cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317     except Exception: # pylint: disable=W0703
318       logging.exception("Error while activating disks")
319
320
321 def IsRapiResponding(hostname):
322   """Connects to RAPI port and does a simple test.
323
324   Connects to RAPI port of hostname and does a simple test. At this time, the
325   test is GetVersion.
326
327   @type hostname: string
328   @param hostname: hostname of the node to connect to.
329   @rtype: bool
330   @return: Whether RAPI is working properly
331
332   """
333   curl_config = rapi.client.GenericCurlConfig()
334   rapi_client = rapi.client.GanetiRapiClient(hostname,
335                                              curl_config_fn=curl_config)
336   try:
337     master_version = rapi_client.GetVersion()
338   except rapi.client.CertificateError, err:
339     logging.warning("RAPI certificate error: %s", err)
340     return False
341   except rapi.client.GanetiApiError, err:
342     logging.warning("RAPI error: %s", err)
343     return False
344   else:
345     logging.debug("Reported RAPI version %s", master_version)
346     return master_version == constants.RAPI_VERSION
347
348
349 def ParseOptions():
350   """Parse the command line options.
351
352   @return: (options, args) as from OptionParser.parse_args()
353
354   """
355   parser = OptionParser(description="Ganeti cluster watcher",
356                         usage="%prog [-d]",
357                         version="%%prog (ganeti) %s" %
358                         constants.RELEASE_VERSION)
359
360   parser.add_option(cli.DEBUG_OPT)
361   parser.add_option(cli.NODEGROUP_OPT)
362   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363                     help="Autoarchive jobs older than this age (default"
364                           " 6 hours)")
365   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366                     action="store_true", help="Ignore cluster pause setting")
367   parser.add_option("--wait-children", dest="wait_children",
368                     action="store_true", help="Wait for child processes")
369   parser.add_option("--no-wait-children", dest="wait_children",
370                     action="store_false", help="Don't wait for child processes")
371   # See optparse documentation for why default values are not set by options
372   parser.set_defaults(wait_children=True)
373   options, args = parser.parse_args()
374   options.job_age = cli.ParseTimespec(options.job_age)
375
376   if args:
377     parser.error("No arguments expected")
378
379   return (options, args)
380
381
382 def _WriteInstanceStatus(filename, data):
383   """Writes the per-group instance status file.
384
385   The entries are sorted.
386
387   @type filename: string
388   @param filename: Path to instance status file
389   @type data: list of tuple; (instance name as string, status as string)
390   @param data: Instance name and status
391
392   """
393   logging.debug("Updating instance status file '%s' with %s instances",
394                 filename, len(data))
395
396   utils.WriteFile(filename,
397                   data="".join(map(compat.partial(operator.mod, "%s %s\n"),
398                                    sorted(data))))
399
400
401 def _UpdateInstanceStatus(filename, instances):
402   """Writes an instance status file from L{Instance} objects.
403
404   @type filename: string
405   @param filename: Path to status file
406   @type instances: list of L{Instance}
407
408   """
409   _WriteInstanceStatus(filename, [(inst.name, inst.status)
410                                   for inst in instances])
411
412
413 class _StatCb:
414   """Helper to store file handle's C{fstat}.
415
416   """
417   def __init__(self):
418     """Initializes this class.
419
420     """
421     self.st = None
422
423   def __call__(self, fh):
424     """Calls C{fstat} on file handle.
425
426     """
427     self.st = os.fstat(fh.fileno())
428
429
430 def _ReadInstanceStatus(filename):
431   """Reads an instance status file.
432
433   @type filename: string
434   @param filename: Path to status file
435   @rtype: tuple; (None or number, list of lists containing instance name and
436     status)
437   @return: File's mtime and instance status contained in the file; mtime is
438     C{None} if file can't be read
439
440   """
441   logging.debug("Reading per-group instance status from '%s'", filename)
442
443   statcb = _StatCb()
444   try:
445     content = utils.ReadFile(filename, preread=statcb)
446   except EnvironmentError, err:
447     if err.errno == errno.ENOENT:
448       logging.error("Can't read '%s', does not exist (yet)", filename)
449     else:
450       logging.exception("Unable to read '%s', ignoring", filename)
451     return (None, None)
452   else:
453     return (statcb.st.st_mtime, [line.split(None, 1)
454                                  for line in content.splitlines()])
455
456
457 def _MergeInstanceStatus(filename, pergroup_filename, groups):
458   """Merges all per-group instance status files into a global one.
459
460   @type filename: string
461   @param filename: Path to global instance status file
462   @type pergroup_filename: string
463   @param pergroup_filename: Path to per-group status files, must contain "%s"
464     to be replaced with group UUID
465   @type groups: sequence
466   @param groups: UUIDs of known groups
467
468   """
469   # Lock global status file in exclusive mode
470   lock = utils.FileLock.Open(filename)
471   try:
472     lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
473   except errors.LockError, err:
474     # All per-group processes will lock and update the file. None of them
475     # should take longer than 10 seconds (the value of
476     # INSTANCE_STATUS_LOCK_TIMEOUT).
477     logging.error("Can't acquire lock on instance status file '%s', not"
478                   " updating: %s", filename, err)
479     return
480
481   logging.debug("Acquired exclusive lock on '%s'", filename)
482
483   data = {}
484
485   # Load instance status from all groups
486   for group_uuid in groups:
487     (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
488
489     if mtime is not None:
490       for (instance_name, status) in instdata:
491         data.setdefault(instance_name, []).append((mtime, status))
492
493   # Select last update based on file mtime
494   inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
495                 for (instance_name, status) in data.items()]
496
497   # Write the global status file. Don't touch file after it's been
498   # updated--there is no lock anymore.
499   _WriteInstanceStatus(filename, inststatus)
500
501
502 def GetLuxiClient(try_restart):
503   """Tries to connect to the master daemon.
504
505   @type try_restart: bool
506   @param try_restart: Whether to attempt to restart the master daemon
507
508   """
509   try:
510     return cli.GetClient()
511   except errors.OpPrereqError, err:
512     # this is, from cli.GetClient, a not-master case
513     raise NotMasterError("Not on master node (%s)" % err)
514
515   except luxi.NoMasterError, err:
516     if not try_restart:
517       raise
518
519     logging.warning("Master daemon seems to be down (%s), trying to restart",
520                     err)
521
522     if not utils.EnsureDaemon(constants.MASTERD):
523       raise errors.GenericError("Can't start the master daemon")
524
525     # Retry the connection
526     return cli.GetClient()
527
528
529 def _StartGroupChildren(cl, wait):
530   """Starts a new instance of the watcher for every node group.
531
532   """
533   assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
534                         for arg in sys.argv)
535
536   result = cl.QueryGroups([], ["name", "uuid"], False)
537
538   children = []
539
540   for (idx, (name, uuid)) in enumerate(result):
541     args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
542
543     if idx > 0:
544       # Let's not kill the system
545       time.sleep(CHILD_PROCESS_DELAY)
546
547     logging.debug("Spawning child for group '%s' (%s), arguments %s",
548                   name, uuid, args)
549
550     try:
551       # TODO: Should utils.StartDaemon be used instead?
552       pid = os.spawnv(os.P_NOWAIT, args[0], args)
553     except Exception: # pylint: disable=W0703
554       logging.exception("Failed to start child for group '%s' (%s)",
555                         name, uuid)
556     else:
557       logging.debug("Started with PID %s", pid)
558       children.append(pid)
559
560   if wait:
561     for pid in children:
562       logging.debug("Waiting for child PID %s", pid)
563       try:
564         result = utils.RetryOnSignal(os.waitpid, pid, 0)
565       except EnvironmentError, err:
566         result = str(err)
567
568       logging.debug("Child PID %s exited with status %s", pid, result)
569
570
571 def _ArchiveJobs(cl, age):
572   """Archives old jobs.
573
574   """
575   (arch_count, left_count) = cl.AutoArchiveJobs(age)
576   logging.debug("Archived %s jobs, left %s", arch_count, left_count)
577
578
579 def _CheckMaster(cl):
580   """Ensures current host is master node.
581
582   """
583   (master, ) = cl.QueryConfigValues(["master_node"])
584   if master != netutils.Hostname.GetSysName():
585     raise NotMasterError("This is not the master node")
586
587
588 @rapi.client.UsesRapiClient
589 def _GlobalWatcher(opts):
590   """Main function for global watcher.
591
592   At the end child processes are spawned for every node group.
593
594   """
595   StartNodeDaemons()
596   RunWatcherHooks()
597
598   # Run node maintenance in all cases, even if master, so that old masters can
599   # be properly cleaned up
600   if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
601     nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
602
603   try:
604     client = GetLuxiClient(True)
605   except NotMasterError:
606     # Don't proceed on non-master nodes
607     return constants.EXIT_SUCCESS
608
609   # we are on master now
610   utils.EnsureDaemon(constants.RAPI)
611
612   # If RAPI isn't responding to queries, try one restart
613   logging.debug("Attempting to talk to remote API on %s",
614                 constants.IP4_ADDRESS_LOCALHOST)
615   if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616     logging.warning("Couldn't get answer from remote API, restaring daemon")
617     utils.StopDaemon(constants.RAPI)
618     utils.EnsureDaemon(constants.RAPI)
619     logging.debug("Second attempt to talk to remote API")
620     if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
621       logging.fatal("RAPI is not responding")
622   logging.debug("Successfully talked to remote API")
623
624   _CheckMaster(client)
625   _ArchiveJobs(client, opts.job_age)
626
627   # Spawn child processes for all node groups
628   _StartGroupChildren(client, opts.wait_children)
629
630   return constants.EXIT_SUCCESS
631
632
633 def _GetGroupData(cl, uuid):
634   """Retrieves instances and nodes per node group.
635
636   """
637   job = [
638     # Get all primary instances in group
639     opcodes.OpQuery(what=constants.QR_INSTANCE,
640                     fields=["name", "status", "admin_state", "snodes",
641                             "pnode.group.uuid", "snodes.group.uuid"],
642                     qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
643                     use_locking=True),
644
645     # Get all nodes in group
646     opcodes.OpQuery(what=constants.QR_NODE,
647                     fields=["name", "bootid", "offline"],
648                     qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
649                     use_locking=True),
650     ]
651
652   job_id = cl.SubmitJob(job)
653   results = map(objects.QueryResponse.FromDict,
654                 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
655   cl.ArchiveJob(job_id)
656
657   results_data = map(operator.attrgetter("data"), results)
658
659   # Ensure results are tuples with two values
660   assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
661
662   # Extract values ignoring result status
663   (raw_instances, raw_nodes) = [[map(compat.snd, values)
664                                  for values in res]
665                                 for res in results_data]
666
667   secondaries = {}
668   instances = []
669
670   # Load all instances
671   for (name, status, autostart, snodes, pnode_group_uuid,
672        snodes_group_uuid) in raw_instances:
673     if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
674       logging.error("Ignoring split instance '%s', primary group %s, secondary"
675                     " groups %s", name, pnode_group_uuid,
676                     utils.CommaJoin(snodes_group_uuid))
677     else:
678       instances.append(Instance(name, status, autostart, snodes))
679
680       for node in snodes:
681         secondaries.setdefault(node, set()).add(name)
682
683   # Load all nodes
684   nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
685            for (name, bootid, offline) in raw_nodes]
686
687   return (dict((node.name, node) for node in nodes),
688           dict((inst.name, inst) for inst in instances))
689
690
691 def _LoadKnownGroups():
692   """Returns a list of all node groups known by L{ssconf}.
693
694   """
695   groups = ssconf.SimpleStore().GetNodegroupList()
696
697   result = list(line.split(None, 1)[0] for line in groups
698                 if line.strip())
699
700   if not compat.all(map(utils.UUID_RE.match, result)):
701     raise errors.GenericError("Ssconf contains invalid group UUID")
702
703   return result
704
705
706 def _GroupWatcher(opts):
707   """Main function for per-group watcher process.
708
709   """
710   group_uuid = opts.nodegroup.lower()
711
712   if not utils.UUID_RE.match(group_uuid):
713     raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
714                               " got '%s'" %
715                               (cli.NODEGROUP_OPT_NAME, group_uuid))
716
717   logging.info("Watcher for node group '%s'", group_uuid)
718
719   known_groups = _LoadKnownGroups()
720
721   # Check if node group is known
722   if group_uuid not in known_groups:
723     raise errors.GenericError("Node group '%s' is not known by ssconf" %
724                               group_uuid)
725
726   # Group UUID has been verified and should not contain any dangerous characters
727   state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
728   inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
729
730   logging.debug("Using state file %s", state_path)
731
732   # Global watcher
733   statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
734   if not statefile:
735     return constants.EXIT_FAILURE
736
737   notepad = state.WatcherState(statefile) # pylint: disable=E0602
738   try:
739     # Connect to master daemon
740     client = GetLuxiClient(False)
741
742     _CheckMaster(client)
743
744     (nodes, instances) = _GetGroupData(client, group_uuid)
745
746     # Update per-group instance status file
747     _UpdateInstanceStatus(inst_status_path, instances.values())
748
749     _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
750                          constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
751                          known_groups)
752
753     started = _CheckInstances(client, notepad, instances)
754     _CheckDisks(client, notepad, nodes, instances, started)
755     _VerifyDisks(client, group_uuid, nodes, instances)
756   except Exception, err:
757     logging.info("Not updating status file due to failure: %s", err)
758     raise
759   else:
760     # Save changes for next run
761     notepad.Save(state_path)
762
763   return constants.EXIT_SUCCESS
764
765
766 def Main():
767   """Main function.
768
769   """
770   (options, _) = ParseOptions()
771
772   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
773                      debug=options.debug, stderr_logging=options.debug)
774
775   if ShouldPause() and not options.ignore_pause:
776     logging.debug("Pause has been set, exiting")
777     return constants.EXIT_SUCCESS
778
779   # Try to acquire global watcher lock in shared mode
780   lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
781   try:
782     lock.Shared(blocking=False)
783   except (EnvironmentError, errors.LockError), err:
784     logging.error("Can't acquire lock on %s: %s",
785                   constants.WATCHER_LOCK_FILE, err)
786     return constants.EXIT_SUCCESS
787
788   if options.nodegroup is None:
789     fn = _GlobalWatcher
790   else:
791     # Per-nodegroup watcher
792     fn = _GroupWatcher
793
794   try:
795     return fn(options)
796   except (SystemExit, KeyboardInterrupt):
797     raise
798   except NotMasterError:
799     logging.debug("Not master, exiting")
800     return constants.EXIT_NOTMASTER
801   except errors.ResolverError, err:
802     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
803     return constants.EXIT_NODESETUP_ERROR
804   except errors.JobQueueFull:
805     logging.error("Job queue is full, can't query cluster state")
806   except errors.JobQueueDrainError:
807     logging.error("Job queue is drained, can't maintain cluster state")
808   except Exception, err:
809     logging.exception(str(err))
810     return constants.EXIT_FAILURE
811
812   return constants.EXIT_SUCCESS