Revision 75dc539e snf-cyclades-app/synnefo/logic/reconciliation.py

b/snf-cyclades-app/synnefo/logic/reconciliation.py
1
#!/usr/bin/env python
2 1
# -*- coding: utf-8 -*-
3 2
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 4
#
6 5
# Redistribution and use in source and binary forms, with or
7 6
# without modification, are permitted provided that the following
......
56 55

  
57 56
"""
58 57

  
59
import logging
60
import sys
61
import itertools
62 58

  
63 59
from django.core.management import setup_environ
64 60
try:
......
69 65
setup_environ(settings)
70 66

  
71 67

  
68
import logging
69
import itertools
72 70
from datetime import datetime, timedelta
73
from collections import namedtuple
74 71

  
75
from synnefo.db.models import (VirtualMachine, NetworkInterface, Flavor,
72
from django.db import transaction
73
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
76 74
                               pooled_rapi_client)
75
from synnefo.logic import utils, backend as backend_mod
77 76
from synnefo.logic.rapi import GanetiApiError
78
from synnefo.logic.backend import get_instances
79
from synnefo.logic import utils
80

  
81 77

  
82
log = logging.getLogger()
78
logger = logging.getLogger()
79
logging.basicConfig()
83 80

  
84 81
try:
85 82
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
......
87 84
    CHECK_INTERVAL = 60
88 85

  
89 86

  
90
def needs_reconciliation(vm):
91
    now = datetime.now()
92
    return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
93
           (now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
94

  
95
VMState = namedtuple("VMState", ["state", "cpu", "ram", "nics"])
96

  
97

  
98
def stale_servers_in_db(D, G):
99
    idD = set(D.keys())
100
    idG = set(G.keys())
101

  
102
    stale = set()
103
    for i in idD - idG:
104
        if D[i] == 'BUILD':
105
            vm = VirtualMachine.objects.get(id=i)
106
            if needs_reconciliation(vm):
107
                with pooled_rapi_client(vm) as c:
108
                    try:
109
                        job_status = c.GetJobStatus(vm.backendjobid)['status']
110
                        if job_status in ('queued', 'waiting', 'running'):
111
                            # Server is still building in Ganeti
112
                            continue
113
                        else:
114
                            c.GetInstance(utils.id_to_instance_name(i))
115
                            # Server has just been created in Ganeti
116
                            continue
117
                    except GanetiApiError:
118
                        stale.add(i)
87
class BackendReconciler(object):
88
    def __init__(self, backend, logger, options=None):
89
        self.backend = backend
90
        self.log = logger
91
        self.client = backend.get_client()
92
        if options is None:
93
            self.options = {}
119 94
        else:
120
            stale.add(i)
121

  
122
    return stale
123

  
124

  
125
def orphan_instances_in_ganeti(D, G):
126
    idD = set(D.keys())
127
    idG = set(G.keys())
128

  
129
    return idG - idD
130

  
131

  
132
def unsynced_operstate(D, G):
133
    unsynced = set()
134
    idD = set(D.keys())
135
    idG = set(G.keys())
136

  
137
    for i in idD & idG:
138
        dbstate = D[i].state
139
        gntstate = G[i].state
140
        vm_unsynced = (gntstate and dbstate != "STARTED") or\
141
            (not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
142
        if vm_unsynced:
143
            unsynced.add((i, dbstate, gntstate))
144
        if not gntstate and dbstate == 'BUILD':
145
            vm = VirtualMachine.objects.get(id=i)
146
            if needs_reconciliation(vm):
147
                with pooled_rapi_client(vm) as c:
148
                    try:
149
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
150
                        if job_info['status'] == 'success':
151
                            unsynced.add((i, dbstate, gntstate))
152
                    except GanetiApiError:
153
                        pass
154

  
155
    return unsynced
156

  
157

  
158
def unsynced_flavors(D, G):
159
    unsynced = set()
160
    idD = set(D.keys())
161
    idG = set(G.keys())
162

  
163
    for i in idD & idG:
164
        if D[i].ram != G[i].ram or D[i].cpu != G[i].cpu:
165
            db_flavor = VirtualMachine.objects.get(id=i).flavor
95
            self.options = options
96

  
97
    def close(self):
98
        self.backend.put_client(self.client)
99

  
100
    @transaction.commit_on_success
101
    def reconcile(self):
102
        log = self.log
103
        backend = self.backend
104
        log.debug("Reconciling backend %s", backend)
105

  
106
        self.db_servers = get_database_servers(backend)
107
        self.db_servers_keys = set(self.db_servers.keys())
108
        log.debug("Got servers info from database.")
109

  
110
        self.gnt_servers = get_ganeti_servers(backend)
111
        self.gnt_servers_keys = set(self.gnt_servers.keys())
112
        log.debug("Got servers info from Ganeti backend.")
113

  
114
        self.event_time = datetime.now()
115

  
116
        self.stale_servers = self.reconcile_stale_servers()
117
        self.orphan_servers = self.reconcile_orphan_servers()
118
        self.unsynced_servers = self.reconcile_unsynced_servers()
119
        self.close()
120

  
121
    def get_build_status(self, db_server):
122
        job = db_server.backendjobid
123
        if job is None:
124
            created = db_server.created
125
            # Job has not yet been enqueued.
126
            if self.event_time < created + timedelta(seconds=60):
127
                return "RUNNING"
128
            else:
129
                return "ERROR"
130
        else:
131
            updated = db_server.backendtime
132
            if self.event_time >= updated + timedelta(seconds=60):
133
                try:
134
                    job_info = self.client.GetJobStatus(job_id=job)
135
                    finalized = ["success", "error", "cancelled"]
136
                    if job_info["status"] == "error":
137
                        return "ERROR"
138
                    elif job_info["status"] not in finalized:
139
                        return "RUNNING"
140
                    else:
141
                        return "FINALIZED"
142
                except GanetiApiError:
143
                    return "ERROR"
144
            else:
145
                self.log.debug("Pending build for server '%s'", db_server.id)
146
                return "RUNNING"
147

  
148
    def reconcile_stale_servers(self):
149
        # Detect stale servers
150
        stale = []
151
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
152
        for server_id in stale_keys:
153
            db_server = self.db_servers[server_id]
154
            if db_server.operstate == "BUILD":
155
                build_status = self.get_build_status(db_server)
156
                if build_status == "ERROR":
157
                    # Special handling of BUILD eerrors
158
                    self.reconcile_building_server(db_server)
159
                elif build_status != "RUNNING":
160
                    stale.append(server_id)
161
            else:
162
                stale.append(server_id)
163

  
164
        # Report them
165
        if stale:
166
            self.log.info("Found stale servers %s at backend %s",
167
                          ", ".join(map(str, stale)), self.backend)
168
        else:
169
            self.log.debug("No stale servers at backend %s", self.backend)
170

  
171
        # Fix them
172
        if stale and self.options["fix_stale"]:
173
            for server_id in stale:
174
                db_server = self.db_servers[server_id]
175
                backend_mod.process_op_status(
176
                    vm=db_server,
177
                    etime=self.event_time,
178
                    jobid=-0,
179
                    opcode='OP_INSTANCE_REMOVE', status='success',
180
                    logmsg='Reconciliation: simulated Ganeti event')
181
            self.log.debug("Simulated Ganeti removal for stale servers.")
182

  
183
    def reconcile_orphan_servers(self):
184
        orphans = self.gnt_servers_keys - self.db_servers_keys
185
        if orphans:
186
            self.log.info("Found orphan servers %s at backend %s",
187
                          ", ".join(map(str, orphans)), self.backend)
188
        else:
189
            self.log.debug("No orphan servers at backend %s", self.backend)
190

  
191
        if orphans and self.options["fix_orphans"]:
192
            for server_id in orphans:
193
                server_name = utils.id_to_instance_name(server_id)
194
                self.client.DeleteInstance(server_name)
195
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
196

  
197
    def reconcile_unsynced_servers(self):
198
        #log = self.log
199
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
200
            db_server = self.db_servers[server_id]
201
            gnt_server = self.gnt_servers[server_id]
202
            if db_server.operstate == "BUILD":
203
                build_status = self.get_build_status(db_server)
204
                if build_status == "RUNNING":
205
                    # Do not reconcile building VMs
206
                    continue
207
                elif build_status == "ERROR":
208
                    # Special handling of build errors
209
                    self.reconcile_building_server(db_server)
210
                    continue
211

  
212
            self.reconcile_unsynced_operstate(server_id, db_server,
213
                                              gnt_server)
214
            self.reconcile_unsynced_flavor(server_id, db_server,
215
                                           gnt_server)
216
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
217
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
218

  
219
    def reconcile_building_server(self, db_server):
220
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
221
                      db_server.id)
222
        if self.options["fix_unsynced"]:
223
            fix_opcode = "OP_INSTANCE_CREATE"
224
            backend_mod.process_op_status(
225
                vm=db_server,
226
                etime=self.event_time,
227
                jobid=-0,
228
                opcode=fix_opcode, status='error',
229
                logmsg='Reconciliation: simulated Ganeti event')
230
            self.log.debug("Simulated Ganeti error build event for"
231
                           " server '%s'", db_server.id)
232

  
233
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
234
        if db_server.operstate != gnt_server["state"]:
235
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
236
                          server_id, db_server.operstate, gnt_server["state"])
237
            if self.options["fix_unsynced"]:
238
                fix_opcode = \
239
                    "OP_INSTANCE_STARTUP" if gnt_server["state"] == "STARTED"\
240
                    else "OP_INSTANCE_SHUTDOWN"
241
                backend_mod.process_op_status(
242
                    vm=db_server,
243
                    etime=self.event_time,
244
                    jobid=-0,
245
                    opcode=fix_opcode, status='success',
246
                    logmsg='Reconciliation: simulated Ganeti event')
247
                self.log.debug("Simulated Ganeti state event for server '%s'",
248
                               server_id)
249

  
250
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
251
        db_flavor = db_server.flavor
252
        gnt_flavor = gnt_server["flavor"]
253
        if (db_flavor.ram != gnt_flavor["ram"] or
254
            db_flavor.cpu != gnt_flavor["vcpus"]):
166 255
            try:
167 256
                gnt_flavor = Flavor.objects.get(
168
                                    ram=G[i].ram, cpu=G[i].cpu,
169
                                    disk=db_flavor.disk,
170
                                    disk_template=db_flavor.disk_template)
257
                    ram=gnt_flavor["ram"],
258
                    cpu=gnt_flavor["vcpus"],
259
                    disk=db_flavor.disk,
260
                    disk_template=db_flavor.disk_template)
171 261
            except Flavor.DoesNotExist:
172
                gnt_flavor = None
173
            unsynced.add((i, db_flavor, gnt_flavor))
174
    return unsynced
175

  
176

  
177
def instances_with_build_errors(D, G):
178
    failed = set()
179
    idD = set(D.keys())
180
    idG = set(G.keys())
181

  
182
    for i in idD & idG:
183
        if not G[i] and D[i] == 'BUILD':
184
            vm = VirtualMachine.objects.get(id=i)
185
            if not vm.backendjobid:  # VM has not been enqueued in the backend
186
                if datetime.now() > vm.created + timedelta(seconds=120):
187
                    # If a job has not been enqueued after 2 minutues, then
188
                    # it must be a stale entry..
189
                    failed.add(i)
190
            elif needs_reconciliation(vm):
191
                # Check time to avoid many rapi calls
192
                with pooled_rapi_client(vm) as c:
193
                    try:
194
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
195
                        if job_info['status'] == 'error':
196
                            failed.add(i)
197
                    except GanetiApiError:
198
                        failed.add(i)
199

  
200
    return failed
201

  
202

  
203
def get_servers_from_db(backends, with_nics=True):
204
    vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
205
    vm_info = vms.values_list("id", "operstate", "flavor__cpu", "flavor__ram")
206
    if with_nics:
207
        nics = NetworkInterface.objects.filter(machine__in=vms)\
208
                               .order_by("machine")\
209
                               .values_list("machine", "index", "mac", "ipv4",
210
                                            "network")
211
        vm_nics = {}
212
        for machine, machine_nics in itertools.groupby(nics,
213
                                                       lambda nic: nic[0]):
214
            vm_nics[machine] = {}
215
            for machine, index, mac, ipv4, network in machine_nics:
216
                nic = {'mac':      mac,
217
                       'network':  utils.id_to_network_name(network),
218
                       'ipv4':     ipv4 if ipv4 != '' else None
219
                       }
220
                vm_nics[machine][index] = nic
221
    servers = dict([(vm_id, VMState(state=state,
222
                                    cpu=cpu,
223
                                    ram=ram,
224
                                    nics=vm_nics.get(vm_id, [])))
225
                    for vm_id, state, cpu, ram in vm_info])
226
    return servers
227

  
228

  
229
def get_instances_from_ganeti(backends):
230
    instances = []
231
    for backend in backends:
232
        instances.append(get_instances(backend))
233
    ganeti_instances = reduce(list.__add__, instances, [])
234
    snf_instances = {}
235

  
236
    prefix = settings.BACKEND_PREFIX_ID
237
    for i in ganeti_instances:
238
        if i['name'].startswith(prefix):
239
            try:
240
                id = utils.id_from_instance_name(i['name'])
241
            except Exception:
242
                log.error("Ignoring instance with malformed name %s",
243
                          i['name'])
244
                continue
245

  
246
            if id in snf_instances:
247
                log.error("Ignoring instance with duplicate Synnefo id %s",
248
                          i['name'])
249
                continue
250

  
251
            nics = get_nics_from_instance(i)
252
            beparams = i["beparams"]
253
            vcpus = beparams["vcpus"]
254
            ram = beparams["maxmem"]
255
            snf_instances[id] = VMState(state=i["oper_state"],
256
                                        cpu=vcpus,
257
                                        ram=ram,
258
                                        nics=nics)
259

  
260
    return snf_instances
262
                self.log.warning("Server '%s' has unknown flavor.", server_id)
263
                return
264

  
265
            self.log.info("Server '%s' has flavor '%' in DB and '%s' in"
266
                          " Ganeti", server_id, db_flavor, gnt_flavor)
267
            if self.options["fix_unsynced_flavors"]:
268
                old_state = db_server.operstate
269
                opcode = "OP_INSTANCE_SET_PARAMS"
270
                beparams = {"vcpus": gnt_flavor.cpu,
271
                            "minmem": gnt_flavor.ram,
272
                            "maxmem": gnt_flavor.ram}
273
                backend_mod.process_op_status(
274
                    vm=db_server, etime=self.event_time, jobid=-0,
275
                    opcode=opcode, status='success',
276
                    beparams=beparams,
277
                    logmsg='Reconciliation: simulated Ganeti event')
278
                # process_op_status with beparams will set the vmstate to
279
                # shutdown. Fix this be returning it to old state
280
                vm = VirtualMachine.objects.get(pk=server_id)
281
                vm.operstate = old_state
282
                vm.save()
283
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
284
                               server_id)
285

  
286
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
287
        db_nics = db_server.nics.order_by("index")
288
        gnt_nics = gnt_server["nics"]
289
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
290
        if backend_mod.nics_changed(db_nics, gnt_nics_parsed):
291
            msg = "Found unsynced NICs for server '%s'.\n\t"\
292
                  "DB: %s\n\tGaneti: %s"
293
            db_nics_str = ", ".join(map(format_db_nic, db_nics))
294
            gnt_nics_str = ", ".join(map(format_gnt_nic, gnt_nics_parsed))
295
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
296
            if self.options["fix_unsynced_nics"]:
297
                backend_mod.process_net_status(vm=db_server,
298
                                               etime=self.event_time,
299
                                               nics=gnt_nics)
300

  
301
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
302
        pass
303

  
304

  
305
def format_db_nic(nic):
306
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
307
           nic.ipv4, nic.network_id, nic.mac, nic.firewall_profile)
308

  
309

  
310
def format_gnt_nic(nic):
311
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" %\
312
           (nic["index"], nic["ipv4"], nic["network"], nic["mac"],
313
            nic["firewall_profile"])
261 314

  
262 315

  
263 316
#
264
# Nics
265
#
266
def get_nics_from_ganeti(backends):
267
    """Get network interfaces for each ganeti instance.
268

  
269
    """
270
    instances = []
271
    for backend in backends:
272
        instances.append(get_instances(backend))
273
    instances = reduce(list.__add__, instances, [])
274
    prefix = settings.BACKEND_PREFIX_ID
275

  
276
    snf_instances_nics = {}
277
    for i in instances:
278
        if i['name'].startswith(prefix):
279
            try:
280
                id = utils.id_from_instance_name(i['name'])
281
            except Exception:
282
                log.error("Ignoring instance with malformed name %s",
283
                          i['name'])
284
                continue
285
            if id in snf_instances_nics:
286
                log.error("Ignoring instance with duplicate Synnefo id %s",
287
                          i['name'])
288
                continue
289

  
290
            snf_instances_nics[id] = get_nics_from_instance(i)
291

  
292
    return snf_instances_nics
293

  
294

  
295
def get_nics_from_instance(i):
296
    ips = zip(itertools.repeat('ipv4'), i['nic.ips'])
297
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
298
    networks = zip(itertools.repeat('network'), i['nic.networks'])
299
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
300
    # links = zip(itertools.repeat('link'), i['nic.links'])
301
    # nics = zip(ips,macs,modes,networks,links)
302
    nics = zip(ips, macs, networks)
303
    nics = map(lambda x: dict(x), nics)
304
    nics = dict(enumerate(nics))
305
    return nics
306

  
307

  
308
def unsynced_nics(DBVMs, GanetiVMs):
309
    """Find unsynced network interfaces between DB and Ganeti.
310

  
311
    @ rtype: dict; {instance_id: ganeti_nics}
312
    @ return Dictionary containing the instances ids that have unsynced network
313
    interfaces between DB and Ganeti and the network interfaces in Ganeti.
314

  
315
    """
316
    idD = set(DBVMs.keys())
317
    idG = set(GanetiVMs.keys())
318

  
319
    unsynced = {}
320
    for i in idD & idG:
321
        nicsD = DBVMs[i].nics
322
        nicsG = GanetiVMs[i].nics
323
        if len(nicsD) != len(nicsG):
324
            unsynced[i] = (nicsD, nicsG)
325
            continue
326
        for index in nicsG.keys():
327
            nicD = nicsD[index]
328
            nicG = nicsG[index]
329
            diff = (nicD['ipv4'] != nicG['ipv4'] or
330
                    nicD['mac'] != nicG['mac'] or
331
                    nicD['network'] != nicG['network'])
332
            if diff:
333
                    unsynced[i] = (nicsD, nicsG)
334
                    break
335

  
336
    return unsynced
337

  
338
#
339 317
# Networks
340 318
#
341 319

  
......
375 353
    return hanging
376 354

  
377 355

  
378
# Only for testing this module individually
379
def main():
380
    print get_instances_from_ganeti()
356
def get_online_backends():
357
    return Backend.objects.filter(offline=False)
358

  
359

  
360
def get_database_servers(backend):
361
    servers = backend.virtual_machines.select_related("nics", "flavor")\
362
                                      .filter(deleted=False)
363
    return dict([(s.id, s) for s in servers])
364

  
365

  
366
def get_ganeti_servers(backend):
367
    gnt_instances = backend_mod.get_instances(backend)
368
    # Filter out non-synnefo instances
369
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
370
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
371
                           gnt_instances)
372
    gnt_instances = map(parse_gnt_instance, gnt_instances)
373
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
374

  
375

  
376
def parse_gnt_instance(instance):
377
    try:
378
        instance_id = utils.id_from_instance_name(instance['name'])
379
    except Exception:
380
        logger.error("Ignoring instance with malformed name %s",
381
                     instance['name'])
382
        return (None, None)
383

  
384
    beparams = instance["beparams"]
385

  
386
    vcpus = beparams["vcpus"]
387
    ram = beparams["maxmem"]
388
    state = instance["oper_state"] and "STARTED" or "STOPPED"
389

  
390
    return {
391
        "id": instance_id,
392
        "state": state,  # FIX
393
        "updated": datetime.fromtimestamp(instance["mtime"]),
394
        "disks": disks_from_instance(instance),
395
        "nics": nics_from_instance(instance),
396
        "flavor": {"vcpus": vcpus,
397
                   "ram": ram},
398
        "tags": instance["tags"]
399
    }
400

  
401

  
402
def nics_from_instance(i):
403
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
404
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
405
    networks = zip(itertools.repeat('network'), i['nic.networks'])
406
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
407
    # links = zip(itertools.repeat('link'), i['nic.links'])
408
    # nics = zip(ips,macs,modes,networks,links)
409
    nics = zip(ips, macs, networks)
410
    nics = map(lambda x: dict(x), nics)
411
    #nics = dict(enumerate(nics))
412
    tags = i["tags"]
413
    for tag in tags:
414
        t = tag.split(":")
415
        if t[0:2] == ["synnefo", "network"]:
416
            if len(t) != 4:
417
                logger.error("Malformed synefo tag %s", tag)
418
                continue
419
            try:
420
                index = int(t[2])
421
                nics[index]['firewall'] = t[3]
422
            except ValueError:
423
                logger.error("Malformed synnefo tag %s", tag)
424
            except IndexError:
425
                logger.error("Found tag %s for non-existent NIC %d",
426
                             tag, index)
427
    return nics
381 428

  
382 429

  
383
if __name__ == "__main__":
384
    sys.exit(main())
430
def disks_from_instance(i):
431
    return dict([(index, {"size": size})
432
                 for index, size in enumerate(i["disk.sizes"])])

Also available in: Unified diff