Revision 0ccb6461 snf-cyclades-app/synnefo/logic/reconciliation.py

b/snf-cyclades-app/synnefo/logic/reconciliation.py
66 66
from django.db import transaction
67 67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68 68
                               pooled_rapi_client, Network,
69
                               BackendNetwork)
70
from synnefo.db.pools import IPPool
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
71 72
from synnefo.logic import utils, backend as backend_mod
72 73

  
73 74
logger = logging.getLogger()
......
457 458

  
458 459

  
459 460
class NetworkReconciler(object):
460
    def __init__(self, logger, fix=False, conflicting_ips=False):
461
    def __init__(self, logger, fix=False):
461 462
        self.log = logger
462
        self.conflicting_ips = conflicting_ips
463 463
        self.fix = fix
464 464

  
465 465
    @transaction.commit_on_success
466 466
    def reconcile_networks(self):
467 467
        # Get models from DB
468
        backends = Backend.objects.exclude(offline=True)
469
        networks = Network.objects.filter(deleted=False)
468
        self.backends = Backend.objects.exclude(offline=True)
469
        self.networks = Network.objects.filter(deleted=False)
470 470

  
471 471
        self.event_time = datetime.now()
472 472

  
473 473
        # Get info from all ganeti backends
474
        ganeti_networks = {}
475
        ganeti_hanging_networks = {}
476
        for b in backends:
474
        self.ganeti_networks = {}
475
        self.ganeti_hanging_networks = {}
476
        for b in self.backends:
477 477
            g_nets = get_networks_from_ganeti(b)
478
            ganeti_networks[b] = g_nets
478
            self.ganeti_networks[b] = g_nets
479 479
            g_hanging_nets = hanging_networks(b, g_nets)
480
            ganeti_hanging_networks[b] = g_hanging_nets
481

  
482
        # Perform reconciliation for each network
483
        for network in networks:
484
            ip_available_maps = []
485
            ip_reserved_maps = []
486
            for bend in backends:
487
                bnet = get_backend_network(network, bend)
488
                gnet = ganeti_networks[bend].get(network.id)
489
                if not bnet:
490
                    if network.floating_ip_pool:
491
                        # Network is a floating IP pool and does not exist in
492
                        # backend. We need to create it
493
                        bnet = self.reconcile_parted_network(network, bend)
494
                    elif not gnet:
495
                        # Network does not exist either in Ganeti nor in BD.
496
                        continue
497
                    else:
498
                        # Network exists in Ganeti and not in DB.
499
                        if network.action != "DESTROY" and not network.public:
500
                            bnet = self.reconcile_parted_network(network, bend)
501
                        else:
502
                            continue
503

  
504
                if not gnet:
505
                    # Network does not exist in Ganeti. If the network action
506
                    # is DESTROY, we have to mark as deleted in DB, else we
507
                    # have to create it in Ganeti.
508
                    if network.action == "DESTROY":
509
                        if bnet.operstate != "DELETED":
510
                            self.reconcile_stale_network(bnet)
511
                    else:
512
                        self.reconcile_missing_network(network, bend)
513
                    # Skip rest reconciliation!
514
                    continue
515

  
516
                try:
517
                    hanging_groups = ganeti_hanging_networks[bend][network.id]
518
                except KeyError:
519
                    # Network is connected to all nodegroups
520
                    hanging_groups = []
480
            self.ganeti_hanging_networks[b] = g_hanging_nets
521 481

  
522
                if hanging_groups:
523
                    # CASE-3: Ganeti networks not connected to all nodegroups
524
                    self.reconcile_hanging_groups(network, bend,
525
                                                  hanging_groups)
526
                    continue
482
        self._reconcile_orphan_networks()
527 483

  
528
                if bnet.operstate != 'ACTIVE':
529
                    # CASE-4: Unsynced network state. At this point the network
530
                    # exists and is connected to all nodes so is must be
531
                    # active!
532
                    self.reconcile_unsynced_network(network, bend, bnet)
484
        for network in self.networks:
485
            self._reconcile_network(network)
533 486

  
534
                # Get ganeti IP Pools
535
                available_map, reserved_map = get_network_pool(gnet)
536
                ip_available_maps.append(available_map)
537
                ip_reserved_maps.append(reserved_map)
487
    @transaction.commit_on_success
488
    def _reconcile_network(self, network):
489
        """Reconcile a network with corresponging Ganeti networks.
490

  
491
        Reconcile a Network and the associated BackendNetworks with the
492
        corresponding Ganeti networks in all Ganeti backends.
493

  
494
        """
495
        network_ip_pool = network.get_pool()  # X-Lock on IP Pool
496
        for bend in self.backends:
497
            bnet = get_backend_network(network, bend)
498
            gnet = self.ganeti_networks[bend].get(network.id)
499
            if not bnet:
500
                if network.floating_ip_pool:
501
                    # Network is a floating IP pool and does not exist in
502
                    # backend. We need to create it
503
                    bnet = self.reconcile_parted_network(network, bend)
504
                elif not gnet:
505
                    # Network does not exist either in Ganeti nor in BD.
506
                    continue
507
                else:
508
                    # Network exists in Ganeti and not in DB.
509
                    if network.action != "DESTROY" and not network.public:
510
                        bnet = self.reconcile_parted_network(network, bend)
511
                    else:
512
                        continue
538 513

  
539
            if ip_available_maps or ip_reserved_maps:
540
                # CASE-5: Unsynced IP Pools
541
                self.reconcile_ip_pools(network, ip_available_maps,
542
                                        ip_reserved_maps)
514
            if not gnet:
515
                # Network does not exist in Ganeti. If the network action
516
                # is DESTROY, we have to mark as deleted in DB, else we
517
                # have to create it in Ganeti.
518
                if network.action == "DESTROY":
519
                    if bnet.operstate != "DELETED":
520
                        self.reconcile_stale_network(bnet)
521
                else:
522
                    self.reconcile_missing_network(network, bend)
523
                # Skip rest reconciliation!
524
                continue
543 525

  
544
            if self.conflicting_ips:
545
                self.detect_conflicting_ips()
526
            try:
527
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
528
            except KeyError:
529
                # Network is connected to all nodegroups
530
                hanging_groups = []
531

  
532
            if hanging_groups:
533
                # CASE-3: Ganeti networks not connected to all nodegroups
534
                self.reconcile_hanging_groups(network, bend,
535
                                              hanging_groups)
536
                continue
546 537

  
547
        # CASE-6: Orphan networks
548
        self.reconcile_orphan_networks(networks, ganeti_networks)
538
            if bnet.operstate != 'ACTIVE':
539
                # CASE-4: Unsynced network state. At this point the network
540
                # exists and is connected to all nodes so is must be
541
                # active!
542
                self.reconcile_unsynced_network(network, bend, bnet)
543

  
544
            # Check that externally reserved IPs of the network in Ganeti are
545
            # also externally reserved to the IP pool
546
            externally_reserved = gnet['external_reservations']
547
            for ip in externally_reserved.split(","):
548
                ip = ip.strip()
549
                if not network_ip_pool.is_reserved(ip):
550
                    msg = ("D: IP '%s' is reserved for network '%s' in"
551
                           " backend '%s' but not in DB.")
552
                    self.log.info(msg, ip, network, bend)
553
                    if self.fix:
554
                        network_ip_pool.reserve(ip, external=True)
555
                        network_ip_pool.save()
556
                        self.log.info("F: Reserved IP '%s'", ip)
549 557

  
550 558
    def reconcile_parted_network(self, network, backend):
551 559
        self.log.info("D: Missing DB entry for network %s in backend %s",
......
595 603
                "success",
596 604
                "Reconciliation simulated eventd")
597 605

  
598
    def reconcile_ip_pools(self, network, available_maps, reserved_maps):
599
        available_map = reduce(lambda x, y: x & y, available_maps)
600
        reserved_map = reduce(lambda x, y: x & y, reserved_maps)
601

  
602
        pool = network.get_pool()
603
        # Temporary release unused floating IPs
604
        temp_pool = network.get_pool()
605
        used_ips = network.nics.values_list("ipv4", flat=True)
606
        unused_static_ips = network.floating_ips.exclude(ipv4__in=used_ips)
607
        map(lambda ip: temp_pool.put(ip.ipv4), unused_static_ips)
608
        if temp_pool.available != available_map:
609
            self.log.info("D: Unsynced available map of network %s:\n"
610
                          "\tDB: %r\n\tGB: %r", network,
611
                          temp_pool.available.to01(),
612
                          available_map.to01())
613
            if self.fix:
614
                pool.available = available_map
615
                # Release unsued floating IPs, as they are not included in the
616
                # available map
617
                map(lambda ip: pool.reserve(ip.ipv4), unused_static_ips)
618
                pool.save()
619
        if pool.reserved != reserved_map:
620
            self.log.info("D: Unsynced reserved map of network %s:\n"
621
                          "\tDB: %r\n\tGB: %r", network, pool.reserved.to01(),
622
                          reserved_map.to01())
623
            if self.fix:
624
                pool.reserved = reserved_map
625
                pool.save()
626

  
627
    def detect_conflicting_ips(self, network):
628
        """Detect NIC's that have the same IP in the same network."""
629
        machine_ips = network.nics.all().values_list('ipv4', 'machine')
630
        ips = map(lambda x: x[0], machine_ips)
631
        distinct_ips = set(ips)
632
        if len(distinct_ips) < len(ips):
633
            for i in distinct_ips:
634
                ips.remove(i)
635
            for i in ips:
636
                machines = [utils.id_to_instance_name(x[1])
637
                            for x in machine_ips if x[0] == i]
638
                self.log.info('D: Conflicting IP:%s Machines: %s',
639
                              i, ', '.join(machines))
640

  
641
    def reconcile_orphan_networks(self, db_networks, ganeti_networks):
606
    def _reconcile_orphan_networks(self):
607
        db_networks = self.networks
608
        ganeti_networks = self.ganeti_networks
642 609
        # Detect Orphan Networks in Ganeti
643 610
        db_network_ids = set([net.id for net in db_networks])
644 611
        for back_end, ganeti_networks in ganeti_networks.items():
......
669 636
        return None
670 637

  
671 638

  
672
def get_network_pool(gnet):
673
    """Return available and reserved IP maps.
639
class PoolReconciler(object):
640
    def __init__(self, logger, fix=False):
641
        self.log = logger
642
        self.fix = fix
643

  
644
    def reconcile(self):
645
        self.reconcile_bridges()
646
        self.reconcile_mac_prefixes()
647
        for network in Network.objects.filter(deleted=False):
648
            self.reconcile_ip_pool(network)
649

  
650
    @transaction.commit_on_success
651
    def reconcile_bridges(self):
652
        networks = Network.objects.filter(deleted=False,
653
                                          flavor="PHYSICAL_VLAN")
654
        check_unique_values(objects=networks, field='link', logger=self.log)
655
        try:
656
            pool = BridgePoolTable.get_pool()
657
        except pools.EmptyPool:
658
            self.log.info("There is no available pool for bridges.")
659
            return
660

  
661
        used_bridges = set(networks.values_list('link', flat=True))
662
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
663
                              used_values=used_bridges, fix=self.fix,
664
                              logger=self.log)
674 665

  
675
    Extract the available and reserved IP map from the info return from Ganeti
676
    for a network.
666
    @transaction.commit_on_success
667
    def reconcile_mac_prefixes(self):
668
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
669
        check_unique_values(objects=networks, field='mac_prefix',
670
                            logger=self.log)
671
        try:
672
            pool = MacPrefixPoolTable.get_pool()
673
        except pools.EmptyPool:
674
            self.log.info("There is no available pool for MAC prefixes.")
675
            return
676

  
677
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
678
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
679
                              used_values=used_mac_prefixes, fix=self.fix,
680
                              logger=self.log)
677 681

  
678
    """
679
    converter = IPPool(Foo(gnet['network']))
680
    a_map = bitarray_from_map(gnet['map'])
681
    a_map.invert()
682
    reserved = gnet['external_reservations']
683
    r_map = a_map.copy()
684
    r_map.setall(True)
685
    if reserved:
686
        for address in reserved.split(','):
687
            index = converter.value_to_index(address.strip())
688
            a_map[index] = True
689
            r_map[index] = False
690
    return a_map, r_map
691

  
692

  
693
def bitarray_from_map(bitmap):
694
    return bitarray.bitarray(bitmap.replace("X", "1").replace(".", "0"))
695

  
696

  
697
class Foo():
698
    def __init__(self, subnet):
699
        self.available_map = ''
700
        self.reserved_map = ''
701
        self.size = 0
702
        self.network = Foo.Foo1(subnet)
703

  
704
    class Foo1():
705
        def __init__(self, subnet):
706
            self.subnet = subnet
707
            self.gateway = None
682
    @transaction.commit_on_success
683
    def reconcile_ip_pool(self, network):
684
        # Check that all NICs have unique IPv4 address
685
        nics = network.nics.filter(ipv4__isnull=False)
686
        check_unique_values(objects=nics, field='ipv4', logger=self.log)
687

  
688
        # Check that all Floating IPs have unique IPv4 address
689
        floating_ips = network.floating_ips.filter(deleted=False)
690
        check_unique_values(objects=floating_ips, field='ipv4',
691
                            logger=self.log)
692

  
693
        # First get(lock) the IP pool of the network to prevent new NICs
694
        # from being created.
695
        network_ip_pool = network.get_pool()
696
        used_ips = set(list(nics.values_list("ipv4", flat=True)) +
697
                       list(floating_ips.values_list("ipv4", flat=True)))
698

  
699
        check_pool_consistent(pool=network_ip_pool,
700
                              pool_class=pools.IPPool,
701
                              used_values=used_ips,
702
                              fix=self.fix, logger=self.log)
703

  
704

  
705
def check_unique_values(objects, field, logger):
706
    used_values = list(objects.values_list(field, flat=True))
707
    if len(used_values) != len(set(used_values)):
708
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
709
        for value in duplicate_values:
710
            filter_args = {field: value}
711
            using_objects = objects.filter(**filter_args)
712
            msg = "Value '%s' is used as %s for more than one objects: %s"
713
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
714
        return False
715
    logger.debug("Values for field '%s' are unique.", field)
716
    return True
717

  
718

  
719
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
720
    dummy_pool = create_empty_pool(pool, pool_class)
721
    [dummy_pool.reserve(value) for value in used_values]
722
    if dummy_pool.available != pool.available:
723
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
724
        pool_diff = dummy_pool.available ^ pool.available
725
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
726
            value = pool.index_to_value(int(index))
727
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
728
            value1 = pool.is_available(value) and "available" or "unavailable"
729
            value2 = dummy_pool.is_available(value) and "available"\
730
                or "unavailable"
731
            logger.error(msg, pool, value, value1, value2)
732
        if fix:
733
            pool.available = dummy_pool.available
734
            pool.save()
735
            logger.info("Fixed available map of pool '%s'", pool)
736

  
737

  
738
def create_empty_pool(pool, pool_class):
739
    pool_row = pool.pool_table
740
    pool_row.available_map = ""
741
    pool_row.reserved_map = ""
742
    return pool_class(pool_row)

Also available in: Unified diff