Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ a9bb2a1a

History | View | Annotate | Download (29.6 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, backend as backend_mod
73

    
74
logger = logging.getLogger()
75
logging.basicConfig()
76

    
77
try:
78
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
79
except AttributeError:
80
    CHECK_INTERVAL = 60
81

    
82
GANETI_JOB_ERROR = "error"
83
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"]
84
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"]
85

    
86

    
87
class BackendReconciler(object):
88
    def __init__(self, backend, logger, options=None):
89
        self.backend = backend
90
        self.log = logger
91
        self.client = backend.get_client()
92
        if options is None:
93
            self.options = {}
94
        else:
95
            self.options = options
96

    
97
    def close(self):
98
        self.backend.put_client(self.client)
99

    
100
    @transaction.commit_on_success
101
    def reconcile(self):
102
        log = self.log
103
        backend = self.backend
104
        log.debug("Reconciling backend %s", backend)
105

    
106
        self.db_servers = get_database_servers(backend)
107
        self.db_servers_keys = set(self.db_servers.keys())
108
        log.debug("Got servers info from database.")
109

    
110
        self.gnt_servers = get_ganeti_servers(backend)
111
        self.gnt_servers_keys = set(self.gnt_servers.keys())
112
        log.debug("Got servers info from Ganeti backend.")
113

    
114
        self.gnt_jobs = get_ganeti_jobs(backend)
115
        log.debug("Got jobs from Ganeti backend")
116

    
117
        self.event_time = datetime.now()
118

    
119
        self.stale_servers = self.reconcile_stale_servers()
120
        self.orphan_servers = self.reconcile_orphan_servers()
121
        self.unsynced_servers = self.reconcile_unsynced_servers()
122
        self.close()
123

    
124
    def get_build_status(self, db_server):
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            gnt_job_status = self.gnt_jobs[job_id]["status"]
128
            if gnt_job_status == GANETI_JOB_ERROR:
129
                return "ERROR"
130
            elif gnt_job_status not in GANETI_JOBS_FINALIZED:
131
                return "RUNNING"
132
            else:
133
                return "FINALIZED"
134
        else:
135
            return "ERROR"
136

    
137
    def reconcile_stale_servers(self):
138
        # Detect stale servers
139
        stale = []
140
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
141
        for server_id in stale_keys:
142
            db_server = self.db_servers[server_id]
143
            if db_server.operstate == "BUILD":
144
                build_status = self.get_build_status(db_server)
145
                if build_status == "ERROR":
146
                    # Special handling of BUILD eerrors
147
                    self.reconcile_building_server(db_server)
148
                elif build_status != "RUNNING":
149
                    stale.append(server_id)
150
            elif (db_server.operstate == "ERROR" and
151
                  db_server.action != "DESTROY"):
152
                # Servers at building ERROR are stale only if the user has
153
                # asked to destroy them.
154
                pass
155
            else:
156
                stale.append(server_id)
157

    
158
        # Report them
159
        if stale:
160
            self.log.info("Found stale servers %s at backend %s",
161
                          ", ".join(map(str, stale)), self.backend)
162
        else:
163
            self.log.debug("No stale servers at backend %s", self.backend)
164

    
165
        # Fix them
166
        if stale and self.options["fix_stale"]:
167
            for server_id in stale:
168
                db_server = self.db_servers[server_id]
169
                backend_mod.process_op_status(
170
                    vm=db_server,
171
                    etime=self.event_time,
172
                    jobid=-0,
173
                    opcode='OP_INSTANCE_REMOVE', status='success',
174
                    logmsg='Reconciliation: simulated Ganeti event')
175
            self.log.debug("Simulated Ganeti removal for stale servers.")
176

    
177
    def reconcile_orphan_servers(self):
178
        orphans = self.gnt_servers_keys - self.db_servers_keys
179
        if orphans:
180
            self.log.info("Found orphan servers %s at backend %s",
181
                          ", ".join(map(str, orphans)), self.backend)
182
        else:
183
            self.log.debug("No orphan servers at backend %s", self.backend)
184

    
185
        if orphans and self.options["fix_orphans"]:
186
            for server_id in orphans:
187
                server_name = utils.id_to_instance_name(server_id)
188
                self.client.DeleteInstance(server_name)
189
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
190

    
191
    def reconcile_unsynced_servers(self):
192
        #log = self.log
193
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
194
            db_server = self.db_servers[server_id]
195
            gnt_server = self.gnt_servers[server_id]
196
            if db_server.operstate == "BUILD":
197
                build_status = self.get_build_status(db_server)
198
                if build_status == "RUNNING":
199
                    # Do not reconcile building VMs
200
                    continue
201
                elif build_status == "ERROR":
202
                    # Special handling of build errors
203
                    self.reconcile_building_server(db_server)
204
                    continue
205

    
206
            self.reconcile_unsynced_operstate(server_id, db_server,
207
                                              gnt_server)
208
            self.reconcile_unsynced_flavor(server_id, db_server,
209
                                           gnt_server)
210
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
211
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
212
            if db_server.task is not None:
213
                self.reconcile_pending_task(server_id, db_server)
214

    
215
    def reconcile_building_server(self, db_server):
216
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
217
                      db_server.id)
218
        if self.options["fix_unsynced"]:
219
            fix_opcode = "OP_INSTANCE_CREATE"
220
            backend_mod.process_op_status(
221
                vm=db_server,
222
                etime=self.event_time,
223
                jobid=-0,
224
                opcode=fix_opcode, status='error',
225
                logmsg='Reconciliation: simulated Ganeti event')
226
            self.log.debug("Simulated Ganeti error build event for"
227
                           " server '%s'", db_server.id)
228

    
229
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
230
        if db_server.operstate != gnt_server["state"]:
231
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
232
                          server_id, db_server.operstate, gnt_server["state"])
233
            if self.options["fix_unsynced"]:
234
                # If server is in building state, you will have first to
235
                # reconcile it's creation, to avoid wrong quotas
236
                if db_server.operstate == "BUILD":
237
                    backend_mod.process_op_status(
238
                        vm=db_server, etime=self.event_time, jobid=-0,
239
                        opcode="OP_INSTANCE_CREATE", status='success',
240
                        logmsg='Reconciliation: simulated Ganeti event')
241
                fix_opcode = "OP_INSTANCE_STARTUP"\
242
                    if gnt_server["state"] == "STARTED"\
243
                    else "OP_INSTANCE_SHUTDOWN"
244
                backend_mod.process_op_status(
245
                    vm=db_server, etime=self.event_time, jobid=-0,
246
                    opcode=fix_opcode, status='success',
247
                    logmsg='Reconciliation: simulated Ganeti event')
248
                self.log.debug("Simulated Ganeti state event for server '%s'",
249
                               server_id)
250

    
251
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
252
        db_flavor = db_server.flavor
253
        gnt_flavor = gnt_server["flavor"]
254
        if (db_flavor.ram != gnt_flavor["ram"] or
255
           db_flavor.cpu != gnt_flavor["vcpus"]):
256
            try:
257
                gnt_flavor = Flavor.objects.get(
258
                    ram=gnt_flavor["ram"],
259
                    cpu=gnt_flavor["vcpus"],
260
                    disk=db_flavor.disk,
261
                    disk_template=db_flavor.disk_template)
262
            except Flavor.DoesNotExist:
263
                self.log.warning("Server '%s' has unknown flavor.", server_id)
264
                return
265

    
266
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
267
                          " Ganeti", server_id, db_flavor, gnt_flavor)
268
            if self.options["fix_unsynced_flavors"]:
269
                old_state = db_server.operstate
270
                opcode = "OP_INSTANCE_SET_PARAMS"
271
                beparams = {"vcpus": gnt_flavor.cpu,
272
                            "minmem": gnt_flavor.ram,
273
                            "maxmem": gnt_flavor.ram}
274
                backend_mod.process_op_status(
275
                    vm=db_server, etime=self.event_time, jobid=-0,
276
                    opcode=opcode, status='success',
277
                    beparams=beparams,
278
                    logmsg='Reconciliation: simulated Ganeti event')
279
                # process_op_status with beparams will set the vmstate to
280
                # shutdown. Fix this be returning it to old state
281
                vm = VirtualMachine.objects.get(pk=server_id)
282
                vm.operstate = old_state
283
                vm.save()
284
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
285
                               server_id)
286

    
287
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
288
        db_nics = db_server.nics.order_by("index")
289
        gnt_nics = gnt_server["nics"]
290
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
291
        if backend_mod.nics_changed(db_nics, gnt_nics_parsed):
292
            msg = "Found unsynced NICs for server '%s'.\n\t"\
293
                  "DB: %s\n\tGaneti: %s"
294
            db_nics_str = ", ".join(map(format_db_nic, db_nics))
295
            gnt_nics_str = ", ".join(map(format_gnt_nic, gnt_nics_parsed))
296
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
297
            if self.options["fix_unsynced_nics"]:
298
                backend_mod.process_net_status(vm=db_server,
299
                                               etime=self.event_time,
300
                                               nics=gnt_nics)
301

    
302
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
303
        pass
304

    
305
    def reconcile_pending_task(self, server_id, db_server):
306
        job_id = db_server.task_job_id
307
        pending_task = False
308
        if job_id not in self.gnt_jobs:
309
            pending_task = True
310
        else:
311
            gnt_job_status = self.gnt_jobs[job_id]["status"]
312
            if gnt_job_status in GANETI_JOBS_FINALIZED:
313
                pending_task = True
314

    
315
        if pending_task:
316
            self.log.info("Found server '%s' with pending task: '%s'",
317
                          server_id, db_server.task)
318
            if self.options["fix_pending_tasks"]:
319
                db_server.task = None
320
                db_server.task_job_id = None
321
                db_server.save()
322
                self.log.info("Cleared pending task for server '%s", server_id)
323

    
324

    
325
def format_db_nic(nic):
326
    return "Index: %s, IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
327
           nic.ipv4, nic.network_id, nic.mac, nic.firewall_profile)
328

    
329

    
330
def format_gnt_nic(nic):
331
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" %\
332
           (nic["index"], nic["ipv4"], nic["network"], nic["mac"],
333
            nic["firewall_profile"])
334

    
335

    
336
#
337
# Networks
338
#
339

    
340

    
341
def get_networks_from_ganeti(backend):
342
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
343

    
344
    networks = {}
345
    with pooled_rapi_client(backend) as c:
346
        for net in c.GetNetworks(bulk=True):
347
            if net['name'].startswith(prefix):
348
                id = utils.id_from_network_name(net['name'])
349
                networks[id] = net
350

    
351
    return networks
352

    
353

    
354
def hanging_networks(backend, GNets):
355
    """Get networks that are not connected to all Nodegroups.
356

357
    """
358
    def get_network_groups(group_list):
359
        groups = set()
360
        for (name, mode, link) in group_list:
361
            groups.add(name)
362
        return groups
363

    
364
    with pooled_rapi_client(backend) as c:
365
        groups = set(c.GetGroups())
366

    
367
    hanging = {}
368
    for id, info in GNets.items():
369
        group_list = get_network_groups(info['group_list'])
370
        if group_list != groups:
371
            hanging[id] = groups - group_list
372
    return hanging
373

    
374

    
375
def get_online_backends():
376
    return Backend.objects.filter(offline=False)
377

    
378

    
379
def get_database_servers(backend):
380
    servers = backend.virtual_machines.select_related("nics", "flavor")\
381
                                      .filter(deleted=False)
382
    return dict([(s.id, s) for s in servers])
383

    
384

    
385
def get_ganeti_servers(backend):
386
    gnt_instances = backend_mod.get_instances(backend)
387
    # Filter out non-synnefo instances
388
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
389
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
390
                           gnt_instances)
391
    gnt_instances = map(parse_gnt_instance, gnt_instances)
392
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
393

    
394

    
395
def parse_gnt_instance(instance):
396
    try:
397
        instance_id = utils.id_from_instance_name(instance['name'])
398
    except Exception:
399
        logger.error("Ignoring instance with malformed name %s",
400
                     instance['name'])
401
        return (None, None)
402

    
403
    beparams = instance["beparams"]
404

    
405
    vcpus = beparams["vcpus"]
406
    ram = beparams["maxmem"]
407
    state = instance["oper_state"] and "STARTED" or "STOPPED"
408

    
409
    return {
410
        "id": instance_id,
411
        "state": state,  # FIX
412
        "updated": datetime.fromtimestamp(instance["mtime"]),
413
        "disks": disks_from_instance(instance),
414
        "nics": nics_from_instance(instance),
415
        "flavor": {"vcpus": vcpus,
416
                   "ram": ram},
417
        "tags": instance["tags"]
418
    }
419

    
420

    
421
def nics_from_instance(i):
422
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
423
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
424
    networks = zip(itertools.repeat('network'), i['nic.networks'])
425
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
426
    # links = zip(itertools.repeat('link'), i['nic.links'])
427
    # nics = zip(ips,macs,modes,networks,links)
428
    nics = zip(ips, macs, networks)
429
    nics = map(lambda x: dict(x), nics)
430
    #nics = dict(enumerate(nics))
431
    tags = i["tags"]
432
    for tag in tags:
433
        t = tag.split(":")
434
        if t[0:2] == ["synnefo", "network"]:
435
            if len(t) != 4:
436
                logger.error("Malformed synefo tag %s", tag)
437
                continue
438
            try:
439
                index = int(t[2])
440
                nics[index]['firewall'] = t[3]
441
            except ValueError:
442
                logger.error("Malformed synnefo tag %s", tag)
443
            except IndexError:
444
                logger.error("Found tag %s for non-existent NIC %d",
445
                             tag, index)
446
    return nics
447

    
448

    
449
def get_ganeti_jobs(backend):
450
    gnt_jobs = backend_mod.get_jobs(backend)
451
    return dict([(int(j["id"]), j) for j in gnt_jobs])
452

    
453

    
454
def disks_from_instance(i):
455
    return dict([(index, {"size": size})
456
                 for index, size in enumerate(i["disk.sizes"])])
457

    
458

    
459
class NetworkReconciler(object):
460
    def __init__(self, logger, fix=False):
461
        self.log = logger
462
        self.fix = fix
463

    
464
    @transaction.commit_on_success
465
    def reconcile_networks(self):
466
        # Get models from DB
467
        self.backends = Backend.objects.exclude(offline=True)
468
        self.networks = Network.objects.filter(deleted=False)
469

    
470
        self.event_time = datetime.now()
471

    
472
        # Get info from all ganeti backends
473
        self.ganeti_networks = {}
474
        self.ganeti_hanging_networks = {}
475
        for b in self.backends:
476
            g_nets = get_networks_from_ganeti(b)
477
            self.ganeti_networks[b] = g_nets
478
            g_hanging_nets = hanging_networks(b, g_nets)
479
            self.ganeti_hanging_networks[b] = g_hanging_nets
480

    
481
        self._reconcile_orphan_networks()
482

    
483
        for network in self.networks:
484
            self._reconcile_network(network)
485

    
486
    @transaction.commit_on_success
487
    def _reconcile_network(self, network):
488
        """Reconcile a network with corresponging Ganeti networks.
489

490
        Reconcile a Network and the associated BackendNetworks with the
491
        corresponding Ganeti networks in all Ganeti backends.
492

493
        """
494
        network_ip_pool = network.get_pool()  # X-Lock on IP Pool
495
        for bend in self.backends:
496
            bnet = get_backend_network(network, bend)
497
            gnet = self.ganeti_networks[bend].get(network.id)
498
            if not bnet:
499
                if network.floating_ip_pool:
500
                    # Network is a floating IP pool and does not exist in
501
                    # backend. We need to create it
502
                    bnet = self.reconcile_parted_network(network, bend)
503
                elif not gnet:
504
                    # Network does not exist either in Ganeti nor in BD.
505
                    continue
506
                else:
507
                    # Network exists in Ganeti and not in DB.
508
                    if network.action != "DESTROY" and not network.public:
509
                        bnet = self.reconcile_parted_network(network, bend)
510
                    else:
511
                        continue
512

    
513
            if not gnet:
514
                # Network does not exist in Ganeti. If the network action
515
                # is DESTROY, we have to mark as deleted in DB, else we
516
                # have to create it in Ganeti.
517
                if network.action == "DESTROY":
518
                    if bnet.operstate != "DELETED":
519
                        self.reconcile_stale_network(bnet)
520
                else:
521
                    self.reconcile_missing_network(network, bend)
522
                # Skip rest reconciliation!
523
                continue
524

    
525
            try:
526
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
527
            except KeyError:
528
                # Network is connected to all nodegroups
529
                hanging_groups = []
530

    
531
            if hanging_groups:
532
                # CASE-3: Ganeti networks not connected to all nodegroups
533
                self.reconcile_hanging_groups(network, bend,
534
                                              hanging_groups)
535
                continue
536

    
537
            if bnet.operstate != 'ACTIVE':
538
                # CASE-4: Unsynced network state. At this point the network
539
                # exists and is connected to all nodes so is must be
540
                # active!
541
                self.reconcile_unsynced_network(network, bend, bnet)
542

    
543
            # Check that externally reserved IPs of the network in Ganeti are
544
            # also externally reserved to the IP pool
545
            externally_reserved = gnet['external_reservations']
546
            if externally_reserved:
547
                for ip in externally_reserved.split(","):
548
                    ip = ip.strip()
549
                    if not network_ip_pool.is_reserved(ip):
550
                        msg = ("D: IP '%s' is reserved for network '%s' in"
551
                               " backend '%s' but not in DB.")
552
                        self.log.info(msg, ip, network, bend)
553
                        if self.fix:
554
                            network_ip_pool.reserve(ip, external=True)
555
                            network_ip_pool.save()
556
                            self.log.info("F: Reserved IP '%s'", ip)
557

    
558
    def reconcile_parted_network(self, network, backend):
559
        self.log.info("D: Missing DB entry for network %s in backend %s",
560
                      network, backend)
561
        if self.fix:
562
            network.create_backend_network(backend)
563
            self.log.info("F: Created DB entry")
564
            bnet = get_backend_network(network, backend)
565
            return bnet
566

    
567
    def reconcile_stale_network(self, backend_network):
568
        self.log.info("D: Stale DB entry for network %s in backend %s",
569
                      backend_network.network, backend_network.backend)
570
        if self.fix:
571
            backend_mod.process_network_status(
572
                backend_network, self.event_time, 0,
573
                "OP_NETWORK_REMOVE",
574
                "success",
575
                "Reconciliation simulated event")
576
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
577

    
578
    def reconcile_missing_network(self, network, backend):
579
        self.log.info("D: Missing Ganeti network %s in backend %s",
580
                      network, backend)
581
        if self.fix:
582
            backend_mod.create_network(network, backend)
583
            self.log.info("F: Issued OP_NETWORK_CONNECT")
584

    
585
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
586
        self.log.info('D: Network %s in backend %s is not connected to '
587
                      'the following groups:', network, backend)
588
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
589
        if self.fix:
590
            for group in hanging_groups:
591
                self.log.info('F: Connecting network %s to nodegroup %s',
592
                              network, group)
593
                backend_mod.connect_network(network, backend, depends=[],
594
                                            group=group)
595

    
596
    def reconcile_unsynced_network(self, network, backend, backend_network):
597
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
598
        if self.fix:
599
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
600
            backend_mod.process_network_status(
601
                backend_network, self.event_time, 0,
602
                "OP_NETWORK_CONNECT",
603
                "success",
604
                "Reconciliation simulated eventd")
605

    
606
    def _reconcile_orphan_networks(self):
607
        db_networks = self.networks
608
        ganeti_networks = self.ganeti_networks
609
        # Detect Orphan Networks in Ganeti
610
        db_network_ids = set([net.id for net in db_networks])
611
        for back_end, ganeti_networks in ganeti_networks.items():
612
            ganeti_network_ids = set(ganeti_networks.keys())
613
            orphans = ganeti_network_ids - db_network_ids
614

    
615
            if len(orphans) > 0:
616
                self.log.info('D: Orphan Networks in backend %s:',
617
                              back_end.clustername)
618
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
619
                if self.fix:
620
                    for net_id in orphans:
621
                        self.log.info('Disconnecting and deleting network %d',
622
                                      net_id)
623
                        try:
624
                            network = Network.objects.get(id=net_id)
625
                            backend_mod.delete_network(network,
626
                                                       backend=back_end)
627
                        except Network.DoesNotExist:
628
                            self.log.info("Not entry for network %s in DB !!",
629
                                          net_id)
630

    
631

    
632
def get_backend_network(network, backend):
633
    try:
634
        return BackendNetwork.objects.get(network=network, backend=backend)
635
    except BackendNetwork.DoesNotExist:
636
        return None
637

    
638

    
639
class PoolReconciler(object):
640
    def __init__(self, logger, fix=False):
641
        self.log = logger
642
        self.fix = fix
643

    
644
    def reconcile(self):
645
        self.reconcile_bridges()
646
        self.reconcile_mac_prefixes()
647
        for network in Network.objects.filter(deleted=False):
648
            self.reconcile_ip_pool(network)
649

    
650
    @transaction.commit_on_success
651
    def reconcile_bridges(self):
652
        networks = Network.objects.filter(deleted=False,
653
                                          flavor="PHYSICAL_VLAN")
654
        check_unique_values(objects=networks, field='link', logger=self.log)
655
        try:
656
            pool = BridgePoolTable.get_pool()
657
        except pools.EmptyPool:
658
            self.log.info("There is no available pool for bridges.")
659
            return
660

    
661
        used_bridges = set(networks.values_list('link', flat=True))
662
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
663
                              used_values=used_bridges, fix=self.fix,
664
                              logger=self.log)
665

    
666
    @transaction.commit_on_success
667
    def reconcile_mac_prefixes(self):
668
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
669
        check_unique_values(objects=networks, field='mac_prefix',
670
                            logger=self.log)
671
        try:
672
            pool = MacPrefixPoolTable.get_pool()
673
        except pools.EmptyPool:
674
            self.log.info("There is no available pool for MAC prefixes.")
675
            return
676

    
677
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
678
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
679
                              used_values=used_mac_prefixes, fix=self.fix,
680
                              logger=self.log)
681

    
682
    @transaction.commit_on_success
683
    def reconcile_ip_pool(self, network):
684
        # Check that all NICs have unique IPv4 address
685
        nics = network.nics.filter(ipv4__isnull=False)
686
        check_unique_values(objects=nics, field='ipv4', logger=self.log)
687

    
688
        # Check that all Floating IPs have unique IPv4 address
689
        floating_ips = network.floating_ips.filter(deleted=False)
690
        check_unique_values(objects=floating_ips, field='ipv4',
691
                            logger=self.log)
692

    
693
        # First get(lock) the IP pool of the network to prevent new NICs
694
        # from being created.
695
        network_ip_pool = network.get_pool()
696
        used_ips = set(list(nics.values_list("ipv4", flat=True)) +
697
                       list(floating_ips.values_list("ipv4", flat=True)))
698

    
699
        check_pool_consistent(pool=network_ip_pool,
700
                              pool_class=pools.IPPool,
701
                              used_values=used_ips,
702
                              fix=self.fix, logger=self.log)
703

    
704

    
705
def check_unique_values(objects, field, logger):
706
    used_values = list(objects.values_list(field, flat=True))
707
    if len(used_values) != len(set(used_values)):
708
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
709
        for value in duplicate_values:
710
            filter_args = {field: value}
711
            using_objects = objects.filter(**filter_args)
712
            msg = "Value '%s' is used as %s for more than one objects: %s"
713
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
714
        return False
715
    logger.debug("Values for field '%s' are unique.", field)
716
    return True
717

    
718

    
719
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
720
    dummy_pool = create_empty_pool(pool, pool_class)
721
    [dummy_pool.reserve(value) for value in used_values]
722
    if dummy_pool.available != pool.available:
723
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
724
        pool_diff = dummy_pool.available ^ pool.available
725
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
726
            value = pool.index_to_value(int(index))
727
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
728
            value1 = pool.is_available(value) and "available" or "unavailable"
729
            value2 = dummy_pool.is_available(value) and "available"\
730
                or "unavailable"
731
            logger.error(msg, pool, value, value1, value2)
732
        if fix:
733
            pool.available = dummy_pool.available
734
            pool.save()
735
            logger.info("Fixed available map of pool '%s'", pool)
736

    
737

    
738
def create_empty_pool(pool, pool_class):
739
    pool_row = pool.pool_table
740
    pool_row.available_map = ""
741
    pool_row.reserved_map = ""
742
    return pool_class(pool_row)