Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 1cb7846c

History | View | Annotate | Download (30.5 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, backend as backend_mod
73

    
74
logger = logging.getLogger()
75
logging.basicConfig()
76

    
77
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
78

    
79
GANETI_JOB_ERROR = "error"
80
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"]
81
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"]
82

    
83

    
84
class BackendReconciler(object):
85
    def __init__(self, backend, logger, options=None):
86
        self.backend = backend
87
        self.log = logger
88
        self.client = backend.get_client()
89
        if options is None:
90
            self.options = {}
91
        else:
92
            self.options = options
93

    
94
    def close(self):
95
        self.backend.put_client(self.client)
96

    
97
    @transaction.commit_on_success
98
    def reconcile(self):
99
        log = self.log
100
        backend = self.backend
101
        log.debug("Reconciling backend %s", backend)
102

    
103
        self.db_servers = get_database_servers(backend)
104
        self.db_servers_keys = set(self.db_servers.keys())
105
        log.debug("Got servers info from database.")
106

    
107
        self.gnt_servers = get_ganeti_servers(backend)
108
        self.gnt_servers_keys = set(self.gnt_servers.keys())
109
        log.debug("Got servers info from Ganeti backend.")
110

    
111
        self.gnt_jobs = get_ganeti_jobs(backend)
112
        log.debug("Got jobs from Ganeti backend")
113

    
114
        self.event_time = datetime.now()
115

    
116
        self.stale_servers = self.reconcile_stale_servers()
117
        self.orphan_servers = self.reconcile_orphan_servers()
118
        self.unsynced_servers = self.reconcile_unsynced_servers()
119
        self.close()
120

    
121
    def get_build_status(self, db_server):
122
        job_id = db_server.backendjobid
123
        if job_id in self.gnt_jobs:
124
            gnt_job_status = self.gnt_jobs[job_id]["status"]
125
            if gnt_job_status == GANETI_JOB_ERROR:
126
                return "ERROR"
127
            elif gnt_job_status not in GANETI_JOBS_FINALIZED:
128
                return "RUNNING"
129
            else:
130
                return "FINALIZED"
131
        else:
132
            return "ERROR"
133

    
134
    def reconcile_stale_servers(self):
135
        # Detect stale servers
136
        stale = []
137
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
138
        for server_id in stale_keys:
139
            db_server = self.db_servers[server_id]
140
            if db_server.operstate == "BUILD":
141
                build_status = self.get_build_status(db_server)
142
                if build_status == "ERROR":
143
                    # Special handling of BUILD eerrors
144
                    self.reconcile_building_server(db_server)
145
                elif build_status != "RUNNING":
146
                    stale.append(server_id)
147
            elif (db_server.operstate == "ERROR" and
148
                  db_server.action != "DESTROY"):
149
                # Servers at building ERROR are stale only if the user has
150
                # asked to destroy them.
151
                pass
152
            else:
153
                stale.append(server_id)
154

    
155
        # Report them
156
        if stale:
157
            self.log.info("Found stale servers %s at backend %s",
158
                          ", ".join(map(str, stale)), self.backend)
159
        else:
160
            self.log.debug("No stale servers at backend %s", self.backend)
161

    
162
        # Fix them
163
        if stale and self.options["fix_stale"]:
164
            for server_id in stale:
165
                db_server = self.db_servers[server_id]
166
                backend_mod.process_op_status(
167
                    vm=db_server,
168
                    etime=self.event_time,
169
                    jobid=-0,
170
                    opcode='OP_INSTANCE_REMOVE', status='success',
171
                    logmsg='Reconciliation: simulated Ganeti event')
172
            self.log.debug("Simulated Ganeti removal for stale servers.")
173

    
174
    def reconcile_orphan_servers(self):
175
        orphans = self.gnt_servers_keys - self.db_servers_keys
176
        if orphans:
177
            self.log.info("Found orphan servers %s at backend %s",
178
                          ", ".join(map(str, orphans)), self.backend)
179
        else:
180
            self.log.debug("No orphan servers at backend %s", self.backend)
181

    
182
        if orphans and self.options["fix_orphans"]:
183
            for server_id in orphans:
184
                server_name = utils.id_to_instance_name(server_id)
185
                self.client.DeleteInstance(server_name)
186
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
187

    
188
    def reconcile_unsynced_servers(self):
189
        #log = self.log
190
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
191
            db_server = self.db_servers[server_id]
192
            gnt_server = self.gnt_servers[server_id]
193
            if db_server.operstate == "BUILD":
194
                build_status = self.get_build_status(db_server)
195
                if build_status == "RUNNING":
196
                    # Do not reconcile building VMs
197
                    continue
198
                elif build_status == "ERROR":
199
                    # Special handling of build errors
200
                    self.reconcile_building_server(db_server)
201
                    continue
202

    
203
            self.reconcile_unsynced_operstate(server_id, db_server,
204
                                              gnt_server)
205
            self.reconcile_unsynced_flavor(server_id, db_server,
206
                                           gnt_server)
207
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
208
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
209
            if db_server.task is not None:
210
                self.reconcile_pending_task(server_id, db_server)
211

    
212
    def reconcile_building_server(self, db_server):
213
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
214
                      db_server.id)
215
        if self.options["fix_unsynced"]:
216
            fix_opcode = "OP_INSTANCE_CREATE"
217
            backend_mod.process_op_status(
218
                vm=db_server,
219
                etime=self.event_time,
220
                jobid=-0,
221
                opcode=fix_opcode, status='error',
222
                logmsg='Reconciliation: simulated Ganeti event')
223
            self.log.debug("Simulated Ganeti error build event for"
224
                           " server '%s'", db_server.id)
225

    
226
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
227
        if db_server.operstate != gnt_server["state"]:
228
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
229
                          server_id, db_server.operstate, gnt_server["state"])
230
            if self.options["fix_unsynced"]:
231
                # If server is in building state, you will have first to
232
                # reconcile it's creation, to avoid wrong quotas
233
                if db_server.operstate == "BUILD":
234
                    backend_mod.process_op_status(
235
                        vm=db_server, etime=self.event_time, jobid=-0,
236
                        opcode="OP_INSTANCE_CREATE", status='success',
237
                        logmsg='Reconciliation: simulated Ganeti event')
238
                fix_opcode = "OP_INSTANCE_STARTUP"\
239
                    if gnt_server["state"] == "STARTED"\
240
                    else "OP_INSTANCE_SHUTDOWN"
241
                backend_mod.process_op_status(
242
                    vm=db_server, etime=self.event_time, jobid=-0,
243
                    opcode=fix_opcode, status='success',
244
                    logmsg='Reconciliation: simulated Ganeti event')
245
                self.log.debug("Simulated Ganeti state event for server '%s'",
246
                               server_id)
247

    
248
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
249
        db_flavor = db_server.flavor
250
        gnt_flavor = gnt_server["flavor"]
251
        if (db_flavor.ram != gnt_flavor["ram"] or
252
           db_flavor.cpu != gnt_flavor["vcpus"]):
253
            try:
254
                gnt_flavor = Flavor.objects.get(
255
                    ram=gnt_flavor["ram"],
256
                    cpu=gnt_flavor["vcpus"],
257
                    disk=db_flavor.disk,
258
                    disk_template=db_flavor.disk_template)
259
            except Flavor.DoesNotExist:
260
                self.log.warning("Server '%s' has unknown flavor.", server_id)
261
                return
262

    
263
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
264
                          " Ganeti", server_id, db_flavor, gnt_flavor)
265
            if self.options["fix_unsynced_flavors"]:
266
                old_state = db_server.operstate
267
                opcode = "OP_INSTANCE_SET_PARAMS"
268
                beparams = {"vcpus": gnt_flavor.cpu,
269
                            "minmem": gnt_flavor.ram,
270
                            "maxmem": gnt_flavor.ram}
271
                backend_mod.process_op_status(
272
                    vm=db_server, etime=self.event_time, jobid=-0,
273
                    opcode=opcode, status='success',
274
                    job_fields={"beparams": beparams},
275
                    logmsg='Reconciliation: simulated Ganeti event')
276
                # process_op_status with beparams will set the vmstate to
277
                # shutdown. Fix this be returning it to old state
278
                vm = VirtualMachine.objects.get(pk=server_id)
279
                vm.operstate = old_state
280
                vm.save()
281
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
282
                               server_id)
283

    
284
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
285
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
286
        db_nics = db_server.nics.exclude(state="BUILD",
287
                                         created__lte=building_time) \
288
                                .order_by("id")
289
        gnt_nics = gnt_server["nics"]
290
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
291
        nics_changed = len(db_nics) != len(gnt_nics)
292
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
293
            gnt_nic_id, gnt_nic = gnt_nic
294
            if (db_nic.id == gnt_nic_id) and\
295
               backend_mod.nics_are_equal(db_nic, gnt_nic):
296
                continue
297
            else:
298
                nics_changed = True
299
                break
300
        if nics_changed:
301
            msg = "Found unsynced NICs for server '%s'.\n"\
302
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
303
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
304
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
305
                                         sorted(gnt_nics_parsed.items())))
306
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
307
            if self.options["fix_unsynced_nics"]:
308
                backend_mod.process_net_status(vm=db_server,
309
                                               etime=self.event_time,
310
                                               nics=gnt_nics)
311

    
312
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
313
        pass
314

    
315
    def reconcile_pending_task(self, server_id, db_server):
316
        job_id = db_server.task_job_id
317
        pending_task = False
318
        if job_id not in self.gnt_jobs:
319
            pending_task = True
320
        else:
321
            gnt_job_status = self.gnt_jobs[job_id]["status"]
322
            if gnt_job_status in GANETI_JOBS_FINALIZED:
323
                pending_task = True
324

    
325
        if pending_task:
326
            self.log.info("Found server '%s' with pending task: '%s'",
327
                          server_id, db_server.task)
328
            if self.options["fix_pending_tasks"]:
329
                db_server.task = None
330
                db_server.task_job_id = None
331
                db_server.save()
332
                self.log.info("Cleared pending task for server '%s", server_id)
333

    
334

    
335
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
336
                         "Firewall"]) + ": %s"
337

    
338

    
339
def format_db_nic(nic):
340
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
341
                      nic.mac, nic.index, nic.firewall_profile)
342

    
343

    
344
def format_gnt_nic(nic):
345
    nic_name, nic = nic
346
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
347
                      nic["network"].id, nic["mac"], nic["index"],
348
                      nic["firewall_profile"])
349

    
350

    
351
#
352
# Networks
353
#
354

    
355

    
356
def get_networks_from_ganeti(backend):
357
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
358

    
359
    networks = {}
360
    with pooled_rapi_client(backend) as c:
361
        for net in c.GetNetworks(bulk=True):
362
            if net['name'].startswith(prefix):
363
                id = utils.id_from_network_name(net['name'])
364
                networks[id] = net
365

    
366
    return networks
367

    
368

    
369
def hanging_networks(backend, GNets):
370
    """Get networks that are not connected to all Nodegroups.
371

372
    """
373
    def get_network_groups(group_list):
374
        groups = set()
375
        for (name, mode, link) in group_list:
376
            groups.add(name)
377
        return groups
378

    
379
    with pooled_rapi_client(backend) as c:
380
        groups = set(c.GetGroups())
381

    
382
    hanging = {}
383
    for id, info in GNets.items():
384
        group_list = get_network_groups(info['group_list'])
385
        if group_list != groups:
386
            hanging[id] = groups - group_list
387
    return hanging
388

    
389

    
390
def get_online_backends():
391
    return Backend.objects.filter(offline=False)
392

    
393

    
394
def get_database_servers(backend):
395
    servers = backend.virtual_machines.select_related("flavor")\
396
                                      .prefetch_related("nics__ips__subnet")\
397
                                      .filter(deleted=False)
398
    return dict([(s.id, s) for s in servers])
399

    
400

    
401
def get_ganeti_servers(backend):
402
    gnt_instances = backend_mod.get_instances(backend)
403
    # Filter out non-synnefo instances
404
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
405
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
406
                           gnt_instances)
407
    gnt_instances = map(parse_gnt_instance, gnt_instances)
408
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
409

    
410

    
411
def parse_gnt_instance(instance):
412
    try:
413
        instance_id = utils.id_from_instance_name(instance['name'])
414
    except Exception:
415
        logger.error("Ignoring instance with malformed name %s",
416
                     instance['name'])
417
        return (None, None)
418

    
419
    beparams = instance["beparams"]
420

    
421
    vcpus = beparams["vcpus"]
422
    ram = beparams["maxmem"]
423
    state = instance["oper_state"] and "STARTED" or "STOPPED"
424

    
425
    return {
426
        "id": instance_id,
427
        "state": state,  # FIX
428
        "updated": datetime.fromtimestamp(instance["mtime"]),
429
        "disks": disks_from_instance(instance),
430
        "nics": nics_from_instance(instance),
431
        "flavor": {"vcpus": vcpus,
432
                   "ram": ram},
433
        "tags": instance["tags"]
434
    }
435

    
436

    
437
def nics_from_instance(i):
438
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
439
    names = zip(itertools.repeat('name'), i['nic.names'])
440
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
441
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
442
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
443
    # links = zip(itertools.repeat('link'), i['nic.links'])
444
    # nics = zip(ips,macs,modes,networks,links)
445
    nics = zip(ips, names, macs, networks)
446
    nics = map(lambda x: dict(x), nics)
447
    #nics = dict(enumerate(nics))
448
    tags = i["tags"]
449
    for tag in tags:
450
        t = tag.split(":")
451
        if t[0:2] == ["synnefo", "network"]:
452
            if len(t) != 4:
453
                logger.error("Malformed synefo tag %s", tag)
454
                continue
455
            nic_name = t[2]
456
            firewall = t[3]
457
            [nic.setdefault("firewall", firewall)
458
             for nic in nics if nic["name"] == nic_name]
459
    return nics
460

    
461

    
462
def get_ganeti_jobs(backend):
463
    gnt_jobs = backend_mod.get_jobs(backend)
464
    return dict([(int(j["id"]), j) for j in gnt_jobs])
465

    
466

    
467
def disks_from_instance(i):
468
    return dict([(index, {"size": size})
469
                 for index, size in enumerate(i["disk.sizes"])])
470

    
471

    
472
class NetworkReconciler(object):
473
    def __init__(self, logger, fix=False):
474
        self.log = logger
475
        self.fix = fix
476

    
477
    @transaction.commit_on_success
478
    def reconcile_networks(self):
479
        # Get models from DB
480
        self.backends = Backend.objects.exclude(offline=True)
481
        self.networks = Network.objects.filter(deleted=False)
482

    
483
        self.event_time = datetime.now()
484

    
485
        # Get info from all ganeti backends
486
        self.ganeti_networks = {}
487
        self.ganeti_hanging_networks = {}
488
        for b in self.backends:
489
            g_nets = get_networks_from_ganeti(b)
490
            self.ganeti_networks[b] = g_nets
491
            g_hanging_nets = hanging_networks(b, g_nets)
492
            self.ganeti_hanging_networks[b] = g_hanging_nets
493

    
494
        self._reconcile_orphan_networks()
495

    
496
        for network in self.networks:
497
            self._reconcile_network(network)
498

    
499
    @transaction.commit_on_success
500
    def _reconcile_network(self, network):
501
        """Reconcile a network with corresponging Ganeti networks.
502

503
        Reconcile a Network and the associated BackendNetworks with the
504
        corresponding Ganeti networks in all Ganeti backends.
505

506
        """
507
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
508
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
509
        else:
510
            ip_pools = None
511
        for bend in self.backends:
512
            bnet = get_backend_network(network, bend)
513
            gnet = self.ganeti_networks[bend].get(network.id)
514
            if bnet is None and gnet is not None:
515
                # Network exists in backend but not in DB for this backend
516
                bnet = self.reconcile_parted_network(network, bend)
517

    
518
            if bnet is None:
519
                continue
520

    
521
            if gnet is None:
522
                # Network does not exist in Ganeti. If the network action
523
                # is DESTROY, we have to mark as deleted in DB, else we
524
                # have to create it in Ganeti.
525
                if network.action == "DESTROY":
526
                    if bnet.operstate != "DELETED":
527
                        self.reconcile_stale_network(bnet)
528
                else:
529
                    self.reconcile_missing_network(network, bend)
530
                # Skip rest reconciliation!
531
                continue
532

    
533
            try:
534
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
535
            except KeyError:
536
                # Network is connected to all nodegroups
537
                hanging_groups = []
538

    
539
            if hanging_groups:
540
                # CASE-3: Ganeti networks not connected to all nodegroups
541
                self.reconcile_hanging_groups(network, bend,
542
                                              hanging_groups)
543
                continue
544

    
545
            if bnet.operstate != 'ACTIVE':
546
                # CASE-4: Unsynced network state. At this point the network
547
                # exists and is connected to all nodes so is must be
548
                # active!
549
                self.reconcile_unsynced_network(network, bend, bnet)
550

    
551
            # Check that externally reserved IPs of the network in Ganeti are
552
            # also externally reserved to the IP pool
553
            externally_reserved = gnet['external_reservations']
554
            if externally_reserved and ip_pools is not None:
555
                for ip in externally_reserved.split(","):
556
                    ip = ip.strip()
557
                    for ip_pool in ip_pools:
558
                        if ip_pool.contains(ip):
559
                            if not ip_pool.is_reserved(ip):
560
                                msg = ("D: IP '%s' is reserved for network"
561
                                       " '%s' in backend '%s' but not in DB.")
562
                                self.log.info(msg, ip, network, bend)
563
                                if self.fix:
564
                                    ip_pool.reserve(ip, external=True)
565
                                    ip_pool.save()
566
                                    self.log.info("F: Reserved IP '%s'", ip)
567
        if network.state != "ACTIVE":
568
            network = Network.objects.select_for_update().get(id=network.id)
569
            backend_mod.update_network_state(network)
570

    
571
    def reconcile_parted_network(self, network, backend):
572
        self.log.info("D: Missing DB entry for network %s in backend %s",
573
                      network, backend)
574
        if self.fix:
575
            network.create_backend_network(backend)
576
            self.log.info("F: Created DB entry")
577
            bnet = get_backend_network(network, backend)
578
            return bnet
579

    
580
    def reconcile_stale_network(self, backend_network):
581
        self.log.info("D: Stale DB entry for network %s in backend %s",
582
                      backend_network.network, backend_network.backend)
583
        if self.fix:
584
            backend_mod.process_network_status(
585
                backend_network, self.event_time, 0,
586
                "OP_NETWORK_REMOVE",
587
                "success",
588
                "Reconciliation simulated event")
589
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
590

    
591
    def reconcile_missing_network(self, network, backend):
592
        self.log.info("D: Missing Ganeti network %s in backend %s",
593
                      network, backend)
594
        if self.fix:
595
            backend_mod.create_network(network, backend)
596
            self.log.info("F: Issued OP_NETWORK_CONNECT")
597

    
598
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
599
        self.log.info('D: Network %s in backend %s is not connected to '
600
                      'the following groups:', network, backend)
601
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
602
        if self.fix:
603
            for group in hanging_groups:
604
                self.log.info('F: Connecting network %s to nodegroup %s',
605
                              network, group)
606
                backend_mod.connect_network(network, backend, depends=[],
607
                                            group=group)
608

    
609
    def reconcile_unsynced_network(self, network, backend, backend_network):
610
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
611
        if self.fix:
612
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
613
            backend_mod.process_network_status(
614
                backend_network, self.event_time, 0,
615
                "OP_NETWORK_CONNECT",
616
                "success",
617
                "Reconciliation simulated eventd")
618

    
619
    def _reconcile_orphan_networks(self):
620
        db_networks = self.networks
621
        ganeti_networks = self.ganeti_networks
622
        # Detect Orphan Networks in Ganeti
623
        db_network_ids = set([net.id for net in db_networks])
624
        for back_end, ganeti_networks in ganeti_networks.items():
625
            ganeti_network_ids = set(ganeti_networks.keys())
626
            orphans = ganeti_network_ids - db_network_ids
627

    
628
            if len(orphans) > 0:
629
                self.log.info('D: Orphan Networks in backend %s:',
630
                              back_end.clustername)
631
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
632
                if self.fix:
633
                    for net_id in orphans:
634
                        self.log.info('Disconnecting and deleting network %d',
635
                                      net_id)
636
                        try:
637
                            network = Network.objects.get(id=net_id)
638
                            backend_mod.delete_network(network,
639
                                                       backend=back_end)
640
                        except Network.DoesNotExist:
641
                            self.log.info("Not entry for network %s in DB !!",
642
                                          net_id)
643

    
644

    
645
def get_backend_network(network, backend):
646
    try:
647
        return BackendNetwork.objects.get(network=network, backend=backend)
648
    except BackendNetwork.DoesNotExist:
649
        return None
650

    
651

    
652
class PoolReconciler(object):
653
    def __init__(self, logger, fix=False):
654
        self.log = logger
655
        self.fix = fix
656

    
657
    def reconcile(self):
658
        self.reconcile_bridges()
659
        self.reconcile_mac_prefixes()
660

    
661
        networks = Network.objects.prefetch_related("subnets")\
662
                                  .filter(deleted=False)
663
        for network in networks:
664
            for subnet in network.subnets.all():
665
                if subnet.ipversion == 4 and subnet.dhcp:
666
                    self.reconcile_ip_pool(network)
667

    
668
    @transaction.commit_on_success
669
    def reconcile_bridges(self):
670
        networks = Network.objects.filter(deleted=False,
671
                                          flavor="PHYSICAL_VLAN")
672
        check_unique_values(objects=networks, field='link', logger=self.log)
673
        try:
674
            pool = BridgePoolTable.get_pool()
675
        except pools.EmptyPool:
676
            self.log.info("There is no available pool for bridges.")
677
            return
678

    
679
        # Since pool is locked, no new networks may be created
680
        used_bridges = set(networks.values_list('link', flat=True))
681
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
682
                              used_values=used_bridges, fix=self.fix,
683
                              logger=self.log)
684

    
685
    @transaction.commit_on_success
686
    def reconcile_mac_prefixes(self):
687
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
688
        check_unique_values(objects=networks, field='mac_prefix',
689
                            logger=self.log)
690
        try:
691
            pool = MacPrefixPoolTable.get_pool()
692
        except pools.EmptyPool:
693
            self.log.info("There is no available pool for MAC prefixes.")
694
            return
695

    
696
        # Since pool is locked, no new network may be created
697
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
698
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
699
                              used_values=used_mac_prefixes, fix=self.fix,
700
                              logger=self.log)
701

    
702
    @transaction.commit_on_success
703
    def reconcile_ip_pool(self, network):
704
        # Check that all NICs have unique IPv4 address
705
        nics = network.ips.exclude(address__isnull=True).all()
706
        check_unique_values(objects=nics, field="address", logger=self.log)
707

    
708
        for ip_pool in network.get_ip_pools():
709
            # IP pool is now locked, so no new IPs may be created
710
            used_ips = ip_pool.pool_table.subnet\
711
                              .ips.exclude(address__isnull=True)\
712
                              .exclude(deleted=True)\
713
                              .values_list("address", flat=True)
714
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
715
            check_pool_consistent(pool=ip_pool,
716
                                  pool_class=pools.IPPool,
717
                                  used_values=used_ips,
718
                                  fix=self.fix, logger=self.log)
719

    
720

    
721
def check_unique_values(objects, field, logger):
722
    used_values = list(objects.values_list(field, flat=True))
723
    if len(used_values) != len(set(used_values)):
724
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
725
        for value in duplicate_values:
726
            filter_args = {field: value}
727
            using_objects = objects.filter(**filter_args)
728
            msg = "Value '%s' is used as %s for more than one objects: %s"
729
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
730
        return False
731
    logger.debug("Values for field '%s' are unique.", field)
732
    return True
733

    
734

    
735
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
736
    dummy_pool = create_empty_pool(pool, pool_class)
737
    [dummy_pool.reserve(value) for value in used_values]
738
    if dummy_pool.available != pool.available:
739
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
740
        pool_diff = dummy_pool.available ^ pool.available
741
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
742
            value = pool.index_to_value(int(index))
743
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
744
            value1 = pool.is_available(value) and "available" or "unavailable"
745
            value2 = dummy_pool.is_available(value) and "available"\
746
                or "unavailable"
747
            logger.error(msg, pool, value, value1, value2)
748
        if fix:
749
            pool.available = dummy_pool.available
750
            pool.save()
751
            logger.info("Fixed available map of pool '%s'", pool)
752

    
753

    
754
def create_empty_pool(pool, pool_class):
755
    pool_row = pool.pool_table
756
    pool_row.available_map = ""
757
    pool_row.reserved_map = ""
758
    return pool_class(pool_row)