Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / reconciliation.py @ 7ab30015

History | View | Annotate | Download (11.6 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
"""Business logic for reconciliation
38

39
Reconcile the contents of the DB with the actual state of the
40
Ganeti backend.
41

42
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
43
RULES:
44
    R1. Stale servers in DB:
45
            For any v in D but not in G:
46
            Set deleted=True.
47
    R2. Orphan instances in Ganet:
48
            For any v in G with deleted=True in D:
49
            Issue OP_INSTANCE_DESTROY.
50
    R3. Unsynced operstate:
51
            For any v whose operating state differs between G and V:
52
            Set the operating state in D based on the state in G.
53
In the code, D, G are Python dicts mapping instance ids to operating state.
54
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
55
For G, the operating state is True if the machine is up, False otherwise.
56

57
"""
58

    
59
import logging
60
import sys
61
import itertools
62

    
63
from django.core.management import setup_environ
64
try:
65
    from synnefo import settings
66
except ImportError:
67
    raise Exception("Cannot import settings, make sure PYTHONPATH contains "
68
                    "the parent directory of the Synnefo Django project.")
69
setup_environ(settings)
70

    
71

    
72
from datetime import datetime, timedelta
73
from collections import namedtuple
74

    
75
from synnefo.db.models import (VirtualMachine, NetworkInterface,
76
                               pooled_rapi_client)
77
from synnefo.logic.rapi import GanetiApiError
78
from synnefo.logic.backend import get_instances
79
from synnefo.logic import utils
80

    
81

    
82
log = logging.getLogger()
83

    
84
try:
85
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
86
except AttributeError:
87
    CHECK_INTERVAL = 60
88

    
89

    
90
def needs_reconciliation(vm):
91
    now = datetime.now()
92
    return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
93
           (now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
94

    
95
VMState = namedtuple("VMState", ["state", "nics"])
96

    
97

    
98
def stale_servers_in_db(D, G):
99
    idD = set(D.keys())
100
    idG = set(G.keys())
101

    
102
    stale = set()
103
    for i in idD - idG:
104
        if D[i] == 'BUILD':
105
            vm = VirtualMachine.objects.get(id=i)
106
            if needs_reconciliation(vm):
107
                with pooled_rapi_client(vm) as c:
108
                    try:
109
                        job_status = c.GetJobStatus(vm.backendjobid)['status']
110
                        if job_status in ('queued', 'waiting', 'running'):
111
                            # Server is still building in Ganeti
112
                            continue
113
                        else:
114
                            c.GetInstance(utils.id_to_instance_name(i))
115
                            # Server has just been created in Ganeti
116
                            continue
117
                    except GanetiApiError:
118
                        stale.add(i)
119
        else:
120
            stale.add(i)
121

    
122
    return stale
123

    
124

    
125
def orphan_instances_in_ganeti(D, G):
126
    idD = set(D.keys())
127
    idG = set(G.keys())
128

    
129
    return idG - idD
130

    
131

    
132
def unsynced_operstate(D, G):
133
    unsynced = set()
134
    idD = set(D.keys())
135
    idG = set(G.keys())
136

    
137
    for i in idD & idG:
138
        dbstate = D[i].state
139
        gntstate = G[i].state
140
        vm_unsynced = (gntstate and dbstate != "STARTED") or\
141
            (not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
142
        if vm_unsynced:
143
            unsynced.add((i, dbstate, gntstate))
144
        if not gntstate and dbstate == 'BUILD':
145
            vm = VirtualMachine.objects.get(id=i)
146
            if needs_reconciliation(vm):
147
                with pooled_rapi_client(vm) as c:
148
                    try:
149
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
150
                        if job_info['status'] == 'success':
151
                            unsynced.add((i, dbstate, gntstate))
152
                    except GanetiApiError:
153
                        pass
154

    
155
    return unsynced
156

    
157

    
158
def instances_with_build_errors(D, G):
159
    failed = set()
160
    idD = set(D.keys())
161
    idG = set(G.keys())
162

    
163
    for i in idD & idG:
164
        if not G[i] and D[i] == 'BUILD':
165
            vm = VirtualMachine.objects.get(id=i)
166
            if not vm.backendjobid:  # VM has not been enqueued in the backend
167
                if datetime.now() > vm.created + timedelta(seconds=120):
168
                    # If a job has not been enqueued after 2 minutues, then
169
                    # it must be a stale entry..
170
                    failed.add(i)
171
            elif needs_reconciliation(vm):
172
                # Check time to avoid many rapi calls
173
                with pooled_rapi_client(vm) as c:
174
                    try:
175
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
176
                        if job_info['status'] == 'error':
177
                            failed.add(i)
178
                    except GanetiApiError:
179
                        failed.add(i)
180

    
181
    return failed
182

    
183

    
184
def get_servers_from_db(backends, with_nics=True):
185
    vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
186
    vm_info = vms.values_list("id", "operstate")
187
    if with_nics:
188
        nics = NetworkInterface.objects.filter(machine__in=vms)\
189
                               .order_by("machine")\
190
                               .values_list("machine", "index", "mac", "ipv4",
191
                                            "network")
192
        vm_nics = {}
193
        for machine, machine_nics in itertools.groupby(nics,
194
                                                       lambda nic: nic[0]):
195
            vm_nics[machine] = {}
196
            for machine, index, mac, ipv4, network in machine_nics:
197
                nic = {'mac':      mac,
198
                       'network':  utils.id_to_network_name(network),
199
                       'ipv4':     ipv4 if ipv4 != '' else None
200
                       }
201
                vm_nics[machine][index] = nic
202
    servers = dict([(vm_id, VMState(state=state, nics=vm_nics.get(vm_id, [])))
203
                    for vm_id, state in vm_info])
204
    return servers
205

    
206

    
207
def get_instances_from_ganeti(backends):
208
    instances = []
209
    for backend in backends:
210
        instances.append(get_instances(backend))
211
    ganeti_instances = reduce(list.__add__, instances, [])
212
    snf_instances = {}
213

    
214
    prefix = settings.BACKEND_PREFIX_ID
215
    for i in ganeti_instances:
216
        if i['name'].startswith(prefix):
217
            try:
218
                id = utils.id_from_instance_name(i['name'])
219
            except Exception:
220
                log.error("Ignoring instance with malformed name %s",
221
                          i['name'])
222
                continue
223

    
224
            if id in snf_instances:
225
                log.error("Ignoring instance with duplicate Synnefo id %s",
226
                          i['name'])
227
                continue
228

    
229
            nics = get_nics_from_instance(i)
230
            snf_instances[id] = VMState(state=i["oper_state"],
231
                                        nics=nics)
232

    
233
    return snf_instances
234

    
235

    
236
#
237
# Nics
238
#
239
def get_nics_from_ganeti(backends):
240
    """Get network interfaces for each ganeti instance.
241

242
    """
243
    instances = []
244
    for backend in backends:
245
        instances.append(get_instances(backend))
246
    instances = reduce(list.__add__, instances, [])
247
    prefix = settings.BACKEND_PREFIX_ID
248

    
249
    snf_instances_nics = {}
250
    for i in instances:
251
        if i['name'].startswith(prefix):
252
            try:
253
                id = utils.id_from_instance_name(i['name'])
254
            except Exception:
255
                log.error("Ignoring instance with malformed name %s",
256
                          i['name'])
257
                continue
258
            if id in snf_instances_nics:
259
                log.error("Ignoring instance with duplicate Synnefo id %s",
260
                          i['name'])
261
                continue
262

    
263
            snf_instances_nics[id] = get_nics_from_instance(i)
264

    
265
    return snf_instances_nics
266

    
267

    
268
def get_nics_from_instance(i):
269
    ips = zip(itertools.repeat('ipv4'), i['nic.ips'])
270
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
271
    networks = zip(itertools.repeat('network'), i['nic.networks'])
272
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
273
    # links = zip(itertools.repeat('link'), i['nic.links'])
274
    # nics = zip(ips,macs,modes,networks,links)
275
    nics = zip(ips, macs, networks)
276
    nics = map(lambda x: dict(x), nics)
277
    nics = dict(enumerate(nics))
278
    return nics
279

    
280

    
281
def unsynced_nics(DBVMs, GanetiVMs):
282
    """Find unsynced network interfaces between DB and Ganeti.
283

284
    @ rtype: dict; {instance_id: ganeti_nics}
285
    @ return Dictionary containing the instances ids that have unsynced network
286
    interfaces between DB and Ganeti and the network interfaces in Ganeti.
287

288
    """
289
    idD = set(DBVMs.keys())
290
    idG = set(GanetiVMs.keys())
291

    
292
    unsynced = {}
293
    for i in idD & idG:
294
        nicsD = DBVMs[i].nics
295
        nicsG = GanetiVMs[i].nics
296
        if len(nicsD) != len(nicsG):
297
            unsynced[i] = (nicsD, nicsG)
298
            continue
299
        for index in nicsG.keys():
300
            nicD = nicsD[index]
301
            nicG = nicsG[index]
302
            diff = (nicD['ipv4'] != nicG['ipv4'] or
303
                    nicD['mac'] != nicG['mac'] or
304
                    nicD['network'] != nicG['network'])
305
            if diff:
306
                    unsynced[i] = (nicsD, nicsG)
307
                    break
308

    
309
    return unsynced
310

    
311
#
312
# Networks
313
#
314

    
315

    
316
def get_networks_from_ganeti(backend):
317
    prefix = settings.BACKEND_PREFIX_ID + 'net-'
318

    
319
    networks = {}
320
    with pooled_rapi_client(backend) as c:
321
        for net in c.GetNetworks(bulk=True):
322
            if net['name'].startswith(prefix):
323
                id = utils.id_from_network_name(net['name'])
324
                networks[id] = net
325

    
326
    return networks
327

    
328

    
329
def hanging_networks(backend, GNets):
330
    """Get networks that are not connected to all Nodegroups.
331

332
    """
333
    def get_network_groups(group_list):
334
        groups = set()
335
        for g in group_list:
336
            g_name = g.split('(')[0]
337
            groups.add(g_name)
338
        return groups
339

    
340
    with pooled_rapi_client(backend) as c:
341
        groups = set(c.GetGroups())
342

    
343
    hanging = {}
344
    for id, info in GNets.items():
345
        group_list = get_network_groups(info['group_list'])
346
        if group_list != groups:
347
            hanging[id] = groups - group_list
348
    return hanging
349

    
350

    
351
# Only for testing this module individually
352
def main():
353
    print get_instances_from_ganeti()
354

    
355

    
356
if __name__ == "__main__":
357
    sys.exit(main())