Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 20fb929a

History | View | Annotate | Download (24.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104
def RunWatcherHooks():
105
  """Run the watcher hooks.
106

107
  """
108
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109
                             constants.HOOKS_NAME_WATCHER)
110
  if not os.path.isdir(hooks_dir):
111
    return
112

    
113
  try:
114
    results = utils.RunParts(hooks_dir)
115
  except Exception, err: # pylint: disable=W0703
116
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
117
    return
118

    
119
  for (relname, status, runresult) in results:
120
    if status == constants.RUNPARTS_SKIP:
121
      logging.debug("Watcher hook %s: skipped", relname)
122
    elif status == constants.RUNPARTS_ERR:
123
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124
    elif status == constants.RUNPARTS_RUN:
125
      if runresult.failed:
126
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127
                        relname, runresult.exit_code, runresult.output)
128
      else:
129
        logging.debug("Watcher hook %s: success (output: %s)", relname,
130
                      runresult.output)
131
    else:
132
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133
                                   status)
134

    
135

    
136
class Instance(object):
137
  """Abstraction for a Virtual Machine instance.
138

139
  """
140
  def __init__(self, name, status, disks_active, snodes):
141
    self.name = name
142
    self.status = status
143
    self.disks_active = disks_active
144
    self.snodes = snodes
145

    
146
  def Restart(self, cl):
147
    """Encapsulates the start of an instance.
148

149
    """
150
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151
    cli.SubmitOpCode(op, cl=cl)
152

    
153
  def ActivateDisks(self, cl):
154
    """Encapsulates the activation of all disks of an instance.
155

156
    """
157
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158
    cli.SubmitOpCode(op, cl=cl)
159

    
160

    
161
class Node:
162
  """Data container representing cluster node.
163

164
  """
165
  def __init__(self, name, bootid, offline, secondaries):
166
    """Initializes this class.
167

168
    """
169
    self.name = name
170
    self.bootid = bootid
171
    self.offline = offline
172
    self.secondaries = secondaries
173

    
174

    
175
def _CleanupInstance(cl, notepad, inst):
176
  n = notepad.NumberOfCleanupAttempts(inst.name)
177

    
178
  if n > MAXTRIES:
179
    logging.warning("Not cleaning up instance '%s', retries exhausted",
180
                    inst.name)
181
    return
182

    
183
  logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
184
               inst.name)
185
  op = opcodes.OpInstanceShutdown(instance_name=inst.name)
186

    
187
  try:
188
    cli.SubmitOpCode(op, cl=cl)
189
    if notepad.NumberOfCleanupAttempts(inst.name):
190
      notepad.RemoveInstance(inst.name)
191
  except Exception: # pylint: disable=W0703
192
    logging.exception("Error while cleaning up instance '%s'", inst.name)
193
    notepad.RecordCleanupAttempt(inst.name)
194

    
195

    
196
def _CheckInstances(cl, notepad, instances):
197
  """Make a pass over the list of instances, restarting downed ones.
198

199
  """
200
  notepad.MaintainInstanceList(instances.keys())
201

    
202
  started = set()
203

    
204
  for inst in instances.values():
205
    if inst.status == constants.INSTST_USERDOWN:
206
      _CleanupInstance(cl, notepad, inst)
207
    elif inst.status in BAD_STATES:
208
      n = notepad.NumberOfRestartAttempts(inst.name)
209

    
210
      if n > MAXTRIES:
211
        logging.warning("Not restarting instance '%s', retries exhausted",
212
                        inst.name)
213
        continue
214

    
215
      if n == MAXTRIES:
216
        notepad.RecordRestartAttempt(inst.name)
217
        logging.error("Could not restart instance '%s' after %s attempts,"
218
                      " giving up", inst.name, MAXTRIES)
219
        continue
220

    
221
      try:
222
        logging.info("Restarting instance '%s' (attempt #%s)",
223
                     inst.name, n + 1)
224
        inst.Restart(cl)
225
      except Exception: # pylint: disable=W0703
226
        logging.exception("Error while restarting instance '%s'", inst.name)
227
      else:
228
        started.add(inst.name)
229

    
230
      notepad.RecordRestartAttempt(inst.name)
231

    
232
    else:
233
      if notepad.NumberOfRestartAttempts(inst.name):
234
        notepad.RemoveInstance(inst.name)
235
        if inst.status not in HELPLESS_STATES:
236
          logging.info("Restart of instance '%s' succeeded", inst.name)
237

    
238
  return started
239

    
240

    
241
def _CheckDisks(cl, notepad, nodes, instances, started):
242
  """Check all nodes for restarted ones.
243

244
  """
245
  check_nodes = []
246

    
247
  for node in nodes.values():
248
    old = notepad.GetNodeBootID(node.name)
249
    if not node.bootid:
250
      # Bad node, not returning a boot id
251
      if not node.offline:
252
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
253
                      node.name)
254
      continue
255

    
256
    if old != node.bootid:
257
      # Node's boot ID has changed, probably through a reboot
258
      check_nodes.append(node)
259

    
260
  if check_nodes:
261
    # Activate disks for all instances with any of the checked nodes as a
262
    # secondary node.
263
    for node in check_nodes:
264
      for instance_name in node.secondaries:
265
        try:
266
          inst = instances[instance_name]
267
        except KeyError:
268
          logging.info("Can't find instance '%s', maybe it was ignored",
269
                       instance_name)
270
          continue
271

    
272
        if not inst.disks_active:
273
          logging.info("Skipping disk activation for instance with not"
274
                       " activated disks '%s'", inst.name)
275
          continue
276

    
277
        if inst.name in started:
278
          # we already tried to start the instance, which should have
279
          # activated its drives (if they can be at all)
280
          logging.debug("Skipping disk activation for instance '%s' as"
281
                        " it was already started", inst.name)
282
          continue
283

    
284
        try:
285
          logging.info("Activating disks for instance '%s'", inst.name)
286
          inst.ActivateDisks(cl)
287
        except Exception: # pylint: disable=W0703
288
          logging.exception("Error while activating disks for instance '%s'",
289
                            inst.name)
290

    
291
    # Keep changed boot IDs
292
    for node in check_nodes:
293
      notepad.SetNodeBootID(node.name, node.bootid)
294

    
295

    
296
def _CheckForOfflineNodes(nodes, instance):
297
  """Checks if given instances has any secondary in offline status.
298

299
  @param instance: The instance object
300
  @return: True if any of the secondary is offline, False otherwise
301

302
  """
303
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
304

    
305

    
306
def _VerifyDisks(cl, uuid, nodes, instances):
307
  """Run a per-group "gnt-cluster verify-disks".
308

309
  """
310
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
311
  ((_, offline_disk_instances, _), ) = \
312
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
313
  cl.ArchiveJob(job_id)
314

    
315
  if not offline_disk_instances:
316
    # nothing to do
317
    logging.debug("Verify-disks reported no offline disks, nothing to do")
318
    return
319

    
320
  logging.debug("Will activate disks for instance(s) %s",
321
                utils.CommaJoin(offline_disk_instances))
322

    
323
  # We submit only one job, and wait for it. Not optimal, but this puts less
324
  # load on the job queue.
325
  job = []
326
  for name in offline_disk_instances:
327
    try:
328
      inst = instances[name]
329
    except KeyError:
330
      logging.info("Can't find instance '%s', maybe it was ignored", name)
331
      continue
332

    
333
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
334
      logging.info("Skipping instance '%s' because it is in a helpless state"
335
                   " or has offline secondaries", name)
336
      continue
337

    
338
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
339

    
340
  if job:
341
    job_id = cli.SendJob(job, cl=cl)
342

    
343
    try:
344
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
345
    except Exception: # pylint: disable=W0703
346
      logging.exception("Error while activating disks")
347

    
348

    
349
def IsRapiResponding(hostname):
350
  """Connects to RAPI port and does a simple test.
351

352
  Connects to RAPI port of hostname and does a simple test. At this time, the
353
  test is GetVersion.
354

355
  @type hostname: string
356
  @param hostname: hostname of the node to connect to.
357
  @rtype: bool
358
  @return: Whether RAPI is working properly
359

360
  """
361
  curl_config = rapi.client.GenericCurlConfig()
362
  rapi_client = rapi.client.GanetiRapiClient(hostname,
363
                                             curl_config_fn=curl_config)
364
  try:
365
    master_version = rapi_client.GetVersion()
366
  except rapi.client.CertificateError, err:
367
    logging.warning("RAPI certificate error: %s", err)
368
    return False
369
  except rapi.client.GanetiApiError, err:
370
    logging.warning("RAPI error: %s", err)
371
    return False
372
  else:
373
    logging.debug("Reported RAPI version %s", master_version)
374
    return master_version == constants.RAPI_VERSION
375

    
376

    
377
def ParseOptions():
378
  """Parse the command line options.
379

380
  @return: (options, args) as from OptionParser.parse_args()
381

382
  """
383
  parser = OptionParser(description="Ganeti cluster watcher",
384
                        usage="%prog [-d]",
385
                        version="%%prog (ganeti) %s" %
386
                        constants.RELEASE_VERSION)
387

    
388
  parser.add_option(cli.DEBUG_OPT)
389
  parser.add_option(cli.NODEGROUP_OPT)
390
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
391
                    help="Autoarchive jobs older than this age (default"
392
                          " 6 hours)")
393
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
394
                    action="store_true", help="Ignore cluster pause setting")
395
  parser.add_option("--wait-children", dest="wait_children",
396
                    action="store_true", help="Wait for child processes")
397
  parser.add_option("--no-wait-children", dest="wait_children",
398
                    action="store_false",
399
                    help="Don't wait for child processes")
400
  # See optparse documentation for why default values are not set by options
401
  parser.set_defaults(wait_children=True)
402
  options, args = parser.parse_args()
403
  options.job_age = cli.ParseTimespec(options.job_age)
404

    
405
  if args:
406
    parser.error("No arguments expected")
407

    
408
  return (options, args)
409

    
410

    
411
def _WriteInstanceStatus(filename, data):
412
  """Writes the per-group instance status file.
413

414
  The entries are sorted.
415

416
  @type filename: string
417
  @param filename: Path to instance status file
418
  @type data: list of tuple; (instance name as string, status as string)
419
  @param data: Instance name and status
420

421
  """
422
  logging.debug("Updating instance status file '%s' with %s instances",
423
                filename, len(data))
424

    
425
  utils.WriteFile(filename,
426
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
427
                                   sorted(data))))
428

    
429

    
430
def _UpdateInstanceStatus(filename, instances):
431
  """Writes an instance status file from L{Instance} objects.
432

433
  @type filename: string
434
  @param filename: Path to status file
435
  @type instances: list of L{Instance}
436

437
  """
438
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
439
                                  for inst in instances])
440

    
441

    
442
def _ReadInstanceStatus(filename):
443
  """Reads an instance status file.
444

445
  @type filename: string
446
  @param filename: Path to status file
447
  @rtype: tuple; (None or number, list of lists containing instance name and
448
    status)
449
  @return: File's mtime and instance status contained in the file; mtime is
450
    C{None} if file can't be read
451

452
  """
453
  logging.debug("Reading per-group instance status from '%s'", filename)
454

    
455
  statcb = utils.FileStatHelper()
456
  try:
457
    content = utils.ReadFile(filename, preread=statcb)
458
  except EnvironmentError, err:
459
    if err.errno == errno.ENOENT:
460
      logging.error("Can't read '%s', does not exist (yet)", filename)
461
    else:
462
      logging.exception("Unable to read '%s', ignoring", filename)
463
    return (None, None)
464
  else:
465
    return (statcb.st.st_mtime, [line.split(None, 1)
466
                                 for line in content.splitlines()])
467

    
468

    
469
def _MergeInstanceStatus(filename, pergroup_filename, groups):
470
  """Merges all per-group instance status files into a global one.
471

472
  @type filename: string
473
  @param filename: Path to global instance status file
474
  @type pergroup_filename: string
475
  @param pergroup_filename: Path to per-group status files, must contain "%s"
476
    to be replaced with group UUID
477
  @type groups: sequence
478
  @param groups: UUIDs of known groups
479

480
  """
481
  # Lock global status file in exclusive mode
482
  lock = utils.FileLock.Open(filename)
483
  try:
484
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
485
  except errors.LockError, err:
486
    # All per-group processes will lock and update the file. None of them
487
    # should take longer than 10 seconds (the value of
488
    # INSTANCE_STATUS_LOCK_TIMEOUT).
489
    logging.error("Can't acquire lock on instance status file '%s', not"
490
                  " updating: %s", filename, err)
491
    return
492

    
493
  logging.debug("Acquired exclusive lock on '%s'", filename)
494

    
495
  data = {}
496

    
497
  # Load instance status from all groups
498
  for group_uuid in groups:
499
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
500

    
501
    if mtime is not None:
502
      for (instance_name, status) in instdata:
503
        data.setdefault(instance_name, []).append((mtime, status))
504

    
505
  # Select last update based on file mtime
506
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
507
                for (instance_name, status) in data.items()]
508

    
509
  # Write the global status file. Don't touch file after it's been
510
  # updated--there is no lock anymore.
511
  _WriteInstanceStatus(filename, inststatus)
512

    
513

    
514
def GetLuxiClient(try_restart, query=False):
515
  """Tries to connect to the master daemon.
516

517
  @type try_restart: bool
518
  @param try_restart: Whether to attempt to restart the master daemon
519

520
  """
521
  try:
522
    return cli.GetClient(query=query)
523
  except errors.OpPrereqError, err:
524
    # this is, from cli.GetClient, a not-master case
525
    raise NotMasterError("Not on master node (%s)" % err)
526

    
527
  except luxi.NoMasterError, err:
528
    if not try_restart:
529
      raise
530

    
531
    logging.warning("Master daemon seems to be down (%s), trying to restart",
532
                    err)
533

    
534
    if not utils.EnsureDaemon(constants.MASTERD):
535
      raise errors.GenericError("Can't start the master daemon")
536

    
537
    # Retry the connection
538
    return cli.GetClient(query=query)
539

    
540

    
541
def _StartGroupChildren(cl, wait):
542
  """Starts a new instance of the watcher for every node group.
543

544
  """
545
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
546
                        for arg in sys.argv)
547

    
548
  result = cl.QueryGroups([], ["name", "uuid"], False)
549

    
550
  children = []
551

    
552
  for (idx, (name, uuid)) in enumerate(result):
553
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
554

    
555
    if idx > 0:
556
      # Let's not kill the system
557
      time.sleep(CHILD_PROCESS_DELAY)
558

    
559
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
560
                  name, uuid, args)
561

    
562
    try:
563
      # TODO: Should utils.StartDaemon be used instead?
564
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
565
    except Exception: # pylint: disable=W0703
566
      logging.exception("Failed to start child for group '%s' (%s)",
567
                        name, uuid)
568
    else:
569
      logging.debug("Started with PID %s", pid)
570
      children.append(pid)
571

    
572
  if wait:
573
    for pid in children:
574
      logging.debug("Waiting for child PID %s", pid)
575
      try:
576
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
577
      except EnvironmentError, err:
578
        result = str(err)
579

    
580
      logging.debug("Child PID %s exited with status %s", pid, result)
581

    
582

    
583
def _ArchiveJobs(cl, age):
584
  """Archives old jobs.
585

586
  """
587
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
588
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
589

    
590

    
591
def _CheckMaster(cl):
592
  """Ensures current host is master node.
593

594
  """
595
  (master, ) = cl.QueryConfigValues(["master_node"])
596
  if master != netutils.Hostname.GetSysName():
597
    raise NotMasterError("This is not the master node")
598

    
599

    
600
@UsesRapiClient
601
def _GlobalWatcher(opts):
602
  """Main function for global watcher.
603

604
  At the end child processes are spawned for every node group.
605

606
  """
607
  StartNodeDaemons()
608
  RunWatcherHooks()
609

    
610
  # Run node maintenance in all cases, even if master, so that old masters can
611
  # be properly cleaned up
612
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
613
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
614

    
615
  try:
616
    client = GetLuxiClient(True)
617
    query_client = GetLuxiClient(True, query=True)
618
  except NotMasterError:
619
    # Don't proceed on non-master nodes
620
    return constants.EXIT_SUCCESS
621

    
622
  # we are on master now
623
  utils.EnsureDaemon(constants.RAPI)
624

    
625
  # If RAPI isn't responding to queries, try one restart
626
  logging.debug("Attempting to talk to remote API on %s",
627
                constants.IP4_ADDRESS_LOCALHOST)
628
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
629
    logging.warning("Couldn't get answer from remote API, restaring daemon")
630
    utils.StopDaemon(constants.RAPI)
631
    utils.EnsureDaemon(constants.RAPI)
632
    logging.debug("Second attempt to talk to remote API")
633
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
634
      logging.fatal("RAPI is not responding")
635
  logging.debug("Successfully talked to remote API")
636

    
637
  _CheckMaster(client)
638
  _ArchiveJobs(client, opts.job_age)
639

    
640
  # Spawn child processes for all node groups
641
  _StartGroupChildren(query_client, opts.wait_children)
642

    
643
  return constants.EXIT_SUCCESS
644

    
645

    
646
def _GetGroupData(qcl, uuid):
647
  """Retrieves instances and nodes per node group.
648

649
  """
650
  queries = [
651
      (constants.QR_INSTANCE,
652
       ["name", "status", "disks_active", "snodes",
653
        "pnode.group.uuid", "snodes.group.uuid"],
654
       [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
655
      (constants.QR_NODE,
656
       ["name", "bootid", "offline"],
657
       [qlang.OP_EQUAL, "group.uuid", uuid]),
658
      ]
659

    
660
  results = []
661
  for what, fields, qfilter in queries:
662
    results.append(qcl.Query(what, fields, qfilter))
663

    
664
  results_data = map(operator.attrgetter("data"), results)
665

    
666
  # Ensure results are tuples with two values
667
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
668

    
669
  # Extract values ignoring result status
670
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
671
                                 for values in res]
672
                                for res in results_data]
673

    
674
  secondaries = {}
675
  instances = []
676

    
677
  # Load all instances
678
  for (name, status, disks_active, snodes, pnode_group_uuid,
679
       snodes_group_uuid) in raw_instances:
680
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
681
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
682
                    " groups %s", name, pnode_group_uuid,
683
                    utils.CommaJoin(snodes_group_uuid))
684
    else:
685
      instances.append(Instance(name, status, disks_active, snodes))
686

    
687
      for node in snodes:
688
        secondaries.setdefault(node, set()).add(name)
689

    
690
  # Load all nodes
691
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
692
           for (name, bootid, offline) in raw_nodes]
693

    
694
  return (dict((node.name, node) for node in nodes),
695
          dict((inst.name, inst) for inst in instances))
696

    
697

    
698
def _LoadKnownGroups():
699
  """Returns a list of all node groups known by L{ssconf}.
700

701
  """
702
  groups = ssconf.SimpleStore().GetNodegroupList()
703

    
704
  result = list(line.split(None, 1)[0] for line in groups
705
                if line.strip())
706

    
707
  if not compat.all(map(utils.UUID_RE.match, result)):
708
    raise errors.GenericError("Ssconf contains invalid group UUID")
709

    
710
  return result
711

    
712

    
713
def _GroupWatcher(opts):
714
  """Main function for per-group watcher process.
715

716
  """
717
  group_uuid = opts.nodegroup.lower()
718

    
719
  if not utils.UUID_RE.match(group_uuid):
720
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
721
                              " got '%s'" %
722
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
723

    
724
  logging.info("Watcher for node group '%s'", group_uuid)
725

    
726
  known_groups = _LoadKnownGroups()
727

    
728
  # Check if node group is known
729
  if group_uuid not in known_groups:
730
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
731
                              group_uuid)
732

    
733
  # Group UUID has been verified and should not contain any dangerous
734
  # characters
735
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
736
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
737

    
738
  logging.debug("Using state file %s", state_path)
739

    
740
  # Global watcher
741
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
742
  if not statefile:
743
    return constants.EXIT_FAILURE
744

    
745
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
746
  try:
747
    # Connect to master daemon
748
    client = GetLuxiClient(False)
749
    query_client = GetLuxiClient(False, query=True)
750

    
751
    _CheckMaster(client)
752

    
753
    (nodes, instances) = _GetGroupData(query_client, group_uuid)
754

    
755
    # Update per-group instance status file
756
    _UpdateInstanceStatus(inst_status_path, instances.values())
757

    
758
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
759
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
760
                         known_groups)
761

    
762
    started = _CheckInstances(client, notepad, instances)
763
    _CheckDisks(client, notepad, nodes, instances, started)
764
    _VerifyDisks(client, group_uuid, nodes, instances)
765
  except Exception, err:
766
    logging.info("Not updating status file due to failure: %s", err)
767
    raise
768
  else:
769
    # Save changes for next run
770
    notepad.Save(state_path)
771

    
772
  return constants.EXIT_SUCCESS
773

    
774

    
775
def Main():
776
  """Main function.
777

778
  """
779
  (options, _) = ParseOptions()
780

    
781
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
782
                     debug=options.debug, stderr_logging=options.debug)
783

    
784
  if ShouldPause() and not options.ignore_pause:
785
    logging.debug("Pause has been set, exiting")
786
    return constants.EXIT_SUCCESS
787

    
788
  # Try to acquire global watcher lock in shared mode
789
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
790
  try:
791
    lock.Shared(blocking=False)
792
  except (EnvironmentError, errors.LockError), err:
793
    logging.error("Can't acquire lock on %s: %s",
794
                  pathutils.WATCHER_LOCK_FILE, err)
795
    return constants.EXIT_SUCCESS
796

    
797
  if options.nodegroup is None:
798
    fn = _GlobalWatcher
799
  else:
800
    # Per-nodegroup watcher
801
    fn = _GroupWatcher
802

    
803
  try:
804
    return fn(options)
805
  except (SystemExit, KeyboardInterrupt):
806
    raise
807
  except NotMasterError:
808
    logging.debug("Not master, exiting")
809
    return constants.EXIT_NOTMASTER
810
  except errors.ResolverError, err:
811
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
812
    return constants.EXIT_NODESETUP_ERROR
813
  except errors.JobQueueFull:
814
    logging.error("Job queue is full, can't query cluster state")
815
  except errors.JobQueueDrainError:
816
    logging.error("Job queue is drained, can't maintain cluster state")
817
  except Exception, err:
818
    logging.exception(str(err))
819
    return constants.EXIT_FAILURE
820

    
821
  return constants.EXIT_SUCCESS