Revision 0ccb6461 snf-cyclades-app/synnefo/logic/reconciliation.py
b/snf-cyclades-app/synnefo/logic/reconciliation.py | ||
---|---|---|
66 | 66 |
from django.db import transaction |
67 | 67 |
from synnefo.db.models import (Backend, VirtualMachine, Flavor, |
68 | 68 |
pooled_rapi_client, Network, |
69 |
BackendNetwork) |
|
70 |
from synnefo.db.pools import IPPool |
|
69 |
BackendNetwork, BridgePoolTable, |
|
70 |
MacPrefixPoolTable) |
|
71 |
from synnefo.db import pools |
|
71 | 72 |
from synnefo.logic import utils, backend as backend_mod |
72 | 73 |
|
73 | 74 |
logger = logging.getLogger() |
... | ... | |
457 | 458 |
|
458 | 459 |
|
459 | 460 |
class NetworkReconciler(object): |
460 |
def __init__(self, logger, fix=False, conflicting_ips=False):
|
|
461 |
def __init__(self, logger, fix=False): |
|
461 | 462 |
self.log = logger |
462 |
self.conflicting_ips = conflicting_ips |
|
463 | 463 |
self.fix = fix |
464 | 464 |
|
465 | 465 |
@transaction.commit_on_success |
466 | 466 |
def reconcile_networks(self): |
467 | 467 |
# Get models from DB |
468 |
backends = Backend.objects.exclude(offline=True) |
|
469 |
networks = Network.objects.filter(deleted=False) |
|
468 |
self.backends = Backend.objects.exclude(offline=True)
|
|
469 |
self.networks = Network.objects.filter(deleted=False)
|
|
470 | 470 |
|
471 | 471 |
self.event_time = datetime.now() |
472 | 472 |
|
473 | 473 |
# Get info from all ganeti backends |
474 |
ganeti_networks = {} |
|
475 |
ganeti_hanging_networks = {} |
|
476 |
for b in backends: |
|
474 |
self.ganeti_networks = {}
|
|
475 |
self.ganeti_hanging_networks = {}
|
|
476 |
for b in self.backends:
|
|
477 | 477 |
g_nets = get_networks_from_ganeti(b) |
478 |
ganeti_networks[b] = g_nets |
|
478 |
self.ganeti_networks[b] = g_nets
|
|
479 | 479 |
g_hanging_nets = hanging_networks(b, g_nets) |
480 |
ganeti_hanging_networks[b] = g_hanging_nets |
|
481 |
|
|
482 |
# Perform reconciliation for each network |
|
483 |
for network in networks: |
|
484 |
ip_available_maps = [] |
|
485 |
ip_reserved_maps = [] |
|
486 |
for bend in backends: |
|
487 |
bnet = get_backend_network(network, bend) |
|
488 |
gnet = ganeti_networks[bend].get(network.id) |
|
489 |
if not bnet: |
|
490 |
if network.floating_ip_pool: |
|
491 |
# Network is a floating IP pool and does not exist in |
|
492 |
# backend. We need to create it |
|
493 |
bnet = self.reconcile_parted_network(network, bend) |
|
494 |
elif not gnet: |
|
495 |
# Network does not exist either in Ganeti nor in BD. |
|
496 |
continue |
|
497 |
else: |
|
498 |
# Network exists in Ganeti and not in DB. |
|
499 |
if network.action != "DESTROY" and not network.public: |
|
500 |
bnet = self.reconcile_parted_network(network, bend) |
|
501 |
else: |
|
502 |
continue |
|
503 |
|
|
504 |
if not gnet: |
|
505 |
# Network does not exist in Ganeti. If the network action |
|
506 |
# is DESTROY, we have to mark as deleted in DB, else we |
|
507 |
# have to create it in Ganeti. |
|
508 |
if network.action == "DESTROY": |
|
509 |
if bnet.operstate != "DELETED": |
|
510 |
self.reconcile_stale_network(bnet) |
|
511 |
else: |
|
512 |
self.reconcile_missing_network(network, bend) |
|
513 |
# Skip rest reconciliation! |
|
514 |
continue |
|
515 |
|
|
516 |
try: |
|
517 |
hanging_groups = ganeti_hanging_networks[bend][network.id] |
|
518 |
except KeyError: |
|
519 |
# Network is connected to all nodegroups |
|
520 |
hanging_groups = [] |
|
480 |
self.ganeti_hanging_networks[b] = g_hanging_nets |
|
521 | 481 |
|
522 |
if hanging_groups: |
|
523 |
# CASE-3: Ganeti networks not connected to all nodegroups |
|
524 |
self.reconcile_hanging_groups(network, bend, |
|
525 |
hanging_groups) |
|
526 |
continue |
|
482 |
self._reconcile_orphan_networks() |
|
527 | 483 |
|
528 |
if bnet.operstate != 'ACTIVE': |
|
529 |
# CASE-4: Unsynced network state. At this point the network |
|
530 |
# exists and is connected to all nodes so is must be |
|
531 |
# active! |
|
532 |
self.reconcile_unsynced_network(network, bend, bnet) |
|
484 |
for network in self.networks: |
|
485 |
self._reconcile_network(network) |
|
533 | 486 |
|
534 |
# Get ganeti IP Pools |
|
535 |
available_map, reserved_map = get_network_pool(gnet) |
|
536 |
ip_available_maps.append(available_map) |
|
537 |
ip_reserved_maps.append(reserved_map) |
|
487 |
@transaction.commit_on_success |
|
488 |
def _reconcile_network(self, network): |
|
489 |
"""Reconcile a network with corresponging Ganeti networks. |
|
490 |
|
|
491 |
Reconcile a Network and the associated BackendNetworks with the |
|
492 |
corresponding Ganeti networks in all Ganeti backends. |
|
493 |
|
|
494 |
""" |
|
495 |
network_ip_pool = network.get_pool() # X-Lock on IP Pool |
|
496 |
for bend in self.backends: |
|
497 |
bnet = get_backend_network(network, bend) |
|
498 |
gnet = self.ganeti_networks[bend].get(network.id) |
|
499 |
if not bnet: |
|
500 |
if network.floating_ip_pool: |
|
501 |
# Network is a floating IP pool and does not exist in |
|
502 |
# backend. We need to create it |
|
503 |
bnet = self.reconcile_parted_network(network, bend) |
|
504 |
elif not gnet: |
|
505 |
# Network does not exist either in Ganeti nor in BD. |
|
506 |
continue |
|
507 |
else: |
|
508 |
# Network exists in Ganeti and not in DB. |
|
509 |
if network.action != "DESTROY" and not network.public: |
|
510 |
bnet = self.reconcile_parted_network(network, bend) |
|
511 |
else: |
|
512 |
continue |
|
538 | 513 |
|
539 |
if ip_available_maps or ip_reserved_maps: |
|
540 |
# CASE-5: Unsynced IP Pools |
|
541 |
self.reconcile_ip_pools(network, ip_available_maps, |
|
542 |
ip_reserved_maps) |
|
514 |
if not gnet: |
|
515 |
# Network does not exist in Ganeti. If the network action |
|
516 |
# is DESTROY, we have to mark as deleted in DB, else we |
|
517 |
# have to create it in Ganeti. |
|
518 |
if network.action == "DESTROY": |
|
519 |
if bnet.operstate != "DELETED": |
|
520 |
self.reconcile_stale_network(bnet) |
|
521 |
else: |
|
522 |
self.reconcile_missing_network(network, bend) |
|
523 |
# Skip rest reconciliation! |
|
524 |
continue |
|
543 | 525 |
|
544 |
if self.conflicting_ips: |
|
545 |
self.detect_conflicting_ips() |
|
526 |
try: |
|
527 |
hanging_groups = self.ganeti_hanging_networks[bend][network.id] |
|
528 |
except KeyError: |
|
529 |
# Network is connected to all nodegroups |
|
530 |
hanging_groups = [] |
|
531 |
|
|
532 |
if hanging_groups: |
|
533 |
# CASE-3: Ganeti networks not connected to all nodegroups |
|
534 |
self.reconcile_hanging_groups(network, bend, |
|
535 |
hanging_groups) |
|
536 |
continue |
|
546 | 537 |
|
547 |
# CASE-6: Orphan networks |
|
548 |
self.reconcile_orphan_networks(networks, ganeti_networks) |
|
538 |
if bnet.operstate != 'ACTIVE': |
|
539 |
# CASE-4: Unsynced network state. At this point the network |
|
540 |
# exists and is connected to all nodes so is must be |
|
541 |
# active! |
|
542 |
self.reconcile_unsynced_network(network, bend, bnet) |
|
543 |
|
|
544 |
# Check that externally reserved IPs of the network in Ganeti are |
|
545 |
# also externally reserved to the IP pool |
|
546 |
externally_reserved = gnet['external_reservations'] |
|
547 |
for ip in externally_reserved.split(","): |
|
548 |
ip = ip.strip() |
|
549 |
if not network_ip_pool.is_reserved(ip): |
|
550 |
msg = ("D: IP '%s' is reserved for network '%s' in" |
|
551 |
" backend '%s' but not in DB.") |
|
552 |
self.log.info(msg, ip, network, bend) |
|
553 |
if self.fix: |
|
554 |
network_ip_pool.reserve(ip, external=True) |
|
555 |
network_ip_pool.save() |
|
556 |
self.log.info("F: Reserved IP '%s'", ip) |
|
549 | 557 |
|
550 | 558 |
def reconcile_parted_network(self, network, backend): |
551 | 559 |
self.log.info("D: Missing DB entry for network %s in backend %s", |
... | ... | |
595 | 603 |
"success", |
596 | 604 |
"Reconciliation simulated eventd") |
597 | 605 |
|
598 |
def reconcile_ip_pools(self, network, available_maps, reserved_maps): |
|
599 |
available_map = reduce(lambda x, y: x & y, available_maps) |
|
600 |
reserved_map = reduce(lambda x, y: x & y, reserved_maps) |
|
601 |
|
|
602 |
pool = network.get_pool() |
|
603 |
# Temporary release unused floating IPs |
|
604 |
temp_pool = network.get_pool() |
|
605 |
used_ips = network.nics.values_list("ipv4", flat=True) |
|
606 |
unused_static_ips = network.floating_ips.exclude(ipv4__in=used_ips) |
|
607 |
map(lambda ip: temp_pool.put(ip.ipv4), unused_static_ips) |
|
608 |
if temp_pool.available != available_map: |
|
609 |
self.log.info("D: Unsynced available map of network %s:\n" |
|
610 |
"\tDB: %r\n\tGB: %r", network, |
|
611 |
temp_pool.available.to01(), |
|
612 |
available_map.to01()) |
|
613 |
if self.fix: |
|
614 |
pool.available = available_map |
|
615 |
# Release unsued floating IPs, as they are not included in the |
|
616 |
# available map |
|
617 |
map(lambda ip: pool.reserve(ip.ipv4), unused_static_ips) |
|
618 |
pool.save() |
|
619 |
if pool.reserved != reserved_map: |
|
620 |
self.log.info("D: Unsynced reserved map of network %s:\n" |
|
621 |
"\tDB: %r\n\tGB: %r", network, pool.reserved.to01(), |
|
622 |
reserved_map.to01()) |
|
623 |
if self.fix: |
|
624 |
pool.reserved = reserved_map |
|
625 |
pool.save() |
|
626 |
|
|
627 |
def detect_conflicting_ips(self, network): |
|
628 |
"""Detect NIC's that have the same IP in the same network.""" |
|
629 |
machine_ips = network.nics.all().values_list('ipv4', 'machine') |
|
630 |
ips = map(lambda x: x[0], machine_ips) |
|
631 |
distinct_ips = set(ips) |
|
632 |
if len(distinct_ips) < len(ips): |
|
633 |
for i in distinct_ips: |
|
634 |
ips.remove(i) |
|
635 |
for i in ips: |
|
636 |
machines = [utils.id_to_instance_name(x[1]) |
|
637 |
for x in machine_ips if x[0] == i] |
|
638 |
self.log.info('D: Conflicting IP:%s Machines: %s', |
|
639 |
i, ', '.join(machines)) |
|
640 |
|
|
641 |
def reconcile_orphan_networks(self, db_networks, ganeti_networks): |
|
606 |
def _reconcile_orphan_networks(self): |
|
607 |
db_networks = self.networks |
|
608 |
ganeti_networks = self.ganeti_networks |
|
642 | 609 |
# Detect Orphan Networks in Ganeti |
643 | 610 |
db_network_ids = set([net.id for net in db_networks]) |
644 | 611 |
for back_end, ganeti_networks in ganeti_networks.items(): |
... | ... | |
669 | 636 |
return None |
670 | 637 |
|
671 | 638 |
|
672 |
def get_network_pool(gnet): |
|
673 |
"""Return available and reserved IP maps. |
|
639 |
class PoolReconciler(object): |
|
640 |
def __init__(self, logger, fix=False): |
|
641 |
self.log = logger |
|
642 |
self.fix = fix |
|
643 |
|
|
644 |
def reconcile(self): |
|
645 |
self.reconcile_bridges() |
|
646 |
self.reconcile_mac_prefixes() |
|
647 |
for network in Network.objects.filter(deleted=False): |
|
648 |
self.reconcile_ip_pool(network) |
|
649 |
|
|
650 |
@transaction.commit_on_success |
|
651 |
def reconcile_bridges(self): |
|
652 |
networks = Network.objects.filter(deleted=False, |
|
653 |
flavor="PHYSICAL_VLAN") |
|
654 |
check_unique_values(objects=networks, field='link', logger=self.log) |
|
655 |
try: |
|
656 |
pool = BridgePoolTable.get_pool() |
|
657 |
except pools.EmptyPool: |
|
658 |
self.log.info("There is no available pool for bridges.") |
|
659 |
return |
|
660 |
|
|
661 |
used_bridges = set(networks.values_list('link', flat=True)) |
|
662 |
check_pool_consistent(pool=pool, pool_class=pools.BridgePool, |
|
663 |
used_values=used_bridges, fix=self.fix, |
|
664 |
logger=self.log) |
|
674 | 665 |
|
675 |
Extract the available and reserved IP map from the info return from Ganeti |
|
676 |
for a network. |
|
666 |
@transaction.commit_on_success |
|
667 |
def reconcile_mac_prefixes(self): |
|
668 |
networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED") |
|
669 |
check_unique_values(objects=networks, field='mac_prefix', |
|
670 |
logger=self.log) |
|
671 |
try: |
|
672 |
pool = MacPrefixPoolTable.get_pool() |
|
673 |
except pools.EmptyPool: |
|
674 |
self.log.info("There is no available pool for MAC prefixes.") |
|
675 |
return |
|
676 |
|
|
677 |
used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True)) |
|
678 |
check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool, |
|
679 |
used_values=used_mac_prefixes, fix=self.fix, |
|
680 |
logger=self.log) |
|
677 | 681 |
|
678 |
""" |
|
679 |
converter = IPPool(Foo(gnet['network'])) |
|
680 |
a_map = bitarray_from_map(gnet['map']) |
|
681 |
a_map.invert() |
|
682 |
reserved = gnet['external_reservations'] |
|
683 |
r_map = a_map.copy() |
|
684 |
r_map.setall(True) |
|
685 |
if reserved: |
|
686 |
for address in reserved.split(','): |
|
687 |
index = converter.value_to_index(address.strip()) |
|
688 |
a_map[index] = True |
|
689 |
r_map[index] = False |
|
690 |
return a_map, r_map |
|
691 |
|
|
692 |
|
|
693 |
def bitarray_from_map(bitmap): |
|
694 |
return bitarray.bitarray(bitmap.replace("X", "1").replace(".", "0")) |
|
695 |
|
|
696 |
|
|
697 |
class Foo(): |
|
698 |
def __init__(self, subnet): |
|
699 |
self.available_map = '' |
|
700 |
self.reserved_map = '' |
|
701 |
self.size = 0 |
|
702 |
self.network = Foo.Foo1(subnet) |
|
703 |
|
|
704 |
class Foo1(): |
|
705 |
def __init__(self, subnet): |
|
706 |
self.subnet = subnet |
|
707 |
self.gateway = None |
|
682 |
@transaction.commit_on_success |
|
683 |
def reconcile_ip_pool(self, network): |
|
684 |
# Check that all NICs have unique IPv4 address |
|
685 |
nics = network.nics.filter(ipv4__isnull=False) |
|
686 |
check_unique_values(objects=nics, field='ipv4', logger=self.log) |
|
687 |
|
|
688 |
# Check that all Floating IPs have unique IPv4 address |
|
689 |
floating_ips = network.floating_ips.filter(deleted=False) |
|
690 |
check_unique_values(objects=floating_ips, field='ipv4', |
|
691 |
logger=self.log) |
|
692 |
|
|
693 |
# First get(lock) the IP pool of the network to prevent new NICs |
|
694 |
# from being created. |
|
695 |
network_ip_pool = network.get_pool() |
|
696 |
used_ips = set(list(nics.values_list("ipv4", flat=True)) + |
|
697 |
list(floating_ips.values_list("ipv4", flat=True))) |
|
698 |
|
|
699 |
check_pool_consistent(pool=network_ip_pool, |
|
700 |
pool_class=pools.IPPool, |
|
701 |
used_values=used_ips, |
|
702 |
fix=self.fix, logger=self.log) |
|
703 |
|
|
704 |
|
|
705 |
def check_unique_values(objects, field, logger): |
|
706 |
used_values = list(objects.values_list(field, flat=True)) |
|
707 |
if len(used_values) != len(set(used_values)): |
|
708 |
duplicate_values = [v for v in used_values if used_values.count(v) > 1] |
|
709 |
for value in duplicate_values: |
|
710 |
filter_args = {field: value} |
|
711 |
using_objects = objects.filter(**filter_args) |
|
712 |
msg = "Value '%s' is used as %s for more than one objects: %s" |
|
713 |
logger.error(msg, value, field, ",".join(map(str, using_objects))) |
|
714 |
return False |
|
715 |
logger.debug("Values for field '%s' are unique.", field) |
|
716 |
return True |
|
717 |
|
|
718 |
|
|
719 |
def check_pool_consistent(pool, pool_class, used_values, fix, logger): |
|
720 |
dummy_pool = create_empty_pool(pool, pool_class) |
|
721 |
[dummy_pool.reserve(value) for value in used_values] |
|
722 |
if dummy_pool.available != pool.available: |
|
723 |
msg = "'%s' is not consistent!\nPool: %s\nUsed: %s" |
|
724 |
pool_diff = dummy_pool.available ^ pool.available |
|
725 |
for index in pool_diff.itersearch(bitarray.bitarray("1")): |
|
726 |
value = pool.index_to_value(int(index)) |
|
727 |
msg = "%s is incosistent! Value '%s' is %s but should be %s." |
|
728 |
value1 = pool.is_available(value) and "available" or "unavailable" |
|
729 |
value2 = dummy_pool.is_available(value) and "available"\ |
|
730 |
or "unavailable" |
|
731 |
logger.error(msg, pool, value, value1, value2) |
|
732 |
if fix: |
|
733 |
pool.available = dummy_pool.available |
|
734 |
pool.save() |
|
735 |
logger.info("Fixed available map of pool '%s'", pool) |
|
736 |
|
|
737 |
|
|
738 |
def create_empty_pool(pool, pool_class): |
|
739 |
pool_row = pool.pool_table |
|
740 |
pool_row.available_map = "" |
|
741 |
pool_row.reserved_map = "" |
|
742 |
return pool_class(pool_row) |
Also available in: Unified diff