Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ d05e5324

History | View | Annotate | Download (34.1 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.conf import settings
60

    
61
import logging
62
import itertools
63
import bitarray
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client, Network,
69
                               BackendNetwork, BridgePoolTable,
70
                               MacPrefixPoolTable)
71
from synnefo.db import pools
72
from synnefo.logic import utils, rapi, backend as backend_mod
73
from synnefo.lib.utils import merge_time
74

    
75
logger = logging.getLogger()
76
logging.basicConfig()
77

    
78
BUILDING_NIC_TIMEOUT = timedelta(seconds=120)
79

    
80

    
81
class BackendReconciler(object):
82
    def __init__(self, backend, logger, options=None):
83
        self.backend = backend
84
        self.log = logger
85
        self.client = backend.get_client()
86
        if options is None:
87
            self.options = {}
88
        else:
89
            self.options = options
90

    
91
    def close(self):
92
        self.backend.put_client(self.client)
93

    
94
    @transaction.commit_on_success
95
    def reconcile(self):
96
        log = self.log
97
        backend = self.backend
98
        log.debug("Reconciling backend %s", backend)
99

    
100
        self.event_time = datetime.now()
101

    
102
        self.db_servers = get_database_servers(backend)
103
        self.db_servers_keys = set(self.db_servers.keys())
104
        log.debug("Got servers info from database.")
105

    
106
        self.gnt_servers = get_ganeti_servers(backend)
107
        self.gnt_servers_keys = set(self.gnt_servers.keys())
108
        log.debug("Got servers info from Ganeti backend.")
109

    
110
        self.gnt_jobs = get_ganeti_jobs(backend)
111
        log.debug("Got jobs from Ganeti backend")
112

    
113
        self.stale_servers = self.reconcile_stale_servers()
114
        self.orphan_servers = self.reconcile_orphan_servers()
115
        self.unsynced_servers = self.reconcile_unsynced_servers()
116
        self.close()
117

    
118
    def get_build_status(self, db_server):
119
        """Return the status of the build job.
120

121
        Return whether the job is RUNNING, FINALIZED or ERROR, together
122
        with the timestamp that the job finished (if any).
123

124
        """
125
        job_id = db_server.backendjobid
126
        if job_id in self.gnt_jobs:
127
            job = self.gnt_jobs[job_id]
128
            gnt_job_status = job["status"]
129
            end_timestamp = job["end_ts"]
130
            if end_timestamp is not None:
131
                end_timestamp = merge_time(end_timestamp)
132
            if gnt_job_status == rapi.JOB_STATUS_ERROR:
133
                return "ERROR", end_timestamp
134
            elif gnt_job_status not in rapi.JOB_STATUS_FINALIZED:
135
                return "RUNNING", None
136
            else:
137
                return "FINALIZED", end_timestamp
138
        else:
139
            return "ERROR", None
140

    
141
    def reconcile_stale_servers(self):
142
        # Detect stale servers
143
        stale = []
144
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
145
        for server_id in stale_keys:
146
            db_server = self.db_servers[server_id]
147
            if db_server.operstate == "BUILD":
148
                build_status, end_timestamp = self.get_build_status(db_server)
149
                if build_status == "ERROR":
150
                    # Special handling of BUILD eerrors
151
                    self.reconcile_building_server(db_server)
152
                elif build_status != "RUNNING":
153
                    stale.append(server_id)
154
            elif (db_server.operstate == "ERROR" and
155
                  db_server.action != "DESTROY"):
156
                # Servers at building ERROR are stale only if the user has
157
                # asked to destroy them.
158
                pass
159
            else:
160
                stale.append(server_id)
161

    
162
        # Report them
163
        if stale:
164
            self.log.info("Found stale servers %s at backend %s",
165
                          ", ".join(map(str, stale)), self.backend)
166
        else:
167
            self.log.debug("No stale servers at backend %s", self.backend)
168

    
169
        # Fix them
170
        if stale and self.options["fix_stale"]:
171
            for server_id in stale:
172
                vm = get_locked_server(server_id)
173
                backend_mod.process_op_status(
174
                    vm=vm,
175
                    etime=self.event_time,
176
                    jobid=-0,
177
                    opcode='OP_INSTANCE_REMOVE', status='success',
178
                    logmsg='Reconciliation: simulated Ganeti event')
179
            self.log.debug("Simulated Ganeti removal for stale servers.")
180

    
181
    def reconcile_orphan_servers(self):
182
        orphans = self.gnt_servers_keys - self.db_servers_keys
183
        if orphans:
184
            self.log.info("Found orphan servers %s at backend %s",
185
                          ", ".join(map(str, orphans)), self.backend)
186
        else:
187
            self.log.debug("No orphan servers at backend %s", self.backend)
188

    
189
        if orphans and self.options["fix_orphans"]:
190
            for server_id in orphans:
191
                server_name = utils.id_to_instance_name(server_id)
192
                self.client.DeleteInstance(server_name)
193
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
194

    
195
    def reconcile_unsynced_servers(self):
196
        #log = self.log
197
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
198
            db_server = self.db_servers[server_id]
199
            gnt_server = self.gnt_servers[server_id]
200
            if db_server.operstate == "BUILD":
201
                build_status, end_timestamp = self.get_build_status(db_server)
202
                if build_status == "RUNNING":
203
                    # Do not reconcile building VMs
204
                    continue
205
                elif build_status == "ERROR":
206
                    # Special handling of build errors
207
                    self.reconcile_building_server(db_server)
208
                    continue
209
                elif end_timestamp >= self.event_time:
210
                    # Do not continue reconciliation for building server that
211
                    # the build job completed after quering the state of
212
                    # Ganeti servers.
213
                    continue
214

    
215
            self.reconcile_unsynced_operstate(server_id, db_server,
216
                                              gnt_server)
217
            self.reconcile_unsynced_flavor(server_id, db_server,
218
                                           gnt_server)
219
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
220
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
221
            if db_server.task is not None:
222
                self.reconcile_pending_task(server_id, db_server)
223

    
224
    def reconcile_building_server(self, db_server):
225
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
226
                      db_server.id)
227
        if self.options["fix_unsynced"]:
228
            fix_opcode = "OP_INSTANCE_CREATE"
229
            vm = get_locked_server(db_server.id)
230
            backend_mod.process_op_status(
231
                vm=vm,
232
                etime=self.event_time,
233
                jobid=-0,
234
                opcode=fix_opcode, status='error',
235
                logmsg='Reconciliation: simulated Ganeti event')
236
            self.log.debug("Simulated Ganeti error build event for"
237
                           " server '%s'", db_server.id)
238

    
239
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
240
        if db_server.operstate != gnt_server["state"]:
241
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
242
                          server_id, db_server.operstate, gnt_server["state"])
243
            if self.options["fix_unsynced"]:
244
                vm = get_locked_server(server_id)
245
                # If server is in building state, you will have first to
246
                # reconcile it's creation, to avoid wrong quotas
247
                if db_server.operstate == "BUILD":
248
                    backend_mod.process_op_status(
249
                        vm=vm, etime=self.event_time, jobid=-0,
250
                        opcode="OP_INSTANCE_CREATE", status='success',
251
                        logmsg='Reconciliation: simulated Ganeti event')
252
                fix_opcode = "OP_INSTANCE_STARTUP"\
253
                    if gnt_server["state"] == "STARTED"\
254
                    else "OP_INSTANCE_SHUTDOWN"
255
                backend_mod.process_op_status(
256
                    vm=vm, etime=self.event_time, jobid=-0,
257
                    opcode=fix_opcode, status='success',
258
                    logmsg='Reconciliation: simulated Ganeti event')
259
                self.log.debug("Simulated Ganeti state event for server '%s'",
260
                               server_id)
261

    
262
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
263
        db_flavor = db_server.flavor
264
        gnt_flavor = gnt_server["flavor"]
265
        if (db_flavor.ram != gnt_flavor["ram"] or
266
           db_flavor.cpu != gnt_flavor["vcpus"]):
267
            try:
268
                gnt_flavor = Flavor.objects.get(
269
                    ram=gnt_flavor["ram"],
270
                    cpu=gnt_flavor["vcpus"],
271
                    disk=db_flavor.disk,
272
                    disk_template=db_flavor.disk_template)
273
            except Flavor.DoesNotExist:
274
                self.log.warning("Server '%s' has unknown flavor.", server_id)
275
                return
276

    
277
            self.log.info("Server '%s' has flavor '%s' in DB and '%s' in"
278
                          " Ganeti", server_id, db_flavor, gnt_flavor)
279
            if self.options["fix_unsynced_flavors"]:
280
                vm = get_locked_server(server_id)
281
                old_state = vm.operstate
282
                opcode = "OP_INSTANCE_SET_PARAMS"
283
                beparams = {"vcpus": gnt_flavor.cpu,
284
                            "minmem": gnt_flavor.ram,
285
                            "maxmem": gnt_flavor.ram}
286
                backend_mod.process_op_status(
287
                    vm=vm, etime=self.event_time, jobid=-0,
288
                    opcode=opcode, status='success',
289
                    job_fields={"beparams": beparams},
290
                    logmsg='Reconciliation: simulated Ganeti event')
291
                # process_op_status with beparams will set the vmstate to
292
                # shutdown. Fix this be returning it to old state
293
                vm = VirtualMachine.objects.get(pk=server_id)
294
                vm.operstate = old_state
295
                vm.save()
296
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
297
                               server_id)
298

    
299
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
300
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
301
        db_nics = db_server.nics.exclude(state="BUILD",
302
                                         created__lte=building_time) \
303
                                .order_by("id")
304
        gnt_nics = gnt_server["nics"]
305
        gnt_nics_parsed = backend_mod.parse_instance_nics(gnt_nics)
306
        nics_changed = len(db_nics) != len(gnt_nics)
307
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
308
            gnt_nic_id, gnt_nic = gnt_nic
309
            if (db_nic.id == gnt_nic_id) and\
310
               backend_mod.nics_are_equal(db_nic, gnt_nic):
311
                continue
312
            else:
313
                nics_changed = True
314
                break
315
        if nics_changed:
316
            msg = "Found unsynced NICs for server '%s'.\n"\
317
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
318
            db_nics_str = "\n\t\t".join(map(format_db_nic, db_nics))
319
            gnt_nics_str = "\n\t\t".join(map(format_gnt_nic,
320
                                         sorted(gnt_nics_parsed.items())))
321
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
322
            if self.options["fix_unsynced_nics"]:
323
                vm = get_locked_server(server_id)
324
                backend_mod.process_op_status(
325
                    vm=vm, etime=self.event_time, jobid=-0,
326
                    opcode="OP_INSTANCE_SET_PARAMS", status='success',
327
                    logmsg="Reconciliation: simulated Ganeti event",
328
                    nics=gnt_nics)
329

    
330
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
331
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
332
        db_disks = db_server.volumes.exclude(status="CREATING",
333
                                             created__lte=building_time) \
334
                                    .filter(deleted=False)\
335
                                    .order_by("id")
336
        gnt_disks = gnt_server["disks"]
337
        gnt_disks_parsed = backend_mod.parse_instance_disks(gnt_disks)
338
        disks_changed = len(db_disks) != len(gnt_disks)
339
        for db_disk, gnt_disk in zip(db_disks,
340
                                     sorted(gnt_disks_parsed.items())):
341
            gnt_disk_id, gnt_disk = gnt_disk
342
            if (db_disk.id == gnt_disk_id) and\
343
               backend_mod.disks_are_equal(db_disk, gnt_disk):
344
                continue
345
            else:
346
                disks_changed = True
347
                break
348
        if disks_changed:
349
            msg = "Found unsynced disks for server '%s'.\n"\
350
                  "\tDB:\n\t\t%s\n\tGaneti:\n\t\t%s"
351
            db_disks_str = "\n\t\t".join(map(format_db_disk, db_disks))
352
            gnt_disks_str = "\n\t\t".join(map(format_gnt_disk,
353
                                          sorted(gnt_disks_parsed.items())))
354
            self.log.info(msg, server_id, db_disks_str, gnt_disks_str)
355
            if self.options["fix_unsynced_disks"]:
356
                vm = get_locked_server(server_id)
357
                backend_mod.process_op_status(
358
                    vm=vm, etime=self.event_time, jobid=-0,
359
                    opcode="OP_INSTANCE_SET_PARAMS", status='success',
360
                    logmsg="Reconciliation: simulated Ganeti event",
361
                    disks=gnt_disks)
362

    
363
    def reconcile_pending_task(self, server_id, db_server):
364
        job_id = db_server.task_job_id
365
        pending_task = False
366
        if job_id not in self.gnt_jobs:
367
            pending_task = True
368
        else:
369
            gnt_job_status = self.gnt_jobs[job_id]["status"]
370
            if gnt_job_status in rapi.JOB_STATUS_FINALIZED:
371
                pending_task = True
372

    
373
        if pending_task:
374
            db_server = get_locked_server(server_id)
375
            if db_server.task_job_id != job_id:
376
                # task has changed!
377
                return
378
            self.log.info("Found server '%s' with pending task: '%s'",
379
                          server_id, db_server.task)
380
            if self.options["fix_pending_tasks"]:
381
                db_server.task = None
382
                db_server.task_job_id = None
383
                db_server.save()
384
                self.log.info("Cleared pending task for server '%s", server_id)
385

    
386

    
387
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
388
                         "Firewall"]) + ": %s"
389

    
390

    
391
def format_db_nic(nic):
392
    return NIC_MSG % (nic.id, nic.state, nic.ipv4_address, nic.network_id,
393
                      nic.mac, nic.index, nic.firewall_profile)
394

    
395

    
396
def format_gnt_nic(nic):
397
    nic_name, nic = nic
398
    return NIC_MSG % (nic_name, nic["state"], nic["ipv4_address"],
399
                      nic["network"].id, nic["mac"], nic["index"],
400
                      nic["firewall_profile"])
401

    
402
DISK_MSG = ": %s\t".join(["ID", "State", "Size", "Index"]) + ": %s"
403

    
404

    
405
def format_db_disk(disk):
406
    return DISK_MSG % (disk.id, disk.status, disk.size, disk.index)
407

    
408

    
409
def format_gnt_disk(disk):
410
    disk_name, disk = disk
411
    return DISK_MSG % (disk_name, disk["status"], disk["size"], disk["index"])
412

    
413

    
414
#
415
# Networks
416
#
417

    
418

    
419
def get_networks_from_ganeti(backend):
420
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
421

    
422
    networks = {}
423
    with pooled_rapi_client(backend) as c:
424
        for net in c.GetNetworks(bulk=True):
425
            if net['name'].startswith(prefix):
426
                id = utils.id_from_network_name(net['name'])
427
                networks[id] = net
428

    
429
    return networks
430

    
431

    
432
def hanging_networks(backend, GNets):
433
    """Get networks that are not connected to all Nodegroups.
434

435
    """
436
    def get_network_groups(group_list):
437
        groups = set()
438
        for (name, mode, link) in group_list:
439
            groups.add(name)
440
        return groups
441

    
442
    with pooled_rapi_client(backend) as c:
443
        groups = set(c.GetGroups())
444

    
445
    hanging = {}
446
    for id, info in GNets.items():
447
        group_list = get_network_groups(info['group_list'])
448
        if group_list != groups:
449
            hanging[id] = groups - group_list
450
    return hanging
451

    
452

    
453
def get_online_backends():
454
    return Backend.objects.filter(offline=False)
455

    
456

    
457
def get_database_servers(backend):
458
    servers = backend.virtual_machines.select_related("flavor")\
459
                                      .prefetch_related("nics__ips__subnet")\
460
                                      .filter(deleted=False)
461
    return dict([(s.id, s) for s in servers])
462

    
463

    
464
def get_ganeti_servers(backend):
465
    gnt_instances = backend_mod.get_instances(backend)
466
    # Filter out non-synnefo instances
467
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
468
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
469
                           gnt_instances)
470
    gnt_instances = map(parse_gnt_instance, gnt_instances)
471
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
472

    
473

    
474
def parse_gnt_instance(instance):
475
    try:
476
        instance_id = utils.id_from_instance_name(instance['name'])
477
    except Exception:
478
        logger.error("Ignoring instance with malformed name %s",
479
                     instance['name'])
480
        return (None, None)
481

    
482
    beparams = instance["beparams"]
483

    
484
    vcpus = beparams["vcpus"]
485
    ram = beparams["maxmem"]
486
    state = instance["oper_state"] and "STARTED" or "STOPPED"
487

    
488
    return {
489
        "id": instance_id,
490
        "state": state,  # FIX
491
        "updated": datetime.fromtimestamp(instance["mtime"]),
492
        "disks": disks_from_instance(instance),
493
        "nics": nics_from_instance(instance),
494
        "flavor": {"vcpus": vcpus,
495
                   "ram": ram},
496
        "tags": instance["tags"]
497
    }
498

    
499

    
500
def nics_from_instance(i):
501
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
502
    names = zip(itertools.repeat('name'), i['nic.names'])
503
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
504
    networks = zip(itertools.repeat('network'), i['nic.networks.names'])
505
    indexes = zip(itertools.repeat('index'), range(0, len(ips)))
506
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
507
    # links = zip(itertools.repeat('link'), i['nic.links'])
508
    # nics = zip(ips,macs,modes,networks,links)
509
    nics = zip(ips, names, macs, networks, indexes)
510
    nics = map(lambda x: dict(x), nics)
511
    #nics = dict(enumerate(nics))
512
    tags = i["tags"]
513
    for tag in tags:
514
        t = tag.split(":")
515
        if t[0:2] == ["synnefo", "network"]:
516
            if len(t) != 4:
517
                logger.error("Malformed synefo tag %s", tag)
518
                continue
519
            nic_name = t[2]
520
            firewall = t[3]
521
            [nic.setdefault("firewall", firewall)
522
             for nic in nics if nic["name"] == nic_name]
523
    return nics
524

    
525

    
526
def disks_from_instance(i):
527
    sizes = zip(itertools.repeat('size'), i['disk.sizes'])
528
    names = zip(itertools.repeat('name'), i['disk.names'])
529
    uuids = zip(itertools.repeat('uuid'), i['disk.uuids'])
530
    indexes = zip(itertools.repeat('index'), range(0, len(sizes)))
531
    disks = zip(sizes, names, uuids, indexes)
532
    disks = map(lambda x: dict(x), disks)
533
    #disks = dict(enumerate(disks))
534
    return disks
535

    
536

    
537
def get_ganeti_jobs(backend):
538
    gnt_jobs = backend_mod.get_jobs(backend)
539
    return dict([(int(j["id"]), j) for j in gnt_jobs])
540

    
541

    
542
class NetworkReconciler(object):
543
    def __init__(self, logger, fix=False):
544
        self.log = logger
545
        self.fix = fix
546

    
547
    @transaction.commit_on_success
548
    def reconcile_networks(self):
549
        # Get models from DB
550
        self.backends = Backend.objects.exclude(offline=True)
551
        self.networks = Network.objects.filter(deleted=False)
552

    
553
        self.event_time = datetime.now()
554

    
555
        # Get info from all ganeti backends
556
        self.ganeti_networks = {}
557
        self.ganeti_hanging_networks = {}
558
        for b in self.backends:
559
            g_nets = get_networks_from_ganeti(b)
560
            self.ganeti_networks[b] = g_nets
561
            g_hanging_nets = hanging_networks(b, g_nets)
562
            self.ganeti_hanging_networks[b] = g_hanging_nets
563

    
564
        self._reconcile_orphan_networks()
565

    
566
        for network in self.networks:
567
            self._reconcile_network(network)
568

    
569
    @transaction.commit_on_success
570
    def _reconcile_network(self, network):
571
        """Reconcile a network with corresponging Ganeti networks.
572

573
        Reconcile a Network and the associated BackendNetworks with the
574
        corresponding Ganeti networks in all Ganeti backends.
575

576
        """
577
        if network.subnets.filter(ipversion=4, dhcp=True).exists():
578
            ip_pools = network.get_ip_pools()  # X-Lock on IP pools
579
        else:
580
            ip_pools = None
581
        for bend in self.backends:
582
            bnet = get_backend_network(network, bend)
583
            gnet = self.ganeti_networks[bend].get(network.id)
584
            if bnet is None and gnet is not None:
585
                # Network exists in backend but not in DB for this backend
586
                bnet = self.reconcile_parted_network(network, bend)
587

    
588
            if bnet is None:
589
                continue
590

    
591
            if gnet is None:
592
                # Network does not exist in Ganeti. If the network action
593
                # is DESTROY, we have to mark as deleted in DB, else we
594
                # have to create it in Ganeti.
595
                if network.action == "DESTROY":
596
                    if bnet.operstate != "DELETED":
597
                        self.reconcile_stale_network(bnet)
598
                else:
599
                    self.reconcile_missing_network(network, bend)
600
                # Skip rest reconciliation!
601
                continue
602

    
603
            try:
604
                hanging_groups = self.ganeti_hanging_networks[bend][network.id]
605
            except KeyError:
606
                # Network is connected to all nodegroups
607
                hanging_groups = []
608

    
609
            if hanging_groups:
610
                # CASE-3: Ganeti networks not connected to all nodegroups
611
                self.reconcile_hanging_groups(network, bend,
612
                                              hanging_groups)
613
                continue
614

    
615
            if bnet.operstate != 'ACTIVE':
616
                # CASE-4: Unsynced network state. At this point the network
617
                # exists and is connected to all nodes so is must be
618
                # active!
619
                self.reconcile_unsynced_network(network, bend, bnet)
620

    
621
            # Check that externally reserved IPs of the network in Ganeti are
622
            # also externally reserved to the IP pool
623
            externally_reserved = gnet['external_reservations']
624
            if externally_reserved and ip_pools is not None:
625
                for ip in externally_reserved.split(","):
626
                    ip = ip.strip()
627
                    for ip_pool in ip_pools:
628
                        if ip_pool.contains(ip):
629
                            if not ip_pool.is_reserved(ip):
630
                                msg = ("D: IP '%s' is reserved for network"
631
                                       " '%s' in backend '%s' but not in DB.")
632
                                self.log.info(msg, ip, network, bend)
633
                                if self.fix:
634
                                    ip_pool.reserve(ip, external=True)
635
                                    ip_pool.save()
636
                                    self.log.info("F: Reserved IP '%s'", ip)
637
        if network.state != "ACTIVE":
638
            network = Network.objects.select_for_update().get(id=network.id)
639
            backend_mod.update_network_state(network)
640

    
641
    def reconcile_parted_network(self, network, backend):
642
        self.log.info("D: Missing DB entry for network %s in backend %s",
643
                      network, backend)
644
        if self.fix:
645
            network.create_backend_network(backend)
646
            self.log.info("F: Created DB entry")
647
            bnet = get_backend_network(network, backend)
648
            return bnet
649

    
650
    def reconcile_stale_network(self, backend_network):
651
        self.log.info("D: Stale DB entry for network %s in backend %s",
652
                      backend_network.network, backend_network.backend)
653
        if self.fix:
654
            backend_network = BackendNetwork.objects.select_for_update()\
655
                                                    .get(id=backend_network.id)
656
            backend_mod.process_network_status(
657
                backend_network, self.event_time, 0,
658
                "OP_NETWORK_REMOVE",
659
                "success",
660
                "Reconciliation simulated event")
661
            self.log.info("F: Reconciled event: OP_NETWORK_REMOVE")
662

    
663
    def reconcile_missing_network(self, network, backend):
664
        self.log.info("D: Missing Ganeti network %s in backend %s",
665
                      network, backend)
666
        if self.fix:
667
            backend_mod.create_network(network, backend)
668
            self.log.info("F: Issued OP_NETWORK_CONNECT")
669

    
670
    def reconcile_hanging_groups(self, network, backend, hanging_groups):
671
        self.log.info('D: Network %s in backend %s is not connected to '
672
                      'the following groups:', network, backend)
673
        self.log.info('-  ' + '\n-  '.join(hanging_groups))
674
        if self.fix:
675
            for group in hanging_groups:
676
                self.log.info('F: Connecting network %s to nodegroup %s',
677
                              network, group)
678
                backend_mod.connect_network(network, backend, depends=[],
679
                                            group=group)
680

    
681
    def reconcile_unsynced_network(self, network, backend, backend_network):
682
        self.log.info("D: Unsynced network %s in backend %s", network, backend)
683
        if self.fix:
684
            self.log.info("F: Issuing OP_NETWORK_CONNECT")
685
            backend_network = BackendNetwork.objects.select_for_update()\
686
                                                    .get(id=backend_network.id)
687
            backend_mod.process_network_status(
688
                backend_network, self.event_time, 0,
689
                "OP_NETWORK_CONNECT",
690
                "success",
691
                "Reconciliation simulated eventd")
692

    
693
    def _reconcile_orphan_networks(self):
694
        db_networks = self.networks
695
        ganeti_networks = self.ganeti_networks
696
        # Detect Orphan Networks in Ganeti
697
        db_network_ids = set([net.id for net in db_networks])
698
        for back_end, ganeti_networks in ganeti_networks.items():
699
            ganeti_network_ids = set(ganeti_networks.keys())
700
            orphans = ganeti_network_ids - db_network_ids
701

    
702
            if len(orphans) > 0:
703
                self.log.info('D: Orphan Networks in backend %s:',
704
                              back_end.clustername)
705
                self.log.info('-  ' + '\n-  '.join([str(o) for o in orphans]))
706
                if self.fix:
707
                    for net_id in orphans:
708
                        self.log.info('Disconnecting and deleting network %d',
709
                                      net_id)
710
                        try:
711
                            network = Network.objects.get(id=net_id)
712
                            backend_mod.delete_network(network,
713
                                                       backend=back_end)
714
                        except Network.DoesNotExist:
715
                            self.log.info("Not entry for network %s in DB !!",
716
                                          net_id)
717

    
718

    
719
def get_backend_network(network, backend):
720
    try:
721
        return BackendNetwork.objects.get(network=network, backend=backend)
722
    except BackendNetwork.DoesNotExist:
723
        return None
724

    
725

    
726
class PoolReconciler(object):
727
    def __init__(self, logger, fix=False):
728
        self.log = logger
729
        self.fix = fix
730

    
731
    def reconcile(self):
732
        self.reconcile_bridges()
733
        self.reconcile_mac_prefixes()
734

    
735
        networks = Network.objects.prefetch_related("subnets")\
736
                                  .filter(deleted=False)
737
        for network in networks:
738
            for subnet in network.subnets.all():
739
                if subnet.ipversion == 4 and subnet.dhcp:
740
                    self.reconcile_ip_pool(network)
741

    
742
    @transaction.commit_on_success
743
    def reconcile_bridges(self):
744
        networks = Network.objects.filter(deleted=False,
745
                                          flavor="PHYSICAL_VLAN")
746
        check_unique_values(objects=networks, field='link', logger=self.log)
747
        try:
748
            pool = BridgePoolTable.get_pool()
749
        except pools.EmptyPool:
750
            self.log.info("There is no available pool for bridges.")
751
            return
752

    
753
        # Since pool is locked, no new networks may be created
754
        used_bridges = set(networks.values_list('link', flat=True))
755
        check_pool_consistent(pool=pool, pool_class=pools.BridgePool,
756
                              used_values=used_bridges, fix=self.fix,
757
                              logger=self.log)
758

    
759
    @transaction.commit_on_success
760
    def reconcile_mac_prefixes(self):
761
        networks = Network.objects.filter(deleted=False, flavor="MAC_FILTERED")
762
        check_unique_values(objects=networks, field='mac_prefix',
763
                            logger=self.log)
764
        try:
765
            pool = MacPrefixPoolTable.get_pool()
766
        except pools.EmptyPool:
767
            self.log.info("There is no available pool for MAC prefixes.")
768
            return
769

    
770
        # Since pool is locked, no new network may be created
771
        used_mac_prefixes = set(networks.values_list('mac_prefix', flat=True))
772
        check_pool_consistent(pool=pool, pool_class=pools.MacPrefixPool,
773
                              used_values=used_mac_prefixes, fix=self.fix,
774
                              logger=self.log)
775

    
776
    @transaction.commit_on_success
777
    def reconcile_ip_pool(self, network):
778
        # Check that all NICs have unique IPv4 address
779
        nics = network.ips.exclude(address__isnull=True).all()
780
        check_unique_values(objects=nics, field="address", logger=self.log)
781

    
782
        for ip_pool in network.get_ip_pools():
783
            # IP pool is now locked, so no new IPs may be created
784
            used_ips = ip_pool.pool_table.subnet\
785
                              .ips.exclude(address__isnull=True)\
786
                              .exclude(deleted=True)\
787
                              .values_list("address", flat=True)
788
            used_ips = filter(lambda x: ip_pool.contains(x), used_ips)
789
            check_pool_consistent(pool=ip_pool,
790
                                  pool_class=pools.IPPool,
791
                                  used_values=used_ips,
792
                                  fix=self.fix, logger=self.log)
793

    
794

    
795
def check_unique_values(objects, field, logger):
796
    used_values = list(objects.values_list(field, flat=True))
797
    if len(used_values) != len(set(used_values)):
798
        duplicate_values = [v for v in used_values if used_values.count(v) > 1]
799
        for value in duplicate_values:
800
            filter_args = {field: value}
801
            using_objects = objects.filter(**filter_args)
802
            msg = "Value '%s' is used as %s for more than one objects: %s"
803
            logger.error(msg, value, field, ",".join(map(str, using_objects)))
804
        return False
805
    logger.debug("Values for field '%s' are unique.", field)
806
    return True
807

    
808

    
809
def check_pool_consistent(pool, pool_class, used_values, fix, logger):
810
    dummy_pool = create_empty_pool(pool, pool_class)
811
    [dummy_pool.reserve(value) for value in used_values]
812
    if dummy_pool.available != pool.available:
813
        msg = "'%s' is not consistent!\nPool: %s\nUsed: %s"
814
        pool_diff = dummy_pool.available ^ pool.available
815
        for index in pool_diff.itersearch(bitarray.bitarray("1")):
816
            value = pool.index_to_value(int(index))
817
            msg = "%s is incosistent! Value '%s' is %s but should be %s."
818
            value1 = pool.is_available(value) and "available" or "unavailable"
819
            value2 = dummy_pool.is_available(value) and "available"\
820
                or "unavailable"
821
            logger.error(msg, pool, value, value1, value2)
822
        if fix:
823
            pool.available = dummy_pool.available
824
            pool.save()
825
            logger.info("Fixed available map of pool '%s'", pool)
826

    
827

    
828
def create_empty_pool(pool, pool_class):
829
    pool_row = pool.pool_table
830
    pool_row.available_map = ""
831
    pool_row.reserved_map = ""
832
    return pool_class(pool_row)
833

    
834

    
835
def get_locked_server(server_id):
836
    return VirtualMachine.objects.select_for_update().get(id=server_id)