Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ d4eae5ce

History | View | Annotate | Download (16.6 kB)

1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36
"""Business logic for reconciliation
37

38
Reconcile the contents of the DB with the actual state of the
39
Ganeti backend.
40

41
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
42
RULES:
43
    R1. Stale servers in DB:
44
            For any v in D but not in G:
45
            Set deleted=True.
46
    R2. Orphan instances in Ganet:
47
            For any v in G with deleted=True in D:
48
            Issue OP_INSTANCE_DESTROY.
49
    R3. Unsynced operstate:
50
            For any v whose operating state differs between G and V:
51
            Set the operating state in D based on the state in G.
52
In the code, D, G are Python dicts mapping instance ids to operating state.
53
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
54
For G, the operating state is True if the machine is up, False otherwise.
55

56
"""
57

    
58

    
59
from django.core.management import setup_environ
60
from django.conf import settings
61

    
62
import logging
63
import itertools
64
from datetime import datetime, timedelta
65

    
66
from django.db import transaction
67
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
68
                               pooled_rapi_client)
69
from synnefo.logic import utils, backend as backend_mod
70
from synnefo.logic.rapi import GanetiApiError
71

    
72
logger = logging.getLogger()
73
logging.basicConfig()
74

    
75
try:
76
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
77
except AttributeError:
78
    CHECK_INTERVAL = 60
79

    
80

    
81
class BackendReconciler(object):
82
    def __init__(self, backend, logger, options=None):
83
        self.backend = backend
84
        self.log = logger
85
        self.client = backend.get_client()
86
        if options is None:
87
            self.options = {}
88
        else:
89
            self.options = options
90

    
91
    def close(self):
92
        self.backend.put_client(self.client)
93

    
94
    @transaction.commit_on_success
95
    def reconcile(self):
96
        log = self.log
97
        backend = self.backend
98
        log.debug("Reconciling backend %s", backend)
99

    
100
        self.db_servers = get_database_servers(backend)
101
        self.db_servers_keys = set(self.db_servers.keys())
102
        log.debug("Got servers info from database.")
103

    
104
        self.gnt_servers = get_ganeti_servers(backend)
105
        self.gnt_servers_keys = set(self.gnt_servers.keys())
106
        log.debug("Got servers info from Ganeti backend.")
107

    
108
        self.event_time = datetime.now()
109

    
110
        self.stale_servers = self.reconcile_stale_servers()
111
        self.orphan_servers = self.reconcile_orphan_servers()
112
        self.unsynced_servers = self.reconcile_unsynced_servers()
113
        self.close()
114

    
115
    def get_build_status(self, db_server):
116
        job = db_server.backendjobid
117
        if job is None:
118
            created = db_server.created
119
            # Job has not yet been enqueued.
120
            if self.event_time < created + timedelta(seconds=60):
121
                return "RUNNING"
122
            else:
123
                return "ERROR"
124
        else:
125
            updated = db_server.backendtime
126
            if self.event_time >= updated + timedelta(seconds=60):
127
                try:
128
                    job_info = self.client.GetJobStatus(job_id=job)
129
                    finalized = ["success", "error", "cancelled"]
130
                    if job_info["status"] == "error":
131
                        return "ERROR"
132
                    elif job_info["status"] not in finalized:
133
                        return "RUNNING"
134
                    else:
135
                        return "FINALIZED"
136
                except GanetiApiError:
137
                    return "ERROR"
138
            else:
139
                self.log.debug("Pending build for server '%s'", db_server.id)
140
                return "RUNNING"
141

    
142
    def reconcile_stale_servers(self):
143
        # Detect stale servers
144
        stale = []
145
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
146
        for server_id in stale_keys:
147
            db_server = self.db_servers[server_id]
148
            if db_server.operstate == "BUILD":
149
                build_status = self.get_build_status(db_server)
150
                if build_status == "ERROR":
151
                    # Special handling of BUILD eerrors
152
                    self.reconcile_building_server(db_server)
153
                elif build_status != "RUNNING":
154
                    stale.append(server_id)
155
            else:
156
                stale.append(server_id)
157

    
158
        # Report them
159
        if stale:
160
            self.log.info("Found stale servers %s at backend %s",
161
                          ", ".join(map(str, stale)), self.backend)
162
        else:
163
            self.log.debug("No stale servers at backend %s", self.backend)
164

    
165
        # Fix them
166
        if stale and self.options["fix_stale"]:
167
            for server_id in stale:
168
                db_server = self.db_servers[server_id]
169
                backend_mod.process_op_status(
170
                    vm=db_server,
171
                    etime=self.event_time,
172
                    jobid=-0,
173
                    opcode='OP_INSTANCE_REMOVE', status='success',
174
                    logmsg='Reconciliation: simulated Ganeti event')
175
            self.log.debug("Simulated Ganeti removal for stale servers.")
176

    
177
    def reconcile_orphan_servers(self):
178
        orphans = self.gnt_servers_keys - self.db_servers_keys
179
        if orphans:
180
            self.log.info("Found orphan servers %s at backend %s",
181
                          ", ".join(map(str, orphans)), self.backend)
182
        else:
183
            self.log.debug("No orphan servers at backend %s", self.backend)
184

    
185
        if orphans and self.options["fix_orphans"]:
186
            for server_id in orphans:
187
                server_name = utils.id_to_instance_name(server_id)
188
                self.client.DeleteInstance(server_name)
189
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
190

    
191
    def reconcile_unsynced_servers(self):
192
        #log = self.log
193
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
194
            db_server = self.db_servers[server_id]
195
            gnt_server = self.gnt_servers[server_id]
196
            if db_server.operstate == "BUILD":
197
                build_status = self.get_build_status(db_server)
198
                if build_status == "RUNNING":
199
                    # Do not reconcile building VMs
200
                    continue
201
                elif build_status == "ERROR":
202
                    # Special handling of build errors
203
                    self.reconcile_building_server(db_server)
204
                    continue
205

    
206
            self.reconcile_unsynced_operstate(server_id, db_server,
207
                                              gnt_server)
208
            self.reconcile_unsynced_flavor(server_id, db_server,
209
                                           gnt_server)
210
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
211
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
212

    
213
    def reconcile_building_server(self, db_server):
214
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
215
                      db_server.id)
216
        if self.options["fix_unsynced"]:
217
            fix_opcode = "OP_INSTANCE_CREATE"
218
            backend_mod.process_op_status(
219
                vm=db_server,
220
                etime=self.event_time,
221
                jobid=-0,
222
                opcode=fix_opcode, status='error',
223
                logmsg='Reconciliation: simulated Ganeti event')
224
            self.log.debug("Simulated Ganeti error build event for"
225
                           " server '%s'", db_server.id)
226

    
227
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
228
        if db_server.operstate != gnt_server["state"]:
229
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
230
                          server_id, db_server.operstate, gnt_server["state"])
231
            if self.options["fix_unsynced"]:
232
                # If server is in building state, you will have first to
233
                # reconcile it's creation, to avoid wrong quotas
234
                if db_server.operstate == "BUILD":
235
                    backend_mod.process_op_status(
236
                        vm=db_server, etime=self.event_time, jobid=-0,
237
                        opcode="OP_INSTANCE_CREATE", status='success',
238
                        logmsg='Reconciliation: simulated Ganeti event')
239
                fix_opcode = "OP_INSTANCE_STARTUP"\
240
                    if gnt_server["state"] == "STARTED"\
241
                    else "OP_INSTANCE_SHUTDOWN"
242
                backend_mod.process_op_status(
243
                    vm=db_server, etime=self.event_time, jobid=-0,
244
                    opcode=fix_opcode, status='success',
245
                    logmsg='Reconciliation: simulated Ganeti event')
246
                self.log.debug("Simulated Ganeti state event for server '%s'",
247
                               server_id)
248

    
249
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
250
        db_flavor = db_server.flavor
251
        gnt_flavor = gnt_server["flavor"]
252
        if (db_flavor.ram != gnt_flavor["ram"] or
253
           db_flavor.cpu != gnt_flavor["vcpus"]):
254
            try:
255
                gnt_flavor = Flavor.objects.get(
256
                    ram=gnt_flavor["ram"],
257
                    cpu=gnt_flavor["vcpus"],
258
                    disk=db_flavor.disk,
259
                    disk_template=db_flavor.disk_template)
260
            except Flavor.DoesNotExist:
261
                self.log.warning("Server '%s' has unknown flavor.", server_id)
262
                return
263

    
264
            self.log.info("Server '%s' has flavor '%' in DB and '%s' in"
265
                          " Ganeti", server_id, db_flavor, gnt_flavor)
266
            if self.options["fix_unsynced_flavors"]:
267
                old_state = db_server.operstate
268
                opcode = "OP_INSTANCE_SET_PARAMS"
269
                beparams = {"vcpus": gnt_flavor.cpu,
270
                            "minmem": gnt_flavor.ram,
271
                            "maxmem": gnt_flavor.ram}
272
                backend_mod.process_op_status(
273
                    vm=db_server, etime=self.event_time, jobid=-0,
274
                    opcode=opcode, status='success',
275
                    beparams=beparams,
276
                    logmsg='Reconciliation: simulated Ganeti event')
277
                # process_op_status with beparams will set the vmstate to
278
                # shutdown. Fix this be returning it to old state
279
                vm = VirtualMachine.objects.get(pk=server_id)
280
                vm.operstate = old_state
281
                vm.save()
282
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
283
                               server_id)
284

    
285
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
286
        db_nics = db_server.nics.order_by("index")
287
        gnt_nics = gnt_server["nics"]
288
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
289
        if backend_mod.nics_changed(db_nics, gnt_nics_parsed):
290
            msg = "Found unsynced NICs for server '%s'.\n\t"\
291
                  "DB: %s\n\tGaneti: %s"
292
            db_nics_str = ", ".join(map(format_db_nic, db_nics))
293
            gnt_nics_str = ", ".join(map(format_gnt_nic, gnt_nics_parsed))
294
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
295
            if self.options["fix_unsynced_nics"]:
296
                backend_mod.process_net_status(vm=db_server,
297
                                               etime=self.event_time,
298
                                               nics=gnt_nics)
299

    
300
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
301
        pass
302

    
303

    
304
def format_db_nic(nic):
305
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
306
           nic.ipv4, nic.network_id, nic.mac, nic.firewall_profile)
307

    
308

    
309
def format_gnt_nic(nic):
310
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" %\
311
           (nic["index"], nic["ipv4"], nic["network"], nic["mac"],
312
            nic["firewall_profile"])
313

    
314

    
315
#
316
# Networks
317
#
318

    
319

    
320
def get_networks_from_ganeti(backend):
321
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
322

    
323
    networks = {}
324
    with pooled_rapi_client(backend) as c:
325
        for net in c.GetNetworks(bulk=True):
326
            if net['name'].startswith(prefix):
327
                id = utils.id_from_network_name(net['name'])
328
                networks[id] = net
329

    
330
    return networks
331

    
332

    
333
def hanging_networks(backend, GNets):
334
    """Get networks that are not connected to all Nodegroups.
335

336
    """
337
    def get_network_groups(group_list):
338
        groups = set()
339
        for g in group_list:
340
            g_name = g.split('(')[0]
341
            groups.add(g_name)
342
        return groups
343

    
344
    with pooled_rapi_client(backend) as c:
345
        groups = set(c.GetGroups())
346

    
347
    hanging = {}
348
    for id, info in GNets.items():
349
        group_list = get_network_groups(info['group_list'])
350
        if group_list != groups:
351
            hanging[id] = groups - group_list
352
    return hanging
353

    
354

    
355
def get_online_backends():
356
    return Backend.objects.filter(offline=False)
357

    
358

    
359
def get_database_servers(backend):
360
    servers = backend.virtual_machines.select_related("nics", "flavor")\
361
                                      .filter(deleted=False)
362
    return dict([(s.id, s) for s in servers])
363

    
364

    
365
def get_ganeti_servers(backend):
366
    gnt_instances = backend_mod.get_instances(backend)
367
    # Filter out non-synnefo instances
368
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
369
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
370
                           gnt_instances)
371
    gnt_instances = map(parse_gnt_instance, gnt_instances)
372
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
373

    
374

    
375
def parse_gnt_instance(instance):
376
    try:
377
        instance_id = utils.id_from_instance_name(instance['name'])
378
    except Exception:
379
        logger.error("Ignoring instance with malformed name %s",
380
                     instance['name'])
381
        return (None, None)
382

    
383
    beparams = instance["beparams"]
384

    
385
    vcpus = beparams["vcpus"]
386
    ram = beparams["maxmem"]
387
    state = instance["oper_state"] and "STARTED" or "STOPPED"
388

    
389
    return {
390
        "id": instance_id,
391
        "state": state,  # FIX
392
        "updated": datetime.fromtimestamp(instance["mtime"]),
393
        "disks": disks_from_instance(instance),
394
        "nics": nics_from_instance(instance),
395
        "flavor": {"vcpus": vcpus,
396
                   "ram": ram},
397
        "tags": instance["tags"]
398
    }
399

    
400

    
401
def nics_from_instance(i):
402
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
403
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
404
    networks = zip(itertools.repeat('network'), i['nic.networks'])
405
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
406
    # links = zip(itertools.repeat('link'), i['nic.links'])
407
    # nics = zip(ips,macs,modes,networks,links)
408
    nics = zip(ips, macs, networks)
409
    nics = map(lambda x: dict(x), nics)
410
    #nics = dict(enumerate(nics))
411
    tags = i["tags"]
412
    for tag in tags:
413
        t = tag.split(":")
414
        if t[0:2] == ["synnefo", "network"]:
415
            if len(t) != 4:
416
                logger.error("Malformed synefo tag %s", tag)
417
                continue
418
            try:
419
                index = int(t[2])
420
                nics[index]['firewall'] = t[3]
421
            except ValueError:
422
                logger.error("Malformed synnefo tag %s", tag)
423
            except IndexError:
424
                logger.error("Found tag %s for non-existent NIC %d",
425
                             tag, index)
426
    return nics
427

    
428

    
429
def disks_from_instance(i):
430
    return dict([(index, {"size": size})
431
                 for index, size in enumerate(i["disk.sizes"])])