Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 7b438672

History | View | Annotate | Download (30.3 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73

    
74
logger = logging.getLogger()
75
logging.basicConfig()
76

    
77
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
78

    
79

    
80
class BackendReconciler(object):
81
    def __init__(self, backend, logger, options=None):
82
        self.backend = backend
83
        self.log = logger
84
        self.client = backend.get_client()
85
        if options is None:
86
            self.options = {}
87
        else:
88
            self.options = options
89

    
90
    def close(self):
91
        self.backend.put_client(self.client)
92

    
93
    @transaction.commit_on_success
94
    def reconcile(self):
95
        log = self.log
96
        backend = self.backend
97
        log.debug("Reconciling backend %s", backend)
98

    
99
        self.db_servers = get_database_servers(backend)
100
        self.db_servers_keys = set(self.db_servers.keys())
101
        log.debug("Got servers info from database.")
102

    
103
        self.gnt_servers = get_ganeti_servers(backend)
104
        self.gnt_servers_keys = set(self.gnt_servers.keys())
105
        log.debug("Got servers info from Ganeti backend.")
106

    
107
        self.gnt_jobs = get_ganeti_jobs(backend)
108
        log.debug("Got jobs from Ganeti backend")
109

    
110
        self.event_time = datetime.now()
111

    
112
        self.stale_servers = self.reconcile_stale_servers()
113
        self.orphan_servers = self.reconcile_orphan_servers()
114
        self.unsynced_servers = self.reconcile_unsynced_servers()
115
        self.close()
116

    
117
    def get_build_status(self, db_server):
118
        job_id = db_server.backendjobid
119
        if job_id in self.gnt_jobs:
120
            gnt_job_status = self.gnt_jobs[job_id]["status"]
121
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
122
                return "ERROR"
123
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
124
                return "RUNNING"
125
            else:
126
                return "FINALIZED"
127
        else:
128
            return "ERROR"
129

    
130
    def reconcile_stale_servers(self):
131
        # Detect stale servers
132
        stale = []
133
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
134
        for server_id in stale_keys:
135
            db_server = self.db_servers[server_id]
136
            if db_server.operstate == "BUILD":
137
                build_status = self.get_build_status(db_server)
138
                if build_status == "ERROR":
139
                    # Special handling of BUILD eerrors
140
                    self.reconcile_building_server(db_server)
141
                elif build_status != "RUNNING":
142
                    stale.append(server_id)
143
            elif (db_server.operstate == "ERROR" and
144
                  db_server.action != "DESTROY"):
145
                # Servers at building ERROR are stale only if the user has
146
                # asked to destroy them.
147
                pass
148
            else:
149
                stale.append(server_id)
150

    
151
        # Report them
152
        if stale:
153
            self.log.info("Found stale servers %s at backend %s",
154
                          ", ".join(map(str, stale)), self.backend)
155
        else:
156
            self.log.debug("No stale servers at backend %s", self.backend)
157

    
158
        # Fix them
159
        if stale and self.options["fix_stale"]:
160
            for server_id in stale:
161
                db_server = self.db_servers[server_id]
162
                backend_mod.process_op_status(
163
                    vm=db_server,
164
                    etime=self.event_time,
165
                    jobid=-0,
166
                    opcode='OP_INSTANCE_REMOVE', status='success',
167
                    logmsg='Reconciliation: simulated Ganeti event')
168
            self.log.debug("Simulated Ganeti removal for stale servers.")
169

    
170
    def reconcile_orphan_servers(self):
171
        orphans = self.gnt_servers_keys - self.db_servers_keys
172
        if orphans:
173
            self.log.info("Found orphan servers %s at backend %s",
174
                          ", ".join(map(str, orphans)), self.backend)
175
        else:
176
            self.log.debug("No orphan servers at backend %s", self.backend)
177

    
178
        if orphans and self.options["fix_orphans"]:
179
            for server_id in orphans:
180
                server_name = utils.id_to_instance_name(server_id)
181
                self.client.DeleteInstance(server_name)
182
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
183

    
184
    def reconcile_unsynced_servers(self):
185
        #log = self.log
186
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
187
            db_server = self.db_servers[server_id]
188
            gnt_server = self.gnt_servers[server_id]
189
            if db_server.operstate == "BUILD":
190
                build_status = self.get_build_status(db_server)
191
                if build_status == "RUNNING":
192
                    # Do not reconcile building VMs
193
                    continue
194
                elif build_status == "ERROR":
195
                    # Special handling of build errors
196
                    self.reconcile_building_server(db_server)
197
                    continue
198

    
199
            self.reconcile_unsynced_operstate(server_id, db_server,
200
                                              gnt_server)
201
            self.reconcile_unsynced_flavor(server_id, db_server,
202
                                           gnt_server)
203
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
204
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
205
            if db_server.task is not None:
206
                self.reconcile_pending_task(server_id, db_server)
207

    
208
    def reconcile_building_server(self, db_server):
209
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
210
                      db_server.id)
211
        if self.options["fix_unsynced"]:
212
            fix_opcode = "OP_INSTANCE_CREATE"
213
            backend_mod.process_op_status(
214
                vm=db_server,
215
                etime=self.event_time,
216
                jobid=-0,
217
                opcode=fix_opcode, status='error',
218
                logmsg='Reconciliation: simulated Ganeti event')
219
            self.log.debug("Simulated Ganeti error build event for"
220
                           " server '%s'", db_server.id)
221

    
222
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
223
        if db_server.operstate != gnt_server["state"]:
224
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
225
                          server_id, db_server.operstate, gnt_server["state"])
226
            if self.options["fix_unsynced"]:
227
                # If server is in building state, you will have first to
228
                # reconcile it's creation, to avoid wrong quotas
229
                if db_server.operstate == "BUILD":
230
                    backend_mod.process_op_status(
231
                        vm=db_server, etime=self.event_time, jobid=-0,
232
                        opcode="OP_INSTANCE_CREATE", status='success',
233
                        logmsg='Reconciliation: simulated Ganeti event')
234
                fix_opcode = "OP_INSTANCE_STARTUP"\
235
                    if gnt_server["state"] == "STARTED"\
236
                    else "OP_INSTANCE_SHUTDOWN"
237
                backend_mod.process_op_status(
238
                    vm=db_server, etime=self.event_time, jobid=-0,
239
                    opcode=fix_opcode, status='success',
240
                    logmsg='Reconciliation: simulated Ganeti event')
241
                self.log.debug("Simulated Ganeti state event for server '%s'",
242
                               server_id)
243

    
244
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
245
        db_flavor = db_server.flavor
246
        gnt_flavor = gnt_server["flavor"]
247
        if (db_flavor.ram != gnt_flavor["ram"] or
248
           db_flavor.cpu != gnt_flavor["vcpus"]):
249
            try:
250
                gnt_flavor = Flavor.objects.get(
251
                    ram=gnt_flavor["ram"],
252
                    cpu=gnt_flavor["vcpus"],
253
                    disk=db_flavor.disk,
254
                    disk_template=db_flavor.disk_template)
255
            except Flavor.DoesNotExist:
256
                self.log.warning("Server '%s' has unknown flavor.", server_id)
257
                return
258

    
259
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
260
                          " Ganeti", server_id, db_flavor, gnt_flavor)
261
            if self.options["fix_unsynced_flavors"]:
262
                old_state = db_server.operstate
263
                opcode = "OP_INSTANCE_SET_PARAMS"
264
                beparams = {"vcpus": gnt_flavor.cpu,
265
                            "minmem": gnt_flavor.ram,
266
                            "maxmem": gnt_flavor.ram}
267
                backend_mod.process_op_status(
268
                    vm=db_server, etime=self.event_time, jobid=-0,
269
                    opcode=opcode, status='success',
270
                    job_fields={"beparams": beparams},
271
                    logmsg='Reconciliation: simulated Ganeti event')
272
                # process_op_status with beparams will set the vmstate to
273
                # shutdown. Fix this be returning it to old state
274
                vm = VirtualMachine.objects.get(pk=server_id)
275
                vm.operstate = old_state
276
                vm.save()
277
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
278
                               server_id)
279

    
280
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
281
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
282
        db_nics = db_server.nics.exclude(state="BUILD",
283
                                         created__lte=building_time) \
284
                                .order_by("id")
285
        gnt_nics = gnt_server["nics"]
286
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
287
        nics_changed = len(db_nics) != len(gnt_nics)
288
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
289
            gnt_nic_id, gnt_nic = gnt_nic
290
            if (db_nic.id == gnt_nic_id) and\
291
               backend_mod.nics_are_equal(db_nic, gnt_nic):
292
                continue
293
            else:
294
                nics_changed = True
295
                break
296
        if nics_changed:
297
            msg = "Found unsynced NICs for server '%s'.\n"\
298
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
299
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
300
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
301
                                         sorted(gnt_nics_parsed.items())))
302
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
303
            if self.options["fix_unsynced_nics"]:
304
                backend_mod.process_net_status(vm=db_server,
305
                                               etime=self.event_time,
306
                                               nics=gnt_nics)
307

    
308
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
309
        pass
310

    
311
    def reconcile_pending_task(self, server_id, db_server):
312
        job_id = db_server.task_job_id
313
        pending_task = False
314
        if job_id not in self.gnt_jobs:
315
            pending_task = True
316
        else:
317
            gnt_job_status = self.gnt_jobs[job_id]["status"]
318
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
319
                pending_task = True
320

    
321
        if pending_task:
322
            self.log.info("Found server '%s' with pending task: '%s'",
323
                          server_id, db_server.task)
324
            if self.options["fix_pending_tasks"]:
325
                db_server.task = None
326
                db_server.task_job_id = None
327
                db_server.save()
328
                self.log.info("Cleared pending task for server '%s", server_id)
329

    
330

    
331
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
332
                         "Firewall"]) + ": %s"
333

    
334

    
335
def format_db_nic(nic):
336
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
337
                      nic.mac, nic.index, nic.firewall_profile)
338

    
339

    
340
def format_gnt_nic(nic):
341
    nic_name, nic = nic
342
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
343
                      nic["network"].id, nic["mac"], nic["index"],
344
                      nic["firewall_profile"])
345

    
346

    
347
#
348
# Networks
349
#
350

    
351

    
352
def get_networks_from_ganeti(backend):
353
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
354

    
355
    networks = {}
356
    with pooled_rapi_client(backend) as c:
357
        for net in c.GetNetworks(bulk=True):
358
            if net['name'].startswith(prefix):
359
                id = utils.id_from_network_name(net['name'])
360
                networks[id] = net
361

    
362
    return networks
363

    
364

    
365
def hanging_networks(backend, GNets):
366
    """Get networks that are not connected to all Nodegroups.
367

368
    """
369
    def get_network_groups(group_list):
370
        groups = set()
371
        for (name, mode, link) in group_list:
372
            groups.add(name)
373
        return groups
374

    
375
    with pooled_rapi_client(backend) as c:
376
        groups = set(c.GetGroups())
377

    
378
    hanging = {}
379
    for id, info in GNets.items():
380
        group_list = get_network_groups(info['group_list'])
381
        if group_list != groups:
382
            hanging[id] = groups - group_list
383
    return hanging
384

    
385

    
386
def get_online_backends():
387
    return Backend.objects.filter(offline=False)
388

    
389

    
390
def get_database_servers(backend):
391
    servers = backend.virtual_machines.select_related("flavor")\
392
                                      .prefetch_related("nics__ips__subnet")\
393
                                      .filter(deleted=False)
394
    return dict([(s.id, s) for s in servers])
395

    
396

    
397
def get_ganeti_servers(backend):
398
    gnt_instances = backend_mod.get_instances(backend)
399
    # Filter out non-synnefo instances
400
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
401
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
402
                           gnt_instances)
403
    gnt_instances = map(parse_gnt_instance, gnt_instances)
404
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
405

    
406

    
407
def parse_gnt_instance(instance):
408
    try:
409
        instance_id = utils.id_from_instance_name(instance['name'])
410
    except Exception:
411
        logger.error("Ignoring instance with malformed name %s",
412
                     instance['name'])
413
        return (None, None)
414

    
415
    beparams = instance["beparams"]
416

    
417
    vcpus = beparams["vcpus"]
418
    ram = beparams["maxmem"]
419
    state = instance["oper_state"] and "STARTED" or "STOPPED"
420

    
421
    return {
422
        "id": instance_id,
423
        "state": state,  # FIX
424
        "updated": datetime.fromtimestamp(instance["mtime"]),
425
        "disks": disks_from_instance(instance),
426
        "nics": nics_from_instance(instance),
427
        "flavor": {"vcpus": vcpus,
428
                   "ram": ram},
429
        "tags": instance["tags"]
430
    }
431

    
432

    
433
def nics_from_instance(i):
434
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
435
    names = zip(itertools.repeat('name'), i['nic.names'])
436
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
437
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
438
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
439
    # links = zip(itertools.repeat('link'), i['nic.links'])
440
    # nics = zip(ips,macs,modes,networks,links)
441
    nics = zip(ips, names, macs, networks)
442
    nics = map(lambda x: dict(x), nics)
443
    #nics = dict(enumerate(nics))
444
    tags = i["tags"]
445
    for tag in tags:
446
        t = tag.split(":")
447
        if t[0:2] == ["synnefo", "network"]:
448
            if len(t) != 4:
449
                logger.error("Malformed synefo tag %s", tag)
450
                continue
451
            nic_name = t[2]
452
            firewall = t[3]
453
            [nic.setdefault("firewall", firewall)
454
             for nic in nics if nic["name"] == nic_name]
455
    return nics
456

    
457

    
458
def get_ganeti_jobs(backend):
459
    gnt_jobs = backend_mod.get_jobs(backend)
460
    return dict([(int(j["id"]), j) for j in gnt_jobs])
461

    
462

    
463
def disks_from_instance(i):
464
    return dict([(index, {"size": size})
465
                 for index, size in enumerate(i["disk.sizes"])])
466

    
467

    
468
class NetworkReconciler(object):
469
    def __init__(self, logger, fix=False):
470
        self.log = logger
471
        self.fix = fix
472

    
473
    @transaction.commit_on_success
474
    def reconcile_networks(self):
475
        # Get models from DB
476
        self.backends = Backend.objects.exclude(offline=True)
477
        self.networks = Network.objects.filter(deleted=False)
478

    
479
        self.event_time = datetime.now()
480

    
481
        # Get info from all ganeti backends
482
        self.ganeti_networks = {}
483
        self.ganeti_hanging_networks = {}
484
        for b in self.backends:
485
            g_nets = get_networks_from_ganeti(b)
486
            self.ganeti_networks[b] = g_nets
487
            g_hanging_nets = hanging_networks(b, g_nets)
488
            self.ganeti_hanging_networks[b] = g_hanging_nets
489

    
490
        self._reconcile_orphan_networks()
491

    
492
        for network in self.networks:
493
            self._reconcile_network(network)
494

    
495
    @transaction.commit_on_success
496
    def _reconcile_network(self, network):
497
        """Reconcile a network with corresponging Ganeti networks.
498

499
        Reconcile a Network and the associated BackendNetworks with the
500
        corresponding Ganeti networks in all Ganeti backends.
501

502
        """
503
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
504
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
505
        else:
506
            ip_pools = None
507
        for bend in self.backends:
508
            bnet = get_backend_network(network, bend)
509
            gnet = self.ganeti_networks[bend].get(network.id)
510
            if bnet is None and gnet is not None:
511
                # Network exists in backend but not in DB for this backend
512
                bnet = self.reconcile_parted_network(network, bend)
513

    
514
            if bnet is None:
515
                continue
516

    
517
            if gnet is None:
518
                # Network does not exist in Ganeti. If the network action
519
                # is DESTROY, we have to mark as deleted in DB, else we
520
                # have to create it in Ganeti.
521
                if network.action == "DESTROY":
522
                    if bnet.operstate != "DELETED":
523
                        self.reconcile_stale_network(bnet)
524
                else:
525
                    self.reconcile_missing_network(network, bend)
526
                # Skip rest reconciliation!
527
                continue
528

    
529
            try:
530
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
531
            except KeyError:
532
                # Network is connected to all nodegroups
533
                hanging_groups = []
534

    
535
            if hanging_groups:
536
                # CASE-3: Ganeti networks not connected to all nodegroups
537
                self.reconcile_hanging_groups(network, bend,
538
                                              hanging_groups)
539
                continue
540

    
541
            if bnet.operstate != 'ACTIVE':
542
                # CASE-4: Unsynced network state. At this point the network
543
                # exists and is connected to all nodes so is must be
544
                # active!
545
                self.reconcile_unsynced_network(network, bend, bnet)
546

    
547
            # Check that externally reserved IPs of the network in Ganeti are
548
            # also externally reserved to the IP pool
549
            externally_reserved = gnet['external_reservations']
550
            if externally_reserved and ip_pools is not None:
551
                for ip in externally_reserved.split(","):
552
                    ip = ip.strip()
553
                    for ip_pool in ip_pools:
554
                        if ip_pool.contains(ip):
555
                            if not ip_pool.is_reserved(ip):
556
                                msg = ("D: IP '%s' is reserved for network"
557
                                       " '%s' in backend '%s' but not in DB.")
558
                                self.log.info(msg, ip, network, bend)
559
                                if self.fix:
560
                                    ip_pool.reserve(ip, external=True)
561
                                    ip_pool.save()
562
                                    self.log.info("F: Reserved IP '%s'", ip)
563
        if network.state != "ACTIVE":
564
            network = Network.objects.select_for_update().get(id=network.id)
565
            backend_mod.update_network_state(network)
566

    
567
    def reconcile_parted_network(self, network, backend):
568
        self.log.info("D: Missing DB entry for network %s in backend %s",
569
                      network, backend)
570
        if self.fix:
571
            network.create_backend_network(backend)
572
            self.log.info("F: Created DB entry")
573
            bnet = get_backend_network(network, backend)
574
            return bnet
575

    
576
    def reconcile_stale_network(self, backend_network):
577
        self.log.info("D: Stale DB entry for network %s in backend %s",
578
                      backend_network.network, backend_network.backend)
579
        if self.fix:
580
            backend_mod.process_network_status(
581
                backend_network, self.event_time, 0,
582
                "OP_NETWORK_REMOVE",
583
                "success",
584
                "Reconciliation simulated event")
585
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
586

    
587
    def reconcile_missing_network(self, network, backend):
588
        self.log.info("D: Missing Ganeti network %s in backend %s",
589
                      network, backend)
590
        if self.fix:
591
            backend_mod.create_network(network, backend)
592
            self.log.info("F: Issued OP_NETWORK_CONNECT")
593

    
594
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
595
        self.log.info('D: Network %s in backend %s is not connected to '
596
                      'the following groups:', network, backend)
597
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
598
        if self.fix:
599
            for group in hanging_groups:
600
                self.log.info('F: Connecting network %s to nodegroup %s',
601
                              network, group)
602
                backend_mod.connect_network(network, backend, depends=[],
603
                                            group=group)
604

    
605
    def reconcile_unsynced_network(self, network, backend, backend_network):
606
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
607
        if self.fix:
608
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
609
            backend_mod.process_network_status(
610
                backend_network, self.event_time, 0,
611
                "OP_NETWORK_CONNECT",
612
                "success",
613
                "Reconciliation simulated eventd")
614

    
615
    def _reconcile_orphan_networks(self):
616
        db_networks = self.networks
617
        ganeti_networks = self.ganeti_networks
618
        # Detect Orphan Networks in Ganeti
619
        db_network_ids = set([net.id for net in db_networks])
620
        for back_end, ganeti_networks in ganeti_networks.items():
621
            ganeti_network_ids = set(ganeti_networks.keys())
622
            orphans = ganeti_network_ids - db_network_ids
623

    
624
            if len(orphans) > 0:
625
                self.log.info('D: Orphan Networks in backend %s:',
626
                              back_end.clustername)
627
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
628
                if self.fix:
629
                    for net_id in orphans:
630
                        self.log.info('Disconnecting and deleting network %d',
631
                                      net_id)
632
                        try:
633
                            network = Network.objects.get(id=net_id)
634
                            backend_mod.delete_network(network,
635
                                                       backend=back_end)
636
                        except Network.DoesNotExist:
637
                            self.log.info("Not entry for network %s in DB !!",
638
                                          net_id)
639

    
640

    
641
def get_backend_network(network, backend):
642
    try:
643
        return BackendNetwork.objects.get(network=network, backend=backend)
644
    except BackendNetwork.DoesNotExist:
645
        return None
646

    
647

    
648
class PoolReconciler(object):
649
    def __init__(self, logger, fix=False):
650
        self.log = logger
651
        self.fix = fix
652

    
653
    def reconcile(self):
654
        self.reconcile_bridges()
655
        self.reconcile_mac_prefixes()
656

    
657
        networks = Network.objects.prefetch_related("subnets")\
658
                                  .filter(deleted=False)
659
        for network in networks:
660
            for subnet in network.subnets.all():
661
                if subnet.ipversion == 4 and subnet.dhcp:
662
                    self.reconcile_ip_pool(network)
663

    
664
    @transaction.commit_on_success
665
    def reconcile_bridges(self):
666
        networks = Network.objects.filter(deleted=False,
667
                                          flavor="PHYSICAL_VLAN")
668
        check_unique_values(objects=networks, field='link', logger=self.log)
669
        try:
670
            pool = BridgePoolTable.get_pool()
671
        except pools.EmptyPool:
672
            self.log.info("There is no available pool for bridges.")
673
            return
674

    
675
        # Since pool is locked, no new networks may be created
676
        used_bridges = set(networks.values_list('link', flat=True))
677
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
678
                              used_values=used_bridges, fix=self.fix,
679
                              logger=self.log)
680

    
681
    @transaction.commit_on_success
682
    def reconcile_mac_prefixes(self):
683
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
684
        check_unique_values(objects=networks, field='mac_prefix',
685
                            logger=self.log)
686
        try:
687
            pool = MacPrefixPoolTable.get_pool()
688
        except pools.EmptyPool:
689
            self.log.info("There is no available pool for MAC prefixes.")
690
            return
691

    
692
        # Since pool is locked, no new network may be created
693
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
694
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
695
                              used_values=used_mac_prefixes, fix=self.fix,
696
                              logger=self.log)
697

    
698
    @transaction.commit_on_success
699
    def reconcile_ip_pool(self, network):
700
        # Check that all NICs have unique IPv4 address
701
        nics = network.ips.exclude(address__isnull=True).all()
702
        check_unique_values(objects=nics, field="address", logger=self.log)
703

    
704
        for ip_pool in network.get_ip_pools():
705
            # IP pool is now locked, so no new IPs may be created
706
            used_ips = ip_pool.pool_table.subnet\
707
                              .ips.exclude(address__isnull=True)\
708
                              .exclude(deleted=True)\
709
                              .values_list("address", flat=True)
710
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
711
            check_pool_consistent(pool=ip_pool,
712
                                  pool_class=pools.IPPool,
713
                                  used_values=used_ips,
714
                                  fix=self.fix, logger=self.log)
715

    
716

    
717
def check_unique_values(objects, field, logger):
718
    used_values = list(objects.values_list(field, flat=True))
719
    if len(used_values) != len(set(used_values)):
720
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
721
        for value in duplicate_values:
722
            filter_args = {field: value}
723
            using_objects = objects.filter(**filter_args)
724
            msg = "Value '%s' is used as %s for more than one objects: %s"
725
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
726
        return False
727
    logger.debug("Values for field '%s' are unique.", field)
728
    return True
729

    
730

    
731
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
732
    dummy_pool = create_empty_pool(pool, pool_class)
733
    [dummy_pool.reserve(value) for value in used_values]
734
    if dummy_pool.available != pool.available:
735
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
736
        pool_diff = dummy_pool.available ^ pool.available
737
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
738
            value = pool.index_to_value(int(index))
739
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
740
            value1 = pool.is_available(value) and "available" or "unavailable"
741
            value2 = dummy_pool.is_available(value) and "available"\
742
                or "unavailable"
743
            logger.error(msg, pool, value, value1, value2)
744
        if fix:
745
            pool.available = dummy_pool.available
746
            pool.save()
747
            logger.info("Fixed available map of pool '%s'", pool)
748

    
749

    
750
def create_empty_pool(pool, pool_class):
751
    pool_row = pool.pool_table
752
    pool_row.available_map = ""
753
    pool_row.reserved_map = ""
754
    return pool_class(pool_row)