Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ a67419d8

History | View | Annotate | Download (12.6 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
"""Business logic for reconciliation
38

39
Reconcile the contents of the DB with the actual state of the
40
Ganeti backend.
41

42
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
43
RULES:
44
    R1. Stale servers in DB:
45
            For any v in D but not in G:
46
            Set deleted=True.
47
    R2. Orphan instances in Ganet:
48
            For any v in G with deleted=True in D:
49
            Issue OP_INSTANCE_DESTROY.
50
    R3. Unsynced operstate:
51
            For any v whose operating state differs between G and V:
52
            Set the operating state in D based on the state in G.
53
In the code, D, G are Python dicts mapping instance ids to operating state.
54
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
55
For G, the operating state is True if the machine is up, False otherwise.
56

57
"""
58

    
59
import logging
60
import sys
61
import itertools
62

    
63
from django.core.management import setup_environ
64
try:
65
    from synnefo import settings
66
except ImportError:
67
    raise Exception("Cannot import settings, make sure PYTHONPATH contains "
68
                    "the parent directory of the Synnefo Django project.")
69
setup_environ(settings)
70

    
71

    
72
from datetime import datetime, timedelta
73
from collections import namedtuple
74

    
75
from synnefo.db.models import (VirtualMachine, NetworkInterface, Flavor,
76
                               pooled_rapi_client)
77
from synnefo.logic.rapi import GanetiApiError
78
from synnefo.logic.backend import get_instances
79
from synnefo.logic import utils
80

    
81

    
82
log = logging.getLogger()
83

    
84
try:
85
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
86
except AttributeError:
87
    CHECK_INTERVAL = 60
88

    
89

    
90
def needs_reconciliation(vm):
91
    now = datetime.now()
92
    return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
93
           (now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
94

    
95
VMState = namedtuple("VMState", ["state", "cpu", "ram", "nics"])
96

    
97

    
98
def stale_servers_in_db(D, G):
99
    idD = set(D.keys())
100
    idG = set(G.keys())
101

    
102
    stale = set()
103
    for i in idD - idG:
104
        if D[i] == 'BUILD':
105
            vm = VirtualMachine.objects.get(id=i)
106
            if needs_reconciliation(vm):
107
                with pooled_rapi_client(vm) as c:
108
                    try:
109
                        job_status = c.GetJobStatus(vm.backendjobid)['status']
110
                        if job_status in ('queued', 'waiting', 'running'):
111
                            # Server is still building in Ganeti
112
                            continue
113
                        else:
114
                            c.GetInstance(utils.id_to_instance_name(i))
115
                            # Server has just been created in Ganeti
116
                            continue
117
                    except GanetiApiError:
118
                        stale.add(i)
119
        else:
120
            stale.add(i)
121

    
122
    return stale
123

    
124

    
125
def orphan_instances_in_ganeti(D, G):
126
    idD = set(D.keys())
127
    idG = set(G.keys())
128

    
129
    return idG - idD
130

    
131

    
132
def unsynced_operstate(D, G):
133
    unsynced = set()
134
    idD = set(D.keys())
135
    idG = set(G.keys())
136

    
137
    for i in idD & idG:
138
        dbstate = D[i].state
139
        gntstate = G[i].state
140
        vm_unsynced = (gntstate and dbstate != "STARTED") or\
141
            (not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
142
        if vm_unsynced:
143
            unsynced.add((i, dbstate, gntstate))
144
        if not gntstate and dbstate == 'BUILD':
145
            vm = VirtualMachine.objects.get(id=i)
146
            if needs_reconciliation(vm):
147
                with pooled_rapi_client(vm) as c:
148
                    try:
149
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
150
                        if job_info['status'] == 'success':
151
                            unsynced.add((i, dbstate, gntstate))
152
                    except GanetiApiError:
153
                        pass
154

    
155
    return unsynced
156

    
157

    
158
def unsynced_flavors(D, G):
159
    unsynced = set()
160
    idD = set(D.keys())
161
    idG = set(G.keys())
162

    
163
    for i in idD & idG:
164
        if D[i].ram != G[i].ram or D[i].cpu != G[i].cpu:
165
            db_flavor = VirtualMachine.objects.get(id=i).flavor
166
            try:
167
                gnt_flavor = Flavor.objects.get(
168
                                    ram=G[i].ram, cpu=G[i].cpu,
169
                                    disk=db_flavor.disk,
170
                                    disk_template=db_flavor.disk_template)
171
            except Flavor.DoesNotExist:
172
                gnt_flavor = None
173
            unsynced.add((i, db_flavor, gnt_flavor))
174
    return unsynced
175

    
176

    
177
def instances_with_build_errors(D, G):
178
    failed = set()
179
    idD = set(D.keys())
180
    idG = set(G.keys())
181

    
182
    for i in idD & idG:
183
        if not G[i] and D[i] == 'BUILD':
184
            vm = VirtualMachine.objects.get(id=i)
185
            if not vm.backendjobid:  # VM has not been enqueued in the backend
186
                if datetime.now() > vm.created + timedelta(seconds=120):
187
                    # If a job has not been enqueued after 2 minutues, then
188
                    # it must be a stale entry..
189
                    failed.add(i)
190
            elif needs_reconciliation(vm):
191
                # Check time to avoid many rapi calls
192
                with pooled_rapi_client(vm) as c:
193
                    try:
194
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
195
                        if job_info['status'] == 'error':
196
                            failed.add(i)
197
                    except GanetiApiError:
198
                        failed.add(i)
199

    
200
    return failed
201

    
202

    
203
def get_servers_from_db(backends, with_nics=True):
204
    vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
205
    vm_info = vms.values_list("id", "operstate", "flavor__cpu", "flavor__ram")
206
    if with_nics:
207
        nics = NetworkInterface.objects.filter(machine__in=vms)\
208
                               .order_by("machine")\
209
                               .values_list("machine", "index", "mac", "ipv4",
210
                                            "network")
211
        vm_nics = {}
212
        for machine, machine_nics in itertools.groupby(nics,
213
                                                       lambda nic: nic[0]):
214
            vm_nics[machine] = {}
215
            for machine, index, mac, ipv4, network in machine_nics:
216
                nic = {'mac':      mac,
217
                       'network':  utils.id_to_network_name(network),
218
                       'ipv4':     ipv4 if ipv4 != '' else None
219
                       }
220
                vm_nics[machine][index] = nic
221
    servers = dict([(vm_id, VMState(state=state,
222
                                    cpu=cpu,
223
                                    ram=ram,
224
                                    nics=vm_nics.get(vm_id, [])))
225
                    for vm_id, state, cpu, ram in vm_info])
226
    return servers
227

    
228

    
229
def get_instances_from_ganeti(backends):
230
    instances = []
231
    for backend in backends:
232
        instances.append(get_instances(backend))
233
    ganeti_instances = reduce(list.__add__, instances, [])
234
    snf_instances = {}
235

    
236
    prefix = settings.BACKEND_PREFIX_ID
237
    for i in ganeti_instances:
238
        if i['name'].startswith(prefix):
239
            try:
240
                id = utils.id_from_instance_name(i['name'])
241
            except Exception:
242
                log.error("Ignoring instance with malformed name %s",
243
                          i['name'])
244
                continue
245

    
246
            if id in snf_instances:
247
                log.error("Ignoring instance with duplicate Synnefo id %s",
248
                          i['name'])
249
                continue
250

    
251
            nics = get_nics_from_instance(i)
252
            beparams = i["beparams"]
253
            vcpus = beparams["vcpus"]
254
            ram = beparams["maxmem"]
255
            snf_instances[id] = VMState(state=i["oper_state"],
256
                                        cpu=vcpus,
257
                                        ram=ram,
258
                                        nics=nics)
259

    
260
    return snf_instances
261

    
262

    
263
#
264
# Nics
265
#
266
def get_nics_from_ganeti(backends):
267
    """Get network interfaces for each ganeti instance.
268

269
    """
270
    instances = []
271
    for backend in backends:
272
        instances.append(get_instances(backend))
273
    instances = reduce(list.__add__, instances, [])
274
    prefix = settings.BACKEND_PREFIX_ID
275

    
276
    snf_instances_nics = {}
277
    for i in instances:
278
        if i['name'].startswith(prefix):
279
            try:
280
                id = utils.id_from_instance_name(i['name'])
281
            except Exception:
282
                log.error("Ignoring instance with malformed name %s",
283
                          i['name'])
284
                continue
285
            if id in snf_instances_nics:
286
                log.error("Ignoring instance with duplicate Synnefo id %s",
287
                          i['name'])
288
                continue
289

    
290
            snf_instances_nics[id] = get_nics_from_instance(i)
291

    
292
    return snf_instances_nics
293

    
294

    
295
def get_nics_from_instance(i):
296
    ips = zip(itertools.repeat('ipv4'), i['nic.ips'])
297
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
298
    networks = zip(itertools.repeat('network'), i['nic.networks'])
299
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
300
    # links = zip(itertools.repeat('link'), i['nic.links'])
301
    # nics = zip(ips,macs,modes,networks,links)
302
    nics = zip(ips, macs, networks)
303
    nics = map(lambda x: dict(x), nics)
304
    nics = dict(enumerate(nics))
305
    return nics
306

    
307

    
308
def unsynced_nics(DBVMs, GanetiVMs):
309
    """Find unsynced network interfaces between DB and Ganeti.
310

311
    @ rtype: dict; {instance_id: ganeti_nics}
312
    @ return Dictionary containing the instances ids that have unsynced network
313
    interfaces between DB and Ganeti and the network interfaces in Ganeti.
314

315
    """
316
    idD = set(DBVMs.keys())
317
    idG = set(GanetiVMs.keys())
318

    
319
    unsynced = {}
320
    for i in idD & idG:
321
        nicsD = DBVMs[i].nics
322
        nicsG = GanetiVMs[i].nics
323
        if len(nicsD) != len(nicsG):
324
            unsynced[i] = (nicsD, nicsG)
325
            continue
326
        for index in nicsG.keys():
327
            nicD = nicsD[index]
328
            nicG = nicsG[index]
329
            diff = (nicD['ipv4'] != nicG['ipv4'] or
330
                    nicD['mac'] != nicG['mac'] or
331
                    nicD['network'] != nicG['network'])
332
            if diff:
333
                    unsynced[i] = (nicsD, nicsG)
334
                    break
335

    
336
    return unsynced
337

    
338
#
339
# Networks
340
#
341

    
342

    
343
def get_networks_from_ganeti(backend):
344
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
345

    
346
    networks = {}
347
    with pooled_rapi_client(backend) as c:
348
        for net in c.GetNetworks(bulk=True):
349
            if net['name'].startswith(prefix):
350
                id = utils.id_from_network_name(net['name'])
351
                networks[id] = net
352

    
353
    return networks
354

    
355

    
356
def hanging_networks(backend, GNets):
357
    """Get networks that are not connected to all Nodegroups.
358

359
    """
360
    def get_network_groups(group_list):
361
        groups = set()
362
        for g in group_list:
363
            g_name = g.split('(')[0]
364
            groups.add(g_name)
365
        return groups
366

    
367
    with pooled_rapi_client(backend) as c:
368
        groups = set(c.GetGroups())
369

    
370
    hanging = {}
371
    for id, info in GNets.items():
372
        group_list = get_network_groups(info['group_list'])
373
        if group_list != groups:
374
            hanging[id] = groups - group_list
375
    return hanging
376

    
377

    
378
# Only for testing this module individually
379
def main():
380
    print get_instances_from_ganeti()
381

    
382

    
383
if __name__ == "__main__":
384
    sys.exit(main())