Revision 9bb69bb5 lib/watcher/__init__.py
b/lib/watcher/__init__.py | ||
---|---|---|
33 | 33 |
import time |
34 | 34 |
import logging |
35 | 35 |
import operator |
36 |
import errno |
|
36 | 37 |
from optparse import OptionParser |
37 | 38 |
|
38 | 39 |
from ganeti import utils |
... | ... | |
69 | 70 |
#: Number of seconds to wait between starting child processes for node groups |
70 | 71 |
CHILD_PROCESS_DELAY = 1.0 |
71 | 72 |
|
73 |
#: How many seconds to wait for instance status file lock |
|
74 |
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 |
|
75 |
|
|
72 | 76 |
|
73 | 77 |
class NotMasterError(errors.GenericError): |
74 | 78 |
"""Exception raised when this host is not the master.""" |
... | ... | |
370 | 374 |
return (options, args) |
371 | 375 |
|
372 | 376 |
|
373 |
def _UpdateInstanceStatus(cl, filename): |
|
374 |
"""Get a list of instances on this cluster. |
|
377 |
def _WriteInstanceStatus(filename, data): |
|
378 |
"""Writes the per-group instance status file. |
|
379 |
|
|
380 |
The entries are sorted. |
|
375 | 381 |
|
376 |
@todo: Think about doing this per nodegroup, too |
|
382 |
@type filename: string |
|
383 |
@param filename: Path to instance status file |
|
384 |
@type data: list of tuple; (instance name as string, status as string) |
|
385 |
@param data: Instance name and status |
|
377 | 386 |
|
378 | 387 |
""" |
379 |
op = opcodes.OpInstanceQuery(output_fields=["name", "status"], names=[], |
|
380 |
use_locking=True) |
|
381 |
job_id = cl.SubmitJob([op]) |
|
382 |
(result, ) = cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) |
|
388 |
logging.debug("Updating instance status file '%s' with %s instances", |
|
389 |
filename, len(data)) |
|
383 | 390 |
|
384 |
cl.ArchiveJob(job_id) |
|
391 |
utils.WriteFile(filename, |
|
392 |
data="".join(map(compat.partial(operator.mod, "%s %s\n"), |
|
393 |
sorted(data)))) |
|
394 |
|
|
395 |
|
|
396 |
def _UpdateInstanceStatus(filename, instances): |
|
397 |
"""Writes an instance status file from L{Instance} objects. |
|
398 |
|
|
399 |
@type filename: string |
|
400 |
@param filename: Path to status file |
|
401 |
@type instances: list of L{Instance} |
|
402 |
|
|
403 |
""" |
|
404 |
_WriteInstanceStatus(filename, [(inst.name, inst.status) |
|
405 |
for inst in instances]) |
|
406 |
|
|
407 |
|
|
408 |
class _StatCb: |
|
409 |
"""Helper to store file handle's C{fstat}. |
|
410 |
|
|
411 |
""" |
|
412 |
def __init__(self): |
|
413 |
"""Initializes this class. |
|
414 |
|
|
415 |
""" |
|
416 |
self.st = None |
|
417 |
|
|
418 |
def __call__(self, fh): |
|
419 |
"""Calls C{fstat} on file handle. |
|
385 | 420 |
|
386 |
logging.debug("Got instance data, writing status file %s", filename) |
|
421 |
""" |
|
422 |
self.st = os.fstat(fh.fileno()) |
|
387 | 423 |
|
388 |
utils.WriteFile(filename, data="".join("%s %s\n" % (name, status) |
|
389 |
for (name, status) in result)) |
|
424 |
|
|
425 |
def _ReadInstanceStatus(filename): |
|
426 |
"""Reads an instance status file. |
|
427 |
|
|
428 |
@type filename: string |
|
429 |
@param filename: Path to status file |
|
430 |
@rtype: tuple; (None or number, list of lists containing instance name and |
|
431 |
status) |
|
432 |
@return: File's mtime and instance status contained in the file; mtime is |
|
433 |
C{None} if file can't be read |
|
434 |
|
|
435 |
""" |
|
436 |
logging.debug("Reading per-group instance status from '%s'", filename) |
|
437 |
|
|
438 |
statcb = _StatCb() |
|
439 |
try: |
|
440 |
content = utils.ReadFile(filename, preread=statcb) |
|
441 |
except EnvironmentError, err: |
|
442 |
if err.errno == errno.ENOENT: |
|
443 |
logging.error("Can't read '%s', does not exist (yet)", filename) |
|
444 |
else: |
|
445 |
logging.exception("Unable to read '%s', ignoring", filename) |
|
446 |
return (None, None) |
|
447 |
else: |
|
448 |
return (statcb.st.st_mtime, [line.split(1) |
|
449 |
for line in content.splitlines()]) |
|
450 |
|
|
451 |
|
|
452 |
def _MergeInstanceStatus(filename, pergroup_filename, groups): |
|
453 |
"""Merges all per-group instance status files into a global one. |
|
454 |
|
|
455 |
@type filename: string |
|
456 |
@param filename: Path to global instance status file |
|
457 |
@type pergroup_filename: string |
|
458 |
@param pergroup_filename: Path to per-group status files, must contain "%s" |
|
459 |
to be replaced with group UUID |
|
460 |
@type groups: sequence |
|
461 |
@param groups: UUIDs of known groups |
|
462 |
|
|
463 |
""" |
|
464 |
# Lock global status file in exclusive mode |
|
465 |
lock = utils.FileLock.Open(filename) |
|
466 |
try: |
|
467 |
lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) |
|
468 |
except errors.LockError, err: |
|
469 |
# All per-group processes will lock and update the file. None of them |
|
470 |
# should take longer than 10 seconds (the value of |
|
471 |
# INSTANCE_STATUS_LOCK_TIMEOUT). |
|
472 |
logging.error("Can't acquire lock on instance status file '%s', not" |
|
473 |
" updating: %s", filename, err) |
|
474 |
return |
|
475 |
|
|
476 |
logging.debug("Acquired exclusive lock on '%s'", filename) |
|
477 |
|
|
478 |
data = {} |
|
479 |
|
|
480 |
# Load instance status from all groups |
|
481 |
for group_uuid in groups: |
|
482 |
(mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) |
|
483 |
|
|
484 |
if mtime is not None: |
|
485 |
for (instance_name, status) in instdata: |
|
486 |
data.setdefault(instance_name, []).append((mtime, status)) |
|
487 |
|
|
488 |
# Select last update based on file mtime |
|
489 |
inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) |
|
490 |
for (instance_name, status) in data.items()] |
|
491 |
|
|
492 |
# Write the global status file. Don't touch file after it's been |
|
493 |
# updated--there is no lock anymore. |
|
494 |
_WriteInstanceStatus(filename, inststatus) |
|
390 | 495 |
|
391 | 496 |
|
392 | 497 |
def GetLuxiClient(try_restart): |
... | ... | |
513 | 618 |
|
514 | 619 |
_CheckMaster(client) |
515 | 620 |
_ArchiveJobs(client, opts.job_age) |
516 |
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE) |
|
517 | 621 |
|
518 | 622 |
# Spawn child processes for all node groups |
519 | 623 |
_StartGroupChildren(client, opts.wait_children) |
... | ... | |
578 | 682 |
dict((inst.name, inst) for inst in instances)) |
579 | 683 |
|
580 | 684 |
|
581 |
def _KnownGroup(uuid):
|
|
582 |
"""Checks if a group UUID is known by ssconf.
|
|
685 |
def _LoadKnownGroups():
|
|
686 |
"""Returns a list of all node groups known by L{ssconf}.
|
|
583 | 687 |
|
584 | 688 |
""" |
585 | 689 |
groups = ssconf.SimpleStore().GetNodegroupList() |
586 | 690 |
|
587 |
return compat.any(line.strip() and line.split()[0] == uuid |
|
588 |
for line in groups) |
|
691 |
result = list(line.split(None, 1)[0] for line in groups |
|
692 |
if line.strip()) |
|
693 |
|
|
694 |
if not compat.all(map(utils.UUID_RE.match, result)): |
|
695 |
raise errors.GenericError("Ssconf contains invalid group UUID") |
|
696 |
|
|
697 |
return result |
|
589 | 698 |
|
590 | 699 |
|
591 | 700 |
def _GroupWatcher(opts): |
... | ... | |
601 | 710 |
|
602 | 711 |
logging.info("Watcher for node group '%s'", group_uuid) |
603 | 712 |
|
713 |
known_groups = _LoadKnownGroups() |
|
714 |
|
|
604 | 715 |
# Check if node group is known |
605 |
if not _KnownGroup(group_uuid):
|
|
716 |
if group_uuid not in known_groups:
|
|
606 | 717 |
raise errors.GenericError("Node group '%s' is not known by ssconf" % |
607 | 718 |
group_uuid) |
608 | 719 |
|
720 |
# Group UUID has been verified and should not contain any dangerous characters |
|
609 | 721 |
state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid |
722 |
inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid |
|
610 | 723 |
|
611 | 724 |
logging.debug("Using state file %s", state_path) |
612 | 725 |
|
... | ... | |
624 | 737 |
|
625 | 738 |
(nodes, instances) = _GetGroupData(client, group_uuid) |
626 | 739 |
|
740 |
# Update per-group instance status file |
|
741 |
_UpdateInstanceStatus(inst_status_path, instances.values()) |
|
742 |
|
|
743 |
_MergeInstanceStatus(constants.INSTANCE_STATUS_FILE, |
|
744 |
constants.WATCHER_GROUP_INSTANCE_STATUS_FILE, |
|
745 |
known_groups) |
|
746 |
|
|
627 | 747 |
started = _CheckInstances(client, notepad, instances) |
628 | 748 |
_CheckDisks(client, notepad, nodes, instances, started) |
629 | 749 |
_VerifyDisks(client, group_uuid, nodes, instances) |
Also available in: Unified diff