Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 39bdcf76

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
import ganeti.rpc.node as rpc
46
import ganeti.rpc.errors as rpcerr
47
from ganeti import rapi
48
from ganeti import netutils
49
from ganeti import qlang
50
from ganeti import objects
51
from ganeti import ssconf
52
from ganeti import ht
53
from ganeti import pathutils
54

    
55
import ganeti.rapi.client # pylint: disable=W0611
56
from ganeti.rapi.client import UsesRapiClient
57

    
58
from ganeti.watcher import nodemaint
59
from ganeti.watcher import state
60

    
61

    
62
MAXTRIES = 5
63
BAD_STATES = compat.UniqueFrozenset([
64
  constants.INSTST_ERRORDOWN,
65
  ])
66
HELPLESS_STATES = compat.UniqueFrozenset([
67
  constants.INSTST_NODEDOWN,
68
  constants.INSTST_NODEOFFLINE,
69
  ])
70
NOTICE = "NOTICE"
71
ERROR = "ERROR"
72

    
73
#: Number of seconds to wait between starting child processes for node groups
74
CHILD_PROCESS_DELAY = 1.0
75

    
76
#: How many seconds to wait for instance status file lock
77
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
78

    
79

    
80
class NotMasterError(errors.GenericError):
81
  """Exception raised when this host is not the master."""
82

    
83

    
84
def ShouldPause():
85
  """Check whether we should pause.
86

87
  """
88
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
89

    
90

    
91
def StartNodeDaemons():
92
  """Start all the daemons that should be running on all nodes.
93

94
  """
95
  # on master or not, try to start the node daemon
96
  utils.EnsureDaemon(constants.NODED)
97
  # start confd as well. On non candidates it will be in disabled mode.
98
  if constants.ENABLE_CONFD:
99
    utils.EnsureDaemon(constants.CONFD)
100
  # start mond as well: all nodes need monitoring
101
  if constants.ENABLE_MOND:
102
    utils.EnsureDaemon(constants.MOND)
103

    
104

    
105
def RunWatcherHooks():
106
  """Run the watcher hooks.
107

108
  """
109
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
110
                             constants.HOOKS_NAME_WATCHER)
111
  if not os.path.isdir(hooks_dir):
112
    return
113

    
114
  try:
115
    results = utils.RunParts(hooks_dir)
116
  except Exception, err: # pylint: disable=W0703
117
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
118
    return
119

    
120
  for (relname, status, runresult) in results:
121
    if status == constants.RUNPARTS_SKIP:
122
      logging.debug("Watcher hook %s: skipped", relname)
123
    elif status == constants.RUNPARTS_ERR:
124
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
125
    elif status == constants.RUNPARTS_RUN:
126
      if runresult.failed:
127
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
128
                        relname, runresult.exit_code, runresult.output)
129
      else:
130
        logging.debug("Watcher hook %s: success (output: %s)", relname,
131
                      runresult.output)
132
    else:
133
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
134
                                   status)
135

    
136

    
137
class Instance(object):
138
  """Abstraction for a Virtual Machine instance.
139

140
  """
141
  def __init__(self, name, status, disks_active, snodes):
142
    self.name = name
143
    self.status = status
144
    self.disks_active = disks_active
145
    self.snodes = snodes
146

    
147
  def Restart(self, cl):
148
    """Encapsulates the start of an instance.
149

150
    """
151
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
152
    cli.SubmitOpCode(op, cl=cl)
153

    
154
  def ActivateDisks(self, cl):
155
    """Encapsulates the activation of all disks of an instance.
156

157
    """
158
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
159
    cli.SubmitOpCode(op, cl=cl)
160

    
161

    
162
class Node:
163
  """Data container representing cluster node.
164

165
  """
166
  def __init__(self, name, bootid, offline, secondaries):
167
    """Initializes this class.
168

169
    """
170
    self.name = name
171
    self.bootid = bootid
172
    self.offline = offline
173
    self.secondaries = secondaries
174

    
175

    
176
def _CleanupInstance(cl, notepad, inst):
177
  n = notepad.NumberOfCleanupAttempts(inst.name)
178

    
179
  if n > MAXTRIES:
180
    logging.warning("Not cleaning up instance '%s', retries exhausted",
181
                    inst.name)
182
    return
183

    
184
  logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
185
               inst.name)
186
  op = opcodes.OpInstanceShutdown(instance_name=inst.name)
187

    
188
  try:
189
    cli.SubmitOpCode(op, cl=cl)
190
    if notepad.NumberOfCleanupAttempts(inst.name):
191
      notepad.RemoveInstance(inst.name)
192
  except Exception: # pylint: disable=W0703
193
    logging.exception("Error while cleaning up instance '%s'", inst.name)
194
    notepad.RecordCleanupAttempt(inst.name)
195

    
196

    
197
def _CheckInstances(cl, notepad, instances):
198
  """Make a pass over the list of instances, restarting downed ones.
199

200
  """
201
  notepad.MaintainInstanceList(instances.keys())
202

    
203
  started = set()
204

    
205
  for inst in instances.values():
206
    if inst.status == constants.INSTST_USERDOWN:
207
      _CleanupInstance(cl, notepad, inst)
208
    elif inst.status in BAD_STATES:
209
      n = notepad.NumberOfRestartAttempts(inst.name)
210

    
211
      if n > MAXTRIES:
212
        logging.warning("Not restarting instance '%s', retries exhausted",
213
                        inst.name)
214
        continue
215

    
216
      if n == MAXTRIES:
217
        notepad.RecordRestartAttempt(inst.name)
218
        logging.error("Could not restart instance '%s' after %s attempts,"
219
                      " giving up", inst.name, MAXTRIES)
220
        continue
221

    
222
      try:
223
        logging.info("Restarting instance '%s' (attempt #%s)",
224
                     inst.name, n + 1)
225
        inst.Restart(cl)
226
      except Exception: # pylint: disable=W0703
227
        logging.exception("Error while restarting instance '%s'", inst.name)
228
      else:
229
        started.add(inst.name)
230

    
231
      notepad.RecordRestartAttempt(inst.name)
232

    
233
    else:
234
      if notepad.NumberOfRestartAttempts(inst.name):
235
        notepad.RemoveInstance(inst.name)
236
        if inst.status not in HELPLESS_STATES:
237
          logging.info("Restart of instance '%s' succeeded", inst.name)
238

    
239
  return started
240

    
241

    
242
def _CheckDisks(cl, notepad, nodes, instances, started):
243
  """Check all nodes for restarted ones.
244

245
  """
246
  check_nodes = []
247

    
248
  for node in nodes.values():
249
    old = notepad.GetNodeBootID(node.name)
250
    if not node.bootid:
251
      # Bad node, not returning a boot id
252
      if not node.offline:
253
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
254
                      node.name)
255
      continue
256

    
257
    if old != node.bootid:
258
      # Node's boot ID has changed, probably through a reboot
259
      check_nodes.append(node)
260

    
261
  if check_nodes:
262
    # Activate disks for all instances with any of the checked nodes as a
263
    # secondary node.
264
    for node in check_nodes:
265
      for instance_name in node.secondaries:
266
        try:
267
          inst = instances[instance_name]
268
        except KeyError:
269
          logging.info("Can't find instance '%s', maybe it was ignored",
270
                       instance_name)
271
          continue
272

    
273
        if not inst.disks_active:
274
          logging.info("Skipping disk activation for instance with not"
275
                       " activated disks '%s'", inst.name)
276
          continue
277

    
278
        if inst.name in started:
279
          # we already tried to start the instance, which should have
280
          # activated its drives (if they can be at all)
281
          logging.debug("Skipping disk activation for instance '%s' as"
282
                        " it was already started", inst.name)
283
          continue
284

    
285
        try:
286
          logging.info("Activating disks for instance '%s'", inst.name)
287
          inst.ActivateDisks(cl)
288
        except Exception: # pylint: disable=W0703
289
          logging.exception("Error while activating disks for instance '%s'",
290
                            inst.name)
291

    
292
    # Keep changed boot IDs
293
    for node in check_nodes:
294
      notepad.SetNodeBootID(node.name, node.bootid)
295

    
296

    
297
def _CheckForOfflineNodes(nodes, instance):
298
  """Checks if given instances has any secondary in offline status.
299

300
  @param instance: The instance object
301
  @return: True if any of the secondary is offline, False otherwise
302

303
  """
304
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
305

    
306

    
307
def _VerifyDisks(cl, uuid, nodes, instances):
308
  """Run a per-group "gnt-cluster verify-disks".
309

310
  """
311
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
312
  ((_, offline_disk_instances, _), ) = \
313
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
314
  cl.ArchiveJob(job_id)
315

    
316
  if not offline_disk_instances:
317
    # nothing to do
318
    logging.debug("Verify-disks reported no offline disks, nothing to do")
319
    return
320

    
321
  logging.debug("Will activate disks for instance(s) %s",
322
                utils.CommaJoin(offline_disk_instances))
323

    
324
  # We submit only one job, and wait for it. Not optimal, but this puts less
325
  # load on the job queue.
326
  job = []
327
  for name in offline_disk_instances:
328
    try:
329
      inst = instances[name]
330
    except KeyError:
331
      logging.info("Can't find instance '%s', maybe it was ignored", name)
332
      continue
333

    
334
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
335
      logging.info("Skipping instance '%s' because it is in a helpless state"
336
                   " or has offline secondaries", name)
337
      continue
338

    
339
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
340

    
341
  if job:
342
    job_id = cli.SendJob(job, cl=cl)
343

    
344
    try:
345
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
346
    except Exception: # pylint: disable=W0703
347
      logging.exception("Error while activating disks")
348

    
349

    
350
def IsRapiResponding(hostname):
351
  """Connects to RAPI port and does a simple test.
352

353
  Connects to RAPI port of hostname and does a simple test. At this time, the
354
  test is GetVersion.
355

356
  @type hostname: string
357
  @param hostname: hostname of the node to connect to.
358
  @rtype: bool
359
  @return: Whether RAPI is working properly
360

361
  """
362
  curl_config = rapi.client.GenericCurlConfig()
363
  rapi_client = rapi.client.GanetiRapiClient(hostname,
364
                                             curl_config_fn=curl_config)
365
  try:
366
    master_version = rapi_client.GetVersion()
367
  except rapi.client.CertificateError, err:
368
    logging.warning("RAPI certificate error: %s", err)
369
    return False
370
  except rapi.client.GanetiApiError, err:
371
    logging.warning("RAPI error: %s", err)
372
    return False
373
  else:
374
    logging.debug("Reported RAPI version %s", master_version)
375
    return master_version == constants.RAPI_VERSION
376

    
377

    
378
def ParseOptions():
379
  """Parse the command line options.
380

381
  @return: (options, args) as from OptionParser.parse_args()
382

383
  """
384
  parser = OptionParser(description="Ganeti cluster watcher",
385
                        usage="%prog [-d]",
386
                        version="%%prog (ganeti) %s" %
387
                        constants.RELEASE_VERSION)
388

    
389
  parser.add_option(cli.DEBUG_OPT)
390
  parser.add_option(cli.NODEGROUP_OPT)
391
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
392
                    help="Autoarchive jobs older than this age (default"
393
                          " 6 hours)")
394
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
395
                    action="store_true", help="Ignore cluster pause setting")
396
  parser.add_option("--wait-children", dest="wait_children",
397
                    action="store_true", help="Wait for child processes")
398
  parser.add_option("--no-wait-children", dest="wait_children",
399
                    action="store_false",
400
                    help="Don't wait for child processes")
401
  # See optparse documentation for why default values are not set by options
402
  parser.set_defaults(wait_children=True)
403
  options, args = parser.parse_args()
404
  options.job_age = cli.ParseTimespec(options.job_age)
405

    
406
  if args:
407
    parser.error("No arguments expected")
408

    
409
  return (options, args)
410

    
411

    
412
def _WriteInstanceStatus(filename, data):
413
  """Writes the per-group instance status file.
414

415
  The entries are sorted.
416

417
  @type filename: string
418
  @param filename: Path to instance status file
419
  @type data: list of tuple; (instance name as string, status as string)
420
  @param data: Instance name and status
421

422
  """
423
  logging.debug("Updating instance status file '%s' with %s instances",
424
                filename, len(data))
425

    
426
  utils.WriteFile(filename,
427
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
428
                                   sorted(data))))
429

    
430

    
431
def _UpdateInstanceStatus(filename, instances):
432
  """Writes an instance status file from L{Instance} objects.
433

434
  @type filename: string
435
  @param filename: Path to status file
436
  @type instances: list of L{Instance}
437

438
  """
439
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
440
                                  for inst in instances])
441

    
442

    
443
def _ReadInstanceStatus(filename):
444
  """Reads an instance status file.
445

446
  @type filename: string
447
  @param filename: Path to status file
448
  @rtype: tuple; (None or number, list of lists containing instance name and
449
    status)
450
  @return: File's mtime and instance status contained in the file; mtime is
451
    C{None} if file can't be read
452

453
  """
454
  logging.debug("Reading per-group instance status from '%s'", filename)
455

    
456
  statcb = utils.FileStatHelper()
457
  try:
458
    content = utils.ReadFile(filename, preread=statcb)
459
  except EnvironmentError, err:
460
    if err.errno == errno.ENOENT:
461
      logging.error("Can't read '%s', does not exist (yet)", filename)
462
    else:
463
      logging.exception("Unable to read '%s', ignoring", filename)
464
    return (None, None)
465
  else:
466
    return (statcb.st.st_mtime, [line.split(None, 1)
467
                                 for line in content.splitlines()])
468

    
469

    
470
def _MergeInstanceStatus(filename, pergroup_filename, groups):
471
  """Merges all per-group instance status files into a global one.
472

473
  @type filename: string
474
  @param filename: Path to global instance status file
475
  @type pergroup_filename: string
476
  @param pergroup_filename: Path to per-group status files, must contain "%s"
477
    to be replaced with group UUID
478
  @type groups: sequence
479
  @param groups: UUIDs of known groups
480

481
  """
482
  # Lock global status file in exclusive mode
483
  lock = utils.FileLock.Open(filename)
484
  try:
485
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
486
  except errors.LockError, err:
487
    # All per-group processes will lock and update the file. None of them
488
    # should take longer than 10 seconds (the value of
489
    # INSTANCE_STATUS_LOCK_TIMEOUT).
490
    logging.error("Can't acquire lock on instance status file '%s', not"
491
                  " updating: %s", filename, err)
492
    return
493

    
494
  logging.debug("Acquired exclusive lock on '%s'", filename)
495

    
496
  data = {}
497

    
498
  # Load instance status from all groups
499
  for group_uuid in groups:
500
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
501

    
502
    if mtime is not None:
503
      for (instance_name, status) in instdata:
504
        data.setdefault(instance_name, []).append((mtime, status))
505

    
506
  # Select last update based on file mtime
507
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
508
                for (instance_name, status) in data.items()]
509

    
510
  # Write the global status file. Don't touch file after it's been
511
  # updated--there is no lock anymore.
512
  _WriteInstanceStatus(filename, inststatus)
513

    
514

    
515
def GetLuxiClient(try_restart):
516
  """Tries to connect to the luxi daemon.
517

518
  @type try_restart: bool
519
  @param try_restart: Whether to attempt to restart the master daemon
520

521
  """
522
  try:
523
    return cli.GetClient()
524
  except errors.OpPrereqError, err:
525
    # this is, from cli.GetClient, a not-master case
526
    raise NotMasterError("Not on master node (%s)" % err)
527

    
528
  except rpcerr.NoMasterError, err:
529
    if not try_restart:
530
      raise
531

    
532
    logging.warning("Luxi daemon seems to be down (%s), trying to restart",
533
                    err)
534

    
535
    if not utils.EnsureDaemon(constants.LUXID):
536
      raise errors.GenericError("Can't start the master daemon")
537

    
538
    # Retry the connection
539
    return cli.GetClient()
540

    
541

    
542
def _StartGroupChildren(cl, wait):
543
  """Starts a new instance of the watcher for every node group.
544

545
  """
546
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
547
                        for arg in sys.argv)
548

    
549
  result = cl.QueryGroups([], ["name", "uuid"], False)
550

    
551
  children = []
552

    
553
  for (idx, (name, uuid)) in enumerate(result):
554
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
555

    
556
    if idx > 0:
557
      # Let's not kill the system
558
      time.sleep(CHILD_PROCESS_DELAY)
559

    
560
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
561
                  name, uuid, args)
562

    
563
    try:
564
      # TODO: Should utils.StartDaemon be used instead?
565
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
566
    except Exception: # pylint: disable=W0703
567
      logging.exception("Failed to start child for group '%s' (%s)",
568
                        name, uuid)
569
    else:
570
      logging.debug("Started with PID %s", pid)
571
      children.append(pid)
572

    
573
  if wait:
574
    for pid in children:
575
      logging.debug("Waiting for child PID %s", pid)
576
      try:
577
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
578
      except EnvironmentError, err:
579
        result = str(err)
580

    
581
      logging.debug("Child PID %s exited with status %s", pid, result)
582

    
583

    
584
def _ArchiveJobs(cl, age):
585
  """Archives old jobs.
586

587
  """
588
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
589
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
590

    
591

    
592
def _CheckMaster(cl):
593
  """Ensures current host is master node.
594

595
  """
596
  (master, ) = cl.QueryConfigValues(["master_node"])
597
  if master != netutils.Hostname.GetSysName():
598
    raise NotMasterError("This is not the master node")
599

    
600

    
601
@UsesRapiClient
602
def _GlobalWatcher(opts):
603
  """Main function for global watcher.
604

605
  At the end child processes are spawned for every node group.
606

607
  """
608
  StartNodeDaemons()
609
  RunWatcherHooks()
610

    
611
  # Run node maintenance in all cases, even if master, so that old masters can
612
  # be properly cleaned up
613
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
614
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
615

    
616
  try:
617
    client = GetLuxiClient(True)
618
  except NotMasterError:
619
    # Don't proceed on non-master nodes
620
    return constants.EXIT_SUCCESS
621

    
622
  # we are on master now
623
  utils.EnsureDaemon(constants.RAPI)
624

    
625
  # If RAPI isn't responding to queries, try one restart
626
  logging.debug("Attempting to talk to remote API on %s",
627
                constants.IP4_ADDRESS_LOCALHOST)
628
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
629
    logging.warning("Couldn't get answer from remote API, restaring daemon")
630
    utils.StopDaemon(constants.RAPI)
631
    utils.EnsureDaemon(constants.RAPI)
632
    logging.debug("Second attempt to talk to remote API")
633
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
634
      logging.fatal("RAPI is not responding")
635
  logging.debug("Successfully talked to remote API")
636

    
637
  _CheckMaster(client)
638
  _ArchiveJobs(client, opts.job_age)
639

    
640
  # Spawn child processes for all node groups
641
  _StartGroupChildren(client, opts.wait_children)
642

    
643
  return constants.EXIT_SUCCESS
644

    
645

    
646
def _GetGroupData(qcl, uuid):
647
  """Retrieves instances and nodes per node group.
648

649
  """
650
  queries = [
651
      (constants.QR_INSTANCE,
652
       ["name", "status", "disks_active", "snodes",
653
        "pnode.group.uuid", "snodes.group.uuid"],
654
       [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
655
      (constants.QR_NODE,
656
       ["name", "bootid", "offline"],
657
       [qlang.OP_EQUAL, "group.uuid", uuid]),
658
      ]
659

    
660
  results = []
661
  for what, fields, qfilter in queries:
662
    results.append(qcl.Query(what, fields, qfilter))
663

    
664
  results_data = map(operator.attrgetter("data"), results)
665

    
666
  # Ensure results are tuples with two values
667
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
668

    
669
  # Extract values ignoring result status
670
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
671
                                 for values in res]
672
                                for res in results_data]
673

    
674
  secondaries = {}
675
  instances = []
676

    
677
  # Load all instances
678
  for (name, status, disks_active, snodes, pnode_group_uuid,
679
       snodes_group_uuid) in raw_instances:
680
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
681
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
682
                    " groups %s", name, pnode_group_uuid,
683
                    utils.CommaJoin(snodes_group_uuid))
684
    else:
685
      instances.append(Instance(name, status, disks_active, snodes))
686

    
687
      for node in snodes:
688
        secondaries.setdefault(node, set()).add(name)
689

    
690
  # Load all nodes
691
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
692
           for (name, bootid, offline) in raw_nodes]
693

    
694
  return (dict((node.name, node) for node in nodes),
695
          dict((inst.name, inst) for inst in instances))
696

    
697

    
698
def _LoadKnownGroups():
699
  """Returns a list of all node groups known by L{ssconf}.
700

701
  """
702
  groups = ssconf.SimpleStore().GetNodegroupList()
703

    
704
  result = list(line.split(None, 1)[0] for line in groups
705
                if line.strip())
706

    
707
  if not compat.all(map(utils.UUID_RE.match, result)):
708
    raise errors.GenericError("Ssconf contains invalid group UUID")
709

    
710
  return result
711

    
712

    
713
def _GroupWatcher(opts):
714
  """Main function for per-group watcher process.
715

716
  """
717
  group_uuid = opts.nodegroup.lower()
718

    
719
  if not utils.UUID_RE.match(group_uuid):
720
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
721
                              " got '%s'" %
722
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
723

    
724
  logging.info("Watcher for node group '%s'", group_uuid)
725

    
726
  known_groups = _LoadKnownGroups()
727

    
728
  # Check if node group is known
729
  if group_uuid not in known_groups:
730
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
731
                              group_uuid)
732

    
733
  # Group UUID has been verified and should not contain any dangerous
734
  # characters
735
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
736
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
737

    
738
  logging.debug("Using state file %s", state_path)
739

    
740
  # Global watcher
741
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
742
  if not statefile:
743
    return constants.EXIT_FAILURE
744

    
745
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
746
  try:
747
    # Connect to master daemon
748
    client = GetLuxiClient(False)
749

    
750
    _CheckMaster(client)
751

    
752
    (nodes, instances) = _GetGroupData(client, group_uuid)
753

    
754
    # Update per-group instance status file
755
    _UpdateInstanceStatus(inst_status_path, instances.values())
756

    
757
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
758
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
759
                         known_groups)
760

    
761
    started = _CheckInstances(client, notepad, instances)
762
    _CheckDisks(client, notepad, nodes, instances, started)
763
    _VerifyDisks(client, group_uuid, nodes, instances)
764
  except Exception, err:
765
    logging.info("Not updating status file due to failure: %s", err)
766
    raise
767
  else:
768
    # Save changes for next run
769
    notepad.Save(state_path)
770

    
771
  return constants.EXIT_SUCCESS
772

    
773

    
774
def Main():
775
  """Main function.
776

777
  """
778
  (options, _) = ParseOptions()
779

    
780
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
781
                     debug=options.debug, stderr_logging=options.debug)
782

    
783
  if ShouldPause() and not options.ignore_pause:
784
    logging.debug("Pause has been set, exiting")
785
    return constants.EXIT_SUCCESS
786

    
787
  # Try to acquire global watcher lock in shared mode
788
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
789
  try:
790
    lock.Shared(blocking=False)
791
  except (EnvironmentError, errors.LockError), err:
792
    logging.error("Can't acquire lock on %s: %s",
793
                  pathutils.WATCHER_LOCK_FILE, err)
794
    return constants.EXIT_SUCCESS
795

    
796
  if options.nodegroup is None:
797
    fn = _GlobalWatcher
798
  else:
799
    # Per-nodegroup watcher
800
    fn = _GroupWatcher
801

    
802
  try:
803
    return fn(options)
804
  except (SystemExit, KeyboardInterrupt):
805
    raise
806
  except NotMasterError:
807
    logging.debug("Not master, exiting")
808
    return constants.EXIT_NOTMASTER
809
  except errors.ResolverError, err:
810
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
811
    return constants.EXIT_NODESETUP_ERROR
812
  except errors.JobQueueFull:
813
    logging.error("Job queue is full, can't query cluster state")
814
  except errors.JobQueueDrainError:
815
    logging.error("Job queue is drained, can't maintain cluster state")
816
  except Exception, err:
817
    logging.exception(str(err))
818
    return constants.EXIT_FAILURE
819

    
820
  return constants.EXIT_SUCCESS