Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 18cb3999

History | View | Annotate | Download (31.1 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73

    
74
logger = logging.getLogger()
75
logging.basicConfig()
76

    
77
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
78

    
79

    
80
class BackendReconciler(object):
81
    def __init__(self, backend, logger, options=None):
82
        self.backend = backend
83
        self.log = logger
84
        self.client = backend.get_client()
85
        if options is None:
86
            self.options = {}
87
        else:
88
            self.options = options
89

    
90
    def close(self):
91
        self.backend.put_client(self.client)
92

    
93
    @transaction.commit_on_success
94
    def reconcile(self):
95
        log = self.log
96
        backend = self.backend
97
        log.debug("Reconciling backend %s", backend)
98

    
99
        self.db_servers = get_database_servers(backend)
100
        self.db_servers_keys = set(self.db_servers.keys())
101
        log.debug("Got servers info from database.")
102

    
103
        self.gnt_servers = get_ganeti_servers(backend)
104
        self.gnt_servers_keys = set(self.gnt_servers.keys())
105
        log.debug("Got servers info from Ganeti backend.")
106

    
107
        self.gnt_jobs = get_ganeti_jobs(backend)
108
        log.debug("Got jobs from Ganeti backend")
109

    
110
        self.event_time = datetime.now()
111

    
112
        self.stale_servers = self.reconcile_stale_servers()
113
        self.orphan_servers = self.reconcile_orphan_servers()
114
        self.unsynced_servers = self.reconcile_unsynced_servers()
115
        self.close()
116

    
117
    def get_build_status(self, db_server):
118
        job_id = db_server.backendjobid
119
        if job_id in self.gnt_jobs:
120
            gnt_job_status = self.gnt_jobs[job_id]["status"]
121
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
122
                return "ERROR"
123
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
124
                return "RUNNING"
125
            else:
126
                return "FINALIZED"
127
        else:
128
            return "ERROR"
129

    
130
    def reconcile_stale_servers(self):
131
        # Detect stale servers
132
        stale = []
133
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
134
        for server_id in stale_keys:
135
            db_server = self.db_servers[server_id]
136
            if db_server.operstate == "BUILD":
137
                build_status = self.get_build_status(db_server)
138
                if build_status == "ERROR":
139
                    # Special handling of BUILD eerrors
140
                    self.reconcile_building_server(db_server)
141
                elif build_status != "RUNNING":
142
                    stale.append(server_id)
143
            elif (db_server.operstate == "ERROR" and
144
                  db_server.action != "DESTROY"):
145
                # Servers at building ERROR are stale only if the user has
146
                # asked to destroy them.
147
                pass
148
            else:
149
                stale.append(server_id)
150

    
151
        # Report them
152
        if stale:
153
            self.log.info("Found stale servers %s at backend %s",
154
                          ", ".join(map(str, stale)), self.backend)
155
        else:
156
            self.log.debug("No stale servers at backend %s", self.backend)
157

    
158
        # Fix them
159
        if stale and self.options["fix_stale"]:
160
            for server_id in stale:
161
                vm = get_locked_server(server_id)
162
                backend_mod.process_op_status(
163
                    vm=vm,
164
                    etime=self.event_time,
165
                    jobid=-0,
166
                    opcode='OP_INSTANCE_REMOVE', status='success',
167
                    logmsg='Reconciliation: simulated Ganeti event')
168
            self.log.debug("Simulated Ganeti removal for stale servers.")
169

    
170
    def reconcile_orphan_servers(self):
171
        orphans = self.gnt_servers_keys - self.db_servers_keys
172
        if orphans:
173
            self.log.info("Found orphan servers %s at backend %s",
174
                          ", ".join(map(str, orphans)), self.backend)
175
        else:
176
            self.log.debug("No orphan servers at backend %s", self.backend)
177

    
178
        if orphans and self.options["fix_orphans"]:
179
            for server_id in orphans:
180
                server_name = utils.id_to_instance_name(server_id)
181
                self.client.DeleteInstance(server_name)
182
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
183

    
184
    def reconcile_unsynced_servers(self):
185
        #log = self.log
186
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
187
            db_server = self.db_servers[server_id]
188
            gnt_server = self.gnt_servers[server_id]
189
            if db_server.operstate == "BUILD":
190
                build_status = self.get_build_status(db_server)
191
                if build_status == "RUNNING":
192
                    # Do not reconcile building VMs
193
                    continue
194
                elif build_status == "ERROR":
195
                    # Special handling of build errors
196
                    self.reconcile_building_server(db_server)
197
                    continue
198

    
199
            self.reconcile_unsynced_operstate(server_id, db_server,
200
                                              gnt_server)
201
            self.reconcile_unsynced_flavor(server_id, db_server,
202
                                           gnt_server)
203
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
204
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
205
            if db_server.task is not None:
206
                self.reconcile_pending_task(server_id, db_server)
207

    
208
    def reconcile_building_server(self, db_server):
209
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
210
                      db_server.id)
211
        if self.options["fix_unsynced"]:
212
            fix_opcode = "OP_INSTANCE_CREATE"
213
            vm = get_locked_server(db_server.id)
214
            backend_mod.process_op_status(
215
                vm=vm,
216
                etime=self.event_time,
217
                jobid=-0,
218
                opcode=fix_opcode, status='error',
219
                logmsg='Reconciliation: simulated Ganeti event')
220
            self.log.debug("Simulated Ganeti error build event for"
221
                           " server '%s'", db_server.id)
222

    
223
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
224
        if db_server.operstate != gnt_server["state"]:
225
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
226
                          server_id, db_server.operstate, gnt_server["state"])
227
            if self.options["fix_unsynced"]:
228
                vm = get_locked_server(server_id)
229
                # If server is in building state, you will have first to
230
                # reconcile it's creation, to avoid wrong quotas
231
                if db_server.operstate == "BUILD":
232
                    backend_mod.process_op_status(
233
                        vm=vm, etime=self.event_time, jobid=-0,
234
                        opcode="OP_INSTANCE_CREATE", status='success',
235
                        logmsg='Reconciliation: simulated Ganeti event')
236
                fix_opcode = "OP_INSTANCE_STARTUP"\
237
                    if gnt_server["state"] == "STARTED"\
238
                    else "OP_INSTANCE_SHUTDOWN"
239
                backend_mod.process_op_status(
240
                    vm=vm, etime=self.event_time, jobid=-0,
241
                    opcode=fix_opcode, status='success',
242
                    logmsg='Reconciliation: simulated Ganeti event')
243
                self.log.debug("Simulated Ganeti state event for server '%s'",
244
                               server_id)
245

    
246
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
247
        db_flavor = db_server.flavor
248
        gnt_flavor = gnt_server["flavor"]
249
        if (db_flavor.ram != gnt_flavor["ram"] or
250
           db_flavor.cpu != gnt_flavor["vcpus"]):
251
            try:
252
                gnt_flavor = Flavor.objects.get(
253
                    ram=gnt_flavor["ram"],
254
                    cpu=gnt_flavor["vcpus"],
255
                    disk=db_flavor.disk,
256
                    disk_template=db_flavor.disk_template)
257
            except Flavor.DoesNotExist:
258
                self.log.warning("Server '%s' has unknown flavor.", server_id)
259
                return
260

    
261
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
262
                          " Ganeti", server_id, db_flavor, gnt_flavor)
263
            if self.options["fix_unsynced_flavors"]:
264
                vm = get_locked_server(server_id)
265
                old_state = vm.operstate
266
                opcode = "OP_INSTANCE_SET_PARAMS"
267
                beparams = {"vcpus": gnt_flavor.cpu,
268
                            "minmem": gnt_flavor.ram,
269
                            "maxmem": gnt_flavor.ram}
270
                backend_mod.process_op_status(
271
                    vm=vm, etime=self.event_time, jobid=-0,
272
                    opcode=opcode, status='success',
273
                    job_fields={"beparams": beparams},
274
                    logmsg='Reconciliation: simulated Ganeti event')
275
                # process_op_status with beparams will set the vmstate to
276
                # shutdown. Fix this be returning it to old state
277
                vm = VirtualMachine.objects.get(pk=server_id)
278
                vm.operstate = old_state
279
                vm.save()
280
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
281
                               server_id)
282

    
283
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
284
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
285
        db_nics = db_server.nics.exclude(state="BUILD",
286
                                         created__lte=building_time) \
287
                                .order_by("id")
288
        gnt_nics = gnt_server["nics"]
289
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
290
        nics_changed = len(db_nics) != len(gnt_nics)
291
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
292
            gnt_nic_id, gnt_nic = gnt_nic
293
            if (db_nic.id == gnt_nic_id) and\
294
               backend_mod.nics_are_equal(db_nic, gnt_nic):
295
                continue
296
            else:
297
                nics_changed = True
298
                break
299
        if nics_changed:
300
            msg = "Found unsynced NICs for server '%s'.\n"\
301
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
302
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
303
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
304
                                         sorted(gnt_nics_parsed.items())))
305
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
306
            if self.options["fix_unsynced_nics"]:
307
                vm = get_locked_server(server_id)
308
                backend_mod.process_net_status(vm=vm,
309
                                               etime=self.event_time,
310
                                               nics=gnt_nics)
311

    
312
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
313
        pass
314

    
315
    def reconcile_pending_task(self, server_id, db_server):
316
        job_id = db_server.task_job_id
317
        pending_task = False
318
        if job_id not in self.gnt_jobs:
319
            pending_task = True
320
        else:
321
            gnt_job_status = self.gnt_jobs[job_id]["status"]
322
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
323
                pending_task = True
324

    
325
        if pending_task:
326
            db_server = get_locked_server(server_id)
327
            if db_server.task_job_id != job_id:
328
                # task has changed!
329
                return
330
            self.log.info("Found server '%s' with pending task: '%s'",
331
                          server_id, db_server.task)
332
            if self.options["fix_pending_tasks"]:
333
                db_server.task = None
334
                db_server.task_job_id = None
335
                db_server.save()
336
                self.log.info("Cleared pending task for server '%s", server_id)
337

    
338

    
339
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
340
                         "Firewall"]) + ": %s"
341

    
342

    
343
def format_db_nic(nic):
344
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
345
                      nic.mac, nic.index, nic.firewall_profile)
346

    
347

    
348
def format_gnt_nic(nic):
349
    nic_name, nic = nic
350
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
351
                      nic["network"].id, nic["mac"], nic["index"],
352
                      nic["firewall_profile"])
353

    
354

    
355
#
356
# Networks
357
#
358

    
359

    
360
def get_networks_from_ganeti(backend):
361
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
362

    
363
    networks = {}
364
    with pooled_rapi_client(backend) as c:
365
        for net in c.GetNetworks(bulk=True):
366
            if net['name'].startswith(prefix):
367
                id = utils.id_from_network_name(net['name'])
368
                networks[id] = net
369

    
370
    return networks
371

    
372

    
373
def hanging_networks(backend, GNets):
374
    """Get networks that are not connected to all Nodegroups.
375

376
    """
377
    def get_network_groups(group_list):
378
        groups = set()
379
        for (name, mode, link) in group_list:
380
            groups.add(name)
381
        return groups
382

    
383
    with pooled_rapi_client(backend) as c:
384
        groups = set(c.GetGroups())
385

    
386
    hanging = {}
387
    for id, info in GNets.items():
388
        group_list = get_network_groups(info['group_list'])
389
        if group_list != groups:
390
            hanging[id] = groups - group_list
391
    return hanging
392

    
393

    
394
def get_online_backends():
395
    return Backend.objects.filter(offline=False)
396

    
397

    
398
def get_database_servers(backend):
399
    servers = backend.virtual_machines.select_related("flavor")\
400
                                      .prefetch_related("nics__ips__subnet")\
401
                                      .filter(deleted=False)
402
    return dict([(s.id, s) for s in servers])
403

    
404

    
405
def get_ganeti_servers(backend):
406
    gnt_instances = backend_mod.get_instances(backend)
407
    # Filter out non-synnefo instances
408
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
409
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
410
                           gnt_instances)
411
    gnt_instances = map(parse_gnt_instance, gnt_instances)
412
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
413

    
414

    
415
def parse_gnt_instance(instance):
416
    try:
417
        instance_id = utils.id_from_instance_name(instance['name'])
418
    except Exception:
419
        logger.error("Ignoring instance with malformed name %s",
420
                     instance['name'])
421
        return (None, None)
422

    
423
    beparams = instance["beparams"]
424

    
425
    vcpus = beparams["vcpus"]
426
    ram = beparams["maxmem"]
427
    state = instance["oper_state"] and "STARTED" or "STOPPED"
428

    
429
    return {
430
        "id": instance_id,
431
        "state": state,  # FIX
432
        "updated": datetime.fromtimestamp(instance["mtime"]),
433
        "disks": disks_from_instance(instance),
434
        "nics": nics_from_instance(instance),
435
        "flavor": {"vcpus": vcpus,
436
                   "ram": ram},
437
        "tags": instance["tags"]
438
    }
439

    
440

    
441
def nics_from_instance(i):
442
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
443
    names = zip(itertools.repeat('name'), i['nic.names'])
444
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
445
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
446
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
447
    # links = zip(itertools.repeat('link'), i['nic.links'])
448
    # nics = zip(ips,macs,modes,networks,links)
449
    nics = zip(ips, names, macs, networks)
450
    nics = map(lambda x: dict(x), nics)
451
    #nics = dict(enumerate(nics))
452
    tags = i["tags"]
453
    for tag in tags:
454
        t = tag.split(":")
455
        if t[0:2] == ["synnefo", "network"]:
456
            if len(t) != 4:
457
                logger.error("Malformed synefo tag %s", tag)
458
                continue
459
            nic_name = t[2]
460
            firewall = t[3]
461
            [nic.setdefault("firewall", firewall)
462
             for nic in nics if nic["name"] == nic_name]
463
    return nics
464

    
465

    
466
def get_ganeti_jobs(backend):
467
    gnt_jobs = backend_mod.get_jobs(backend)
468
    return dict([(int(j["id"]), j) for j in gnt_jobs])
469

    
470

    
471
def disks_from_instance(i):
472
    return dict([(index, {"size": size})
473
                 for index, size in enumerate(i["disk.sizes"])])
474

    
475

    
476
class NetworkReconciler(object):
477
    def __init__(self, logger, fix=False):
478
        self.log = logger
479
        self.fix = fix
480

    
481
    @transaction.commit_on_success
482
    def reconcile_networks(self):
483
        # Get models from DB
484
        self.backends = Backend.objects.exclude(offline=True)
485
        self.networks = Network.objects.filter(deleted=False)
486

    
487
        self.event_time = datetime.now()
488

    
489
        # Get info from all ganeti backends
490
        self.ganeti_networks = {}
491
        self.ganeti_hanging_networks = {}
492
        for b in self.backends:
493
            g_nets = get_networks_from_ganeti(b)
494
            self.ganeti_networks[b] = g_nets
495
            g_hanging_nets = hanging_networks(b, g_nets)
496
            self.ganeti_hanging_networks[b] = g_hanging_nets
497

    
498
        self._reconcile_orphan_networks()
499

    
500
        for network in self.networks:
501
            self._reconcile_network(network)
502

    
503
    @transaction.commit_on_success
504
    def _reconcile_network(self, network):
505
        """Reconcile a network with corresponging Ganeti networks.
506

507
        Reconcile a Network and the associated BackendNetworks with the
508
        corresponding Ganeti networks in all Ganeti backends.
509

510
        """
511
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
512
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
513
        else:
514
            ip_pools = None
515
        for bend in self.backends:
516
            bnet = get_backend_network(network, bend)
517
            gnet = self.ganeti_networks[bend].get(network.id)
518
            if bnet is None and gnet is not None:
519
                # Network exists in backend but not in DB for this backend
520
                bnet = self.reconcile_parted_network(network, bend)
521

    
522
            if bnet is None:
523
                continue
524

    
525
            if gnet is None:
526
                # Network does not exist in Ganeti. If the network action
527
                # is DESTROY, we have to mark as deleted in DB, else we
528
                # have to create it in Ganeti.
529
                if network.action == "DESTROY":
530
                    if bnet.operstate != "DELETED":
531
                        self.reconcile_stale_network(bnet)
532
                else:
533
                    self.reconcile_missing_network(network, bend)
534
                # Skip rest reconciliation!
535
                continue
536

    
537
            try:
538
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
539
            except KeyError:
540
                # Network is connected to all nodegroups
541
                hanging_groups = []
542

    
543
            if hanging_groups:
544
                # CASE-3: Ganeti networks not connected to all nodegroups
545
                self.reconcile_hanging_groups(network, bend,
546
                                              hanging_groups)
547
                continue
548

    
549
            if bnet.operstate != 'ACTIVE':
550
                # CASE-4: Unsynced network state. At this point the network
551
                # exists and is connected to all nodes so is must be
552
                # active!
553
                self.reconcile_unsynced_network(network, bend, bnet)
554

    
555
            # Check that externally reserved IPs of the network in Ganeti are
556
            # also externally reserved to the IP pool
557
            externally_reserved = gnet['external_reservations']
558
            if externally_reserved and ip_pools is not None:
559
                for ip in externally_reserved.split(","):
560
                    ip = ip.strip()
561
                    for ip_pool in ip_pools:
562
                        if ip_pool.contains(ip):
563
                            if not ip_pool.is_reserved(ip):
564
                                msg = ("D: IP '%s' is reserved for network"
565
                                       " '%s' in backend '%s' but not in DB.")
566
                                self.log.info(msg, ip, network, bend)
567
                                if self.fix:
568
                                    ip_pool.reserve(ip, external=True)
569
                                    ip_pool.save()
570
                                    self.log.info("F: Reserved IP '%s'", ip)
571
        if network.state != "ACTIVE":
572
            network = Network.objects.select_for_update().get(id=network.id)
573
            backend_mod.update_network_state(network)
574

    
575
    def reconcile_parted_network(self, network, backend):
576
        self.log.info("D: Missing DB entry for network %s in backend %s",
577
                      network, backend)
578
        if self.fix:
579
            network.create_backend_network(backend)
580
            self.log.info("F: Created DB entry")
581
            bnet = get_backend_network(network, backend)
582
            return bnet
583

    
584
    def reconcile_stale_network(self, backend_network):
585
        self.log.info("D: Stale DB entry for network %s in backend %s",
586
                      backend_network.network, backend_network.backend)
587
        if self.fix:
588
            backend_network = BackendNetwork.objects.select_for_update()\
589
                                                    .get(id=backend_network.id)
590
            backend_mod.process_network_status(
591
                backend_network, self.event_time, 0,
592
                "OP_NETWORK_REMOVE",
593
                "success",
594
                "Reconciliation simulated event")
595
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
596

    
597
    def reconcile_missing_network(self, network, backend):
598
        self.log.info("D: Missing Ganeti network %s in backend %s",
599
                      network, backend)
600
        if self.fix:
601
            backend_mod.create_network(network, backend)
602
            self.log.info("F: Issued OP_NETWORK_CONNECT")
603

    
604
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
605
        self.log.info('D: Network %s in backend %s is not connected to '
606
                      'the following groups:', network, backend)
607
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
608
        if self.fix:
609
            for group in hanging_groups:
610
                self.log.info('F: Connecting network %s to nodegroup %s',
611
                              network, group)
612
                backend_mod.connect_network(network, backend, depends=[],
613
                                            group=group)
614

    
615
    def reconcile_unsynced_network(self, network, backend, backend_network):
616
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
617
        if self.fix:
618
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
619
            backend_network = BackendNetwork.objects.select_for_update()\
620
                                                    .get(id=backend_network.id)
621
            backend_mod.process_network_status(
622
                backend_network, self.event_time, 0,
623
                "OP_NETWORK_CONNECT",
624
                "success",
625
                "Reconciliation simulated eventd")
626

    
627
    def _reconcile_orphan_networks(self):
628
        db_networks = self.networks
629
        ganeti_networks = self.ganeti_networks
630
        # Detect Orphan Networks in Ganeti
631
        db_network_ids = set([net.id for net in db_networks])
632
        for back_end, ganeti_networks in ganeti_networks.items():
633
            ganeti_network_ids = set(ganeti_networks.keys())
634
            orphans = ganeti_network_ids - db_network_ids
635

    
636
            if len(orphans) > 0:
637
                self.log.info('D: Orphan Networks in backend %s:',
638
                              back_end.clustername)
639
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
640
                if self.fix:
641
                    for net_id in orphans:
642
                        self.log.info('Disconnecting and deleting network %d',
643
                                      net_id)
644
                        try:
645
                            network = Network.objects.get(id=net_id)
646
                            backend_mod.delete_network(network,
647
                                                       backend=back_end)
648
                        except Network.DoesNotExist:
649
                            self.log.info("Not entry for network %s in DB !!",
650
                                          net_id)
651

    
652

    
653
def get_backend_network(network, backend):
654
    try:
655
        return BackendNetwork.objects.get(network=network, backend=backend)
656
    except BackendNetwork.DoesNotExist:
657
        return None
658

    
659

    
660
class PoolReconciler(object):
661
    def __init__(self, logger, fix=False):
662
        self.log = logger
663
        self.fix = fix
664

    
665
    def reconcile(self):
666
        self.reconcile_bridges()
667
        self.reconcile_mac_prefixes()
668

    
669
        networks = Network.objects.prefetch_related("subnets")\
670
                                  .filter(deleted=False)
671
        for network in networks:
672
            for subnet in network.subnets.all():
673
                if subnet.ipversion == 4 and subnet.dhcp:
674
                    self.reconcile_ip_pool(network)
675

    
676
    @transaction.commit_on_success
677
    def reconcile_bridges(self):
678
        networks = Network.objects.filter(deleted=False,
679
                                          flavor="PHYSICAL_VLAN")
680
        check_unique_values(objects=networks, field='link', logger=self.log)
681
        try:
682
            pool = BridgePoolTable.get_pool()
683
        except pools.EmptyPool:
684
            self.log.info("There is no available pool for bridges.")
685
            return
686

    
687
        # Since pool is locked, no new networks may be created
688
        used_bridges = set(networks.values_list('link', flat=True))
689
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
690
                              used_values=used_bridges, fix=self.fix,
691
                              logger=self.log)
692

    
693
    @transaction.commit_on_success
694
    def reconcile_mac_prefixes(self):
695
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
696
        check_unique_values(objects=networks, field='mac_prefix',
697
                            logger=self.log)
698
        try:
699
            pool = MacPrefixPoolTable.get_pool()
700
        except pools.EmptyPool:
701
            self.log.info("There is no available pool for MAC prefixes.")
702
            return
703

    
704
        # Since pool is locked, no new network may be created
705
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
706
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
707
                              used_values=used_mac_prefixes, fix=self.fix,
708
                              logger=self.log)
709

    
710
    @transaction.commit_on_success
711
    def reconcile_ip_pool(self, network):
712
        # Check that all NICs have unique IPv4 address
713
        nics = network.ips.exclude(address__isnull=True).all()
714
        check_unique_values(objects=nics, field="address", logger=self.log)
715

    
716
        for ip_pool in network.get_ip_pools():
717
            # IP pool is now locked, so no new IPs may be created
718
            used_ips = ip_pool.pool_table.subnet\
719
                              .ips.exclude(address__isnull=True)\
720
                              .exclude(deleted=True)\
721
                              .values_list("address", flat=True)
722
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
723
            check_pool_consistent(pool=ip_pool,
724
                                  pool_class=pools.IPPool,
725
                                  used_values=used_ips,
726
                                  fix=self.fix, logger=self.log)
727

    
728

    
729
def check_unique_values(objects, field, logger):
730
    used_values = list(objects.values_list(field, flat=True))
731
    if len(used_values) != len(set(used_values)):
732
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
733
        for value in duplicate_values:
734
            filter_args = {field: value}
735
            using_objects = objects.filter(**filter_args)
736
            msg = "Value '%s' is used as %s for more than one objects: %s"
737
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
738
        return False
739
    logger.debug("Values for field '%s' are unique.", field)
740
    return True
741

    
742

    
743
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
744
    dummy_pool = create_empty_pool(pool, pool_class)
745
    [dummy_pool.reserve(value) for value in used_values]
746
    if dummy_pool.available != pool.available:
747
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
748
        pool_diff = dummy_pool.available ^ pool.available
749
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
750
            value = pool.index_to_value(int(index))
751
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
752
            value1 = pool.is_available(value) and "available" or "unavailable"
753
            value2 = dummy_pool.is_available(value) and "available"\
754
                or "unavailable"
755
            logger.error(msg, pool, value, value1, value2)
756
        if fix:
757
            pool.available = dummy_pool.available
758
            pool.save()
759
            logger.info("Fixed available map of pool '%s'", pool)
760

    
761

    
762
def create_empty_pool(pool, pool_class):
763
    pool_row = pool.pool_table
764
    pool_row.available_map = ""
765
    pool_row.reserved_map = ""
766
    return pool_class(pool_row)
767

    
768

    
769
def get_locked_server(server_id):
770
    return VirtualMachine.objects.select_for_update().get(id=server_id)