Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 876fb142

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104
def RunWatcherHooks():
105
  """Run the watcher hooks.
106

107
  """
108
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109
                             constants.HOOKS_NAME_WATCHER)
110
  if not os.path.isdir(hooks_dir):
111
    return
112

    
113
  try:
114
    results = utils.RunParts(hooks_dir)
115
  except Exception, err: # pylint: disable=W0703
116
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
117
    return
118

    
119
  for (relname, status, runresult) in results:
120
    if status == constants.RUNPARTS_SKIP:
121
      logging.debug("Watcher hook %s: skipped", relname)
122
    elif status == constants.RUNPARTS_ERR:
123
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124
    elif status == constants.RUNPARTS_RUN:
125
      if runresult.failed:
126
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127
                        relname, runresult.exit_code, runresult.output)
128
      else:
129
        logging.debug("Watcher hook %s: success (output: %s)", relname,
130
                      runresult.output)
131
    else:
132
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133
                                   status)
134

    
135

    
136
class Instance(object):
137
  """Abstraction for a Virtual Machine instance.
138

139
  """
140
  def __init__(self, name, status, disks_active, snodes):
141
    self.name = name
142
    self.status = status
143
    self.disks_active = disks_active
144
    self.snodes = snodes
145

    
146
  def Restart(self, cl):
147
    """Encapsulates the start of an instance.
148

149
    """
150
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151
    cli.SubmitOpCode(op, cl=cl)
152

    
153
  def ActivateDisks(self, cl):
154
    """Encapsulates the activation of all disks of an instance.
155

156
    """
157
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158
    cli.SubmitOpCode(op, cl=cl)
159

    
160

    
161
class Node(object):
162
  """Data container representing cluster node.
163

164
  """
165
  def __init__(self, name, bootid, offline, secondaries):
166
    """Initializes this class.
167

168
    """
169
    self.name = name
170
    self.bootid = bootid
171
    self.offline = offline
172
    self.secondaries = secondaries
173

    
174

    
175
def _CheckInstances(cl, notepad, instances):
176
  """Make a pass over the list of instances, restarting downed ones.
177

178
  """
179
  notepad.MaintainInstanceList(instances.keys())
180

    
181
  started = set()
182

    
183
  for inst in instances.values():
184
    if inst.status in BAD_STATES:
185
      n = notepad.NumberOfRestartAttempts(inst.name)
186

    
187
      if n > MAXTRIES:
188
        logging.warning("Not restarting instance '%s', retries exhausted",
189
                        inst.name)
190
        continue
191

    
192
      if n == MAXTRIES:
193
        notepad.RecordRestartAttempt(inst.name)
194
        logging.error("Could not restart instance '%s' after %s attempts,"
195
                      " giving up", inst.name, MAXTRIES)
196
        continue
197

    
198
      try:
199
        logging.info("Restarting instance '%s' (attempt #%s)",
200
                     inst.name, n + 1)
201
        inst.Restart(cl)
202
      except Exception: # pylint: disable=W0703
203
        logging.exception("Error while restarting instance '%s'", inst.name)
204
      else:
205
        started.add(inst.name)
206

    
207
      notepad.RecordRestartAttempt(inst.name)
208

    
209
    else:
210
      if notepad.NumberOfRestartAttempts(inst.name):
211
        notepad.RemoveInstance(inst.name)
212
        if inst.status not in HELPLESS_STATES:
213
          logging.info("Restart of instance '%s' succeeded", inst.name)
214

    
215
  return started
216

    
217

    
218
def _CheckDisks(cl, notepad, nodes, instances, started):
219
  """Check all nodes for restarted ones.
220

221
  """
222
  check_nodes = []
223

    
224
  for node in nodes.values():
225
    old = notepad.GetNodeBootID(node.name)
226
    if not node.bootid:
227
      # Bad node, not returning a boot id
228
      if not node.offline:
229
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230
                      node.name)
231
      continue
232

    
233
    if old != node.bootid:
234
      # Node's boot ID has changed, probably through a reboot
235
      check_nodes.append(node)
236

    
237
  if check_nodes:
238
    # Activate disks for all instances with any of the checked nodes as a
239
    # secondary node.
240
    for node in check_nodes:
241
      for instance_name in node.secondaries:
242
        try:
243
          inst = instances[instance_name]
244
        except KeyError:
245
          logging.info("Can't find instance '%s', maybe it was ignored",
246
                       instance_name)
247
          continue
248

    
249
        if not inst.disks_active:
250
          logging.info("Skipping disk activation for instance with not"
251
                       " activated disks '%s'", inst.name)
252
          continue
253

    
254
        if inst.name in started:
255
          # we already tried to start the instance, which should have
256
          # activated its drives (if they can be at all)
257
          logging.debug("Skipping disk activation for instance '%s' as"
258
                        " it was already started", inst.name)
259
          continue
260

    
261
        try:
262
          logging.info("Activating disks for instance '%s'", inst.name)
263
          inst.ActivateDisks(cl)
264
        except Exception: # pylint: disable=W0703
265
          logging.exception("Error while activating disks for instance '%s'",
266
                            inst.name)
267

    
268
    # Keep changed boot IDs
269
    for node in check_nodes:
270
      notepad.SetNodeBootID(node.name, node.bootid)
271

    
272

    
273
def _CheckForOfflineNodes(nodes, instance):
274
  """Checks if given instances has any secondary in offline status.
275

276
  @param instance: The instance object
277
  @return: True if any of the secondary is offline, False otherwise
278

279
  """
280
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281

    
282

    
283
def _VerifyDisks(cl, uuid, nodes, instances):
284
  """Run a per-group "gnt-cluster verify-disks".
285

286
  """
287
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(
288
      group_name=uuid, priority=constants.OP_PRIO_LOW)])
289
  ((_, offline_disk_instances, _), ) = \
290
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
291
  cl.ArchiveJob(job_id)
292

    
293
  if not offline_disk_instances:
294
    # nothing to do
295
    logging.debug("Verify-disks reported no offline disks, nothing to do")
296
    return
297

    
298
  logging.debug("Will activate disks for instance(s) %s",
299
                utils.CommaJoin(offline_disk_instances))
300

    
301
  # We submit only one job, and wait for it. Not optimal, but this puts less
302
  # load on the job queue.
303
  job = []
304
  for name in offline_disk_instances:
305
    try:
306
      inst = instances[name]
307
    except KeyError:
308
      logging.info("Can't find instance '%s', maybe it was ignored", name)
309
      continue
310

    
311
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
312
      logging.info("Skipping instance '%s' because it is in a helpless state"
313
                   " or has offline secondaries", name)
314
      continue
315

    
316
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
317

    
318
  if job:
319
    job_id = cli.SendJob(job, cl=cl)
320

    
321
    try:
322
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
323
    except Exception: # pylint: disable=W0703
324
      logging.exception("Error while activating disks")
325

    
326

    
327
def IsRapiResponding(hostname):
328
  """Connects to RAPI port and does a simple test.
329

330
  Connects to RAPI port of hostname and does a simple test. At this time, the
331
  test is GetVersion.
332

333
  If RAPI responds with error code "401 Unauthorized", the test is successful,
334
  because the aim of this function is to assess whether RAPI is responding, not
335
  if it is accessible.
336

337
  @type hostname: string
338
  @param hostname: hostname of the node to connect to.
339
  @rtype: bool
340
  @return: Whether RAPI is working properly
341

342
  """
343
  curl_config = rapi.client.GenericCurlConfig()
344
  rapi_client = rapi.client.GanetiRapiClient(hostname,
345
                                             curl_config_fn=curl_config)
346
  try:
347
    master_version = rapi_client.GetVersion()
348
  except rapi.client.CertificateError, err:
349
    logging.warning("RAPI certificate error: %s", err)
350
    return False
351
  except rapi.client.GanetiApiError, err:
352
    if err.code == 401:
353
      # Unauthorized, but RAPI is alive and responding
354
      return True
355
    else:
356
      logging.warning("RAPI error: %s", err)
357
      return False
358
  else:
359
    logging.debug("Reported RAPI version %s", master_version)
360
    return master_version == constants.RAPI_VERSION
361

    
362

    
363
def ParseOptions():
364
  """Parse the command line options.
365

366
  @return: (options, args) as from OptionParser.parse_args()
367

368
  """
369
  parser = OptionParser(description="Ganeti cluster watcher",
370
                        usage="%prog [-d]",
371
                        version="%%prog (ganeti) %s" %
372
                        constants.RELEASE_VERSION)
373

    
374
  parser.add_option(cli.DEBUG_OPT)
375
  parser.add_option(cli.NODEGROUP_OPT)
376
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
377
                    help="Autoarchive jobs older than this age (default"
378
                          " 6 hours)")
379
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
380
                    action="store_true", help="Ignore cluster pause setting")
381
  parser.add_option("--wait-children", dest="wait_children",
382
                    action="store_true", help="Wait for child processes")
383
  parser.add_option("--no-wait-children", dest="wait_children",
384
                    action="store_false",
385
                    help="Don't wait for child processes")
386
  # See optparse documentation for why default values are not set by options
387
  parser.set_defaults(wait_children=True)
388
  options, args = parser.parse_args()
389
  options.job_age = cli.ParseTimespec(options.job_age)
390

    
391
  if args:
392
    parser.error("No arguments expected")
393

    
394
  return (options, args)
395

    
396

    
397
def _WriteInstanceStatus(filename, data):
398
  """Writes the per-group instance status file.
399

400
  The entries are sorted.
401

402
  @type filename: string
403
  @param filename: Path to instance status file
404
  @type data: list of tuple; (instance name as string, status as string)
405
  @param data: Instance name and status
406

407
  """
408
  logging.debug("Updating instance status file '%s' with %s instances",
409
                filename, len(data))
410

    
411
  utils.WriteFile(filename,
412
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
413
                                   sorted(data))))
414

    
415

    
416
def _UpdateInstanceStatus(filename, instances):
417
  """Writes an instance status file from L{Instance} objects.
418

419
  @type filename: string
420
  @param filename: Path to status file
421
  @type instances: list of L{Instance}
422

423
  """
424
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
425
                                  for inst in instances])
426

    
427

    
428
def _ReadInstanceStatus(filename):
429
  """Reads an instance status file.
430

431
  @type filename: string
432
  @param filename: Path to status file
433
  @rtype: tuple; (None or number, list of lists containing instance name and
434
    status)
435
  @return: File's mtime and instance status contained in the file; mtime is
436
    C{None} if file can't be read
437

438
  """
439
  logging.debug("Reading per-group instance status from '%s'", filename)
440

    
441
  statcb = utils.FileStatHelper()
442
  try:
443
    content = utils.ReadFile(filename, preread=statcb)
444
  except EnvironmentError, err:
445
    if err.errno == errno.ENOENT:
446
      logging.error("Can't read '%s', does not exist (yet)", filename)
447
    else:
448
      logging.exception("Unable to read '%s', ignoring", filename)
449
    return (None, None)
450
  else:
451
    return (statcb.st.st_mtime, [line.split(None, 1)
452
                                 for line in content.splitlines()])
453

    
454

    
455
def _MergeInstanceStatus(filename, pergroup_filename, groups):
456
  """Merges all per-group instance status files into a global one.
457

458
  @type filename: string
459
  @param filename: Path to global instance status file
460
  @type pergroup_filename: string
461
  @param pergroup_filename: Path to per-group status files, must contain "%s"
462
    to be replaced with group UUID
463
  @type groups: sequence
464
  @param groups: UUIDs of known groups
465

466
  """
467
  # Lock global status file in exclusive mode
468
  lock = utils.FileLock.Open(filename)
469
  try:
470
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
471
  except errors.LockError, err:
472
    # All per-group processes will lock and update the file. None of them
473
    # should take longer than 10 seconds (the value of
474
    # INSTANCE_STATUS_LOCK_TIMEOUT).
475
    logging.error("Can't acquire lock on instance status file '%s', not"
476
                  " updating: %s", filename, err)
477
    return
478

    
479
  logging.debug("Acquired exclusive lock on '%s'", filename)
480

    
481
  data = {}
482

    
483
  # Load instance status from all groups
484
  for group_uuid in groups:
485
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
486

    
487
    if mtime is not None:
488
      for (instance_name, status) in instdata:
489
        data.setdefault(instance_name, []).append((mtime, status))
490

    
491
  # Select last update based on file mtime
492
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
493
                for (instance_name, status) in data.items()]
494

    
495
  # Write the global status file. Don't touch file after it's been
496
  # updated--there is no lock anymore.
497
  _WriteInstanceStatus(filename, inststatus)
498

    
499

    
500
def GetLuxiClient(try_restart):
501
  """Tries to connect to the master daemon.
502

503
  @type try_restart: bool
504
  @param try_restart: Whether to attempt to restart the master daemon
505

506
  """
507
  try:
508
    return cli.GetClient()
509
  except errors.OpPrereqError, err:
510
    # this is, from cli.GetClient, a not-master case
511
    raise NotMasterError("Not on master node (%s)" % err)
512

    
513
  except luxi.NoMasterError, err:
514
    if not try_restart:
515
      raise
516

    
517
    logging.warning("Master daemon seems to be down (%s), trying to restart",
518
                    err)
519

    
520
    if not utils.EnsureDaemon(constants.MASTERD):
521
      raise errors.GenericError("Can't start the master daemon")
522

    
523
    # Retry the connection
524
    return cli.GetClient()
525

    
526

    
527
def _StartGroupChildren(cl, wait):
528
  """Starts a new instance of the watcher for every node group.
529

530
  """
531
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
532
                        for arg in sys.argv)
533

    
534
  result = cl.QueryGroups([], ["name", "uuid"], False)
535

    
536
  children = []
537

    
538
  for (idx, (name, uuid)) in enumerate(result):
539
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
540

    
541
    if idx > 0:
542
      # Let's not kill the system
543
      time.sleep(CHILD_PROCESS_DELAY)
544

    
545
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
546
                  name, uuid, args)
547

    
548
    try:
549
      # TODO: Should utils.StartDaemon be used instead?
550
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
551
    except Exception: # pylint: disable=W0703
552
      logging.exception("Failed to start child for group '%s' (%s)",
553
                        name, uuid)
554
    else:
555
      logging.debug("Started with PID %s", pid)
556
      children.append(pid)
557

    
558
  if wait:
559
    for pid in children:
560
      logging.debug("Waiting for child PID %s", pid)
561
      try:
562
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
563
      except EnvironmentError, err:
564
        result = str(err)
565

    
566
      logging.debug("Child PID %s exited with status %s", pid, result)
567

    
568

    
569
def _ArchiveJobs(cl, age):
570
  """Archives old jobs.
571

572
  """
573
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
574
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
575

    
576

    
577
def _CheckMaster(cl):
578
  """Ensures current host is master node.
579

580
  """
581
  (master, ) = cl.QueryConfigValues(["master_node"])
582
  if master != netutils.Hostname.GetSysName():
583
    raise NotMasterError("This is not the master node")
584

    
585

    
586
@UsesRapiClient
587
def _GlobalWatcher(opts):
588
  """Main function for global watcher.
589

590
  At the end child processes are spawned for every node group.
591

592
  """
593
  StartNodeDaemons()
594
  RunWatcherHooks()
595

    
596
  # Run node maintenance in all cases, even if master, so that old masters can
597
  # be properly cleaned up
598
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
599
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
600

    
601
  try:
602
    client = GetLuxiClient(True)
603
  except NotMasterError:
604
    # Don't proceed on non-master nodes
605
    return constants.EXIT_SUCCESS
606

    
607
  # we are on master now
608
  utils.EnsureDaemon(constants.RAPI)
609

    
610
  # If RAPI isn't responding to queries, try one restart
611
  logging.debug("Attempting to talk to remote API on %s",
612
                constants.IP4_ADDRESS_LOCALHOST)
613
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
614
    logging.warning("Couldn't get answer from remote API, restaring daemon")
615
    utils.StopDaemon(constants.RAPI)
616
    utils.EnsureDaemon(constants.RAPI)
617
    logging.debug("Second attempt to talk to remote API")
618
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
619
      logging.fatal("RAPI is not responding")
620
  logging.debug("Successfully talked to remote API")
621

    
622
  _CheckMaster(client)
623
  _ArchiveJobs(client, opts.job_age)
624

    
625
  # Spawn child processes for all node groups
626
  _StartGroupChildren(client, opts.wait_children)
627

    
628
  return constants.EXIT_SUCCESS
629

    
630

    
631
def _GetGroupData(cl, uuid):
632
  """Retrieves instances and nodes per node group.
633

634
  """
635
  job = [
636
    # Get all primary instances in group
637
    opcodes.OpQuery(what=constants.QR_INSTANCE,
638
                    fields=["name", "status", "disks_active", "snodes",
639
                            "pnode.group.uuid", "snodes.group.uuid"],
640
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
641
                    use_locking=True,
642
                    priority=constants.OP_PRIO_LOW),
643

    
644
    # Get all nodes in group
645
    opcodes.OpQuery(what=constants.QR_NODE,
646
                    fields=["name", "bootid", "offline"],
647
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
648
                    use_locking=True,
649
                    priority=constants.OP_PRIO_LOW),
650
    ]
651

    
652
  job_id = cl.SubmitJob(job)
653
  results = map(objects.QueryResponse.FromDict,
654
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
655
  cl.ArchiveJob(job_id)
656

    
657
  results_data = map(operator.attrgetter("data"), results)
658

    
659
  # Ensure results are tuples with two values
660
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
661

    
662
  # Extract values ignoring result status
663
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
664
                                 for values in res]
665
                                for res in results_data]
666

    
667
  secondaries = {}
668
  instances = []
669

    
670
  # Load all instances
671
  for (name, status, disks_active, snodes, pnode_group_uuid,
672
       snodes_group_uuid) in raw_instances:
673
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
674
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
675
                    " groups %s", name, pnode_group_uuid,
676
                    utils.CommaJoin(snodes_group_uuid))
677
    else:
678
      instances.append(Instance(name, status, disks_active, snodes))
679

    
680
      for node in snodes:
681
        secondaries.setdefault(node, set()).add(name)
682

    
683
  # Load all nodes
684
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
685
           for (name, bootid, offline) in raw_nodes]
686

    
687
  return (dict((node.name, node) for node in nodes),
688
          dict((inst.name, inst) for inst in instances))
689

    
690

    
691
def _LoadKnownGroups():
692
  """Returns a list of all node groups known by L{ssconf}.
693

694
  """
695
  groups = ssconf.SimpleStore().GetNodegroupList()
696

    
697
  result = list(line.split(None, 1)[0] for line in groups
698
                if line.strip())
699

    
700
  if not compat.all(map(utils.UUID_RE.match, result)):
701
    raise errors.GenericError("Ssconf contains invalid group UUID")
702

    
703
  return result
704

    
705

    
706
def _GroupWatcher(opts):
707
  """Main function for per-group watcher process.
708

709
  """
710
  group_uuid = opts.nodegroup.lower()
711

    
712
  if not utils.UUID_RE.match(group_uuid):
713
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
714
                              " got '%s'" %
715
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
716

    
717
  logging.info("Watcher for node group '%s'", group_uuid)
718

    
719
  known_groups = _LoadKnownGroups()
720

    
721
  # Check if node group is known
722
  if group_uuid not in known_groups:
723
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
724
                              group_uuid)
725

    
726
  # Group UUID has been verified and should not contain any dangerous
727
  # characters
728
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
729
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
730

    
731
  logging.debug("Using state file %s", state_path)
732

    
733
  # Global watcher
734
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
735
  if not statefile:
736
    return constants.EXIT_FAILURE
737

    
738
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
739
  try:
740
    # Connect to master daemon
741
    client = GetLuxiClient(False)
742

    
743
    _CheckMaster(client)
744

    
745
    (nodes, instances) = _GetGroupData(client, group_uuid)
746

    
747
    # Update per-group instance status file
748
    _UpdateInstanceStatus(inst_status_path, instances.values())
749

    
750
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
751
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
752
                         known_groups)
753

    
754
    started = _CheckInstances(client, notepad, instances)
755
    _CheckDisks(client, notepad, nodes, instances, started)
756
    _VerifyDisks(client, group_uuid, nodes, instances)
757
  except Exception, err:
758
    logging.info("Not updating status file due to failure: %s", err)
759
    raise
760
  else:
761
    # Save changes for next run
762
    notepad.Save(state_path)
763

    
764
  return constants.EXIT_SUCCESS
765

    
766

    
767
def Main():
768
  """Main function.
769

770
  """
771
  (options, _) = ParseOptions()
772

    
773
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
774
                     debug=options.debug, stderr_logging=options.debug)
775

    
776
  if ShouldPause() and not options.ignore_pause:
777
    logging.debug("Pause has been set, exiting")
778
    return constants.EXIT_SUCCESS
779

    
780
  # Try to acquire global watcher lock in shared mode
781
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
782
  try:
783
    lock.Shared(blocking=False)
784
  except (EnvironmentError, errors.LockError), err:
785
    logging.error("Can't acquire lock on %s: %s",
786
                  pathutils.WATCHER_LOCK_FILE, err)
787
    return constants.EXIT_SUCCESS
788

    
789
  if options.nodegroup is None:
790
    fn = _GlobalWatcher
791
  else:
792
    # Per-nodegroup watcher
793
    fn = _GroupWatcher
794

    
795
  try:
796
    return fn(options)
797
  except (SystemExit, KeyboardInterrupt):
798
    raise
799
  except NotMasterError:
800
    logging.debug("Not master, exiting")
801
    return constants.EXIT_NOTMASTER
802
  except errors.ResolverError, err:
803
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
804
    return constants.EXIT_NODESETUP_ERROR
805
  except errors.JobQueueFull:
806
    logging.error("Job queue is full, can't query cluster state")
807
  except errors.JobQueueDrainError:
808
    logging.error("Job queue is drained, can't maintain cluster state")
809
  except Exception, err:
810
    logging.exception(str(err))
811
    return constants.EXIT_FAILURE
812

    
813
  return constants.EXIT_SUCCESS