Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 75dc539e

History | View | Annotate | Download (16.4 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.core.management import setup_environ
60
try:
61
    from synnefo import settings
62
except ImportError:
63
    raise Exception("Cannot import settings, make sure PYTHONPATH contains "
64
                    "the parent directory of the Synnefo Django project.")
65
setup_environ(settings)
66

    
67

    
68
import logging
69
import itertools
70
from datetime import datetime, timedelta
71

    
72
from django.db import transaction
73
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
74
                               pooled_rapi_client)
75
from synnefo.logic import utils, backend as backend_mod
76
from synnefo.logic.rapi import GanetiApiError
77

    
78
logger = logging.getLogger()
79
logging.basicConfig()
80

    
81
try:
82
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
83
except AttributeError:
84
    CHECK_INTERVAL = 60
85

    
86

    
87
class BackendReconciler(object):
88
    def __init__(self, backend, logger, options=None):
89
        self.backend = backend
90
        self.log = logger
91
        self.client = backend.get_client()
92
        if options is None:
93
            self.options = {}
94
        else:
95
            self.options = options
96

    
97
    def close(self):
98
        self.backend.put_client(self.client)
99

    
100
    @transaction.commit_on_success
101
    def reconcile(self):
102
        log = self.log
103
        backend = self.backend
104
        log.debug("Reconciling backend %s", backend)
105

    
106
        self.db_servers = get_database_servers(backend)
107
        self.db_servers_keys = set(self.db_servers.keys())
108
        log.debug("Got servers info from database.")
109

    
110
        self.gnt_servers = get_ganeti_servers(backend)
111
        self.gnt_servers_keys = set(self.gnt_servers.keys())
112
        log.debug("Got servers info from Ganeti backend.")
113

    
114
        self.event_time = datetime.now()
115

    
116
        self.stale_servers = self.reconcile_stale_servers()
117
        self.orphan_servers = self.reconcile_orphan_servers()
118
        self.unsynced_servers = self.reconcile_unsynced_servers()
119
        self.close()
120

    
121
    def get_build_status(self, db_server):
122
        job = db_server.backendjobid
123
        if job is None:
124
            created = db_server.created
125
            # Job has not yet been enqueued.
126
            if self.event_time < created + timedelta(seconds=60):
127
                return "RUNNING"
128
            else:
129
                return "ERROR"
130
        else:
131
            updated = db_server.backendtime
132
            if self.event_time >= updated + timedelta(seconds=60):
133
                try:
134
                    job_info = self.client.GetJobStatus(job_id=job)
135
                    finalized = ["success", "error", "cancelled"]
136
                    if job_info["status"] == "error":
137
                        return "ERROR"
138
                    elif job_info["status"] not in finalized:
139
                        return "RUNNING"
140
                    else:
141
                        return "FINALIZED"
142
                except GanetiApiError:
143
                    return "ERROR"
144
            else:
145
                self.log.debug("Pending build for server '%s'", db_server.id)
146
                return "RUNNING"
147

    
148
    def reconcile_stale_servers(self):
149
        # Detect stale servers
150
        stale = []
151
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
152
        for server_id in stale_keys:
153
            db_server = self.db_servers[server_id]
154
            if db_server.operstate == "BUILD":
155
                build_status = self.get_build_status(db_server)
156
                if build_status == "ERROR":
157
                    # Special handling of BUILD eerrors
158
                    self.reconcile_building_server(db_server)
159
                elif build_status != "RUNNING":
160
                    stale.append(server_id)
161
            else:
162
                stale.append(server_id)
163

    
164
        # Report them
165
        if stale:
166
            self.log.info("Found stale servers %s at backend %s",
167
                          ", ".join(map(str, stale)), self.backend)
168
        else:
169
            self.log.debug("No stale servers at backend %s", self.backend)
170

    
171
        # Fix them
172
        if stale and self.options["fix_stale"]:
173
            for server_id in stale:
174
                db_server = self.db_servers[server_id]
175
                backend_mod.process_op_status(
176
                    vm=db_server,
177
                    etime=self.event_time,
178
                    jobid=-0,
179
                    opcode='OP_INSTANCE_REMOVE', status='success',
180
                    logmsg='Reconciliation: simulated Ganeti event')
181
            self.log.debug("Simulated Ganeti removal for stale servers.")
182

    
183
    def reconcile_orphan_servers(self):
184
        orphans = self.gnt_servers_keys - self.db_servers_keys
185
        if orphans:
186
            self.log.info("Found orphan servers %s at backend %s",
187
                          ", ".join(map(str, orphans)), self.backend)
188
        else:
189
            self.log.debug("No orphan servers at backend %s", self.backend)
190

    
191
        if orphans and self.options["fix_orphans"]:
192
            for server_id in orphans:
193
                server_name = utils.id_to_instance_name(server_id)
194
                self.client.DeleteInstance(server_name)
195
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
196

    
197
    def reconcile_unsynced_servers(self):
198
        #log = self.log
199
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
200
            db_server = self.db_servers[server_id]
201
            gnt_server = self.gnt_servers[server_id]
202
            if db_server.operstate == "BUILD":
203
                build_status = self.get_build_status(db_server)
204
                if build_status == "RUNNING":
205
                    # Do not reconcile building VMs
206
                    continue
207
                elif build_status == "ERROR":
208
                    # Special handling of build errors
209
                    self.reconcile_building_server(db_server)
210
                    continue
211

    
212
            self.reconcile_unsynced_operstate(server_id, db_server,
213
                                              gnt_server)
214
            self.reconcile_unsynced_flavor(server_id, db_server,
215
                                           gnt_server)
216
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
217
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
218

    
219
    def reconcile_building_server(self, db_server):
220
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
221
                      db_server.id)
222
        if self.options["fix_unsynced"]:
223
            fix_opcode = "OP_INSTANCE_CREATE"
224
            backend_mod.process_op_status(
225
                vm=db_server,
226
                etime=self.event_time,
227
                jobid=-0,
228
                opcode=fix_opcode, status='error',
229
                logmsg='Reconciliation: simulated Ganeti event')
230
            self.log.debug("Simulated Ganeti error build event for"
231
                           " server '%s'", db_server.id)
232

    
233
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
234
        if db_server.operstate != gnt_server["state"]:
235
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
236
                          server_id, db_server.operstate, gnt_server["state"])
237
            if self.options["fix_unsynced"]:
238
                fix_opcode = \
239
                    "OP_INSTANCE_STARTUP" if gnt_server["state"] == "STARTED"\
240
                    else "OP_INSTANCE_SHUTDOWN"
241
                backend_mod.process_op_status(
242
                    vm=db_server,
243
                    etime=self.event_time,
244
                    jobid=-0,
245
                    opcode=fix_opcode, status='success',
246
                    logmsg='Reconciliation: simulated Ganeti event')
247
                self.log.debug("Simulated Ganeti state event for server '%s'",
248
                               server_id)
249

    
250
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
251
        db_flavor = db_server.flavor
252
        gnt_flavor = gnt_server["flavor"]
253
        if (db_flavor.ram != gnt_flavor["ram"] or
254
            db_flavor.cpu != gnt_flavor["vcpus"]):
255
            try:
256
                gnt_flavor = Flavor.objects.get(
257
                    ram=gnt_flavor["ram"],
258
                    cpu=gnt_flavor["vcpus"],
259
                    disk=db_flavor.disk,
260
                    disk_template=db_flavor.disk_template)
261
            except Flavor.DoesNotExist:
262
                self.log.warning("Server '%s' has unknown flavor.", server_id)
263
                return
264

    
265
            self.log.info("Server '%s' has flavor '%' in DB and '%s' in"
266
                          " Ganeti", server_id, db_flavor, gnt_flavor)
267
            if self.options["fix_unsynced_flavors"]:
268
                old_state = db_server.operstate
269
                opcode = "OP_INSTANCE_SET_PARAMS"
270
                beparams = {"vcpus": gnt_flavor.cpu,
271
                            "minmem": gnt_flavor.ram,
272
                            "maxmem": gnt_flavor.ram}
273
                backend_mod.process_op_status(
274
                    vm=db_server, etime=self.event_time, jobid=-0,
275
                    opcode=opcode, status='success',
276
                    beparams=beparams,
277
                    logmsg='Reconciliation: simulated Ganeti event')
278
                # process_op_status with beparams will set the vmstate to
279
                # shutdown. Fix this be returning it to old state
280
                vm = VirtualMachine.objects.get(pk=server_id)
281
                vm.operstate = old_state
282
                vm.save()
283
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
284
                               server_id)
285

    
286
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
287
        db_nics = db_server.nics.order_by("index")
288
        gnt_nics = gnt_server["nics"]
289
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
290
        if backend_mod.nics_changed(db_nics, gnt_nics_parsed):
291
            msg = "Found unsynced NICs for server '%s'.\n\t"\
292
                  "DB: %s\n\tGaneti: %s"
293
            db_nics_str = ", ".join(map(format_db_nic, db_nics))
294
            gnt_nics_str = ", ".join(map(format_gnt_nic, gnt_nics_parsed))
295
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
296
            if self.options["fix_unsynced_nics"]:
297
                backend_mod.process_net_status(vm=db_server,
298
                                               etime=self.event_time,
299
                                               nics=gnt_nics)
300

    
301
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
302
        pass
303

    
304

    
305
def format_db_nic(nic):
306
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
307
           nic.ipv4, nic.network_id, nic.mac, nic.firewall_profile)
308

    
309

    
310
def format_gnt_nic(nic):
311
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" %\
312
           (nic["index"], nic["ipv4"], nic["network"], nic["mac"],
313
            nic["firewall_profile"])
314

    
315

    
316
#
317
# Networks
318
#
319

    
320

    
321
def get_networks_from_ganeti(backend):
322
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
323

    
324
    networks = {}
325
    with pooled_rapi_client(backend) as c:
326
        for net in c.GetNetworks(bulk=True):
327
            if net['name'].startswith(prefix):
328
                id = utils.id_from_network_name(net['name'])
329
                networks[id] = net
330

    
331
    return networks
332

    
333

    
334
def hanging_networks(backend, GNets):
335
    """Get networks that are not connected to all Nodegroups.
336

337
    """
338
    def get_network_groups(group_list):
339
        groups = set()
340
        for g in group_list:
341
            g_name = g.split('(')[0]
342
            groups.add(g_name)
343
        return groups
344

    
345
    with pooled_rapi_client(backend) as c:
346
        groups = set(c.GetGroups())
347

    
348
    hanging = {}
349
    for id, info in GNets.items():
350
        group_list = get_network_groups(info['group_list'])
351
        if group_list != groups:
352
            hanging[id] = groups - group_list
353
    return hanging
354

    
355

    
356
def get_online_backends():
357
    return Backend.objects.filter(offline=False)
358

    
359

    
360
def get_database_servers(backend):
361
    servers = backend.virtual_machines.select_related("nics", "flavor")\
362
                                      .filter(deleted=False)
363
    return dict([(s.id, s) for s in servers])
364

    
365

    
366
def get_ganeti_servers(backend):
367
    gnt_instances = backend_mod.get_instances(backend)
368
    # Filter out non-synnefo instances
369
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
370
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
371
                           gnt_instances)
372
    gnt_instances = map(parse_gnt_instance, gnt_instances)
373
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
374

    
375

    
376
def parse_gnt_instance(instance):
377
    try:
378
        instance_id = utils.id_from_instance_name(instance['name'])
379
    except Exception:
380
        logger.error("Ignoring instance with malformed name %s",
381
                     instance['name'])
382
        return (None, None)
383

    
384
    beparams = instance["beparams"]
385

    
386
    vcpus = beparams["vcpus"]
387
    ram = beparams["maxmem"]
388
    state = instance["oper_state"] and "STARTED" or "STOPPED"
389

    
390
    return {
391
        "id": instance_id,
392
        "state": state,  # FIX
393
        "updated": datetime.fromtimestamp(instance["mtime"]),
394
        "disks": disks_from_instance(instance),
395
        "nics": nics_from_instance(instance),
396
        "flavor": {"vcpus": vcpus,
397
                   "ram": ram},
398
        "tags": instance["tags"]
399
    }
400

    
401

    
402
def nics_from_instance(i):
403
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
404
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
405
    networks = zip(itertools.repeat('network'), i['nic.networks'])
406
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
407
    # links = zip(itertools.repeat('link'), i['nic.links'])
408
    # nics = zip(ips,macs,modes,networks,links)
409
    nics = zip(ips, macs, networks)
410
    nics = map(lambda x: dict(x), nics)
411
    #nics = dict(enumerate(nics))
412
    tags = i["tags"]
413
    for tag in tags:
414
        t = tag.split(":")
415
        if t[0:2] == ["synnefo", "network"]:
416
            if len(t) != 4:
417
                logger.error("Malformed synefo tag %s", tag)
418
                continue
419
            try:
420
                index = int(t[2])
421
                nics[index]['firewall'] = t[3]
422
            except ValueError:
423
                logger.error("Malformed synnefo tag %s", tag)
424
            except IndexError:
425
                logger.error("Found tag %s for non-existent NIC %d",
426
                             tag, index)
427
    return nics
428

    
429

    
430
def disks_from_instance(i):
431
    return dict([(index, {"size": size})
432
                 for index, size in enumerate(i["disk.sizes"])])