Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 2635bb04

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52

    
53
import ganeti.rapi.client # pylint: disable=W0611
54

    
55
from ganeti.watcher import nodemaint
56
from ganeti.watcher import state
57

    
58

    
59
MAXTRIES = 5
60
BAD_STATES = frozenset([
61
  constants.INSTST_ERRORDOWN,
62
  ])
63
HELPLESS_STATES = frozenset([
64
  constants.INSTST_NODEDOWN,
65
  constants.INSTST_NODEOFFLINE,
66
  ])
67
NOTICE = "NOTICE"
68
ERROR = "ERROR"
69

    
70
#: Number of seconds to wait between starting child processes for node groups
71
CHILD_PROCESS_DELAY = 1.0
72

    
73
#: How many seconds to wait for instance status file lock
74
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75

    
76

    
77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
79

    
80

    
81
def ShouldPause():
82
  """Check whether we should pause.
83

84
  """
85
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86

    
87

    
88
def StartNodeDaemons():
89
  """Start all the daemons that should be running on all nodes.
90

91
  """
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  if constants.ENABLE_CONFD:
96
    utils.EnsureDaemon(constants.CONFD)
97

    
98

    
99
def RunWatcherHooks():
100
  """Run the watcher hooks.
101

102
  """
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104
                             constants.HOOKS_NAME_WATCHER)
105
  if not os.path.isdir(hooks_dir):
106
    return
107

    
108
  try:
109
    results = utils.RunParts(hooks_dir)
110
  except Exception, err: # pylint: disable=W0703
111
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
112
    return
113

    
114
  for (relname, status, runresult) in results:
115
    if status == constants.RUNPARTS_SKIP:
116
      logging.debug("Watcher hook %s: skipped", relname)
117
    elif status == constants.RUNPARTS_ERR:
118
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119
    elif status == constants.RUNPARTS_RUN:
120
      if runresult.failed:
121
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122
                        relname, runresult.exit_code, runresult.output)
123
      else:
124
        logging.debug("Watcher hook %s: success (output: %s)", relname,
125
                      runresult.output)
126
    else:
127
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
128
                                   status)
129

    
130

    
131
class Instance(object):
132
  """Abstraction for a Virtual Machine instance.
133

134
  """
135
  def __init__(self, name, status, autostart, snodes):
136
    self.name = name
137
    self.status = status
138
    self.autostart = autostart
139
    self.snodes = snodes
140

    
141
  def Restart(self, cl):
142
    """Encapsulates the start of an instance.
143

144
    """
145
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146
    cli.SubmitOpCode(op, cl=cl)
147

    
148
  def ActivateDisks(self, cl):
149
    """Encapsulates the activation of all disks of an instance.
150

151
    """
152
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153
    cli.SubmitOpCode(op, cl=cl)
154

    
155

    
156
class Node:
157
  """Data container representing cluster node.
158

159
  """
160
  def __init__(self, name, bootid, offline, secondaries):
161
    """Initializes this class.
162

163
    """
164
    self.name = name
165
    self.bootid = bootid
166
    self.offline = offline
167
    self.secondaries = secondaries
168

    
169

    
170
def _CheckInstances(cl, notepad, instances):
171
  """Make a pass over the list of instances, restarting downed ones.
172

173
  """
174
  notepad.MaintainInstanceList(instances.keys())
175

    
176
  started = set()
177

    
178
  for inst in instances.values():
179
    if inst.status in BAD_STATES:
180
      n = notepad.NumberOfRestartAttempts(inst.name)
181

    
182
      if n > MAXTRIES:
183
        logging.warning("Not restarting instance '%s', retries exhausted",
184
                        inst.name)
185
        continue
186

    
187
      if n == MAXTRIES:
188
        notepad.RecordRestartAttempt(inst.name)
189
        logging.error("Could not restart instance '%s' after %s attempts,"
190
                      " giving up", inst.name, MAXTRIES)
191
        continue
192

    
193
      try:
194
        logging.info("Restarting instance '%s' (attempt #%s)",
195
                     inst.name, n + 1)
196
        inst.Restart(cl)
197
      except Exception: # pylint: disable=W0703
198
        logging.exception("Error while restarting instance '%s'", inst.name)
199
      else:
200
        started.add(inst.name)
201

    
202
      notepad.RecordRestartAttempt(inst.name)
203

    
204
    else:
205
      if notepad.NumberOfRestartAttempts(inst.name):
206
        notepad.RemoveInstance(inst.name)
207
        if inst.status not in HELPLESS_STATES:
208
          logging.info("Restart of instance '%s' succeeded", inst.name)
209

    
210
  return started
211

    
212

    
213
def _CheckDisks(cl, notepad, nodes, instances, started):
214
  """Check all nodes for restarted ones.
215

216
  """
217
  check_nodes = []
218

    
219
  for node in nodes.values():
220
    old = notepad.GetNodeBootID(node.name)
221
    if not node.bootid:
222
      # Bad node, not returning a boot id
223
      if not node.offline:
224
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
225
                      node.name)
226
      continue
227

    
228
    if old != node.bootid:
229
      # Node's boot ID has changed, probably through a reboot
230
      check_nodes.append(node)
231

    
232
  if check_nodes:
233
    # Activate disks for all instances with any of the checked nodes as a
234
    # secondary node.
235
    for node in check_nodes:
236
      for instance_name in node.secondaries:
237
        try:
238
          inst = instances[instance_name]
239
        except KeyError:
240
          logging.info("Can't find instance '%s', maybe it was ignored",
241
                       instance_name)
242
          continue
243

    
244
        if not inst.autostart:
245
          logging.info("Skipping disk activation for non-autostart"
246
                       " instance '%s'", inst.name)
247
          continue
248

    
249
        if inst.name in started:
250
          # we already tried to start the instance, which should have
251
          # activated its drives (if they can be at all)
252
          logging.debug("Skipping disk activation for instance '%s' as"
253
                        " it was already started", inst.name)
254
          continue
255

    
256
        try:
257
          logging.info("Activating disks for instance '%s'", inst.name)
258
          inst.ActivateDisks(cl)
259
        except Exception: # pylint: disable=W0703
260
          logging.exception("Error while activating disks for instance '%s'",
261
                            inst.name)
262

    
263
    # Keep changed boot IDs
264
    for node in check_nodes:
265
      notepad.SetNodeBootID(node.name, node.bootid)
266

    
267

    
268
def _CheckForOfflineNodes(nodes, instance):
269
  """Checks if given instances has any secondary in offline status.
270

271
  @param instance: The instance object
272
  @return: True if any of the secondary is offline, False otherwise
273

274
  """
275
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
276

    
277

    
278
def _VerifyDisks(cl, uuid, nodes, instances):
279
  """Run a per-group "gnt-cluster verify-disks".
280

281
  """
282
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283
  ((_, offline_disk_instances, _), ) = \
284
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285
  cl.ArchiveJob(job_id)
286

    
287
  if not offline_disk_instances:
288
    # nothing to do
289
    logging.debug("Verify-disks reported no offline disks, nothing to do")
290
    return
291

    
292
  logging.debug("Will activate disks for instance(s) %s",
293
                utils.CommaJoin(offline_disk_instances))
294

    
295
  # We submit only one job, and wait for it. Not optimal, but this puts less
296
  # load on the job queue.
297
  job = []
298
  for name in offline_disk_instances:
299
    try:
300
      inst = instances[name]
301
    except KeyError:
302
      logging.info("Can't find instance '%s', maybe it was ignored", name)
303
      continue
304

    
305
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306
      logging.info("Skipping instance '%s' because it is in a helpless state or"
307
                   " has offline secondaries", name)
308
      continue
309

    
310
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
311

    
312
  if job:
313
    job_id = cli.SendJob(job, cl=cl)
314

    
315
    try:
316
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317
    except Exception: # pylint: disable=W0703
318
      logging.exception("Error while activating disks")
319

    
320

    
321
def IsRapiResponding(hostname):
322
  """Connects to RAPI port and does a simple test.
323

324
  Connects to RAPI port of hostname and does a simple test. At this time, the
325
  test is GetVersion.
326

327
  @type hostname: string
328
  @param hostname: hostname of the node to connect to.
329
  @rtype: bool
330
  @return: Whether RAPI is working properly
331

332
  """
333
  curl_config = rapi.client.GenericCurlConfig()
334
  rapi_client = rapi.client.GanetiRapiClient(hostname,
335
                                             curl_config_fn=curl_config)
336
  try:
337
    master_version = rapi_client.GetVersion()
338
  except rapi.client.CertificateError, err:
339
    logging.warning("RAPI certificate error: %s", err)
340
    return False
341
  except rapi.client.GanetiApiError, err:
342
    logging.warning("RAPI error: %s", err)
343
    return False
344
  else:
345
    logging.debug("Reported RAPI version %s", master_version)
346
    return master_version == constants.RAPI_VERSION
347

    
348

    
349
def ParseOptions():
350
  """Parse the command line options.
351

352
  @return: (options, args) as from OptionParser.parse_args()
353

354
  """
355
  parser = OptionParser(description="Ganeti cluster watcher",
356
                        usage="%prog [-d]",
357
                        version="%%prog (ganeti) %s" %
358
                        constants.RELEASE_VERSION)
359

    
360
  parser.add_option(cli.DEBUG_OPT)
361
  parser.add_option(cli.NODEGROUP_OPT)
362
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363
                    help="Autoarchive jobs older than this age (default"
364
                          " 6 hours)")
365
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366
                    action="store_true", help="Ignore cluster pause setting")
367
  parser.add_option("--wait-children", dest="wait_children",
368
                    action="store_true", help="Wait for child processes")
369
  parser.add_option("--no-wait-children", dest="wait_children",
370
                    action="store_false", help="Don't wait for child processes")
371
  # See optparse documentation for why default values are not set by options
372
  parser.set_defaults(wait_children=True)
373
  options, args = parser.parse_args()
374
  options.job_age = cli.ParseTimespec(options.job_age)
375

    
376
  if args:
377
    parser.error("No arguments expected")
378

    
379
  return (options, args)
380

    
381

    
382
def _WriteInstanceStatus(filename, data):
383
  """Writes the per-group instance status file.
384

385
  The entries are sorted.
386

387
  @type filename: string
388
  @param filename: Path to instance status file
389
  @type data: list of tuple; (instance name as string, status as string)
390
  @param data: Instance name and status
391

392
  """
393
  logging.debug("Updating instance status file '%s' with %s instances",
394
                filename, len(data))
395

    
396
  utils.WriteFile(filename,
397
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
398
                                   sorted(data))))
399

    
400

    
401
def _UpdateInstanceStatus(filename, instances):
402
  """Writes an instance status file from L{Instance} objects.
403

404
  @type filename: string
405
  @param filename: Path to status file
406
  @type instances: list of L{Instance}
407

408
  """
409
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
410
                                  for inst in instances])
411

    
412

    
413
def _ReadInstanceStatus(filename):
414
  """Reads an instance status file.
415

416
  @type filename: string
417
  @param filename: Path to status file
418
  @rtype: tuple; (None or number, list of lists containing instance name and
419
    status)
420
  @return: File's mtime and instance status contained in the file; mtime is
421
    C{None} if file can't be read
422

423
  """
424
  logging.debug("Reading per-group instance status from '%s'", filename)
425

    
426
  statcb = utils.FileStatHelper()
427
  try:
428
    content = utils.ReadFile(filename, preread=statcb)
429
  except EnvironmentError, err:
430
    if err.errno == errno.ENOENT:
431
      logging.error("Can't read '%s', does not exist (yet)", filename)
432
    else:
433
      logging.exception("Unable to read '%s', ignoring", filename)
434
    return (None, None)
435
  else:
436
    return (statcb.st.st_mtime, [line.split(None, 1)
437
                                 for line in content.splitlines()])
438

    
439

    
440
def _MergeInstanceStatus(filename, pergroup_filename, groups):
441
  """Merges all per-group instance status files into a global one.
442

443
  @type filename: string
444
  @param filename: Path to global instance status file
445
  @type pergroup_filename: string
446
  @param pergroup_filename: Path to per-group status files, must contain "%s"
447
    to be replaced with group UUID
448
  @type groups: sequence
449
  @param groups: UUIDs of known groups
450

451
  """
452
  # Lock global status file in exclusive mode
453
  lock = utils.FileLock.Open(filename)
454
  try:
455
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
456
  except errors.LockError, err:
457
    # All per-group processes will lock and update the file. None of them
458
    # should take longer than 10 seconds (the value of
459
    # INSTANCE_STATUS_LOCK_TIMEOUT).
460
    logging.error("Can't acquire lock on instance status file '%s', not"
461
                  " updating: %s", filename, err)
462
    return
463

    
464
  logging.debug("Acquired exclusive lock on '%s'", filename)
465

    
466
  data = {}
467

    
468
  # Load instance status from all groups
469
  for group_uuid in groups:
470
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
471

    
472
    if mtime is not None:
473
      for (instance_name, status) in instdata:
474
        data.setdefault(instance_name, []).append((mtime, status))
475

    
476
  # Select last update based on file mtime
477
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
478
                for (instance_name, status) in data.items()]
479

    
480
  # Write the global status file. Don't touch file after it's been
481
  # updated--there is no lock anymore.
482
  _WriteInstanceStatus(filename, inststatus)
483

    
484

    
485
def GetLuxiClient(try_restart):
486
  """Tries to connect to the master daemon.
487

488
  @type try_restart: bool
489
  @param try_restart: Whether to attempt to restart the master daemon
490

491
  """
492
  try:
493
    return cli.GetClient()
494
  except errors.OpPrereqError, err:
495
    # this is, from cli.GetClient, a not-master case
496
    raise NotMasterError("Not on master node (%s)" % err)
497

    
498
  except luxi.NoMasterError, err:
499
    if not try_restart:
500
      raise
501

    
502
    logging.warning("Master daemon seems to be down (%s), trying to restart",
503
                    err)
504

    
505
    if not utils.EnsureDaemon(constants.MASTERD):
506
      raise errors.GenericError("Can't start the master daemon")
507

    
508
    # Retry the connection
509
    return cli.GetClient()
510

    
511

    
512
def _StartGroupChildren(cl, wait):
513
  """Starts a new instance of the watcher for every node group.
514

515
  """
516
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
517
                        for arg in sys.argv)
518

    
519
  result = cl.QueryGroups([], ["name", "uuid"], False)
520

    
521
  children = []
522

    
523
  for (idx, (name, uuid)) in enumerate(result):
524
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
525

    
526
    if idx > 0:
527
      # Let's not kill the system
528
      time.sleep(CHILD_PROCESS_DELAY)
529

    
530
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
531
                  name, uuid, args)
532

    
533
    try:
534
      # TODO: Should utils.StartDaemon be used instead?
535
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
536
    except Exception: # pylint: disable=W0703
537
      logging.exception("Failed to start child for group '%s' (%s)",
538
                        name, uuid)
539
    else:
540
      logging.debug("Started with PID %s", pid)
541
      children.append(pid)
542

    
543
  if wait:
544
    for pid in children:
545
      logging.debug("Waiting for child PID %s", pid)
546
      try:
547
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
548
      except EnvironmentError, err:
549
        result = str(err)
550

    
551
      logging.debug("Child PID %s exited with status %s", pid, result)
552

    
553

    
554
def _ArchiveJobs(cl, age):
555
  """Archives old jobs.
556

557
  """
558
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
559
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
560

    
561

    
562
def _CheckMaster(cl):
563
  """Ensures current host is master node.
564

565
  """
566
  (master, ) = cl.QueryConfigValues(["master_node"])
567
  if master != netutils.Hostname.GetSysName():
568
    raise NotMasterError("This is not the master node")
569

    
570

    
571
@rapi.client.UsesRapiClient
572
def _GlobalWatcher(opts):
573
  """Main function for global watcher.
574

575
  At the end child processes are spawned for every node group.
576

577
  """
578
  StartNodeDaemons()
579
  RunWatcherHooks()
580

    
581
  # Run node maintenance in all cases, even if master, so that old masters can
582
  # be properly cleaned up
583
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
584
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
585

    
586
  try:
587
    client = GetLuxiClient(True)
588
  except NotMasterError:
589
    # Don't proceed on non-master nodes
590
    return constants.EXIT_SUCCESS
591

    
592
  # we are on master now
593
  utils.EnsureDaemon(constants.RAPI)
594

    
595
  # If RAPI isn't responding to queries, try one restart
596
  logging.debug("Attempting to talk to remote API on %s",
597
                constants.IP4_ADDRESS_LOCALHOST)
598
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
599
    logging.warning("Couldn't get answer from remote API, restaring daemon")
600
    utils.StopDaemon(constants.RAPI)
601
    utils.EnsureDaemon(constants.RAPI)
602
    logging.debug("Second attempt to talk to remote API")
603
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
604
      logging.fatal("RAPI is not responding")
605
  logging.debug("Successfully talked to remote API")
606

    
607
  _CheckMaster(client)
608
  _ArchiveJobs(client, opts.job_age)
609

    
610
  # Spawn child processes for all node groups
611
  _StartGroupChildren(client, opts.wait_children)
612

    
613
  return constants.EXIT_SUCCESS
614

    
615

    
616
def _GetGroupData(cl, uuid):
617
  """Retrieves instances and nodes per node group.
618

619
  """
620
  job = [
621
    # Get all primary instances in group
622
    opcodes.OpQuery(what=constants.QR_INSTANCE,
623
                    fields=["name", "status", "admin_state", "snodes",
624
                            "pnode.group.uuid", "snodes.group.uuid"],
625
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
626
                    use_locking=True),
627

    
628
    # Get all nodes in group
629
    opcodes.OpQuery(what=constants.QR_NODE,
630
                    fields=["name", "bootid", "offline"],
631
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
632
                    use_locking=True),
633
    ]
634

    
635
  job_id = cl.SubmitJob(job)
636
  results = map(objects.QueryResponse.FromDict,
637
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
638
  cl.ArchiveJob(job_id)
639

    
640
  results_data = map(operator.attrgetter("data"), results)
641

    
642
  # Ensure results are tuples with two values
643
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
644

    
645
  # Extract values ignoring result status
646
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
647
                                 for values in res]
648
                                for res in results_data]
649

    
650
  secondaries = {}
651
  instances = []
652

    
653
  # Load all instances
654
  for (name, status, autostart, snodes, pnode_group_uuid,
655
       snodes_group_uuid) in raw_instances:
656
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
657
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
658
                    " groups %s", name, pnode_group_uuid,
659
                    utils.CommaJoin(snodes_group_uuid))
660
    else:
661
      instances.append(Instance(name, status, autostart, snodes))
662

    
663
      for node in snodes:
664
        secondaries.setdefault(node, set()).add(name)
665

    
666
  # Load all nodes
667
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
668
           for (name, bootid, offline) in raw_nodes]
669

    
670
  return (dict((node.name, node) for node in nodes),
671
          dict((inst.name, inst) for inst in instances))
672

    
673

    
674
def _LoadKnownGroups():
675
  """Returns a list of all node groups known by L{ssconf}.
676

677
  """
678
  groups = ssconf.SimpleStore().GetNodegroupList()
679

    
680
  result = list(line.split(None, 1)[0] for line in groups
681
                if line.strip())
682

    
683
  if not compat.all(map(utils.UUID_RE.match, result)):
684
    raise errors.GenericError("Ssconf contains invalid group UUID")
685

    
686
  return result
687

    
688

    
689
def _GroupWatcher(opts):
690
  """Main function for per-group watcher process.
691

692
  """
693
  group_uuid = opts.nodegroup.lower()
694

    
695
  if not utils.UUID_RE.match(group_uuid):
696
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
697
                              " got '%s'" %
698
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
699

    
700
  logging.info("Watcher for node group '%s'", group_uuid)
701

    
702
  known_groups = _LoadKnownGroups()
703

    
704
  # Check if node group is known
705
  if group_uuid not in known_groups:
706
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
707
                              group_uuid)
708

    
709
  # Group UUID has been verified and should not contain any dangerous characters
710
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
711
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
712

    
713
  logging.debug("Using state file %s", state_path)
714

    
715
  # Global watcher
716
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
717
  if not statefile:
718
    return constants.EXIT_FAILURE
719

    
720
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
721
  try:
722
    # Connect to master daemon
723
    client = GetLuxiClient(False)
724

    
725
    _CheckMaster(client)
726

    
727
    (nodes, instances) = _GetGroupData(client, group_uuid)
728

    
729
    # Update per-group instance status file
730
    _UpdateInstanceStatus(inst_status_path, instances.values())
731

    
732
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
733
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
734
                         known_groups)
735

    
736
    started = _CheckInstances(client, notepad, instances)
737
    _CheckDisks(client, notepad, nodes, instances, started)
738
    _VerifyDisks(client, group_uuid, nodes, instances)
739
  except Exception, err:
740
    logging.info("Not updating status file due to failure: %s", err)
741
    raise
742
  else:
743
    # Save changes for next run
744
    notepad.Save(state_path)
745

    
746
  return constants.EXIT_SUCCESS
747

    
748

    
749
def Main():
750
  """Main function.
751

752
  """
753
  (options, _) = ParseOptions()
754

    
755
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
756
                     debug=options.debug, stderr_logging=options.debug)
757

    
758
  if ShouldPause() and not options.ignore_pause:
759
    logging.debug("Pause has been set, exiting")
760
    return constants.EXIT_SUCCESS
761

    
762
  # Try to acquire global watcher lock in shared mode
763
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
764
  try:
765
    lock.Shared(blocking=False)
766
  except (EnvironmentError, errors.LockError), err:
767
    logging.error("Can't acquire lock on %s: %s",
768
                  constants.WATCHER_LOCK_FILE, err)
769
    return constants.EXIT_SUCCESS
770

    
771
  if options.nodegroup is None:
772
    fn = _GlobalWatcher
773
  else:
774
    # Per-nodegroup watcher
775
    fn = _GroupWatcher
776

    
777
  try:
778
    return fn(options)
779
  except (SystemExit, KeyboardInterrupt):
780
    raise
781
  except NotMasterError:
782
    logging.debug("Not master, exiting")
783
    return constants.EXIT_NOTMASTER
784
  except errors.ResolverError, err:
785
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
786
    return constants.EXIT_NODESETUP_ERROR
787
  except errors.JobQueueFull:
788
    logging.error("Job queue is full, can't query cluster state")
789
  except errors.JobQueueDrainError:
790
    logging.error("Job queue is drained, can't maintain cluster state")
791
  except Exception, err:
792
    logging.exception(str(err))
793
    return constants.EXIT_FAILURE
794

    
795
  return constants.EXIT_SUCCESS