Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 1040b85b

History | View | Annotate | Download (33.9 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73
from synnefo.lib.utils import merge_time
74

    
75
logger = logging.getLogger()
76
logging.basicConfig()
77

    
78
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
79

    
80

    
81
class BackendReconciler(object):
82
    def __init__(self, backend, logger, options=None):
83
        self.backend = backend
84
        self.log = logger
85
        self.client = backend.get_client()
86
        if options is None:
87
            self.options = {}
88
        else:
89
            self.options = options
90

    
91
    def close(self):
92
        self.backend.put_client(self.client)
93

    
94
    @transaction.commit_on_success
95
    def reconcile(self):
96
        log = self.log
97
        backend = self.backend
98
        log.debug("Reconciling backend %s", backend)
99

    
100
        self.event_time = datetime.now()
101

    
102
        self.db_servers = get_database_servers(backend)
103
        self.db_servers_keys = set(self.db_servers.keys())
104
        log.debug("Got servers info from database.")
105

    
106
        self.gnt_servers = get_ganeti_servers(backend)
107
        self.gnt_servers_keys = set(self.gnt_servers.keys())
108
        log.debug("Got servers info from Ganeti backend.")
109

    
110
        self.gnt_jobs = get_ganeti_jobs(backend)
111
        log.debug("Got jobs from Ganeti backend")
112

    
113
        self.stale_servers = self.reconcile_stale_servers()
114
        self.orphan_servers = self.reconcile_orphan_servers()
115
        self.unsynced_servers = self.reconcile_unsynced_servers()
116
        self.close()
117

    
118
    def get_build_status(self, db_server):
119
        """Return the status of the build job.
120

121
        Return whether the job is RUNNING, FINALIZED or ERROR, together
122
        with the timestamp that the job finished (if any).
123

124
        """
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            job = self.gnt_jobs[job_id]
128
            gnt_job_status = job["status"]
129
            end_timestamp = job["end_ts"]
130
            if end_timestamp is not None:
131
                end_timestamp = merge_time(end_timestamp)
132
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
133
                return "ERROR", end_timestamp
134
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
135
                return "RUNNING", None
136
            else:
137
                return "FINALIZED", end_timestamp
138
        else:
139
            return "ERROR", None
140

    
141
    def reconcile_stale_servers(self):
142
        # Detect stale servers
143
        stale = []
144
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
145
        for server_id in stale_keys:
146
            db_server = self.db_servers[server_id]
147
            if db_server.operstate == "BUILD":
148
                build_status, end_timestamp = self.get_build_status(db_server)
149
                if build_status == "ERROR":
150
                    # Special handling of BUILD eerrors
151
                    self.reconcile_building_server(db_server)
152
                elif build_status != "RUNNING":
153
                    stale.append(server_id)
154
            elif (db_server.operstate == "ERROR" and
155
                  db_server.action != "DESTROY"):
156
                # Servers at building ERROR are stale only if the user has
157
                # asked to destroy them.
158
                pass
159
            else:
160
                stale.append(server_id)
161

    
162
        # Report them
163
        if stale:
164
            self.log.info("Found stale servers %s at backend %s",
165
                          ", ".join(map(str, stale)), self.backend)
166
        else:
167
            self.log.debug("No stale servers at backend %s", self.backend)
168

    
169
        # Fix them
170
        if stale and self.options["fix_stale"]:
171
            for server_id in stale:
172
                vm = get_locked_server(server_id)
173
                backend_mod.process_op_status(
174
                    vm=vm,
175
                    etime=self.event_time,
176
                    jobid=-0,
177
                    opcode='OP_INSTANCE_REMOVE', status='success',
178
                    logmsg='Reconciliation: simulated Ganeti event')
179
            self.log.debug("Simulated Ganeti removal for stale servers.")
180

    
181
    def reconcile_orphan_servers(self):
182
        orphans = self.gnt_servers_keys - self.db_servers_keys
183
        if orphans:
184
            self.log.info("Found orphan servers %s at backend %s",
185
                          ", ".join(map(str, orphans)), self.backend)
186
        else:
187
            self.log.debug("No orphan servers at backend %s", self.backend)
188

    
189
        if orphans and self.options["fix_orphans"]:
190
            for server_id in orphans:
191
                server_name = utils.id_to_instance_name(server_id)
192
                self.client.DeleteInstance(server_name)
193
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
194

    
195
    def reconcile_unsynced_servers(self):
196
        #log = self.log
197
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
198
            db_server = self.db_servers[server_id]
199
            gnt_server = self.gnt_servers[server_id]
200
            if db_server.operstate == "BUILD":
201
                build_status, end_timestamp = self.get_build_status(db_server)
202
                if build_status == "RUNNING":
203
                    # Do not reconcile building VMs
204
                    continue
205
                elif build_status == "ERROR":
206
                    # Special handling of build errors
207
                    self.reconcile_building_server(db_server)
208
                    continue
209
                elif end_timestamp >= self.event_time:
210
                    # Do not continue reconciliation for building server that
211
                    # the build job completed after quering the state of
212
                    # Ganeti servers.
213
                    continue
214

    
215
            self.reconcile_unsynced_operstate(server_id, db_server,
216
                                              gnt_server)
217
            self.reconcile_unsynced_flavor(server_id, db_server,
218
                                           gnt_server)
219
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
220
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
221
            if db_server.task is not None:
222
                self.reconcile_pending_task(server_id, db_server)
223

    
224
    def reconcile_building_server(self, db_server):
225
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
226
                      db_server.id)
227
        if self.options["fix_unsynced"]:
228
            fix_opcode = "OP_INSTANCE_CREATE"
229
            vm = get_locked_server(db_server.id)
230
            backend_mod.process_op_status(
231
                vm=vm,
232
                etime=self.event_time,
233
                jobid=-0,
234
                opcode=fix_opcode, status='error',
235
                logmsg='Reconciliation: simulated Ganeti event')
236
            self.log.debug("Simulated Ganeti error build event for"
237
                           " server '%s'", db_server.id)
238

    
239
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
240
        if db_server.operstate != gnt_server["state"]:
241
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
242
                          server_id, db_server.operstate, gnt_server["state"])
243
            if self.options["fix_unsynced"]:
244
                vm = get_locked_server(server_id)
245
                # If server is in building state, you will have first to
246
                # reconcile it's creation, to avoid wrong quotas
247
                if db_server.operstate == "BUILD":
248
                    backend_mod.process_op_status(
249
                        vm=vm, etime=self.event_time, jobid=-0,
250
                        opcode="OP_INSTANCE_CREATE", status='success',
251
                        logmsg='Reconciliation: simulated Ganeti event')
252
                fix_opcode = "OP_INSTANCE_STARTUP"\
253
                    if gnt_server["state"] == "STARTED"\
254
                    else "OP_INSTANCE_SHUTDOWN"
255
                backend_mod.process_op_status(
256
                    vm=vm, etime=self.event_time, jobid=-0,
257
                    opcode=fix_opcode, status='success',
258
                    logmsg='Reconciliation: simulated Ganeti event')
259
                self.log.debug("Simulated Ganeti state event for server '%s'",
260
                               server_id)
261

    
262
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
263
        db_flavor = db_server.flavor
264
        gnt_flavor = gnt_server["flavor"]
265
        if (db_flavor.ram != gnt_flavor["ram"] or
266
           db_flavor.cpu != gnt_flavor["vcpus"]):
267
            try:
268
                gnt_flavor = Flavor.objects.get(
269
                    ram=gnt_flavor["ram"],
270
                    cpu=gnt_flavor["vcpus"],
271
                    disk=db_flavor.disk,
272
                    disk_template=db_flavor.disk_template)
273
            except Flavor.DoesNotExist:
274
                self.log.warning("Server '%s' has unknown flavor.", server_id)
275
                return
276

    
277
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
278
                          " Ganeti", server_id, db_flavor, gnt_flavor)
279
            if self.options["fix_unsynced_flavors"]:
280
                vm = get_locked_server(server_id)
281
                old_state = vm.operstate
282
                opcode = "OP_INSTANCE_SET_PARAMS"
283
                beparams = {"vcpus": gnt_flavor.cpu,
284
                            "minmem": gnt_flavor.ram,
285
                            "maxmem": gnt_flavor.ram}
286
                backend_mod.process_op_status(
287
                    vm=vm, etime=self.event_time, jobid=-0,
288
                    opcode=opcode, status='success',
289
                    job_fields={"beparams": beparams},
290
                    logmsg='Reconciliation: simulated Ganeti event')
291
                # process_op_status with beparams will set the vmstate to
292
                # shutdown. Fix this be returning it to old state
293
                vm = VirtualMachine.objects.get(pk=server_id)
294
                vm.operstate = old_state
295
                vm.save()
296
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
297
                               server_id)
298

    
299
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
300
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
301
        db_nics = db_server.nics.exclude(state="BUILD",
302
                                         created__lte=building_time) \
303
                                .order_by("id")
304
        gnt_nics = gnt_server["nics"]
305
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
306
        nics_changed = len(db_nics) != len(gnt_nics)
307
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
308
            gnt_nic_id, gnt_nic = gnt_nic
309
            if (db_nic.id == gnt_nic_id) and\
310
               backend_mod.nics_are_equal(db_nic, gnt_nic):
311
                continue
312
            else:
313
                nics_changed = True
314
                break
315
        if nics_changed:
316
            msg = "Found unsynced NICs for server '%s'.\n"\
317
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
318
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
319
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
320
                                         sorted(gnt_nics_parsed.items())))
321
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
322
            if self.options["fix_unsynced_nics"]:
323
                vm = get_locked_server(server_id)
324
                backend_mod.process_net_status(vm=vm,
325
                                               etime=self.event_time,
326
                                               nics=gnt_nics)
327

    
328
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
329
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
330
        db_disks = db_server.volumes.exclude(status="CREATING",
331
                                             created__lte=building_time) \
332
                                    .filter(deleted=False)\
333
                                    .order_by("id")
334
        gnt_disks = gnt_server["disks"]
335
        gnt_disks_parsed = backend_mod.process_ganeti_disks(gnt_disks)
336
        disks_changed = len(db_disks) != len(gnt_disks)
337
        for db_disk, gnt_disk in zip(db_disks,
338
                                     sorted(gnt_disks_parsed.items())):
339
            gnt_disk_id, gnt_disk = gnt_disk
340
            if (db_disk.id == gnt_disk_id) and\
341
               backend_mod.disks_are_equal(db_disk, gnt_disk):
342
                continue
343
            else:
344
                disks_changed = True
345
                break
346
        if disks_changed:
347
            msg = "Found unsynced disks for server '%s'.\n"\
348
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
349
            db_disks_str = "\n\t\t".join(map(format_db_disk, db_disks))
350
            gnt_disks_str = "\n\t\t".join(map(format_gnt_disk,
351
                                          sorted(gnt_disks_parsed.items())))
352
            self.log.info(msg, server_id, db_disks_str, gnt_disks_str)
353
            if self.options["fix_unsynced_disks"]:
354
                vm = get_locked_server(server_id)
355
                backend_mod.process_disks_status(vm=vm,
356
                                                 etime=self.event_time,
357
                                                 disks=gnt_disks)
358

    
359
    def reconcile_pending_task(self, server_id, db_server):
360
        job_id = db_server.task_job_id
361
        pending_task = False
362
        if job_id not in self.gnt_jobs:
363
            pending_task = True
364
        else:
365
            gnt_job_status = self.gnt_jobs[job_id]["status"]
366
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
367
                pending_task = True
368

    
369
        if pending_task:
370
            db_server = get_locked_server(server_id)
371
            if db_server.task_job_id != job_id:
372
                # task has changed!
373
                return
374
            self.log.info("Found server '%s' with pending task: '%s'",
375
                          server_id, db_server.task)
376
            if self.options["fix_pending_tasks"]:
377
                db_server.task = None
378
                db_server.task_job_id = None
379
                db_server.save()
380
                self.log.info("Cleared pending task for server '%s", server_id)
381

    
382

    
383
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
384
                         "Firewall"]) + ": %s"
385

    
386

    
387
def format_db_nic(nic):
388
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
389
                      nic.mac, nic.index, nic.firewall_profile)
390

    
391

    
392
def format_gnt_nic(nic):
393
    nic_name, nic = nic
394
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
395
                      nic["network"].id, nic["mac"], nic["index"],
396
                      nic["firewall_profile"])
397

    
398
DISK_MSG = ": %s\t".join(["ID", "State", "Size", "Index"]) + ": %s"
399

    
400

    
401
def format_db_disk(disk):
402
    return DISK_MSG % (disk.id, disk.status, disk.size, disk.index)
403

    
404

    
405
def format_gnt_disk(disk):
406
    disk_name, disk = disk
407
    return DISK_MSG % (disk_name, disk["status"], disk["size"], disk["index"])
408

    
409

    
410
#
411
# Networks
412
#
413

    
414

    
415
def get_networks_from_ganeti(backend):
416
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
417

    
418
    networks = {}
419
    with pooled_rapi_client(backend) as c:
420
        for net in c.GetNetworks(bulk=True):
421
            if net['name'].startswith(prefix):
422
                id = utils.id_from_network_name(net['name'])
423
                networks[id] = net
424

    
425
    return networks
426

    
427

    
428
def hanging_networks(backend, GNets):
429
    """Get networks that are not connected to all Nodegroups.
430

431
    """
432
    def get_network_groups(group_list):
433
        groups = set()
434
        for (name, mode, link) in group_list:
435
            groups.add(name)
436
        return groups
437

    
438
    with pooled_rapi_client(backend) as c:
439
        groups = set(c.GetGroups())
440

    
441
    hanging = {}
442
    for id, info in GNets.items():
443
        group_list = get_network_groups(info['group_list'])
444
        if group_list != groups:
445
            hanging[id] = groups - group_list
446
    return hanging
447

    
448

    
449
def get_online_backends():
450
    return Backend.objects.filter(offline=False)
451

    
452

    
453
def get_database_servers(backend):
454
    servers = backend.virtual_machines.select_related("flavor")\
455
                                      .prefetch_related("nics__ips__subnet")\
456
                                      .filter(deleted=False)
457
    return dict([(s.id, s) for s in servers])
458

    
459

    
460
def get_ganeti_servers(backend):
461
    gnt_instances = backend_mod.get_instances(backend)
462
    # Filter out non-synnefo instances
463
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
464
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
465
                           gnt_instances)
466
    gnt_instances = map(parse_gnt_instance, gnt_instances)
467
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
468

    
469

    
470
def parse_gnt_instance(instance):
471
    try:
472
        instance_id = utils.id_from_instance_name(instance['name'])
473
    except Exception:
474
        logger.error("Ignoring instance with malformed name %s",
475
                     instance['name'])
476
        return (None, None)
477

    
478
    beparams = instance["beparams"]
479

    
480
    vcpus = beparams["vcpus"]
481
    ram = beparams["maxmem"]
482
    state = instance["oper_state"] and "STARTED" or "STOPPED"
483

    
484
    return {
485
        "id": instance_id,
486
        "state": state,  # FIX
487
        "updated": datetime.fromtimestamp(instance["mtime"]),
488
        "disks": disks_from_instance(instance),
489
        "nics": nics_from_instance(instance),
490
        "flavor": {"vcpus": vcpus,
491
                   "ram": ram},
492
        "tags": instance["tags"]
493
    }
494

    
495

    
496
def nics_from_instance(i):
497
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
498
    names = zip(itertools.repeat('name'), i['nic.names'])
499
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
500
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
501
    indexes = zip(itertools.repeat('index'), range(0, len(ips)))
502
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
503
    # links = zip(itertools.repeat('link'), i['nic.links'])
504
    # nics = zip(ips,macs,modes,networks,links)
505
    nics = zip(ips, names, macs, networks, indexes)
506
    nics = map(lambda x: dict(x), nics)
507
    #nics = dict(enumerate(nics))
508
    tags = i["tags"]
509
    for tag in tags:
510
        t = tag.split(":")
511
        if t[0:2] == ["synnefo", "network"]:
512
            if len(t) != 4:
513
                logger.error("Malformed synefo tag %s", tag)
514
                continue
515
            nic_name = t[2]
516
            firewall = t[3]
517
            [nic.setdefault("firewall", firewall)
518
             for nic in nics if nic["name"] == nic_name]
519
    return nics
520

    
521

    
522
def disks_from_instance(i):
523
    sizes = zip(itertools.repeat('size'), i['disk.sizes'])
524
    names = zip(itertools.repeat('name'), i['disk.names'])
525
    uuids = zip(itertools.repeat('uuid'), i['disk.uuids'])
526
    indexes = zip(itertools.repeat('index'), range(0, len(sizes)))
527
    disks = zip(sizes, names, uuids, indexes)
528
    disks = map(lambda x: dict(x), disks)
529
    #disks = dict(enumerate(disks))
530
    return disks
531

    
532

    
533
def get_ganeti_jobs(backend):
534
    gnt_jobs = backend_mod.get_jobs(backend)
535
    return dict([(int(j["id"]), j) for j in gnt_jobs])
536

    
537

    
538
class NetworkReconciler(object):
539
    def __init__(self, logger, fix=False):
540
        self.log = logger
541
        self.fix = fix
542

    
543
    @transaction.commit_on_success
544
    def reconcile_networks(self):
545
        # Get models from DB
546
        self.backends = Backend.objects.exclude(offline=True)
547
        self.networks = Network.objects.filter(deleted=False)
548

    
549
        self.event_time = datetime.now()
550

    
551
        # Get info from all ganeti backends
552
        self.ganeti_networks = {}
553
        self.ganeti_hanging_networks = {}
554
        for b in self.backends:
555
            g_nets = get_networks_from_ganeti(b)
556
            self.ganeti_networks[b] = g_nets
557
            g_hanging_nets = hanging_networks(b, g_nets)
558
            self.ganeti_hanging_networks[b] = g_hanging_nets
559

    
560
        self._reconcile_orphan_networks()
561

    
562
        for network in self.networks:
563
            self._reconcile_network(network)
564

    
565
    @transaction.commit_on_success
566
    def _reconcile_network(self, network):
567
        """Reconcile a network with corresponging Ganeti networks.
568

569
        Reconcile a Network and the associated BackendNetworks with the
570
        corresponding Ganeti networks in all Ganeti backends.
571

572
        """
573
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
574
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
575
        else:
576
            ip_pools = None
577
        for bend in self.backends:
578
            bnet = get_backend_network(network, bend)
579
            gnet = self.ganeti_networks[bend].get(network.id)
580
            if bnet is None and gnet is not None:
581
                # Network exists in backend but not in DB for this backend
582
                bnet = self.reconcile_parted_network(network, bend)
583

    
584
            if bnet is None:
585
                continue
586

    
587
            if gnet is None:
588
                # Network does not exist in Ganeti. If the network action
589
                # is DESTROY, we have to mark as deleted in DB, else we
590
                # have to create it in Ganeti.
591
                if network.action == "DESTROY":
592
                    if bnet.operstate != "DELETED":
593
                        self.reconcile_stale_network(bnet)
594
                else:
595
                    self.reconcile_missing_network(network, bend)
596
                # Skip rest reconciliation!
597
                continue
598

    
599
            try:
600
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
601
            except KeyError:
602
                # Network is connected to all nodegroups
603
                hanging_groups = []
604

    
605
            if hanging_groups:
606
                # CASE-3: Ganeti networks not connected to all nodegroups
607
                self.reconcile_hanging_groups(network, bend,
608
                                              hanging_groups)
609
                continue
610

    
611
            if bnet.operstate != 'ACTIVE':
612
                # CASE-4: Unsynced network state. At this point the network
613
                # exists and is connected to all nodes so is must be
614
                # active!
615
                self.reconcile_unsynced_network(network, bend, bnet)
616

    
617
            # Check that externally reserved IPs of the network in Ganeti are
618
            # also externally reserved to the IP pool
619
            externally_reserved = gnet['external_reservations']
620
            if externally_reserved and ip_pools is not None:
621
                for ip in externally_reserved.split(","):
622
                    ip = ip.strip()
623
                    for ip_pool in ip_pools:
624
                        if ip_pool.contains(ip):
625
                            if not ip_pool.is_reserved(ip):
626
                                msg = ("D: IP '%s' is reserved for network"
627
                                       " '%s' in backend '%s' but not in DB.")
628
                                self.log.info(msg, ip, network, bend)
629
                                if self.fix:
630
                                    ip_pool.reserve(ip, external=True)
631
                                    ip_pool.save()
632
                                    self.log.info("F: Reserved IP '%s'", ip)
633
        if network.state != "ACTIVE":
634
            network = Network.objects.select_for_update().get(id=network.id)
635
            backend_mod.update_network_state(network)
636

    
637
    def reconcile_parted_network(self, network, backend):
638
        self.log.info("D: Missing DB entry for network %s in backend %s",
639
                      network, backend)
640
        if self.fix:
641
            network.create_backend_network(backend)
642
            self.log.info("F: Created DB entry")
643
            bnet = get_backend_network(network, backend)
644
            return bnet
645

    
646
    def reconcile_stale_network(self, backend_network):
647
        self.log.info("D: Stale DB entry for network %s in backend %s",
648
                      backend_network.network, backend_network.backend)
649
        if self.fix:
650
            backend_network = BackendNetwork.objects.select_for_update()\
651
                                                    .get(id=backend_network.id)
652
            backend_mod.process_network_status(
653
                backend_network, self.event_time, 0,
654
                "OP_NETWORK_REMOVE",
655
                "success",
656
                "Reconciliation simulated event")
657
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
658

    
659
    def reconcile_missing_network(self, network, backend):
660
        self.log.info("D: Missing Ganeti network %s in backend %s",
661
                      network, backend)
662
        if self.fix:
663
            backend_mod.create_network(network, backend)
664
            self.log.info("F: Issued OP_NETWORK_CONNECT")
665

    
666
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
667
        self.log.info('D: Network %s in backend %s is not connected to '
668
                      'the following groups:', network, backend)
669
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
670
        if self.fix:
671
            for group in hanging_groups:
672
                self.log.info('F: Connecting network %s to nodegroup %s',
673
                              network, group)
674
                backend_mod.connect_network(network, backend, depends=[],
675
                                            group=group)
676

    
677
    def reconcile_unsynced_network(self, network, backend, backend_network):
678
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
679
        if self.fix:
680
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
681
            backend_network = BackendNetwork.objects.select_for_update()\
682
                                                    .get(id=backend_network.id)
683
            backend_mod.process_network_status(
684
                backend_network, self.event_time, 0,
685
                "OP_NETWORK_CONNECT",
686
                "success",
687
                "Reconciliation simulated eventd")
688

    
689
    def _reconcile_orphan_networks(self):
690
        db_networks = self.networks
691
        ganeti_networks = self.ganeti_networks
692
        # Detect Orphan Networks in Ganeti
693
        db_network_ids = set([net.id for net in db_networks])
694
        for back_end, ganeti_networks in ganeti_networks.items():
695
            ganeti_network_ids = set(ganeti_networks.keys())
696
            orphans = ganeti_network_ids - db_network_ids
697

    
698
            if len(orphans) > 0:
699
                self.log.info('D: Orphan Networks in backend %s:',
700
                              back_end.clustername)
701
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
702
                if self.fix:
703
                    for net_id in orphans:
704
                        self.log.info('Disconnecting and deleting network %d',
705
                                      net_id)
706
                        try:
707
                            network = Network.objects.get(id=net_id)
708
                            backend_mod.delete_network(network,
709
                                                       backend=back_end)
710
                        except Network.DoesNotExist:
711
                            self.log.info("Not entry for network %s in DB !!",
712
                                          net_id)
713

    
714

    
715
def get_backend_network(network, backend):
716
    try:
717
        return BackendNetwork.objects.get(network=network, backend=backend)
718
    except BackendNetwork.DoesNotExist:
719
        return None
720

    
721

    
722
class PoolReconciler(object):
723
    def __init__(self, logger, fix=False):
724
        self.log = logger
725
        self.fix = fix
726

    
727
    def reconcile(self):
728
        self.reconcile_bridges()
729
        self.reconcile_mac_prefixes()
730

    
731
        networks = Network.objects.prefetch_related("subnets")\
732
                                  .filter(deleted=False)
733
        for network in networks:
734
            for subnet in network.subnets.all():
735
                if subnet.ipversion == 4 and subnet.dhcp:
736
                    self.reconcile_ip_pool(network)
737

    
738
    @transaction.commit_on_success
739
    def reconcile_bridges(self):
740
        networks = Network.objects.filter(deleted=False,
741
                                          flavor="PHYSICAL_VLAN")
742
        check_unique_values(objects=networks, field='link', logger=self.log)
743
        try:
744
            pool = BridgePoolTable.get_pool()
745
        except pools.EmptyPool:
746
            self.log.info("There is no available pool for bridges.")
747
            return
748

    
749
        # Since pool is locked, no new networks may be created
750
        used_bridges = set(networks.values_list('link', flat=True))
751
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
752
                              used_values=used_bridges, fix=self.fix,
753
                              logger=self.log)
754

    
755
    @transaction.commit_on_success
756
    def reconcile_mac_prefixes(self):
757
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
758
        check_unique_values(objects=networks, field='mac_prefix',
759
                            logger=self.log)
760
        try:
761
            pool = MacPrefixPoolTable.get_pool()
762
        except pools.EmptyPool:
763
            self.log.info("There is no available pool for MAC prefixes.")
764
            return
765

    
766
        # Since pool is locked, no new network may be created
767
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
768
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
769
                              used_values=used_mac_prefixes, fix=self.fix,
770
                              logger=self.log)
771

    
772
    @transaction.commit_on_success
773
    def reconcile_ip_pool(self, network):
774
        # Check that all NICs have unique IPv4 address
775
        nics = network.ips.exclude(address__isnull=True).all()
776
        check_unique_values(objects=nics, field="address", logger=self.log)
777

    
778
        for ip_pool in network.get_ip_pools():
779
            # IP pool is now locked, so no new IPs may be created
780
            used_ips = ip_pool.pool_table.subnet\
781
                              .ips.exclude(address__isnull=True)\
782
                              .exclude(deleted=True)\
783
                              .values_list("address", flat=True)
784
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
785
            check_pool_consistent(pool=ip_pool,
786
                                  pool_class=pools.IPPool,
787
                                  used_values=used_ips,
788
                                  fix=self.fix, logger=self.log)
789

    
790

    
791
def check_unique_values(objects, field, logger):
792
    used_values = list(objects.values_list(field, flat=True))
793
    if len(used_values) != len(set(used_values)):
794
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
795
        for value in duplicate_values:
796
            filter_args = {field: value}
797
            using_objects = objects.filter(**filter_args)
798
            msg = "Value '%s' is used as %s for more than one objects: %s"
799
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
800
        return False
801
    logger.debug("Values for field '%s' are unique.", field)
802
    return True
803

    
804

    
805
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
806
    dummy_pool = create_empty_pool(pool, pool_class)
807
    [dummy_pool.reserve(value) for value in used_values]
808
    if dummy_pool.available != pool.available:
809
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
810
        pool_diff = dummy_pool.available ^ pool.available
811
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
812
            value = pool.index_to_value(int(index))
813
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
814
            value1 = pool.is_available(value) and "available" or "unavailable"
815
            value2 = dummy_pool.is_available(value) and "available"\
816
                or "unavailable"
817
            logger.error(msg, pool, value, value1, value2)
818
        if fix:
819
            pool.available = dummy_pool.available
820
            pool.save()
821
            logger.info("Fixed available map of pool '%s'", pool)
822

    
823

    
824
def create_empty_pool(pool, pool_class):
825
    pool_row = pool.pool_table
826
    pool_row.available_map = ""
827
    pool_row.reserved_map = ""
828
    return pool_class(pool_row)
829

    
830

    
831
def get_locked_server(server_id):
832
    return VirtualMachine.objects.select_for_update().get(id=server_id)