Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 5b50c3b4

History | View | Annotate | Download (31.8 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73
from synnefo.lib.utils import merge_time
74

    
75
logger = logging.getLogger()
76
logging.basicConfig()
77

    
78
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
79

    
80

    
81
class BackendReconciler(object):
82
    def __init__(self, backend, logger, options=None):
83
        self.backend = backend
84
        self.log = logger
85
        self.client = backend.get_client()
86
        if options is None:
87
            self.options = {}
88
        else:
89
            self.options = options
90

    
91
    def close(self):
92
        self.backend.put_client(self.client)
93

    
94
    @transaction.commit_on_success
95
    def reconcile(self):
96
        log = self.log
97
        backend = self.backend
98
        log.debug("Reconciling backend %s", backend)
99

    
100
        self.event_time = datetime.now()
101

    
102
        self.db_servers = get_database_servers(backend)
103
        self.db_servers_keys = set(self.db_servers.keys())
104
        log.debug("Got servers info from database.")
105

    
106
        self.gnt_servers = get_ganeti_servers(backend)
107
        self.gnt_servers_keys = set(self.gnt_servers.keys())
108
        log.debug("Got servers info from Ganeti backend.")
109

    
110
        self.gnt_jobs = get_ganeti_jobs(backend)
111
        log.debug("Got jobs from Ganeti backend")
112

    
113
        self.stale_servers = self.reconcile_stale_servers()
114
        self.orphan_servers = self.reconcile_orphan_servers()
115
        self.unsynced_servers = self.reconcile_unsynced_servers()
116
        self.close()
117

    
118
    def get_build_status(self, db_server):
119
        """Return the status of the build job.
120

121
        Return whether the job is RUNNING, FINALIZED or ERROR, together
122
        with the timestamp that the job finished (if any).
123

124
        """
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            job = self.gnt_jobs[job_id]
128
            gnt_job_status = job["status"]
129
            end_timestamp = job["end_ts"]
130
            if end_timestamp is not None:
131
                end_timestamp = merge_time(end_timestamp)
132
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
133
                return "ERROR", end_timestamp
134
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
135
                return "RUNNING", None
136
            else:
137
                return "FINALIZED", end_timestamp
138
        else:
139
            return "ERROR", None
140

    
141
    def reconcile_stale_servers(self):
142
        # Detect stale servers
143
        stale = []
144
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
145
        for server_id in stale_keys:
146
            db_server = self.db_servers[server_id]
147
            if db_server.operstate == "BUILD":
148
                build_status, end_timestamp = self.get_build_status(db_server)
149
                if build_status == "ERROR":
150
                    # Special handling of BUILD eerrors
151
                    self.reconcile_building_server(db_server)
152
                elif build_status != "RUNNING":
153
                    stale.append(server_id)
154
            elif (db_server.operstate == "ERROR" and
155
                  db_server.action != "DESTROY"):
156
                # Servers at building ERROR are stale only if the user has
157
                # asked to destroy them.
158
                pass
159
            else:
160
                stale.append(server_id)
161

    
162
        # Report them
163
        if stale:
164
            self.log.info("Found stale servers %s at backend %s",
165
                          ", ".join(map(str, stale)), self.backend)
166
        else:
167
            self.log.debug("No stale servers at backend %s", self.backend)
168

    
169
        # Fix them
170
        if stale and self.options["fix_stale"]:
171
            for server_id in stale:
172
                vm = get_locked_server(server_id)
173
                backend_mod.process_op_status(
174
                    vm=vm,
175
                    etime=self.event_time,
176
                    jobid=-0,
177
                    opcode='OP_INSTANCE_REMOVE', status='success',
178
                    logmsg='Reconciliation: simulated Ganeti event')
179
            self.log.debug("Simulated Ganeti removal for stale servers.")
180

    
181
    def reconcile_orphan_servers(self):
182
        orphans = self.gnt_servers_keys - self.db_servers_keys
183
        if orphans:
184
            self.log.info("Found orphan servers %s at backend %s",
185
                          ", ".join(map(str, orphans)), self.backend)
186
        else:
187
            self.log.debug("No orphan servers at backend %s", self.backend)
188

    
189
        if orphans and self.options["fix_orphans"]:
190
            for server_id in orphans:
191
                server_name = utils.id_to_instance_name(server_id)
192
                self.client.DeleteInstance(server_name)
193
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
194

    
195
    def reconcile_unsynced_servers(self):
196
        #log = self.log
197
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
198
            db_server = self.db_servers[server_id]
199
            gnt_server = self.gnt_servers[server_id]
200
            if db_server.operstate == "BUILD":
201
                build_status, end_timestamp = self.get_build_status(db_server)
202
                if build_status == "RUNNING":
203
                    # Do not reconcile building VMs
204
                    continue
205
                elif build_status == "ERROR":
206
                    # Special handling of build errors
207
                    self.reconcile_building_server(db_server)
208
                    continue
209
                elif end_timestamp >= self.event_time:
210
                    # Do not continue reconciliation for building server that
211
                    # the build job completed after quering the state of
212
                    # Ganeti servers.
213
                    continue
214

    
215
            self.reconcile_unsynced_operstate(server_id, db_server,
216
                                              gnt_server)
217
            self.reconcile_unsynced_flavor(server_id, db_server,
218
                                           gnt_server)
219
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
220
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
221
            if db_server.task is not None:
222
                self.reconcile_pending_task(server_id, db_server)
223

    
224
    def reconcile_building_server(self, db_server):
225
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
226
                      db_server.id)
227
        if self.options["fix_unsynced"]:
228
            fix_opcode = "OP_INSTANCE_CREATE"
229
            vm = get_locked_server(db_server.id)
230
            backend_mod.process_op_status(
231
                vm=vm,
232
                etime=self.event_time,
233
                jobid=-0,
234
                opcode=fix_opcode, status='error',
235
                logmsg='Reconciliation: simulated Ganeti event')
236
            self.log.debug("Simulated Ganeti error build event for"
237
                           " server '%s'", db_server.id)
238

    
239
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
240
        if db_server.operstate != gnt_server["state"]:
241
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
242
                          server_id, db_server.operstate, gnt_server["state"])
243
            if self.options["fix_unsynced"]:
244
                vm = get_locked_server(server_id)
245
                # If server is in building state, you will have first to
246
                # reconcile it's creation, to avoid wrong quotas
247
                if db_server.operstate == "BUILD":
248
                    backend_mod.process_op_status(
249
                        vm=vm, etime=self.event_time, jobid=-0,
250
                        opcode="OP_INSTANCE_CREATE", status='success',
251
                        logmsg='Reconciliation: simulated Ganeti event')
252
                fix_opcode = "OP_INSTANCE_STARTUP"\
253
                    if gnt_server["state"] == "STARTED"\
254
                    else "OP_INSTANCE_SHUTDOWN"
255
                backend_mod.process_op_status(
256
                    vm=vm, etime=self.event_time, jobid=-0,
257
                    opcode=fix_opcode, status='success',
258
                    logmsg='Reconciliation: simulated Ganeti event')
259
                self.log.debug("Simulated Ganeti state event for server '%s'",
260
                               server_id)
261

    
262
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
263
        db_flavor = db_server.flavor
264
        gnt_flavor = gnt_server["flavor"]
265
        if (db_flavor.ram != gnt_flavor["ram"] or
266
           db_flavor.cpu != gnt_flavor["vcpus"]):
267
            try:
268
                gnt_flavor = Flavor.objects.get(
269
                    ram=gnt_flavor["ram"],
270
                    cpu=gnt_flavor["vcpus"],
271
                    disk=db_flavor.disk,
272
                    disk_template=db_flavor.disk_template)
273
            except Flavor.DoesNotExist:
274
                self.log.warning("Server '%s' has unknown flavor.", server_id)
275
                return
276

    
277
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
278
                          " Ganeti", server_id, db_flavor, gnt_flavor)
279
            if self.options["fix_unsynced_flavors"]:
280
                vm = get_locked_server(server_id)
281
                old_state = vm.operstate
282
                opcode = "OP_INSTANCE_SET_PARAMS"
283
                beparams = {"vcpus": gnt_flavor.cpu,
284
                            "minmem": gnt_flavor.ram,
285
                            "maxmem": gnt_flavor.ram}
286
                backend_mod.process_op_status(
287
                    vm=vm, etime=self.event_time, jobid=-0,
288
                    opcode=opcode, status='success',
289
                    job_fields={"beparams": beparams},
290
                    logmsg='Reconciliation: simulated Ganeti event')
291
                # process_op_status with beparams will set the vmstate to
292
                # shutdown. Fix this be returning it to old state
293
                vm = VirtualMachine.objects.get(pk=server_id)
294
                vm.operstate = old_state
295
                vm.save()
296
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
297
                               server_id)
298

    
299
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
300
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
301
        db_nics = db_server.nics.exclude(state="BUILD",
302
                                         created__lte=building_time) \
303
                                .order_by("id")
304
        gnt_nics = gnt_server["nics"]
305
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
306
        nics_changed = len(db_nics) != len(gnt_nics)
307
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
308
            gnt_nic_id, gnt_nic = gnt_nic
309
            if (db_nic.id == gnt_nic_id) and\
310
               backend_mod.nics_are_equal(db_nic, gnt_nic):
311
                continue
312
            else:
313
                nics_changed = True
314
                break
315
        if nics_changed:
316
            msg = "Found unsynced NICs for server '%s'.\n"\
317
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
318
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
319
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
320
                                         sorted(gnt_nics_parsed.items())))
321
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
322
            if self.options["fix_unsynced_nics"]:
323
                vm = get_locked_server(server_id)
324
                backend_mod.process_net_status(vm=vm,
325
                                               etime=self.event_time,
326
                                               nics=gnt_nics)
327

    
328
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
329
        pass
330

    
331
    def reconcile_pending_task(self, server_id, db_server):
332
        job_id = db_server.task_job_id
333
        pending_task = False
334
        if job_id not in self.gnt_jobs:
335
            pending_task = True
336
        else:
337
            gnt_job_status = self.gnt_jobs[job_id]["status"]
338
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
339
                pending_task = True
340

    
341
        if pending_task:
342
            db_server = get_locked_server(server_id)
343
            if db_server.task_job_id != job_id:
344
                # task has changed!
345
                return
346
            self.log.info("Found server '%s' with pending task: '%s'",
347
                          server_id, db_server.task)
348
            if self.options["fix_pending_tasks"]:
349
                db_server.task = None
350
                db_server.task_job_id = None
351
                db_server.save()
352
                self.log.info("Cleared pending task for server '%s", server_id)
353

    
354

    
355
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
356
                         "Firewall"]) + ": %s"
357

    
358

    
359
def format_db_nic(nic):
360
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
361
                      nic.mac, nic.index, nic.firewall_profile)
362

    
363

    
364
def format_gnt_nic(nic):
365
    nic_name, nic = nic
366
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
367
                      nic["network"].id, nic["mac"], nic["index"],
368
                      nic["firewall_profile"])
369

    
370

    
371
#
372
# Networks
373
#
374

    
375

    
376
def get_networks_from_ganeti(backend):
377
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
378

    
379
    networks = {}
380
    with pooled_rapi_client(backend) as c:
381
        for net in c.GetNetworks(bulk=True):
382
            if net['name'].startswith(prefix):
383
                id = utils.id_from_network_name(net['name'])
384
                networks[id] = net
385

    
386
    return networks
387

    
388

    
389
def hanging_networks(backend, GNets):
390
    """Get networks that are not connected to all Nodegroups.
391

392
    """
393
    def get_network_groups(group_list):
394
        groups = set()
395
        for (name, mode, link) in group_list:
396
            groups.add(name)
397
        return groups
398

    
399
    with pooled_rapi_client(backend) as c:
400
        groups = set(c.GetGroups())
401

    
402
    hanging = {}
403
    for id, info in GNets.items():
404
        group_list = get_network_groups(info['group_list'])
405
        if group_list != groups:
406
            hanging[id] = groups - group_list
407
    return hanging
408

    
409

    
410
def get_online_backends():
411
    return Backend.objects.filter(offline=False)
412

    
413

    
414
def get_database_servers(backend):
415
    servers = backend.virtual_machines.select_related("flavor")\
416
                                      .prefetch_related("nics__ips__subnet")\
417
                                      .filter(deleted=False)
418
    return dict([(s.id, s) for s in servers])
419

    
420

    
421
def get_ganeti_servers(backend):
422
    gnt_instances = backend_mod.get_instances(backend)
423
    # Filter out non-synnefo instances
424
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
425
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
426
                           gnt_instances)
427
    gnt_instances = map(parse_gnt_instance, gnt_instances)
428
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
429

    
430

    
431
def parse_gnt_instance(instance):
432
    try:
433
        instance_id = utils.id_from_instance_name(instance['name'])
434
    except Exception:
435
        logger.error("Ignoring instance with malformed name %s",
436
                     instance['name'])
437
        return (None, None)
438

    
439
    beparams = instance["beparams"]
440

    
441
    vcpus = beparams["vcpus"]
442
    ram = beparams["maxmem"]
443
    state = instance["oper_state"] and "STARTED" or "STOPPED"
444

    
445
    return {
446
        "id": instance_id,
447
        "state": state,  # FIX
448
        "updated": datetime.fromtimestamp(instance["mtime"]),
449
        "disks": disks_from_instance(instance),
450
        "nics": nics_from_instance(instance),
451
        "flavor": {"vcpus": vcpus,
452
                   "ram": ram},
453
        "tags": instance["tags"]
454
    }
455

    
456

    
457
def nics_from_instance(i):
458
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
459
    names = zip(itertools.repeat('name'), i['nic.names'])
460
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
461
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
462
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
463
    # links = zip(itertools.repeat('link'), i['nic.links'])
464
    # nics = zip(ips,macs,modes,networks,links)
465
    nics = zip(ips, names, macs, networks)
466
    nics = map(lambda x: dict(x), nics)
467
    #nics = dict(enumerate(nics))
468
    tags = i["tags"]
469
    for tag in tags:
470
        t = tag.split(":")
471
        if t[0:2] == ["synnefo", "network"]:
472
            if len(t) != 4:
473
                logger.error("Malformed synefo tag %s", tag)
474
                continue
475
            nic_name = t[2]
476
            firewall = t[3]
477
            [nic.setdefault("firewall", firewall)
478
             for nic in nics if nic["name"] == nic_name]
479
    return nics
480

    
481

    
482
def get_ganeti_jobs(backend):
483
    gnt_jobs = backend_mod.get_jobs(backend)
484
    return dict([(int(j["id"]), j) for j in gnt_jobs])
485

    
486

    
487
def disks_from_instance(i):
488
    return dict([(index, {"size": size})
489
                 for index, size in enumerate(i["disk.sizes"])])
490

    
491

    
492
class NetworkReconciler(object):
493
    def __init__(self, logger, fix=False):
494
        self.log = logger
495
        self.fix = fix
496

    
497
    @transaction.commit_on_success
498
    def reconcile_networks(self):
499
        # Get models from DB
500
        self.backends = Backend.objects.exclude(offline=True)
501
        self.networks = Network.objects.filter(deleted=False)
502

    
503
        self.event_time = datetime.now()
504

    
505
        # Get info from all ganeti backends
506
        self.ganeti_networks = {}
507
        self.ganeti_hanging_networks = {}
508
        for b in self.backends:
509
            g_nets = get_networks_from_ganeti(b)
510
            self.ganeti_networks[b] = g_nets
511
            g_hanging_nets = hanging_networks(b, g_nets)
512
            self.ganeti_hanging_networks[b] = g_hanging_nets
513

    
514
        self._reconcile_orphan_networks()
515

    
516
        for network in self.networks:
517
            self._reconcile_network(network)
518

    
519
    @transaction.commit_on_success
520
    def _reconcile_network(self, network):
521
        """Reconcile a network with corresponging Ganeti networks.
522

523
        Reconcile a Network and the associated BackendNetworks with the
524
        corresponding Ganeti networks in all Ganeti backends.
525

526
        """
527
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
528
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
529
        else:
530
            ip_pools = None
531
        for bend in self.backends:
532
            bnet = get_backend_network(network, bend)
533
            gnet = self.ganeti_networks[bend].get(network.id)
534
            if bnet is None and gnet is not None:
535
                # Network exists in backend but not in DB for this backend
536
                bnet = self.reconcile_parted_network(network, bend)
537

    
538
            if bnet is None:
539
                continue
540

    
541
            if gnet is None:
542
                # Network does not exist in Ganeti. If the network action
543
                # is DESTROY, we have to mark as deleted in DB, else we
544
                # have to create it in Ganeti.
545
                if network.action == "DESTROY":
546
                    if bnet.operstate != "DELETED":
547
                        self.reconcile_stale_network(bnet)
548
                else:
549
                    self.reconcile_missing_network(network, bend)
550
                # Skip rest reconciliation!
551
                continue
552

    
553
            try:
554
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
555
            except KeyError:
556
                # Network is connected to all nodegroups
557
                hanging_groups = []
558

    
559
            if hanging_groups:
560
                # CASE-3: Ganeti networks not connected to all nodegroups
561
                self.reconcile_hanging_groups(network, bend,
562
                                              hanging_groups)
563
                continue
564

    
565
            if bnet.operstate != 'ACTIVE':
566
                # CASE-4: Unsynced network state. At this point the network
567
                # exists and is connected to all nodes so is must be
568
                # active!
569
                self.reconcile_unsynced_network(network, bend, bnet)
570

    
571
            # Check that externally reserved IPs of the network in Ganeti are
572
            # also externally reserved to the IP pool
573
            externally_reserved = gnet['external_reservations']
574
            if externally_reserved and ip_pools is not None:
575
                for ip in externally_reserved.split(","):
576
                    ip = ip.strip()
577
                    for ip_pool in ip_pools:
578
                        if ip_pool.contains(ip):
579
                            if not ip_pool.is_reserved(ip):
580
                                msg = ("D: IP '%s' is reserved for network"
581
                                       " '%s' in backend '%s' but not in DB.")
582
                                self.log.info(msg, ip, network, bend)
583
                                if self.fix:
584
                                    ip_pool.reserve(ip, external=True)
585
                                    ip_pool.save()
586
                                    self.log.info("F: Reserved IP '%s'", ip)
587
        if network.state != "ACTIVE":
588
            network = Network.objects.select_for_update().get(id=network.id)
589
            backend_mod.update_network_state(network)
590

    
591
    def reconcile_parted_network(self, network, backend):
592
        self.log.info("D: Missing DB entry for network %s in backend %s",
593
                      network, backend)
594
        if self.fix:
595
            network.create_backend_network(backend)
596
            self.log.info("F: Created DB entry")
597
            bnet = get_backend_network(network, backend)
598
            return bnet
599

    
600
    def reconcile_stale_network(self, backend_network):
601
        self.log.info("D: Stale DB entry for network %s in backend %s",
602
                      backend_network.network, backend_network.backend)
603
        if self.fix:
604
            backend_network = BackendNetwork.objects.select_for_update()\
605
                                                    .get(id=backend_network.id)
606
            backend_mod.process_network_status(
607
                backend_network, self.event_time, 0,
608
                "OP_NETWORK_REMOVE",
609
                "success",
610
                "Reconciliation simulated event")
611
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
612

    
613
    def reconcile_missing_network(self, network, backend):
614
        self.log.info("D: Missing Ganeti network %s in backend %s",
615
                      network, backend)
616
        if self.fix:
617
            backend_mod.create_network(network, backend)
618
            self.log.info("F: Issued OP_NETWORK_CONNECT")
619

    
620
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
621
        self.log.info('D: Network %s in backend %s is not connected to '
622
                      'the following groups:', network, backend)
623
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
624
        if self.fix:
625
            for group in hanging_groups:
626
                self.log.info('F: Connecting network %s to nodegroup %s',
627
                              network, group)
628
                backend_mod.connect_network(network, backend, depends=[],
629
                                            group=group)
630

    
631
    def reconcile_unsynced_network(self, network, backend, backend_network):
632
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
633
        if self.fix:
634
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
635
            backend_network = BackendNetwork.objects.select_for_update()\
636
                                                    .get(id=backend_network.id)
637
            backend_mod.process_network_status(
638
                backend_network, self.event_time, 0,
639
                "OP_NETWORK_CONNECT",
640
                "success",
641
                "Reconciliation simulated eventd")
642

    
643
    def _reconcile_orphan_networks(self):
644
        db_networks = self.networks
645
        ganeti_networks = self.ganeti_networks
646
        # Detect Orphan Networks in Ganeti
647
        db_network_ids = set([net.id for net in db_networks])
648
        for back_end, ganeti_networks in ganeti_networks.items():
649
            ganeti_network_ids = set(ganeti_networks.keys())
650
            orphans = ganeti_network_ids - db_network_ids
651

    
652
            if len(orphans) > 0:
653
                self.log.info('D: Orphan Networks in backend %s:',
654
                              back_end.clustername)
655
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
656
                if self.fix:
657
                    for net_id in orphans:
658
                        self.log.info('Disconnecting and deleting network %d',
659
                                      net_id)
660
                        try:
661
                            network = Network.objects.get(id=net_id)
662
                            backend_mod.delete_network(network,
663
                                                       backend=back_end)
664
                        except Network.DoesNotExist:
665
                            self.log.info("Not entry for network %s in DB !!",
666
                                          net_id)
667

    
668

    
669
def get_backend_network(network, backend):
670
    try:
671
        return BackendNetwork.objects.get(network=network, backend=backend)
672
    except BackendNetwork.DoesNotExist:
673
        return None
674

    
675

    
676
class PoolReconciler(object):
677
    def __init__(self, logger, fix=False):
678
        self.log = logger
679
        self.fix = fix
680

    
681
    def reconcile(self):
682
        self.reconcile_bridges()
683
        self.reconcile_mac_prefixes()
684

    
685
        networks = Network.objects.prefetch_related("subnets")\
686
                                  .filter(deleted=False)
687
        for network in networks:
688
            for subnet in network.subnets.all():
689
                if subnet.ipversion == 4 and subnet.dhcp:
690
                    self.reconcile_ip_pool(network)
691

    
692
    @transaction.commit_on_success
693
    def reconcile_bridges(self):
694
        networks = Network.objects.filter(deleted=False,
695
                                          flavor="PHYSICAL_VLAN")
696
        check_unique_values(objects=networks, field='link', logger=self.log)
697
        try:
698
            pool = BridgePoolTable.get_pool()
699
        except pools.EmptyPool:
700
            self.log.info("There is no available pool for bridges.")
701
            return
702

    
703
        # Since pool is locked, no new networks may be created
704
        used_bridges = set(networks.values_list('link', flat=True))
705
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
706
                              used_values=used_bridges, fix=self.fix,
707
                              logger=self.log)
708

    
709
    @transaction.commit_on_success
710
    def reconcile_mac_prefixes(self):
711
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
712
        check_unique_values(objects=networks, field='mac_prefix',
713
                            logger=self.log)
714
        try:
715
            pool = MacPrefixPoolTable.get_pool()
716
        except pools.EmptyPool:
717
            self.log.info("There is no available pool for MAC prefixes.")
718
            return
719

    
720
        # Since pool is locked, no new network may be created
721
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
722
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
723
                              used_values=used_mac_prefixes, fix=self.fix,
724
                              logger=self.log)
725

    
726
    @transaction.commit_on_success
727
    def reconcile_ip_pool(self, network):
728
        # Check that all NICs have unique IPv4 address
729
        nics = network.ips.exclude(address__isnull=True).all()
730
        check_unique_values(objects=nics, field="address", logger=self.log)
731

    
732
        for ip_pool in network.get_ip_pools():
733
            # IP pool is now locked, so no new IPs may be created
734
            used_ips = ip_pool.pool_table.subnet\
735
                              .ips.exclude(address__isnull=True)\
736
                              .exclude(deleted=True)\
737
                              .values_list("address", flat=True)
738
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
739
            check_pool_consistent(pool=ip_pool,
740
                                  pool_class=pools.IPPool,
741
                                  used_values=used_ips,
742
                                  fix=self.fix, logger=self.log)
743

    
744

    
745
def check_unique_values(objects, field, logger):
746
    used_values = list(objects.values_list(field, flat=True))
747
    if len(used_values) != len(set(used_values)):
748
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
749
        for value in duplicate_values:
750
            filter_args = {field: value}
751
            using_objects = objects.filter(**filter_args)
752
            msg = "Value '%s' is used as %s for more than one objects: %s"
753
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
754
        return False
755
    logger.debug("Values for field '%s' are unique.", field)
756
    return True
757

    
758

    
759
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
760
    dummy_pool = create_empty_pool(pool, pool_class)
761
    [dummy_pool.reserve(value) for value in used_values]
762
    if dummy_pool.available != pool.available:
763
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
764
        pool_diff = dummy_pool.available ^ pool.available
765
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
766
            value = pool.index_to_value(int(index))
767
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
768
            value1 = pool.is_available(value) and "available" or "unavailable"
769
            value2 = dummy_pool.is_available(value) and "available"\
770
                or "unavailable"
771
            logger.error(msg, pool, value, value1, value2)
772
        if fix:
773
            pool.available = dummy_pool.available
774
            pool.save()
775
            logger.info("Fixed available map of pool '%s'", pool)
776

    
777

    
778
def create_empty_pool(pool, pool_class):
779
    pool_row = pool.pool_table
780
    pool_row.available_map = ""
781
    pool_row.reserved_map = ""
782
    return pool_class(pool_row)
783

    
784

    
785
def get_locked_server(server_id):
786
    return VirtualMachine.objects.select_for_update().get(id=server_id)