Revision 89b2b908

b/snf-cyclades-app/synnefo/logic/management/commands/reconcile-networks.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2 2
#
3 3
# Redistribution and use in source and binary forms, with or without
4 4
# modification, are permitted provided that the following conditions
......
34 34
logic/reconciliation.py for a description of reconciliation rules.
35 35

  
36 36
"""
37
import datetime
38
import bitarray
39 37
import logging
40

  
41 38
from optparse import make_option
42

  
43 39
from django.core.management.base import BaseCommand
44
from django.db import transaction
45

  
46
from synnefo.db.models import Backend, Network, BackendNetwork
47
from synnefo.db.pools import IPPool
48
from synnefo.logic import reconciliation, utils
49
from synnefo.logic import backend as backend_mod
40
from synnefo.logic import reconciliation
50 41

  
51 42

  
52 43
class Command(BaseCommand):
......
93 84
            logger.setLevel(logging.WARNING)
94 85

  
95 86
        logger.addHandler(log_handler)
96
        reconciler = NetworkReconciler(logger=logger, fix=fix,
97
                                       conflicting_ips=conflicting_ips)
87
        reconciler = reconciliation.NetworkReconciler(
88
            logger=logger,
89
            fix=fix,
90
            conflicting_ips=conflicting_ips)
98 91
        reconciler.reconcile_networks()
99

  
100

  
101
class NetworkReconciler(object):
102
    def __init__(self, logger, fix=False, conflicting_ips=False):
103
        self.log = logger
104
        self.conflicting_ips = conflicting_ips
105
        self.fix = fix
106

  
107
    @transaction.commit_on_success
108
    def reconcile_networks(self):
109
        # Get models from DB
110
        backends = Backend.objects.exclude(offline=True)
111
        networks = Network.objects.filter(deleted=False)
112

  
113
        # Get info from all ganeti backends
114
        ganeti_networks = {}
115
        ganeti_hanging_networks = {}
116
        for b in backends:
117
            g_nets = reconciliation.get_networks_from_ganeti(b)
118
            ganeti_networks[b] = g_nets
119
            g_hanging_nets = reconciliation.hanging_networks(b, g_nets)
120
            ganeti_hanging_networks[b] = g_hanging_nets
121

  
122
        # Perform reconciliation for each network
123
        for network in networks:
124
            ip_available_maps = []
125
            ip_reserved_maps = []
126
            for bend in backends:
127
                bnet = get_backend_network(network, bend)
128
                gnet = ganeti_networks[bend].get(network.id)
129
                if not bnet:
130
                    if network.floating_ip_pool:
131
                        # Network is a floating IP pool and does not exist in
132
                        # backend. We need to create it
133
                        bnet = self.reconcile_parted_network(network, bend)
134
                    elif not gnet:
135
                        # Network does not exist either in Ganeti nor in BD.
136
                        continue
137
                    else:
138
                        # Network exists in Ganeti and not in DB.
139
                        if network.action != "DESTROY" and not network.public:
140
                            bnet = self.reconcile_parted_network(network, bend)
141

  
142
                if not gnet:
143
                    # Network does not exist in Ganeti. If the network action
144
                    # is DESTROY, we have to mark as deleted in DB, else we
145
                    # have to create it in Ganeti.
146
                    if network.action == "DESTROY":
147
                        if bnet.operstate != "DELETED":
148
                            self.reconcile_stale_network(bnet)
149
                    else:
150
                        self.reconcile_missing_network(network, bend)
151
                    # Skip rest reconciliation!
152
                    continue
153

  
154
                try:
155
                    hanging_groups = ganeti_hanging_networks[bend][network.id]
156
                except KeyError:
157
                    # Network is connected to all nodegroups
158
                    hanging_groups = []
159

  
160
                if hanging_groups:
161
                    # CASE-3: Ganeti networks not connected to all nodegroups
162
                    self.reconcile_hanging_groups(network, bend,
163
                                                  hanging_groups)
164
                    continue
165

  
166
                if bnet.operstate != 'ACTIVE':
167
                    # CASE-4: Unsynced network state. At this point the network
168
                    # exists and is connected to all nodes so is must be
169
                    # active!
170
                    self.reconcile_unsynced_network(network, bend, bnet)
171

  
172
                # Get ganeti IP Pools
173
                available_map, reserved_map = get_network_pool(gnet)
174
                ip_available_maps.append(available_map)
175
                ip_reserved_maps.append(reserved_map)
176

  
177
            if ip_available_maps or ip_reserved_maps:
178
                # CASE-5: Unsynced IP Pools
179
                self.reconcile_ip_pools(network, ip_available_maps,
180
                                        ip_reserved_maps)
181

  
182
            if self.conflicting_ips:
183
                self.detect_conflicting_ips()
184

  
185
        # CASE-6: Orphan networks
186
        self.reconcile_orphan_networks(networks, ganeti_networks)
187

  
188
    def reconcile_parted_network(self, network, backend):
189
        self.log.info("D: Missing DB entry for network %s in backend %s",
190
                      network, backend)
191
        if self.fix:
192
            network.create_backend_network(backend)
193
            self.log.info("F: Created DB entry")
194
            bnet = get_backend_network(network, backend)
195
            return bnet
196

  
197
    def reconcile_stale_network(self, backend_network):
198
        self.log.info("D: Stale DB entry for network %s in backend %s",
199
                      backend_network.network, backend_network.backend)
200
        if self.fix:
201
            etime = datetime.datetime.now()
202
            backend_mod.process_network_status(
203
                backend_network, etime, 0,
204
                "OP_NETWORK_REMOVE",
205
                "success",
206
                "Reconciliation simulated event")
207
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
208

  
209
    def reconcile_missing_network(self, network, backend):
210
        self.log.info("D: Missing Ganeti network %s in backend %s",
211
                      network, backend)
212
        if self.fix:
213
            backend_mod.create_network(network, backend)
214
            self.log.info("F: Issued OP_NETWORK_CONNECT")
215

  
216
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
217
        self.log.info('D: Network %s in backend %s is not connected to '
218
                      'the following groups:', network, backend)
219
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
220
        if self.fix:
221
            for group in hanging_groups:
222
                self.log.info('F: Connecting network %s to nodegroup %s',
223
                              network, group)
224
                backend_mod.connect_network(network, backend, depends=[],
225
                                            group=group)
226

  
227
    def reconcile_unsynced_network(self, network, backend, backend_network):
228
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
229
        if self.fix:
230
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
231
            etime = datetime.datetime.now()
232
            backend_mod.process_network_status(
233
                backend_network, etime, 0,
234
                "OP_NETWORK_CONNECT",
235
                "success",
236
                "Reconciliation simulated eventd")
237

  
238
    def reconcile_ip_pools(self, network, available_maps, reserved_maps):
239
        available_map = reduce(lambda x, y: x & y, available_maps)
240
        reserved_map = reduce(lambda x, y: x & y, reserved_maps)
241

  
242
        pool = network.get_pool()
243
        # Temporary release unused floating IPs
244
        temp_pool = network.get_pool()
245
        used_ips = network.nics.values_list("ipv4", flat=True)
246
        unused_static_ips = network.floating_ips.exclude(ipv4__in=used_ips)
247
        map(lambda ip: temp_pool.put(ip.ipv4), unused_static_ips)
248
        if temp_pool.available != available_map:
249
            self.log.info("D: Unsynced available map of network %s:\n"
250
                          "\tDB: %r\n\tGB: %r", network,
251
                          temp_pool.available.to01(),
252
                          available_map.to01())
253
            if self.fix:
254
                pool.available = available_map
255
                # Release unsued floating IPs, as they are not included in the
256
                # available map
257
                map(lambda ip: pool.reserve(ip.ipv4), unused_static_ips)
258
                pool.save()
259
        if pool.reserved != reserved_map:
260
            self.log.info("D: Unsynced reserved map of network %s:\n"
261
                          "\tDB: %r\n\tGB: %r", network, pool.reserved.to01(),
262
                          reserved_map.to01())
263
            if self.fix:
264
                pool.reserved = reserved_map
265
                pool.save()
266

  
267
    def detect_conflicting_ips(self, network):
268
        """Detect NIC's that have the same IP in the same network."""
269
        machine_ips = network.nics.all().values_list('ipv4', 'machine')
270
        ips = map(lambda x: x[0], machine_ips)
271
        distinct_ips = set(ips)
272
        if len(distinct_ips) < len(ips):
273
            for i in distinct_ips:
274
                ips.remove(i)
275
            for i in ips:
276
                machines = [utils.id_to_instance_name(x[1])
277
                            for x in machine_ips if x[0] == i]
278
                self.log.info('D: Conflicting IP:%s Machines: %s',
279
                              i, ', '.join(machines))
280

  
281
    def reconcile_orphan_networks(self, db_networks, ganeti_networks):
282
        # Detect Orphan Networks in Ganeti
283
        db_network_ids = set([net.id for net in db_networks])
284
        for back_end, ganeti_networks in ganeti_networks.items():
285
            ganeti_network_ids = set(ganeti_networks.keys())
286
            orphans = ganeti_network_ids - db_network_ids
287

  
288
            if len(orphans) > 0:
289
                self.log.info('D: Orphan Networks in backend %s:',
290
                              back_end.clustername)
291
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
292
                if self.fix:
293
                    for net_id in orphans:
294
                        self.log.info('Disconnecting and deleting network %d',
295
                                      net_id)
296
                        try:
297
                            network = Network.objects.get(id=net_id)
298
                            backend_mod.delete_network(network,
299
                                                       backend=back_end)
300
                        except Network.DoesNotExist:
301
                            self.log.info("Not entry for network %s in DB !!",
302
                                          net_id)
303

  
304

  
305
def get_backend_network(network, backend):
306
    try:
307
        return BackendNetwork.objects.get(network=network, backend=backend)
308
    except BackendNetwork.DoesNotExist:
309
        return None
310

  
311

  
312
def get_network_pool(gnet):
313
    """Return available and reserved IP maps.
314

  
315
    Extract the available and reserved IP map from the info return from Ganeti
316
    for a network.
317

  
318
    """
319
    converter = IPPool(Foo(gnet['network']))
320
    a_map = bitarray_from_map(gnet['map'])
321
    a_map.invert()
322
    reserved = gnet['external_reservations']
323
    r_map = a_map.copy()
324
    r_map.setall(True)
325
    for address in reserved.split(','):
326
        index = converter.value_to_index(address.strip())
327
        a_map[index] = True
328
        r_map[index] = False
329
    return a_map, r_map
330

  
331

  
332
def bitarray_from_map(bitmap):
333
    return bitarray.bitarray(bitmap.replace("X", "1").replace(".", "0"))
334

  
335

  
336
class Foo():
337
    def __init__(self, subnet):
338
        self.available_map = ''
339
        self.reserved_map = ''
340
        self.size = 0
341
        self.network = Foo.Foo1(subnet)
342

  
343
    class Foo1():
344
        def __init__(self, subnet):
345
            self.subnet = subnet
346
            self.gateway = None
b/snf-cyclades-app/synnefo/logic/reconciliation.py
67 67

  
68 68
import logging
69 69
import itertools
70
import bitarray
70 71
from datetime import datetime, timedelta
71 72

  
72 73
from django.db import transaction
73 74
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
74
                               pooled_rapi_client)
75
                               pooled_rapi_client, Network,
76
                               BackendNetwork)
77
from synnefo.db.pools import IPPool
75 78
from synnefo.logic import utils, backend as backend_mod
76 79
from synnefo.logic.rapi import GanetiApiError
77 80

  
......
435 438
def disks_from_instance(i):
436 439
    return dict([(index, {"size": size})
437 440
                 for index, size in enumerate(i["disk.sizes"])])
441

  
442

  
443
class NetworkReconciler(object):
444
    def __init__(self, logger, fix=False, conflicting_ips=False):
445
        self.log = logger
446
        self.conflicting_ips = conflicting_ips
447
        self.fix = fix
448

  
449
    @transaction.commit_on_success
450
    def reconcile_networks(self):
451
        # Get models from DB
452
        backends = Backend.objects.exclude(offline=True)
453
        networks = Network.objects.filter(deleted=False)
454

  
455
        self.event_time = datetime.now()
456

  
457
        # Get info from all ganeti backends
458
        ganeti_networks = {}
459
        ganeti_hanging_networks = {}
460
        for b in backends:
461
            g_nets = get_networks_from_ganeti(b)
462
            ganeti_networks[b] = g_nets
463
            g_hanging_nets = hanging_networks(b, g_nets)
464
            ganeti_hanging_networks[b] = g_hanging_nets
465

  
466
        # Perform reconciliation for each network
467
        for network in networks:
468
            ip_available_maps = []
469
            ip_reserved_maps = []
470
            for bend in backends:
471
                bnet = get_backend_network(network, bend)
472
                gnet = ganeti_networks[bend].get(network.id)
473
                if not bnet:
474
                    if network.floating_ip_pool:
475
                        # Network is a floating IP pool and does not exist in
476
                        # backend. We need to create it
477
                        bnet = self.reconcile_parted_network(network, bend)
478
                    elif not gnet:
479
                        # Network does not exist either in Ganeti nor in BD.
480
                        continue
481
                    else:
482
                        # Network exists in Ganeti and not in DB.
483
                        if network.action != "DESTROY" and not network.public:
484
                            bnet = self.reconcile_parted_network(network, bend)
485
                        else:
486
                            continue
487

  
488
                if not gnet:
489
                    # Network does not exist in Ganeti. If the network action
490
                    # is DESTROY, we have to mark as deleted in DB, else we
491
                    # have to create it in Ganeti.
492
                    if network.action == "DESTROY":
493
                        if bnet.operstate != "DELETED":
494
                            self.reconcile_stale_network(bnet)
495
                    else:
496
                        self.reconcile_missing_network(network, bend)
497
                    # Skip rest reconciliation!
498
                    continue
499

  
500
                try:
501
                    hanging_groups = ganeti_hanging_networks[bend][network.id]
502
                except KeyError:
503
                    # Network is connected to all nodegroups
504
                    hanging_groups = []
505

  
506
                if hanging_groups:
507
                    # CASE-3: Ganeti networks not connected to all nodegroups
508
                    self.reconcile_hanging_groups(network, bend,
509
                                                  hanging_groups)
510
                    continue
511

  
512
                if bnet.operstate != 'ACTIVE':
513
                    # CASE-4: Unsynced network state. At this point the network
514
                    # exists and is connected to all nodes so is must be
515
                    # active!
516
                    self.reconcile_unsynced_network(network, bend, bnet)
517

  
518
                # Get ganeti IP Pools
519
                available_map, reserved_map = get_network_pool(gnet)
520
                ip_available_maps.append(available_map)
521
                ip_reserved_maps.append(reserved_map)
522

  
523
            if ip_available_maps or ip_reserved_maps:
524
                # CASE-5: Unsynced IP Pools
525
                self.reconcile_ip_pools(network, ip_available_maps,
526
                                        ip_reserved_maps)
527

  
528
            if self.conflicting_ips:
529
                self.detect_conflicting_ips()
530

  
531
        # CASE-6: Orphan networks
532
        self.reconcile_orphan_networks(networks, ganeti_networks)
533

  
534
    def reconcile_parted_network(self, network, backend):
535
        self.log.info("D: Missing DB entry for network %s in backend %s",
536
                      network, backend)
537
        if self.fix:
538
            network.create_backend_network(backend)
539
            self.log.info("F: Created DB entry")
540
            bnet = get_backend_network(network, backend)
541
            return bnet
542

  
543
    def reconcile_stale_network(self, backend_network):
544
        self.log.info("D: Stale DB entry for network %s in backend %s",
545
                      backend_network.network, backend_network.backend)
546
        if self.fix:
547
            backend_mod.process_network_status(
548
                backend_network, self.event_time, 0,
549
                "OP_NETWORK_REMOVE",
550
                "success",
551
                "Reconciliation simulated event")
552
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
553

  
554
    def reconcile_missing_network(self, network, backend):
555
        self.log.info("D: Missing Ganeti network %s in backend %s",
556
                      network, backend)
557
        if self.fix:
558
            backend_mod.create_network(network, backend)
559
            self.log.info("F: Issued OP_NETWORK_CONNECT")
560

  
561
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
562
        self.log.info('D: Network %s in backend %s is not connected to '
563
                      'the following groups:', network, backend)
564
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
565
        if self.fix:
566
            for group in hanging_groups:
567
                self.log.info('F: Connecting network %s to nodegroup %s',
568
                              network, group)
569
                backend_mod.connect_network(network, backend, depends=[],
570
                                            group=group)
571

  
572
    def reconcile_unsynced_network(self, network, backend, backend_network):
573
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
574
        if self.fix:
575
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
576
            backend_mod.process_network_status(
577
                backend_network, self.event_time, 0,
578
                "OP_NETWORK_CONNECT",
579
                "success",
580
                "Reconciliation simulated eventd")
581

  
582
    def reconcile_ip_pools(self, network, available_maps, reserved_maps):
583
        available_map = reduce(lambda x, y: x & y, available_maps)
584
        reserved_map = reduce(lambda x, y: x & y, reserved_maps)
585

  
586
        pool = network.get_pool()
587
        # Temporary release unused floating IPs
588
        temp_pool = network.get_pool()
589
        used_ips = network.nics.values_list("ipv4", flat=True)
590
        unused_static_ips = network.floating_ips.exclude(ipv4__in=used_ips)
591
        map(lambda ip: temp_pool.put(ip.ipv4), unused_static_ips)
592
        if temp_pool.available != available_map:
593
            self.log.info("D: Unsynced available map of network %s:\n"
594
                          "\tDB: %r\n\tGB: %r", network,
595
                          temp_pool.available.to01(),
596
                          available_map.to01())
597
            if self.fix:
598
                pool.available = available_map
599
                # Release unsued floating IPs, as they are not included in the
600
                # available map
601
                map(lambda ip: pool.reserve(ip.ipv4), unused_static_ips)
602
                pool.save()
603
        if pool.reserved != reserved_map:
604
            self.log.info("D: Unsynced reserved map of network %s:\n"
605
                          "\tDB: %r\n\tGB: %r", network, pool.reserved.to01(),
606
                          reserved_map.to01())
607
            if self.fix:
608
                pool.reserved = reserved_map
609
                pool.save()
610

  
611
    def detect_conflicting_ips(self, network):
612
        """Detect NIC's that have the same IP in the same network."""
613
        machine_ips = network.nics.all().values_list('ipv4', 'machine')
614
        ips = map(lambda x: x[0], machine_ips)
615
        distinct_ips = set(ips)
616
        if len(distinct_ips) < len(ips):
617
            for i in distinct_ips:
618
                ips.remove(i)
619
            for i in ips:
620
                machines = [utils.id_to_instance_name(x[1])
621
                            for x in machine_ips if x[0] == i]
622
                self.log.info('D: Conflicting IP:%s Machines: %s',
623
                              i, ', '.join(machines))
624

  
625
    def reconcile_orphan_networks(self, db_networks, ganeti_networks):
626
        # Detect Orphan Networks in Ganeti
627
        db_network_ids = set([net.id for net in db_networks])
628
        for back_end, ganeti_networks in ganeti_networks.items():
629
            ganeti_network_ids = set(ganeti_networks.keys())
630
            orphans = ganeti_network_ids - db_network_ids
631

  
632
            if len(orphans) > 0:
633
                self.log.info('D: Orphan Networks in backend %s:',
634
                              back_end.clustername)
635
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
636
                if self.fix:
637
                    for net_id in orphans:
638
                        self.log.info('Disconnecting and deleting network %d',
639
                                      net_id)
640
                        try:
641
                            network = Network.objects.get(id=net_id)
642
                            backend_mod.delete_network(network,
643
                                                       backend=back_end)
644
                        except Network.DoesNotExist:
645
                            self.log.info("Not entry for network %s in DB !!",
646
                                          net_id)
647

  
648

  
649
def get_backend_network(network, backend):
650
    try:
651
        return BackendNetwork.objects.get(network=network, backend=backend)
652
    except BackendNetwork.DoesNotExist:
653
        return None
654

  
655

  
656
def get_network_pool(gnet):
657
    """Return available and reserved IP maps.
658

  
659
    Extract the available and reserved IP map from the info return from Ganeti
660
    for a network.
661

  
662
    """
663
    converter = IPPool(Foo(gnet['network']))
664
    a_map = bitarray_from_map(gnet['map'])
665
    a_map.invert()
666
    reserved = gnet['external_reservations']
667
    r_map = a_map.copy()
668
    r_map.setall(True)
669
    if reserved:
670
        for address in reserved.split(','):
671
            index = converter.value_to_index(address.strip())
672
            a_map[index] = True
673
            r_map[index] = False
674
    return a_map, r_map
675

  
676

  
677
def bitarray_from_map(bitmap):
678
    return bitarray.bitarray(bitmap.replace("X", "1").replace(".", "0"))
679

  
680

  
681
class Foo():
682
    def __init__(self, subnet):
683
        self.available_map = ''
684
        self.reserved_map = ''
685
        self.size = 0
686
        self.network = Foo.Foo1(subnet)
687

  
688
    class Foo1():
689
        def __init__(self, subnet):
690
            self.subnet = subnet
691
            self.gateway = None

Also available in: Unified diff