Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 87747cda

History | View | Annotate | Download (25.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104
def RunWatcherHooks():
105
  """Run the watcher hooks.
106

107
  """
108
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109
                             constants.HOOKS_NAME_WATCHER)
110
  if not os.path.isdir(hooks_dir):
111
    return
112

    
113
  try:
114
    results = utils.RunParts(hooks_dir)
115
  except Exception, err: # pylint: disable=W0703
116
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
117
    return
118

    
119
  for (relname, status, runresult) in results:
120
    if status == constants.RUNPARTS_SKIP:
121
      logging.debug("Watcher hook %s: skipped", relname)
122
    elif status == constants.RUNPARTS_ERR:
123
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124
    elif status == constants.RUNPARTS_RUN:
125
      if runresult.failed:
126
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127
                        relname, runresult.exit_code, runresult.output)
128
      else:
129
        logging.debug("Watcher hook %s: success (output: %s)", relname,
130
                      runresult.output)
131
    else:
132
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133
                                   status)
134

    
135

    
136
class Instance(object):
137
  """Abstraction for a Virtual Machine instance.
138

139
  """
140
  def __init__(self, name, status, disks_active, snodes):
141
    self.name = name
142
    self.status = status
143
    self.disks_active = disks_active
144
    self.snodes = snodes
145

    
146
  def Restart(self, cl):
147
    """Encapsulates the start of an instance.
148

149
    """
150
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151
    cli.SubmitOpCode(op, cl=cl)
152

    
153
  def ActivateDisks(self, cl):
154
    """Encapsulates the activation of all disks of an instance.
155

156
    """
157
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158
    cli.SubmitOpCode(op, cl=cl)
159

    
160

    
161
class Node:
162
  """Data container representing cluster node.
163

164
  """
165
  def __init__(self, name, bootid, offline, secondaries):
166
    """Initializes this class.
167

168
    """
169
    self.name = name
170
    self.bootid = bootid
171
    self.offline = offline
172
    self.secondaries = secondaries
173

    
174

    
175
def _CleanupInstance(cl, notepad, inst):
176
  n = notepad.NumberOfCleanupAttempts(inst.name)
177

    
178
  if n > MAXTRIES:
179
    logging.warning("Not cleaning up instance '%s', retries exhausted",
180
                    inst.name)
181
    return
182

    
183
  logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
184
               inst.name)
185
  op = opcodes.OpInstanceShutdown(instance_name=inst.name)
186

    
187
  try:
188
    cli.SubmitOpCode(op, cl=cl)
189
    if notepad.NumberOfCleanupAttempts(inst.name):
190
      notepad.RemoveInstance(inst.name)
191
  except Exception: # pylint: disable=W0703
192
    logging.exception("Error while cleaning up instance '%s'", inst.name)
193
    notepad.RecordCleanupAttempt(inst.name)
194

    
195

    
196
def _CheckInstances(cl, notepad, instances):
197
  """Make a pass over the list of instances, restarting downed ones.
198

199
  """
200
  notepad.MaintainInstanceList(instances.keys())
201

    
202
  started = set()
203

    
204
  for inst in instances.values():
205
    if inst.status == constants.INSTST_USERDOWN:
206
      _CleanupInstance(cl, notepad, inst)
207
    elif inst.status in BAD_STATES:
208
      n = notepad.NumberOfRestartAttempts(inst.name)
209

    
210
      if n > MAXTRIES:
211
        logging.warning("Not restarting instance '%s', retries exhausted",
212
                        inst.name)
213
        continue
214

    
215
      if n == MAXTRIES:
216
        notepad.RecordRestartAttempt(inst.name)
217
        logging.error("Could not restart instance '%s' after %s attempts,"
218
                      " giving up", inst.name, MAXTRIES)
219
        continue
220

    
221
      try:
222
        logging.info("Restarting instance '%s' (attempt #%s)",
223
                     inst.name, n + 1)
224
        inst.Restart(cl)
225
      except Exception: # pylint: disable=W0703
226
        logging.exception("Error while restarting instance '%s'", inst.name)
227
      else:
228
        started.add(inst.name)
229

    
230
      notepad.RecordRestartAttempt(inst.name)
231

    
232
    else:
233
      if notepad.NumberOfRestartAttempts(inst.name):
234
        notepad.RemoveInstance(inst.name)
235
        if inst.status not in HELPLESS_STATES:
236
          logging.info("Restart of instance '%s' succeeded", inst.name)
237

    
238
  return started
239

    
240

    
241
def _CheckDisks(cl, notepad, nodes, instances, started):
242
  """Check all nodes for restarted ones.
243

244
  """
245
  check_nodes = []
246

    
247
  for node in nodes.values():
248
    old = notepad.GetNodeBootID(node.name)
249
    if not node.bootid:
250
      # Bad node, not returning a boot id
251
      if not node.offline:
252
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
253
                      node.name)
254
      continue
255

    
256
    if old != node.bootid:
257
      # Node's boot ID has changed, probably through a reboot
258
      check_nodes.append(node)
259

    
260
  if check_nodes:
261
    # Activate disks for all instances with any of the checked nodes as a
262
    # secondary node.
263
    for node in check_nodes:
264
      for instance_name in node.secondaries:
265
        try:
266
          inst = instances[instance_name]
267
        except KeyError:
268
          logging.info("Can't find instance '%s', maybe it was ignored",
269
                       instance_name)
270
          continue
271

    
272
        if not inst.disks_active:
273
          logging.info("Skipping disk activation for instance with not"
274
                       " activated disks '%s'", inst.name)
275
          continue
276

    
277
        if inst.name in started:
278
          # we already tried to start the instance, which should have
279
          # activated its drives (if they can be at all)
280
          logging.debug("Skipping disk activation for instance '%s' as"
281
                        " it was already started", inst.name)
282
          continue
283

    
284
        try:
285
          logging.info("Activating disks for instance '%s'", inst.name)
286
          inst.ActivateDisks(cl)
287
        except Exception: # pylint: disable=W0703
288
          logging.exception("Error while activating disks for instance '%s'",
289
                            inst.name)
290

    
291
    # Keep changed boot IDs
292
    for node in check_nodes:
293
      notepad.SetNodeBootID(node.name, node.bootid)
294

    
295

    
296
def _CheckForOfflineNodes(nodes, instance):
297
  """Checks if given instances has any secondary in offline status.
298

299
  @param instance: The instance object
300
  @return: True if any of the secondary is offline, False otherwise
301

302
  """
303
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
304

    
305

    
306
def _VerifyDisks(cl, uuid, nodes, instances):
307
  """Run a per-group "gnt-cluster verify-disks".
308

309
  """
310
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
311
  ((_, offline_disk_instances, _), ) = \
312
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
313
  cl.ArchiveJob(job_id)
314

    
315
  if not offline_disk_instances:
316
    # nothing to do
317
    logging.debug("Verify-disks reported no offline disks, nothing to do")
318
    return
319

    
320
  logging.debug("Will activate disks for instance(s) %s",
321
                utils.CommaJoin(offline_disk_instances))
322

    
323
  # We submit only one job, and wait for it. Not optimal, but this puts less
324
  # load on the job queue.
325
  job = []
326
  for name in offline_disk_instances:
327
    try:
328
      inst = instances[name]
329
    except KeyError:
330
      logging.info("Can't find instance '%s', maybe it was ignored", name)
331
      continue
332

    
333
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
334
      logging.info("Skipping instance '%s' because it is in a helpless state"
335
                   " or has offline secondaries", name)
336
      continue
337

    
338
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
339

    
340
  if job:
341
    job_id = cli.SendJob(job, cl=cl)
342

    
343
    try:
344
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
345
    except Exception: # pylint: disable=W0703
346
      logging.exception("Error while activating disks")
347

    
348

    
349
def IsRapiResponding(hostname):
350
  """Connects to RAPI port and does a simple test.
351

352
  Connects to RAPI port of hostname and does a simple test. At this time, the
353
  test is GetVersion.
354

355
  @type hostname: string
356
  @param hostname: hostname of the node to connect to.
357
  @rtype: bool
358
  @return: Whether RAPI is working properly
359

360
  """
361
  curl_config = rapi.client.GenericCurlConfig()
362
  rapi_client = rapi.client.GanetiRapiClient(hostname,
363
                                             curl_config_fn=curl_config)
364
  try:
365
    master_version = rapi_client.GetVersion()
366
  except rapi.client.CertificateError, err:
367
    logging.warning("RAPI certificate error: %s", err)
368
    return False
369
  except rapi.client.GanetiApiError, err:
370
    logging.warning("RAPI error: %s", err)
371
    return False
372
  else:
373
    logging.debug("Reported RAPI version %s", master_version)
374
    return master_version == constants.RAPI_VERSION
375

    
376

    
377
def ParseOptions():
378
  """Parse the command line options.
379

380
  @return: (options, args) as from OptionParser.parse_args()
381

382
  """
383
  parser = OptionParser(description="Ganeti cluster watcher",
384
                        usage="%prog [-d]",
385
                        version="%%prog (ganeti) %s" %
386
                        constants.RELEASE_VERSION)
387

    
388
  parser.add_option(cli.DEBUG_OPT)
389
  parser.add_option(cli.NODEGROUP_OPT)
390
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
391
                    help="Autoarchive jobs older than this age (default"
392
                          " 6 hours)")
393
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
394
                    action="store_true", help="Ignore cluster pause setting")
395
  parser.add_option("--wait-children", dest="wait_children",
396
                    action="store_true", help="Wait for child processes")
397
  parser.add_option("--no-wait-children", dest="wait_children",
398
                    action="store_false",
399
                    help="Don't wait for child processes")
400
  # See optparse documentation for why default values are not set by options
401
  parser.set_defaults(wait_children=True)
402
  options, args = parser.parse_args()
403
  options.job_age = cli.ParseTimespec(options.job_age)
404

    
405
  if args:
406
    parser.error("No arguments expected")
407

    
408
  return (options, args)
409

    
410

    
411
def _WriteInstanceStatus(filename, data):
412
  """Writes the per-group instance status file.
413

414
  The entries are sorted.
415

416
  @type filename: string
417
  @param filename: Path to instance status file
418
  @type data: list of tuple; (instance name as string, status as string)
419
  @param data: Instance name and status
420

421
  """
422
  logging.debug("Updating instance status file '%s' with %s instances",
423
                filename, len(data))
424

    
425
  utils.WriteFile(filename,
426
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
427
                                   sorted(data))))
428

    
429

    
430
def _UpdateInstanceStatus(filename, instances):
431
  """Writes an instance status file from L{Instance} objects.
432

433
  @type filename: string
434
  @param filename: Path to status file
435
  @type instances: list of L{Instance}
436

437
  """
438
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
439
                                  for inst in instances])
440

    
441

    
442
def _ReadInstanceStatus(filename):
443
  """Reads an instance status file.
444

445
  @type filename: string
446
  @param filename: Path to status file
447
  @rtype: tuple; (None or number, list of lists containing instance name and
448
    status)
449
  @return: File's mtime and instance status contained in the file; mtime is
450
    C{None} if file can't be read
451

452
  """
453
  logging.debug("Reading per-group instance status from '%s'", filename)
454

    
455
  statcb = utils.FileStatHelper()
456
  try:
457
    content = utils.ReadFile(filename, preread=statcb)
458
  except EnvironmentError, err:
459
    if err.errno == errno.ENOENT:
460
      logging.error("Can't read '%s', does not exist (yet)", filename)
461
    else:
462
      logging.exception("Unable to read '%s', ignoring", filename)
463
    return (None, None)
464
  else:
465
    return (statcb.st.st_mtime, [line.split(None, 1)
466
                                 for line in content.splitlines()])
467

    
468

    
469
def _MergeInstanceStatus(filename, pergroup_filename, groups):
470
  """Merges all per-group instance status files into a global one.
471

472
  @type filename: string
473
  @param filename: Path to global instance status file
474
  @type pergroup_filename: string
475
  @param pergroup_filename: Path to per-group status files, must contain "%s"
476
    to be replaced with group UUID
477
  @type groups: sequence
478
  @param groups: UUIDs of known groups
479

480
  """
481
  # Lock global status file in exclusive mode
482
  lock = utils.FileLock.Open(filename)
483
  try:
484
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
485
  except errors.LockError, err:
486
    # All per-group processes will lock and update the file. None of them
487
    # should take longer than 10 seconds (the value of
488
    # INSTANCE_STATUS_LOCK_TIMEOUT).
489
    logging.error("Can't acquire lock on instance status file '%s', not"
490
                  " updating: %s", filename, err)
491
    return
492

    
493
  logging.debug("Acquired exclusive lock on '%s'", filename)
494

    
495
  data = {}
496

    
497
  # Load instance status from all groups
498
  for group_uuid in groups:
499
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
500

    
501
    if mtime is not None:
502
      for (instance_name, status) in instdata:
503
        data.setdefault(instance_name, []).append((mtime, status))
504

    
505
  # Select last update based on file mtime
506
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
507
                for (instance_name, status) in data.items()]
508

    
509
  # Write the global status file. Don't touch file after it's been
510
  # updated--there is no lock anymore.
511
  _WriteInstanceStatus(filename, inststatus)
512

    
513

    
514
def GetLuxiClient(try_restart, query=False):
515
  """Tries to connect to the master daemon.
516

517
  @type try_restart: bool
518
  @param try_restart: Whether to attempt to restart the master daemon
519

520
  """
521
  try:
522
    return cli.GetClient(query=query)
523
  except errors.OpPrereqError, err:
524
    # this is, from cli.GetClient, a not-master case
525
    raise NotMasterError("Not on master node (%s)" % err)
526

    
527
  except luxi.NoMasterError, err:
528
    if not try_restart:
529
      raise
530

    
531
    logging.warning("Master daemon seems to be down (%s), trying to restart",
532
                    err)
533

    
534
    if not utils.EnsureDaemon(constants.MASTERD):
535
      raise errors.GenericError("Can't start the master daemon")
536

    
537
    # Retry the connection
538
    return cli.GetClient(query=query)
539

    
540

    
541
def _StartGroupChildren(cl, wait):
542
  """Starts a new instance of the watcher for every node group.
543

544
  """
545
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
546
                        for arg in sys.argv)
547

    
548
  result = cl.QueryGroups([], ["name", "uuid"], False)
549

    
550
  children = []
551

    
552
  for (idx, (name, uuid)) in enumerate(result):
553
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
554

    
555
    if idx > 0:
556
      # Let's not kill the system
557
      time.sleep(CHILD_PROCESS_DELAY)
558

    
559
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
560
                  name, uuid, args)
561

    
562
    try:
563
      # TODO: Should utils.StartDaemon be used instead?
564
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
565
    except Exception: # pylint: disable=W0703
566
      logging.exception("Failed to start child for group '%s' (%s)",
567
                        name, uuid)
568
    else:
569
      logging.debug("Started with PID %s", pid)
570
      children.append(pid)
571

    
572
  if wait:
573
    for pid in children:
574
      logging.debug("Waiting for child PID %s", pid)
575
      try:
576
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
577
      except EnvironmentError, err:
578
        result = str(err)
579

    
580
      logging.debug("Child PID %s exited with status %s", pid, result)
581

    
582

    
583
def _ArchiveJobs(cl, age):
584
  """Archives old jobs.
585

586
  """
587
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
588
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
589

    
590

    
591
def _CheckMaster(cl):
592
  """Ensures current host is master node.
593

594
  """
595
  (master, ) = cl.QueryConfigValues(["master_node"])
596
  if master != netutils.Hostname.GetSysName():
597
    raise NotMasterError("This is not the master node")
598

    
599

    
600
@UsesRapiClient
601
def _GlobalWatcher(opts):
602
  """Main function for global watcher.
603

604
  At the end child processes are spawned for every node group.
605

606
  """
607
  StartNodeDaemons()
608
  RunWatcherHooks()
609

    
610
  # Run node maintenance in all cases, even if master, so that old masters can
611
  # be properly cleaned up
612
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
613
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
614

    
615
  try:
616
    client = GetLuxiClient(True)
617
  except NotMasterError:
618
    # Don't proceed on non-master nodes
619
    return constants.EXIT_SUCCESS
620

    
621
  # we are on master now
622
  utils.EnsureDaemon(constants.RAPI)
623

    
624
  # If RAPI isn't responding to queries, try one restart
625
  logging.debug("Attempting to talk to remote API on %s",
626
                constants.IP4_ADDRESS_LOCALHOST)
627
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
628
    logging.warning("Couldn't get answer from remote API, restaring daemon")
629
    utils.StopDaemon(constants.RAPI)
630
    utils.EnsureDaemon(constants.RAPI)
631
    logging.debug("Second attempt to talk to remote API")
632
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
633
      logging.fatal("RAPI is not responding")
634
  logging.debug("Successfully talked to remote API")
635

    
636
  _CheckMaster(client)
637
  _ArchiveJobs(client, opts.job_age)
638

    
639
  # Spawn child processes for all node groups
640
  _StartGroupChildren(client, opts.wait_children)
641

    
642
  return constants.EXIT_SUCCESS
643

    
644

    
645
def _GetAllNodesInGroup(qcl, uuid):
646
  """Get all nodes of a node group.
647

648
  This function uses the query client to find out which nodes are in the node
649
  group.
650

651
  @type qcl: @C{luxi.Client}
652
  @param qcl: luxi client for queries
653
  @type uuid: string
654
  @param uuid: UUID of the node group
655

656
  """
657
  what = constants.QR_NODE
658
  fields = ["name", "bootid", "offline"]
659
  qfilter = [qlang.OP_EQUAL, "group.uuid", uuid]
660

    
661
  result = qcl.Query(what, fields, qfilter)
662
  return result
663

    
664

    
665
def _GetGroupData(cl, qcl, uuid):
666
  """Retrieves instances and nodes per node group.
667

668
  """
669
  # FIXME: This is an intermediate state where some queries are done via
670
  # the old and some via the new query implementation. This will be beautiful
671
  # again when the transition is complete for all queries.
672
  node_result = _GetAllNodesInGroup(qcl, uuid)
673

    
674
  job = [
675
    # Get all primary instances in group
676
    opcodes.OpQuery(what=constants.QR_INSTANCE,
677
                    fields=["name", "status", "disks_active", "snodes",
678
                            "pnode.group.uuid", "snodes.group.uuid"],
679
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
680
                    use_locking=True)
681
    ]
682
  job_id = cl.SubmitJob(job)
683
  results = map(objects.QueryResponse.FromDict,
684
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
685
  results.append(node_result)
686
  cl.ArchiveJob(job_id)
687

    
688
  results_data = map(operator.attrgetter("data"), results)
689

    
690
  # Ensure results are tuples with two values
691
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
692

    
693
  # Extract values ignoring result status
694
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
695
                                 for values in res]
696
                                for res in results_data]
697

    
698
  secondaries = {}
699
  instances = []
700

    
701
  # Load all instances
702
  for (name, status, disks_active, snodes, pnode_group_uuid,
703
       snodes_group_uuid) in raw_instances:
704
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
705
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
706
                    " groups %s", name, pnode_group_uuid,
707
                    utils.CommaJoin(snodes_group_uuid))
708
    else:
709
      instances.append(Instance(name, status, disks_active, snodes))
710

    
711
      for node in snodes:
712
        secondaries.setdefault(node, set()).add(name)
713

    
714
  # Load all nodes
715
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
716
           for (name, bootid, offline) in raw_nodes]
717

    
718
  return (dict((node.name, node) for node in nodes),
719
          dict((inst.name, inst) for inst in instances))
720

    
721

    
722
def _LoadKnownGroups():
723
  """Returns a list of all node groups known by L{ssconf}.
724

725
  """
726
  groups = ssconf.SimpleStore().GetNodegroupList()
727

    
728
  result = list(line.split(None, 1)[0] for line in groups
729
                if line.strip())
730

    
731
  if not compat.all(map(utils.UUID_RE.match, result)):
732
    raise errors.GenericError("Ssconf contains invalid group UUID")
733

    
734
  return result
735

    
736

    
737
def _GroupWatcher(opts):
738
  """Main function for per-group watcher process.
739

740
  """
741
  group_uuid = opts.nodegroup.lower()
742

    
743
  if not utils.UUID_RE.match(group_uuid):
744
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
745
                              " got '%s'" %
746
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
747

    
748
  logging.info("Watcher for node group '%s'", group_uuid)
749

    
750
  known_groups = _LoadKnownGroups()
751

    
752
  # Check if node group is known
753
  if group_uuid not in known_groups:
754
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
755
                              group_uuid)
756

    
757
  # Group UUID has been verified and should not contain any dangerous
758
  # characters
759
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
760
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
761

    
762
  logging.debug("Using state file %s", state_path)
763

    
764
  # Global watcher
765
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
766
  if not statefile:
767
    return constants.EXIT_FAILURE
768

    
769
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
770
  try:
771
    # Connect to master daemon
772
    client = GetLuxiClient(False)
773
    query_client = GetLuxiClient(False, query=True)
774

    
775
    _CheckMaster(client)
776

    
777
    (nodes, instances) = _GetGroupData(client, query_client, group_uuid)
778

    
779
    # Update per-group instance status file
780
    _UpdateInstanceStatus(inst_status_path, instances.values())
781

    
782
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
783
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
784
                         known_groups)
785

    
786
    started = _CheckInstances(client, notepad, instances)
787
    _CheckDisks(client, notepad, nodes, instances, started)
788
    _VerifyDisks(client, group_uuid, nodes, instances)
789
  except Exception, err:
790
    logging.info("Not updating status file due to failure: %s", err)
791
    raise
792
  else:
793
    # Save changes for next run
794
    notepad.Save(state_path)
795

    
796
  return constants.EXIT_SUCCESS
797

    
798

    
799
def Main():
800
  """Main function.
801

802
  """
803
  (options, _) = ParseOptions()
804

    
805
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
806
                     debug=options.debug, stderr_logging=options.debug)
807

    
808
  if ShouldPause() and not options.ignore_pause:
809
    logging.debug("Pause has been set, exiting")
810
    return constants.EXIT_SUCCESS
811

    
812
  # Try to acquire global watcher lock in shared mode
813
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
814
  try:
815
    lock.Shared(blocking=False)
816
  except (EnvironmentError, errors.LockError), err:
817
    logging.error("Can't acquire lock on %s: %s",
818
                  pathutils.WATCHER_LOCK_FILE, err)
819
    return constants.EXIT_SUCCESS
820

    
821
  if options.nodegroup is None:
822
    fn = _GlobalWatcher
823
  else:
824
    # Per-nodegroup watcher
825
    fn = _GroupWatcher
826

    
827
  try:
828
    return fn(options)
829
  except (SystemExit, KeyboardInterrupt):
830
    raise
831
  except NotMasterError:
832
    logging.debug("Not master, exiting")
833
    return constants.EXIT_NOTMASTER
834
  except errors.ResolverError, err:
835
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
836
    return constants.EXIT_NODESETUP_ERROR
837
  except errors.JobQueueFull:
838
    logging.error("Job queue is full, can't query cluster state")
839
  except errors.JobQueueDrainError:
840
    logging.error("Job queue is drained, can't maintain cluster state")
841
  except Exception, err:
842
    logging.exception(str(err))
843
    return constants.EXIT_FAILURE
844

    
845
  return constants.EXIT_SUCCESS