Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 7ef05bd4

History | View | Annotate | Download (31.7 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73
from synnefo.lib.utils import merge_time
74

    
75
logger = logging.getLogger()
76
logging.basicConfig()
77

    
78
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
79

    
80

    
81
class BackendReconciler(object):
82
    def __init__(self, backend, logger, options=None):
83
        self.backend = backend
84
        self.log = logger
85
        self.client = backend.get_client()
86
        if options is None:
87
            self.options = {}
88
        else:
89
            self.options = options
90

    
91
    def close(self):
92
        self.backend.put_client(self.client)
93

    
94
    @transaction.commit_on_success
95
    def reconcile(self):
96
        log = self.log
97
        backend = self.backend
98
        log.debug("Reconciling backend %s", backend)
99

    
100
        self.event_time = datetime.now()
101

    
102
        self.db_servers = get_database_servers(backend)
103
        self.db_servers_keys = set(self.db_servers.keys())
104
        log.debug("Got servers info from database.")
105

    
106
        self.gnt_servers = get_ganeti_servers(backend)
107
        self.gnt_servers_keys = set(self.gnt_servers.keys())
108
        log.debug("Got servers info from Ganeti backend.")
109

    
110
        self.gnt_jobs = get_ganeti_jobs(backend)
111
        log.debug("Got jobs from Ganeti backend")
112

    
113
        self.stale_servers = self.reconcile_stale_servers()
114
        self.orphan_servers = self.reconcile_orphan_servers()
115
        self.unsynced_servers = self.reconcile_unsynced_servers()
116
        self.close()
117

    
118
    def get_build_status(self, db_server):
119
        """Return the status of the build job.
120

121
        Return whether the job is RUNNING, FINALIZED or ERROR, together
122
        with the timestamp that the job finished (if any).
123

124
        """
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            job = self.gnt_jobs[job_id]
128
            gnt_job_status = job["status"]
129
            end_timestamp = merge_time(job["end_ts"])
130
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
131
                return "ERROR", end_timestamp
132
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
133
                return "RUNNING", None
134
            else:
135
                return "FINALIZED", end_timestamp
136
        else:
137
            return "ERROR", None
138

    
139
    def reconcile_stale_servers(self):
140
        # Detect stale servers
141
        stale = []
142
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
143
        for server_id in stale_keys:
144
            db_server = self.db_servers[server_id]
145
            if db_server.operstate == "BUILD":
146
                build_status, end_timestamp = self.get_build_status(db_server)
147
                if build_status == "ERROR":
148
                    # Special handling of BUILD eerrors
149
                    self.reconcile_building_server(db_server)
150
                elif build_status != "RUNNING":
151
                    stale.append(server_id)
152
            elif (db_server.operstate == "ERROR" and
153
                  db_server.action != "DESTROY"):
154
                # Servers at building ERROR are stale only if the user has
155
                # asked to destroy them.
156
                pass
157
            else:
158
                stale.append(server_id)
159

    
160
        # Report them
161
        if stale:
162
            self.log.info("Found stale servers %s at backend %s",
163
                          ", ".join(map(str, stale)), self.backend)
164
        else:
165
            self.log.debug("No stale servers at backend %s", self.backend)
166

    
167
        # Fix them
168
        if stale and self.options["fix_stale"]:
169
            for server_id in stale:
170
                vm = get_locked_server(server_id)
171
                backend_mod.process_op_status(
172
                    vm=vm,
173
                    etime=self.event_time,
174
                    jobid=-0,
175
                    opcode='OP_INSTANCE_REMOVE', status='success',
176
                    logmsg='Reconciliation: simulated Ganeti event')
177
            self.log.debug("Simulated Ganeti removal for stale servers.")
178

    
179
    def reconcile_orphan_servers(self):
180
        orphans = self.gnt_servers_keys - self.db_servers_keys
181
        if orphans:
182
            self.log.info("Found orphan servers %s at backend %s",
183
                          ", ".join(map(str, orphans)), self.backend)
184
        else:
185
            self.log.debug("No orphan servers at backend %s", self.backend)
186

    
187
        if orphans and self.options["fix_orphans"]:
188
            for server_id in orphans:
189
                server_name = utils.id_to_instance_name(server_id)
190
                self.client.DeleteInstance(server_name)
191
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
192

    
193
    def reconcile_unsynced_servers(self):
194
        #log = self.log
195
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
196
            db_server = self.db_servers[server_id]
197
            gnt_server = self.gnt_servers[server_id]
198
            if db_server.operstate == "BUILD":
199
                build_status, end_timestamp = self.get_build_status(db_server)
200
                if build_status == "RUNNING":
201
                    # Do not reconcile building VMs
202
                    continue
203
                elif build_status == "ERROR":
204
                    # Special handling of build errors
205
                    self.reconcile_building_server(db_server)
206
                    continue
207
                elif end_timestamp >= self.event_time:
208
                    # Do not continue reconciliation for building server that
209
                    # the build job completed after quering the state of
210
                    # Ganeti servers.
211
                    continue
212

    
213
            self.reconcile_unsynced_operstate(server_id, db_server,
214
                                              gnt_server)
215
            self.reconcile_unsynced_flavor(server_id, db_server,
216
                                           gnt_server)
217
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
218
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
219
            if db_server.task is not None:
220
                self.reconcile_pending_task(server_id, db_server)
221

    
222
    def reconcile_building_server(self, db_server):
223
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
224
                      db_server.id)
225
        if self.options["fix_unsynced"]:
226
            fix_opcode = "OP_INSTANCE_CREATE"
227
            vm = get_locked_server(db_server.id)
228
            backend_mod.process_op_status(
229
                vm=vm,
230
                etime=self.event_time,
231
                jobid=-0,
232
                opcode=fix_opcode, status='error',
233
                logmsg='Reconciliation: simulated Ganeti event')
234
            self.log.debug("Simulated Ganeti error build event for"
235
                           " server '%s'", db_server.id)
236

    
237
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
238
        if db_server.operstate != gnt_server["state"]:
239
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
240
                          server_id, db_server.operstate, gnt_server["state"])
241
            if self.options["fix_unsynced"]:
242
                vm = get_locked_server(server_id)
243
                # If server is in building state, you will have first to
244
                # reconcile it's creation, to avoid wrong quotas
245
                if db_server.operstate == "BUILD":
246
                    backend_mod.process_op_status(
247
                        vm=vm, etime=self.event_time, jobid=-0,
248
                        opcode="OP_INSTANCE_CREATE", status='success',
249
                        logmsg='Reconciliation: simulated Ganeti event')
250
                fix_opcode = "OP_INSTANCE_STARTUP"\
251
                    if gnt_server["state"] == "STARTED"\
252
                    else "OP_INSTANCE_SHUTDOWN"
253
                backend_mod.process_op_status(
254
                    vm=vm, etime=self.event_time, jobid=-0,
255
                    opcode=fix_opcode, status='success',
256
                    logmsg='Reconciliation: simulated Ganeti event')
257
                self.log.debug("Simulated Ganeti state event for server '%s'",
258
                               server_id)
259

    
260
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
261
        db_flavor = db_server.flavor
262
        gnt_flavor = gnt_server["flavor"]
263
        if (db_flavor.ram != gnt_flavor["ram"] or
264
           db_flavor.cpu != gnt_flavor["vcpus"]):
265
            try:
266
                gnt_flavor = Flavor.objects.get(
267
                    ram=gnt_flavor["ram"],
268
                    cpu=gnt_flavor["vcpus"],
269
                    disk=db_flavor.disk,
270
                    disk_template=db_flavor.disk_template)
271
            except Flavor.DoesNotExist:
272
                self.log.warning("Server '%s' has unknown flavor.", server_id)
273
                return
274

    
275
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
276
                          " Ganeti", server_id, db_flavor, gnt_flavor)
277
            if self.options["fix_unsynced_flavors"]:
278
                vm = get_locked_server(server_id)
279
                old_state = vm.operstate
280
                opcode = "OP_INSTANCE_SET_PARAMS"
281
                beparams = {"vcpus": gnt_flavor.cpu,
282
                            "minmem": gnt_flavor.ram,
283
                            "maxmem": gnt_flavor.ram}
284
                backend_mod.process_op_status(
285
                    vm=vm, etime=self.event_time, jobid=-0,
286
                    opcode=opcode, status='success',
287
                    job_fields={"beparams": beparams},
288
                    logmsg='Reconciliation: simulated Ganeti event')
289
                # process_op_status with beparams will set the vmstate to
290
                # shutdown. Fix this be returning it to old state
291
                vm = VirtualMachine.objects.get(pk=server_id)
292
                vm.operstate = old_state
293
                vm.save()
294
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
295
                               server_id)
296

    
297
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
298
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
299
        db_nics = db_server.nics.exclude(state="BUILD",
300
                                         created__lte=building_time) \
301
                                .order_by("id")
302
        gnt_nics = gnt_server["nics"]
303
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
304
        nics_changed = len(db_nics) != len(gnt_nics)
305
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
306
            gnt_nic_id, gnt_nic = gnt_nic
307
            if (db_nic.id == gnt_nic_id) and\
308
               backend_mod.nics_are_equal(db_nic, gnt_nic):
309
                continue
310
            else:
311
                nics_changed = True
312
                break
313
        if nics_changed:
314
            msg = "Found unsynced NICs for server '%s'.\n"\
315
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
316
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
317
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
318
                                         sorted(gnt_nics_parsed.items())))
319
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
320
            if self.options["fix_unsynced_nics"]:
321
                vm = get_locked_server(server_id)
322
                backend_mod.process_net_status(vm=vm,
323
                                               etime=self.event_time,
324
                                               nics=gnt_nics)
325

    
326
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
327
        pass
328

    
329
    def reconcile_pending_task(self, server_id, db_server):
330
        job_id = db_server.task_job_id
331
        pending_task = False
332
        if job_id not in self.gnt_jobs:
333
            pending_task = True
334
        else:
335
            gnt_job_status = self.gnt_jobs[job_id]["status"]
336
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
337
                pending_task = True
338

    
339
        if pending_task:
340
            db_server = get_locked_server(server_id)
341
            if db_server.task_job_id != job_id:
342
                # task has changed!
343
                return
344
            self.log.info("Found server '%s' with pending task: '%s'",
345
                          server_id, db_server.task)
346
            if self.options["fix_pending_tasks"]:
347
                db_server.task = None
348
                db_server.task_job_id = None
349
                db_server.save()
350
                self.log.info("Cleared pending task for server '%s", server_id)
351

    
352

    
353
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
354
                         "Firewall"]) + ": %s"
355

    
356

    
357
def format_db_nic(nic):
358
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
359
                      nic.mac, nic.index, nic.firewall_profile)
360

    
361

    
362
def format_gnt_nic(nic):
363
    nic_name, nic = nic
364
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
365
                      nic["network"].id, nic["mac"], nic["index"],
366
                      nic["firewall_profile"])
367

    
368

    
369
#
370
# Networks
371
#
372

    
373

    
374
def get_networks_from_ganeti(backend):
375
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
376

    
377
    networks = {}
378
    with pooled_rapi_client(backend) as c:
379
        for net in c.GetNetworks(bulk=True):
380
            if net['name'].startswith(prefix):
381
                id = utils.id_from_network_name(net['name'])
382
                networks[id] = net
383

    
384
    return networks
385

    
386

    
387
def hanging_networks(backend, GNets):
388
    """Get networks that are not connected to all Nodegroups.
389

390
    """
391
    def get_network_groups(group_list):
392
        groups = set()
393
        for (name, mode, link) in group_list:
394
            groups.add(name)
395
        return groups
396

    
397
    with pooled_rapi_client(backend) as c:
398
        groups = set(c.GetGroups())
399

    
400
    hanging = {}
401
    for id, info in GNets.items():
402
        group_list = get_network_groups(info['group_list'])
403
        if group_list != groups:
404
            hanging[id] = groups - group_list
405
    return hanging
406

    
407

    
408
def get_online_backends():
409
    return Backend.objects.filter(offline=False)
410

    
411

    
412
def get_database_servers(backend):
413
    servers = backend.virtual_machines.select_related("flavor")\
414
                                      .prefetch_related("nics__ips__subnet")\
415
                                      .filter(deleted=False)
416
    return dict([(s.id, s) for s in servers])
417

    
418

    
419
def get_ganeti_servers(backend):
420
    gnt_instances = backend_mod.get_instances(backend)
421
    # Filter out non-synnefo instances
422
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
423
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
424
                           gnt_instances)
425
    gnt_instances = map(parse_gnt_instance, gnt_instances)
426
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
427

    
428

    
429
def parse_gnt_instance(instance):
430
    try:
431
        instance_id = utils.id_from_instance_name(instance['name'])
432
    except Exception:
433
        logger.error("Ignoring instance with malformed name %s",
434
                     instance['name'])
435
        return (None, None)
436

    
437
    beparams = instance["beparams"]
438

    
439
    vcpus = beparams["vcpus"]
440
    ram = beparams["maxmem"]
441
    state = instance["oper_state"] and "STARTED" or "STOPPED"
442

    
443
    return {
444
        "id": instance_id,
445
        "state": state,  # FIX
446
        "updated": datetime.fromtimestamp(instance["mtime"]),
447
        "disks": disks_from_instance(instance),
448
        "nics": nics_from_instance(instance),
449
        "flavor": {"vcpus": vcpus,
450
                   "ram": ram},
451
        "tags": instance["tags"]
452
    }
453

    
454

    
455
def nics_from_instance(i):
456
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
457
    names = zip(itertools.repeat('name'), i['nic.names'])
458
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
459
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
460
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
461
    # links = zip(itertools.repeat('link'), i['nic.links'])
462
    # nics = zip(ips,macs,modes,networks,links)
463
    nics = zip(ips, names, macs, networks)
464
    nics = map(lambda x: dict(x), nics)
465
    #nics = dict(enumerate(nics))
466
    tags = i["tags"]
467
    for tag in tags:
468
        t = tag.split(":")
469
        if t[0:2] == ["synnefo", "network"]:
470
            if len(t) != 4:
471
                logger.error("Malformed synefo tag %s", tag)
472
                continue
473
            nic_name = t[2]
474
            firewall = t[3]
475
            [nic.setdefault("firewall", firewall)
476
             for nic in nics if nic["name"] == nic_name]
477
    return nics
478

    
479

    
480
def get_ganeti_jobs(backend):
481
    gnt_jobs = backend_mod.get_jobs(backend)
482
    return dict([(int(j["id"]), j) for j in gnt_jobs])
483

    
484

    
485
def disks_from_instance(i):
486
    return dict([(index, {"size": size})
487
                 for index, size in enumerate(i["disk.sizes"])])
488

    
489

    
490
class NetworkReconciler(object):
491
    def __init__(self, logger, fix=False):
492
        self.log = logger
493
        self.fix = fix
494

    
495
    @transaction.commit_on_success
496
    def reconcile_networks(self):
497
        # Get models from DB
498
        self.backends = Backend.objects.exclude(offline=True)
499
        self.networks = Network.objects.filter(deleted=False)
500

    
501
        self.event_time = datetime.now()
502

    
503
        # Get info from all ganeti backends
504
        self.ganeti_networks = {}
505
        self.ganeti_hanging_networks = {}
506
        for b in self.backends:
507
            g_nets = get_networks_from_ganeti(b)
508
            self.ganeti_networks[b] = g_nets
509
            g_hanging_nets = hanging_networks(b, g_nets)
510
            self.ganeti_hanging_networks[b] = g_hanging_nets
511

    
512
        self._reconcile_orphan_networks()
513

    
514
        for network in self.networks:
515
            self._reconcile_network(network)
516

    
517
    @transaction.commit_on_success
518
    def _reconcile_network(self, network):
519
        """Reconcile a network with corresponging Ganeti networks.
520

521
        Reconcile a Network and the associated BackendNetworks with the
522
        corresponding Ganeti networks in all Ganeti backends.
523

524
        """
525
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
526
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
527
        else:
528
            ip_pools = None
529
        for bend in self.backends:
530
            bnet = get_backend_network(network, bend)
531
            gnet = self.ganeti_networks[bend].get(network.id)
532
            if bnet is None and gnet is not None:
533
                # Network exists in backend but not in DB for this backend
534
                bnet = self.reconcile_parted_network(network, bend)
535

    
536
            if bnet is None:
537
                continue
538

    
539
            if gnet is None:
540
                # Network does not exist in Ganeti. If the network action
541
                # is DESTROY, we have to mark as deleted in DB, else we
542
                # have to create it in Ganeti.
543
                if network.action == "DESTROY":
544
                    if bnet.operstate != "DELETED":
545
                        self.reconcile_stale_network(bnet)
546
                else:
547
                    self.reconcile_missing_network(network, bend)
548
                # Skip rest reconciliation!
549
                continue
550

    
551
            try:
552
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
553
            except KeyError:
554
                # Network is connected to all nodegroups
555
                hanging_groups = []
556

    
557
            if hanging_groups:
558
                # CASE-3: Ganeti networks not connected to all nodegroups
559
                self.reconcile_hanging_groups(network, bend,
560
                                              hanging_groups)
561
                continue
562

    
563
            if bnet.operstate != 'ACTIVE':
564
                # CASE-4: Unsynced network state. At this point the network
565
                # exists and is connected to all nodes so is must be
566
                # active!
567
                self.reconcile_unsynced_network(network, bend, bnet)
568

    
569
            # Check that externally reserved IPs of the network in Ganeti are
570
            # also externally reserved to the IP pool
571
            externally_reserved = gnet['external_reservations']
572
            if externally_reserved and ip_pools is not None:
573
                for ip in externally_reserved.split(","):
574
                    ip = ip.strip()
575
                    for ip_pool in ip_pools:
576
                        if ip_pool.contains(ip):
577
                            if not ip_pool.is_reserved(ip):
578
                                msg = ("D: IP '%s' is reserved for network"
579
                                       " '%s' in backend '%s' but not in DB.")
580
                                self.log.info(msg, ip, network, bend)
581
                                if self.fix:
582
                                    ip_pool.reserve(ip, external=True)
583
                                    ip_pool.save()
584
                                    self.log.info("F: Reserved IP '%s'", ip)
585
        if network.state != "ACTIVE":
586
            network = Network.objects.select_for_update().get(id=network.id)
587
            backend_mod.update_network_state(network)
588

    
589
    def reconcile_parted_network(self, network, backend):
590
        self.log.info("D: Missing DB entry for network %s in backend %s",
591
                      network, backend)
592
        if self.fix:
593
            network.create_backend_network(backend)
594
            self.log.info("F: Created DB entry")
595
            bnet = get_backend_network(network, backend)
596
            return bnet
597

    
598
    def reconcile_stale_network(self, backend_network):
599
        self.log.info("D: Stale DB entry for network %s in backend %s",
600
                      backend_network.network, backend_network.backend)
601
        if self.fix:
602
            backend_network = BackendNetwork.objects.select_for_update()\
603
                                                    .get(id=backend_network.id)
604
            backend_mod.process_network_status(
605
                backend_network, self.event_time, 0,
606
                "OP_NETWORK_REMOVE",
607
                "success",
608
                "Reconciliation simulated event")
609
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
610

    
611
    def reconcile_missing_network(self, network, backend):
612
        self.log.info("D: Missing Ganeti network %s in backend %s",
613
                      network, backend)
614
        if self.fix:
615
            backend_mod.create_network(network, backend)
616
            self.log.info("F: Issued OP_NETWORK_CONNECT")
617

    
618
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
619
        self.log.info('D: Network %s in backend %s is not connected to '
620
                      'the following groups:', network, backend)
621
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
622
        if self.fix:
623
            for group in hanging_groups:
624
                self.log.info('F: Connecting network %s to nodegroup %s',
625
                              network, group)
626
                backend_mod.connect_network(network, backend, depends=[],
627
                                            group=group)
628

    
629
    def reconcile_unsynced_network(self, network, backend, backend_network):
630
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
631
        if self.fix:
632
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
633
            backend_network = BackendNetwork.objects.select_for_update()\
634
                                                    .get(id=backend_network.id)
635
            backend_mod.process_network_status(
636
                backend_network, self.event_time, 0,
637
                "OP_NETWORK_CONNECT",
638
                "success",
639
                "Reconciliation simulated eventd")
640

    
641
    def _reconcile_orphan_networks(self):
642
        db_networks = self.networks
643
        ganeti_networks = self.ganeti_networks
644
        # Detect Orphan Networks in Ganeti
645
        db_network_ids = set([net.id for net in db_networks])
646
        for back_end, ganeti_networks in ganeti_networks.items():
647
            ganeti_network_ids = set(ganeti_networks.keys())
648
            orphans = ganeti_network_ids - db_network_ids
649

    
650
            if len(orphans) > 0:
651
                self.log.info('D: Orphan Networks in backend %s:',
652
                              back_end.clustername)
653
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
654
                if self.fix:
655
                    for net_id in orphans:
656
                        self.log.info('Disconnecting and deleting network %d',
657
                                      net_id)
658
                        try:
659
                            network = Network.objects.get(id=net_id)
660
                            backend_mod.delete_network(network,
661
                                                       backend=back_end)
662
                        except Network.DoesNotExist:
663
                            self.log.info("Not entry for network %s in DB !!",
664
                                          net_id)
665

    
666

    
667
def get_backend_network(network, backend):
668
    try:
669
        return BackendNetwork.objects.get(network=network, backend=backend)
670
    except BackendNetwork.DoesNotExist:
671
        return None
672

    
673

    
674
class PoolReconciler(object):
675
    def __init__(self, logger, fix=False):
676
        self.log = logger
677
        self.fix = fix
678

    
679
    def reconcile(self):
680
        self.reconcile_bridges()
681
        self.reconcile_mac_prefixes()
682

    
683
        networks = Network.objects.prefetch_related("subnets")\
684
                                  .filter(deleted=False)
685
        for network in networks:
686
            for subnet in network.subnets.all():
687
                if subnet.ipversion == 4 and subnet.dhcp:
688
                    self.reconcile_ip_pool(network)
689

    
690
    @transaction.commit_on_success
691
    def reconcile_bridges(self):
692
        networks = Network.objects.filter(deleted=False,
693
                                          flavor="PHYSICAL_VLAN")
694
        check_unique_values(objects=networks, field='link', logger=self.log)
695
        try:
696
            pool = BridgePoolTable.get_pool()
697
        except pools.EmptyPool:
698
            self.log.info("There is no available pool for bridges.")
699
            return
700

    
701
        # Since pool is locked, no new networks may be created
702
        used_bridges = set(networks.values_list('link', flat=True))
703
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
704
                              used_values=used_bridges, fix=self.fix,
705
                              logger=self.log)
706

    
707
    @transaction.commit_on_success
708
    def reconcile_mac_prefixes(self):
709
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
710
        check_unique_values(objects=networks, field='mac_prefix',
711
                            logger=self.log)
712
        try:
713
            pool = MacPrefixPoolTable.get_pool()
714
        except pools.EmptyPool:
715
            self.log.info("There is no available pool for MAC prefixes.")
716
            return
717

    
718
        # Since pool is locked, no new network may be created
719
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
720
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
721
                              used_values=used_mac_prefixes, fix=self.fix,
722
                              logger=self.log)
723

    
724
    @transaction.commit_on_success
725
    def reconcile_ip_pool(self, network):
726
        # Check that all NICs have unique IPv4 address
727
        nics = network.ips.exclude(address__isnull=True).all()
728
        check_unique_values(objects=nics, field="address", logger=self.log)
729

    
730
        for ip_pool in network.get_ip_pools():
731
            # IP pool is now locked, so no new IPs may be created
732
            used_ips = ip_pool.pool_table.subnet\
733
                              .ips.exclude(address__isnull=True)\
734
                              .exclude(deleted=True)\
735
                              .values_list("address", flat=True)
736
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
737
            check_pool_consistent(pool=ip_pool,
738
                                  pool_class=pools.IPPool,
739
                                  used_values=used_ips,
740
                                  fix=self.fix, logger=self.log)
741

    
742

    
743
def check_unique_values(objects, field, logger):
744
    used_values = list(objects.values_list(field, flat=True))
745
    if len(used_values) != len(set(used_values)):
746
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
747
        for value in duplicate_values:
748
            filter_args = {field: value}
749
            using_objects = objects.filter(**filter_args)
750
            msg = "Value '%s' is used as %s for more than one objects: %s"
751
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
752
        return False
753
    logger.debug("Values for field '%s' are unique.", field)
754
    return True
755

    
756

    
757
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
758
    dummy_pool = create_empty_pool(pool, pool_class)
759
    [dummy_pool.reserve(value) for value in used_values]
760
    if dummy_pool.available != pool.available:
761
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
762
        pool_diff = dummy_pool.available ^ pool.available
763
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
764
            value = pool.index_to_value(int(index))
765
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
766
            value1 = pool.is_available(value) and "available" or "unavailable"
767
            value2 = dummy_pool.is_available(value) and "available"\
768
                or "unavailable"
769
            logger.error(msg, pool, value, value1, value2)
770
        if fix:
771
            pool.available = dummy_pool.available
772
            pool.save()
773
            logger.info("Fixed available map of pool '%s'", pool)
774

    
775

    
776
def create_empty_pool(pool, pool_class):
777
    pool_row = pool.pool_table
778
    pool_row.available_map = ""
779
    pool_row.reserved_map = ""
780
    return pool_class(pool_row)
781

    
782

    
783
def get_locked_server(server_id):
784
    return VirtualMachine.objects.select_for_update().get(id=server_id)