Revision 9bb69bb5

b/lib/constants.py
161 161
#: Status file for per-group watcher, locked in exclusive mode by watcher
162 162
WATCHER_GROUP_STATE_FILE = DATA_DIR + "/watcher.%s.data"
163 163

  
164
#: File for per-group instance status, merged into L{INSTANCE_STATUS_FILE} by
165
#: per-group processes
166
WATCHER_GROUP_INSTANCE_STATUS_FILE = DATA_DIR + "/watcher.%s.instance-status"
167

  
164 168
#: File containing Unix timestamp until which watcher should be paused
165 169
WATCHER_PAUSEFILE = DATA_DIR + "/watcher.pause"
166 170

  
b/lib/watcher/__init__.py
33 33
import time
34 34
import logging
35 35
import operator
36
import errno
36 37
from optparse import OptionParser
37 38

  
38 39
from ganeti import utils
......
69 70
#: Number of seconds to wait between starting child processes for node groups
70 71
CHILD_PROCESS_DELAY = 1.0
71 72

  
73
#: How many seconds to wait for instance status file lock
74
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75

  
72 76

  
73 77
class NotMasterError(errors.GenericError):
74 78
  """Exception raised when this host is not the master."""
......
370 374
  return (options, args)
371 375

  
372 376

  
373
def _UpdateInstanceStatus(cl, filename):
374
  """Get a list of instances on this cluster.
377
def _WriteInstanceStatus(filename, data):
378
  """Writes the per-group instance status file.
379

  
380
  The entries are sorted.
375 381

  
376
  @todo: Think about doing this per nodegroup, too
382
  @type filename: string
383
  @param filename: Path to instance status file
384
  @type data: list of tuple; (instance name as string, status as string)
385
  @param data: Instance name and status
377 386

  
378 387
  """
379
  op = opcodes.OpInstanceQuery(output_fields=["name", "status"], names=[],
380
                               use_locking=True)
381
  job_id = cl.SubmitJob([op])
382
  (result, ) = cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
388
  logging.debug("Updating instance status file '%s' with %s instances",
389
                filename, len(data))
383 390

  
384
  cl.ArchiveJob(job_id)
391
  utils.WriteFile(filename,
392
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
393
                                   sorted(data))))
394

  
395

  
396
def _UpdateInstanceStatus(filename, instances):
397
  """Writes an instance status file from L{Instance} objects.
398

  
399
  @type filename: string
400
  @param filename: Path to status file
401
  @type instances: list of L{Instance}
402

  
403
  """
404
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
405
                                  for inst in instances])
406

  
407

  
408
class _StatCb:
409
  """Helper to store file handle's C{fstat}.
410

  
411
  """
412
  def __init__(self):
413
    """Initializes this class.
414

  
415
    """
416
    self.st = None
417

  
418
  def __call__(self, fh):
419
    """Calls C{fstat} on file handle.
385 420

  
386
  logging.debug("Got instance data, writing status file %s", filename)
421
    """
422
    self.st = os.fstat(fh.fileno())
387 423

  
388
  utils.WriteFile(filename, data="".join("%s %s\n" % (name, status)
389
                                         for (name, status) in result))
424

  
425
def _ReadInstanceStatus(filename):
426
  """Reads an instance status file.
427

  
428
  @type filename: string
429
  @param filename: Path to status file
430
  @rtype: tuple; (None or number, list of lists containing instance name and
431
    status)
432
  @return: File's mtime and instance status contained in the file; mtime is
433
    C{None} if file can't be read
434

  
435
  """
436
  logging.debug("Reading per-group instance status from '%s'", filename)
437

  
438
  statcb = _StatCb()
439
  try:
440
    content = utils.ReadFile(filename, preread=statcb)
441
  except EnvironmentError, err:
442
    if err.errno == errno.ENOENT:
443
      logging.error("Can't read '%s', does not exist (yet)", filename)
444
    else:
445
      logging.exception("Unable to read '%s', ignoring", filename)
446
    return (None, None)
447
  else:
448
    return (statcb.st.st_mtime, [line.split(1)
449
                                 for line in content.splitlines()])
450

  
451

  
452
def _MergeInstanceStatus(filename, pergroup_filename, groups):
453
  """Merges all per-group instance status files into a global one.
454

  
455
  @type filename: string
456
  @param filename: Path to global instance status file
457
  @type pergroup_filename: string
458
  @param pergroup_filename: Path to per-group status files, must contain "%s"
459
    to be replaced with group UUID
460
  @type groups: sequence
461
  @param groups: UUIDs of known groups
462

  
463
  """
464
  # Lock global status file in exclusive mode
465
  lock = utils.FileLock.Open(filename)
466
  try:
467
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
468
  except errors.LockError, err:
469
    # All per-group processes will lock and update the file. None of them
470
    # should take longer than 10 seconds (the value of
471
    # INSTANCE_STATUS_LOCK_TIMEOUT).
472
    logging.error("Can't acquire lock on instance status file '%s', not"
473
                  " updating: %s", filename, err)
474
    return
475

  
476
  logging.debug("Acquired exclusive lock on '%s'", filename)
477

  
478
  data = {}
479

  
480
  # Load instance status from all groups
481
  for group_uuid in groups:
482
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
483

  
484
    if mtime is not None:
485
      for (instance_name, status) in instdata:
486
        data.setdefault(instance_name, []).append((mtime, status))
487

  
488
  # Select last update based on file mtime
489
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
490
                for (instance_name, status) in data.items()]
491

  
492
  # Write the global status file. Don't touch file after it's been
493
  # updated--there is no lock anymore.
494
  _WriteInstanceStatus(filename, inststatus)
390 495

  
391 496

  
392 497
def GetLuxiClient(try_restart):
......
513 618

  
514 619
  _CheckMaster(client)
515 620
  _ArchiveJobs(client, opts.job_age)
516
  _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
517 621

  
518 622
  # Spawn child processes for all node groups
519 623
  _StartGroupChildren(client, opts.wait_children)
......
578 682
          dict((inst.name, inst) for inst in instances))
579 683

  
580 684

  
581
def _KnownGroup(uuid):
582
  """Checks if a group UUID is known by ssconf.
685
def _LoadKnownGroups():
686
  """Returns a list of all node groups known by L{ssconf}.
583 687

  
584 688
  """
585 689
  groups = ssconf.SimpleStore().GetNodegroupList()
586 690

  
587
  return compat.any(line.strip() and line.split()[0] == uuid
588
                    for line in groups)
691
  result = list(line.split(None, 1)[0] for line in groups
692
                if line.strip())
693

  
694
  if not compat.all(map(utils.UUID_RE.match, result)):
695
    raise errors.GenericError("Ssconf contains invalid group UUID")
696

  
697
  return result
589 698

  
590 699

  
591 700
def _GroupWatcher(opts):
......
601 710

  
602 711
  logging.info("Watcher for node group '%s'", group_uuid)
603 712

  
713
  known_groups = _LoadKnownGroups()
714

  
604 715
  # Check if node group is known
605
  if not _KnownGroup(group_uuid):
716
  if group_uuid not in known_groups:
606 717
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
607 718
                              group_uuid)
608 719

  
720
  # Group UUID has been verified and should not contain any dangerous characters
609 721
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
722
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
610 723

  
611 724
  logging.debug("Using state file %s", state_path)
612 725

  
......
624 737

  
625 738
    (nodes, instances) = _GetGroupData(client, group_uuid)
626 739

  
740
    # Update per-group instance status file
741
    _UpdateInstanceStatus(inst_status_path, instances.values())
742

  
743
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
744
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
745
                         known_groups)
746

  
627 747
    started = _CheckInstances(client, notepad, instances)
628 748
    _CheckDisks(client, notepad, nodes, instances, started)
629 749
    _VerifyDisks(client, group_uuid, nodes, instances)

Also available in: Unified diff