Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 4cbd934b

History | View | Annotate | Download (30.6 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, backend as backend_mod
73

    
74
logger = logging.getLogger()
75
logging.basicConfig()
76

    
77
try:
78
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
79
except AttributeError:
80
    CHECK_INTERVAL = 60
81

    
82
GANETI_JOB_ERROR = "error"
83
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"]
84
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"]
85

    
86

    
87
class BackendReconciler(object):
88
    def __init__(self, backend, logger, options=None):
89
        self.backend = backend
90
        self.log = logger
91
        self.client = backend.get_client()
92
        if options is None:
93
            self.options = {}
94
        else:
95
            self.options = options
96

    
97
    def close(self):
98
        self.backend.put_client(self.client)
99

    
100
    @transaction.commit_on_success
101
    def reconcile(self):
102
        log = self.log
103
        backend = self.backend
104
        log.debug("Reconciling backend %s", backend)
105

    
106
        self.db_servers = get_database_servers(backend)
107
        self.db_servers_keys = set(self.db_servers.keys())
108
        log.debug("Got servers info from database.")
109

    
110
        self.gnt_servers = get_ganeti_servers(backend)
111
        self.gnt_servers_keys = set(self.gnt_servers.keys())
112
        log.debug("Got servers info from Ganeti backend.")
113

    
114
        self.gnt_jobs = get_ganeti_jobs(backend)
115
        log.debug("Got jobs from Ganeti backend")
116

    
117
        self.event_time = datetime.now()
118

    
119
        self.stale_servers = self.reconcile_stale_servers()
120
        self.orphan_servers = self.reconcile_orphan_servers()
121
        self.unsynced_servers = self.reconcile_unsynced_servers()
122
        self.close()
123

    
124
    def get_build_status(self, db_server):
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            gnt_job_status = self.gnt_jobs[job_id]["status"]
128
            if gnt_job_status == GANETI_JOB_ERROR:
129
                return "ERROR"
130
            elif gnt_job_status not in GANETI_JOBS_FINALIZED:
131
                return "RUNNING"
132
            else:
133
                return "FINALIZED"
134
        else:
135
            return "ERROR"
136

    
137
    def reconcile_stale_servers(self):
138
        # Detect stale servers
139
        stale = []
140
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
141
        for server_id in stale_keys:
142
            db_server = self.db_servers[server_id]
143
            if db_server.operstate == "BUILD":
144
                build_status = self.get_build_status(db_server)
145
                if build_status == "ERROR":
146
                    # Special handling of BUILD eerrors
147
                    self.reconcile_building_server(db_server)
148
                elif build_status != "RUNNING":
149
                    stale.append(server_id)
150
            elif (db_server.operstate == "ERROR" and
151
                  db_server.action != "DESTROY"):
152
                # Servers at building ERROR are stale only if the user has
153
                # asked to destroy them.
154
                pass
155
            else:
156
                stale.append(server_id)
157

    
158
        # Report them
159
        if stale:
160
            self.log.info("Found stale servers %s at backend %s",
161
                          ", ".join(map(str, stale)), self.backend)
162
        else:
163
            self.log.debug("No stale servers at backend %s", self.backend)
164

    
165
        # Fix them
166
        if stale and self.options["fix_stale"]:
167
            for server_id in stale:
168
                db_server = self.db_servers[server_id]
169
                backend_mod.process_op_status(
170
                    vm=db_server,
171
                    etime=self.event_time,
172
                    jobid=-0,
173
                    opcode='OP_INSTANCE_REMOVE', status='success',
174
                    logmsg='Reconciliation: simulated Ganeti event')
175
            self.log.debug("Simulated Ganeti removal for stale servers.")
176

    
177
    def reconcile_orphan_servers(self):
178
        orphans = self.gnt_servers_keys - self.db_servers_keys
179
        if orphans:
180
            self.log.info("Found orphan servers %s at backend %s",
181
                          ", ".join(map(str, orphans)), self.backend)
182
        else:
183
            self.log.debug("No orphan servers at backend %s", self.backend)
184

    
185
        if orphans and self.options["fix_orphans"]:
186
            for server_id in orphans:
187
                server_name = utils.id_to_instance_name(server_id)
188
                self.client.DeleteInstance(server_name)
189
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
190

    
191
    def reconcile_unsynced_servers(self):
192
        #log = self.log
193
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
194
            db_server = self.db_servers[server_id]
195
            gnt_server = self.gnt_servers[server_id]
196
            if db_server.operstate == "BUILD":
197
                build_status = self.get_build_status(db_server)
198
                if build_status == "RUNNING":
199
                    # Do not reconcile building VMs
200
                    continue
201
                elif build_status == "ERROR":
202
                    # Special handling of build errors
203
                    self.reconcile_building_server(db_server)
204
                    continue
205

    
206
            self.reconcile_unsynced_operstate(server_id, db_server,
207
                                              gnt_server)
208
            self.reconcile_unsynced_flavor(server_id, db_server,
209
                                           gnt_server)
210
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
211
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
212
            if db_server.task is not None:
213
                self.reconcile_pending_task(server_id, db_server)
214

    
215
    def reconcile_building_server(self, db_server):
216
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
217
                      db_server.id)
218
        if self.options["fix_unsynced"]:
219
            fix_opcode = "OP_INSTANCE_CREATE"
220
            backend_mod.process_op_status(
221
                vm=db_server,
222
                etime=self.event_time,
223
                jobid=-0,
224
                opcode=fix_opcode, status='error',
225
                logmsg='Reconciliation: simulated Ganeti event')
226
            self.log.debug("Simulated Ganeti error build event for"
227
                           " server '%s'", db_server.id)
228

    
229
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
230
        if db_server.operstate != gnt_server["state"]:
231
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
232
                          server_id, db_server.operstate, gnt_server["state"])
233
            if self.options["fix_unsynced"]:
234
                # If server is in building state, you will have first to
235
                # reconcile it's creation, to avoid wrong quotas
236
                if db_server.operstate == "BUILD":
237
                    backend_mod.process_op_status(
238
                        vm=db_server, etime=self.event_time, jobid=-0,
239
                        opcode="OP_INSTANCE_CREATE", status='success',
240
                        logmsg='Reconciliation: simulated Ganeti event')
241
                fix_opcode = "OP_INSTANCE_STARTUP"\
242
                    if gnt_server["state"] == "STARTED"\
243
                    else "OP_INSTANCE_SHUTDOWN"
244
                backend_mod.process_op_status(
245
                    vm=db_server, etime=self.event_time, jobid=-0,
246
                    opcode=fix_opcode, status='success',
247
                    logmsg='Reconciliation: simulated Ganeti event')
248
                self.log.debug("Simulated Ganeti state event for server '%s'",
249
                               server_id)
250

    
251
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
252
        db_flavor = db_server.flavor
253
        gnt_flavor = gnt_server["flavor"]
254
        if (db_flavor.ram != gnt_flavor["ram"] or
255
           db_flavor.cpu != gnt_flavor["vcpus"]):
256
            try:
257
                gnt_flavor = Flavor.objects.get(
258
                    ram=gnt_flavor["ram"],
259
                    cpu=gnt_flavor["vcpus"],
260
                    disk=db_flavor.disk,
261
                    disk_template=db_flavor.disk_template)
262
            except Flavor.DoesNotExist:
263
                self.log.warning("Server '%s' has unknown flavor.", server_id)
264
                return
265

    
266
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
267
                          " Ganeti", server_id, db_flavor, gnt_flavor)
268
            if self.options["fix_unsynced_flavors"]:
269
                old_state = db_server.operstate
270
                opcode = "OP_INSTANCE_SET_PARAMS"
271
                beparams = {"vcpus": gnt_flavor.cpu,
272
                            "minmem": gnt_flavor.ram,
273
                            "maxmem": gnt_flavor.ram}
274
                backend_mod.process_op_status(
275
                    vm=db_server, etime=self.event_time, jobid=-0,
276
                    opcode=opcode, status='success',
277
                    job_fields={"beparams": beparams},
278
                    logmsg='Reconciliation: simulated Ganeti event')
279
                # process_op_status with beparams will set the vmstate to
280
                # shutdown. Fix this be returning it to old state
281
                vm = VirtualMachine.objects.get(pk=server_id)
282
                vm.operstate = old_state
283
                vm.save()
284
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
285
                               server_id)
286

    
287
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
288
        building_time = (self.event_time -
289
                         backend_mod.BUILDING_NIC_TIMEOUT)
290
        db_nics = db_server.nics.exclude(state="BUILD",
291
                                         created__lte=building_time) \
292
                                .order_by("id")
293
        gnt_nics = gnt_server["nics"]
294
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
295
        nics_changed = len(db_nics) != len(gnt_nics)
296
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
297
            gnt_nic_id, gnt_nic = gnt_nic
298
            if (db_nic.id == gnt_nic_id) and\
299
               backend_mod.nics_are_equal(db_nic, gnt_nic):
300
                continue
301
            else:
302
                nics_changed = True
303
                break
304
        if nics_changed:
305
            msg = "Found unsynced NICs for server '%s'.\n"\
306
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
307
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
308
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
309
                                         sorted(gnt_nics_parsed.items())))
310
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
311
            if self.options["fix_unsynced_nics"]:
312
                backend_mod.process_net_status(vm=db_server,
313
                                               etime=self.event_time,
314
                                               nics=gnt_nics)
315

    
316
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
317
        pass
318

    
319
    def reconcile_pending_task(self, server_id, db_server):
320
        job_id = db_server.task_job_id
321
        pending_task = False
322
        if job_id not in self.gnt_jobs:
323
            pending_task = True
324
        else:
325
            gnt_job_status = self.gnt_jobs[job_id]["status"]
326
            if gnt_job_status in GANETI_JOBS_FINALIZED:
327
                pending_task = True
328

    
329
        if pending_task:
330
            self.log.info("Found server '%s' with pending task: '%s'",
331
                          server_id, db_server.task)
332
            if self.options["fix_pending_tasks"]:
333
                db_server.task = None
334
                db_server.task_job_id = None
335
                db_server.save()
336
                self.log.info("Cleared pending task for server '%s", server_id)
337

    
338

    
339
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
340
                         "Firewall"]) + ": %s"
341

    
342

    
343
def format_db_nic(nic):
344
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
345
                      nic.mac, nic.index, nic.firewall_profile)
346

    
347

    
348
def format_gnt_nic(nic):
349
    nic_name, nic = nic
350
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
351
                      nic["network"].id, nic["mac"], nic["index"],
352
                      nic["firewall_profile"])
353

    
354

    
355
#
356
# Networks
357
#
358

    
359

    
360
def get_networks_from_ganeti(backend):
361
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
362

    
363
    networks = {}
364
    with pooled_rapi_client(backend) as c:
365
        for net in c.GetNetworks(bulk=True):
366
            if net['name'].startswith(prefix):
367
                id = utils.id_from_network_name(net['name'])
368
                networks[id] = net
369

    
370
    return networks
371

    
372

    
373
def hanging_networks(backend, GNets):
374
    """Get networks that are not connected to all Nodegroups.
375

376
    """
377
    def get_network_groups(group_list):
378
        groups = set()
379
        for (name, mode, link) in group_list:
380
            groups.add(name)
381
        return groups
382

    
383
    with pooled_rapi_client(backend) as c:
384
        groups = set(c.GetGroups())
385

    
386
    hanging = {}
387
    for id, info in GNets.items():
388
        group_list = get_network_groups(info['group_list'])
389
        if group_list != groups:
390
            hanging[id] = groups - group_list
391
    return hanging
392

    
393

    
394
def get_online_backends():
395
    return Backend.objects.filter(offline=False)
396

    
397

    
398
def get_database_servers(backend):
399
    servers = backend.virtual_machines.select_related("flavor")\
400
                                      .prefetch_related("nics__ips__subnet")\
401
                                      .filter(deleted=False)
402
    return dict([(s.id, s) for s in servers])
403

    
404

    
405
def get_ganeti_servers(backend):
406
    gnt_instances = backend_mod.get_instances(backend)
407
    # Filter out non-synnefo instances
408
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
409
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
410
                           gnt_instances)
411
    gnt_instances = map(parse_gnt_instance, gnt_instances)
412
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
413

    
414

    
415
def parse_gnt_instance(instance):
416
    try:
417
        instance_id = utils.id_from_instance_name(instance['name'])
418
    except Exception:
419
        logger.error("Ignoring instance with malformed name %s",
420
                     instance['name'])
421
        return (None, None)
422

    
423
    beparams = instance["beparams"]
424

    
425
    vcpus = beparams["vcpus"]
426
    ram = beparams["maxmem"]
427
    state = instance["oper_state"] and "STARTED" or "STOPPED"
428

    
429
    return {
430
        "id": instance_id,
431
        "state": state,  # FIX
432
        "updated": datetime.fromtimestamp(instance["mtime"]),
433
        "disks": disks_from_instance(instance),
434
        "nics": nics_from_instance(instance),
435
        "flavor": {"vcpus": vcpus,
436
                   "ram": ram},
437
        "tags": instance["tags"]
438
    }
439

    
440

    
441
def nics_from_instance(i):
442
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
443
    names = zip(itertools.repeat('name'), i['nic.names'])
444
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
445
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
446
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
447
    # links = zip(itertools.repeat('link'), i['nic.links'])
448
    # nics = zip(ips,macs,modes,networks,links)
449
    nics = zip(ips, names, macs, networks)
450
    nics = map(lambda x: dict(x), nics)
451
    #nics = dict(enumerate(nics))
452
    tags = i["tags"]
453
    for tag in tags:
454
        t = tag.split(":")
455
        if t[0:2] == ["synnefo", "network"]:
456
            if len(t) != 4:
457
                logger.error("Malformed synefo tag %s", tag)
458
                continue
459
            nic_name = t[2]
460
            firewall = t[3]
461
            [nic.setdefault("firewall", firewall)
462
             for nic in nics if nic["name"] == nic_name]
463
    return nics
464

    
465

    
466
def get_ganeti_jobs(backend):
467
    gnt_jobs = backend_mod.get_jobs(backend)
468
    return dict([(int(j["id"]), j) for j in gnt_jobs])
469

    
470

    
471
def disks_from_instance(i):
472
    return dict([(index, {"size": size})
473
                 for index, size in enumerate(i["disk.sizes"])])
474

    
475

    
476
class NetworkReconciler(object):
477
    def __init__(self, logger, fix=False):
478
        self.log = logger
479
        self.fix = fix
480

    
481
    @transaction.commit_on_success
482
    def reconcile_networks(self):
483
        # Get models from DB
484
        self.backends = Backend.objects.exclude(offline=True)
485
        self.networks = Network.objects.filter(deleted=False)
486

    
487
        self.event_time = datetime.now()
488

    
489
        # Get info from all ganeti backends
490
        self.ganeti_networks = {}
491
        self.ganeti_hanging_networks = {}
492
        for b in self.backends:
493
            g_nets = get_networks_from_ganeti(b)
494
            self.ganeti_networks[b] = g_nets
495
            g_hanging_nets = hanging_networks(b, g_nets)
496
            self.ganeti_hanging_networks[b] = g_hanging_nets
497

    
498
        self._reconcile_orphan_networks()
499

    
500
        for network in self.networks:
501
            self._reconcile_network(network)
502

    
503
    @transaction.commit_on_success
504
    def _reconcile_network(self, network):
505
        """Reconcile a network with corresponging Ganeti networks.
506

507
        Reconcile a Network and the associated BackendNetworks with the
508
        corresponding Ganeti networks in all Ganeti backends.
509

510
        """
511
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
512
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
513
        else:
514
            ip_pools = None
515
        for bend in self.backends:
516
            bnet = get_backend_network(network, bend)
517
            gnet = self.ganeti_networks[bend].get(network.id)
518
            if not bnet:
519
                if network.floating_ip_pool:
520
                    # Network is a floating IP pool and does not exist in
521
                    # backend. We need to create it
522
                    bnet = self.reconcile_parted_network(network, bend)
523
                elif not gnet:
524
                    # Network does not exist either in Ganeti nor in BD.
525
                    continue
526
                else:
527
                    # Network exists in Ganeti and not in DB.
528
                    if network.action != "DESTROY" and not network.public:
529
                        bnet = self.reconcile_parted_network(network, bend)
530
                    else:
531
                        continue
532

    
533
            if not gnet:
534
                # Network does not exist in Ganeti. If the network action
535
                # is DESTROY, we have to mark as deleted in DB, else we
536
                # have to create it in Ganeti.
537
                if network.action == "DESTROY":
538
                    if bnet.operstate != "DELETED":
539
                        self.reconcile_stale_network(bnet)
540
                else:
541
                    self.reconcile_missing_network(network, bend)
542
                # Skip rest reconciliation!
543
                continue
544

    
545
            try:
546
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
547
            except KeyError:
548
                # Network is connected to all nodegroups
549
                hanging_groups = []
550

    
551
            if hanging_groups:
552
                # CASE-3: Ganeti networks not connected to all nodegroups
553
                self.reconcile_hanging_groups(network, bend,
554
                                              hanging_groups)
555
                continue
556

    
557
            if bnet.operstate != 'ACTIVE':
558
                # CASE-4: Unsynced network state. At this point the network
559
                # exists and is connected to all nodes so is must be
560
                # active!
561
                self.reconcile_unsynced_network(network, bend, bnet)
562

    
563
            # Check that externally reserved IPs of the network in Ganeti are
564
            # also externally reserved to the IP pool
565
            externally_reserved = gnet['external_reservations']
566
            if externally_reserved and ip_pools is not None:
567
                for ip in externally_reserved.split(","):
568
                    ip = ip.strip()
569
                    for ip_pool in ip_pools:
570
                        if ip_pool.contains(ip):
571
                            if not ip_pool.is_reserved(ip):
572
                                msg = ("D: IP '%s' is reserved for network"
573
                                       " '%s' in backend '%s' but not in DB.")
574
                                self.log.info(msg, ip, network, bend)
575
                                if self.fix:
576
                                    ip_pool.reserve(ip, external=True)
577
                                    ip_pool.save()
578
                                    self.log.info("F: Reserved IP '%s'", ip)
579

    
580
    def reconcile_parted_network(self, network, backend):
581
        self.log.info("D: Missing DB entry for network %s in backend %s",
582
                      network, backend)
583
        if self.fix:
584
            network.create_backend_network(backend)
585
            self.log.info("F: Created DB entry")
586
            bnet = get_backend_network(network, backend)
587
            return bnet
588

    
589
    def reconcile_stale_network(self, backend_network):
590
        self.log.info("D: Stale DB entry for network %s in backend %s",
591
                      backend_network.network, backend_network.backend)
592
        if self.fix:
593
            backend_mod.process_network_status(
594
                backend_network, self.event_time, 0,
595
                "OP_NETWORK_REMOVE",
596
                "success",
597
                "Reconciliation simulated event")
598
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
599

    
600
    def reconcile_missing_network(self, network, backend):
601
        self.log.info("D: Missing Ganeti network %s in backend %s",
602
                      network, backend)
603
        if self.fix:
604
            backend_mod.create_network(network, backend)
605
            self.log.info("F: Issued OP_NETWORK_CONNECT")
606

    
607
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
608
        self.log.info('D: Network %s in backend %s is not connected to '
609
                      'the following groups:', network, backend)
610
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
611
        if self.fix:
612
            for group in hanging_groups:
613
                self.log.info('F: Connecting network %s to nodegroup %s',
614
                              network, group)
615
                backend_mod.connect_network(network, backend, depends=[],
616
                                            group=group)
617

    
618
    def reconcile_unsynced_network(self, network, backend, backend_network):
619
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
620
        if self.fix:
621
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
622
            backend_mod.process_network_status(
623
                backend_network, self.event_time, 0,
624
                "OP_NETWORK_CONNECT",
625
                "success",
626
                "Reconciliation simulated eventd")
627

    
628
    def _reconcile_orphan_networks(self):
629
        db_networks = self.networks
630
        ganeti_networks = self.ganeti_networks
631
        # Detect Orphan Networks in Ganeti
632
        db_network_ids = set([net.id for net in db_networks])
633
        for back_end, ganeti_networks in ganeti_networks.items():
634
            ganeti_network_ids = set(ganeti_networks.keys())
635
            orphans = ganeti_network_ids - db_network_ids
636

    
637
            if len(orphans) > 0:
638
                self.log.info('D: Orphan Networks in backend %s:',
639
                              back_end.clustername)
640
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
641
                if self.fix:
642
                    for net_id in orphans:
643
                        self.log.info('Disconnecting and deleting network %d',
644
                                      net_id)
645
                        try:
646
                            network = Network.objects.get(id=net_id)
647
                            backend_mod.delete_network(network,
648
                                                       backend=back_end)
649
                        except Network.DoesNotExist:
650
                            self.log.info("Not entry for network %s in DB !!",
651
                                          net_id)
652

    
653

    
654
def get_backend_network(network, backend):
655
    try:
656
        return BackendNetwork.objects.get(network=network, backend=backend)
657
    except BackendNetwork.DoesNotExist:
658
        return None
659

    
660

    
661
class PoolReconciler(object):
662
    def __init__(self, logger, fix=False):
663
        self.log = logger
664
        self.fix = fix
665

    
666
    def reconcile(self):
667
        self.reconcile_bridges()
668
        self.reconcile_mac_prefixes()
669

    
670
        networks = Network.objects.prefetch_related("subnets")\
671
                                  .filter(deleted=False)
672
        for network in networks:
673
            for subnet in network.subnets.all():
674
                if subnet.ipversion == 4 and subnet.dhcp:
675
                    self.reconcile_ip_pool(network)
676

    
677
    @transaction.commit_on_success
678
    def reconcile_bridges(self):
679
        networks = Network.objects.filter(deleted=False,
680
                                          flavor="PHYSICAL_VLAN")
681
        check_unique_values(objects=networks, field='link', logger=self.log)
682
        try:
683
            pool = BridgePoolTable.get_pool()
684
        except pools.EmptyPool:
685
            self.log.info("There is no available pool for bridges.")
686
            return
687

    
688
        used_bridges = set(networks.values_list('link', flat=True))
689
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
690
                              used_values=used_bridges, fix=self.fix,
691
                              logger=self.log)
692

    
693
    @transaction.commit_on_success
694
    def reconcile_mac_prefixes(self):
695
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
696
        check_unique_values(objects=networks, field='mac_prefix',
697
                            logger=self.log)
698
        try:
699
            pool = MacPrefixPoolTable.get_pool()
700
        except pools.EmptyPool:
701
            self.log.info("There is no available pool for MAC prefixes.")
702
            return
703

    
704
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
705
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
706
                              used_values=used_mac_prefixes, fix=self.fix,
707
                              logger=self.log)
708

    
709
    @transaction.commit_on_success
710
    def reconcile_ip_pool(self, network):
711
        # Check that all NICs have unique IPv4 address
712
        nics = network.ips.exclude(address__isnull=True).all()
713
        check_unique_values(objects=nics, field="address", logger=self.log)
714

    
715
        for ip_pool in network.get_ip_pools():
716
            used_ips = ip_pool.pool_table.subnet\
717
                              .ips.exclude(address__isnull=True)\
718
                              .values_list("address", flat=True)
719
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
720
            check_pool_consistent(pool=ip_pool,
721
                                  pool_class=pools.IPPool,
722
                                  used_values=used_ips,
723
                                  fix=self.fix, logger=self.log)
724

    
725

    
726
def check_unique_values(objects, field, logger):
727
    used_values = list(objects.values_list(field, flat=True))
728
    if len(used_values) != len(set(used_values)):
729
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
730
        for value in duplicate_values:
731
            filter_args = {field: value}
732
            using_objects = objects.filter(**filter_args)
733
            msg = "Value '%s' is used as %s for more than one objects: %s"
734
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
735
        return False
736
    logger.debug("Values for field '%s' are unique.", field)
737
    return True
738

    
739

    
740
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
741
    dummy_pool = create_empty_pool(pool, pool_class)
742
    [dummy_pool.reserve(value) for value in used_values]
743
    if dummy_pool.available != pool.available:
744
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
745
        pool_diff = dummy_pool.available ^ pool.available
746
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
747
            value = pool.index_to_value(int(index))
748
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
749
            value1 = pool.is_available(value) and "available" or "unavailable"
750
            value2 = dummy_pool.is_available(value) and "available"\
751
                or "unavailable"
752
            logger.error(msg, pool, value, value1, value2)
753
        if fix:
754
            pool.available = dummy_pool.available
755
            pool.save()
756
            logger.info("Fixed available map of pool '%s'", pool)
757

    
758

    
759
def create_empty_pool(pool, pool_class):
760
    pool_row = pool.pool_table
761
    pool_row.available_map = ""
762
    pool_row.reserved_map = ""
763
    return pool_class(pool_row)