Revision 75dc539e

b/snf-cyclades-app/synnefo/logic/management/commands/reconcile-servers.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2 2
#
3 3
# Redistribution and use in source and binary forms, with or without
4 4
# modification, are permitted provided that the following conditions
......
35 35

  
36 36
"""
37 37
import sys
38
import datetime
39

  
38
import logging
39
import subprocess
40 40
from optparse import make_option
41

  
42
from django.core.management.base import BaseCommand, CommandError
43

  
44
from synnefo.db.models import (Backend, VirtualMachine, Network,
45
                               pooled_rapi_client)
46
from synnefo.logic import reconciliation, utils
47
from synnefo.logic import backend as backend_mod
48
from synnefo.util.mac2eui64 import mac2eui64
41
from django.core.management.base import BaseCommand
49 42
from synnefo.management.common import get_backend
43
from synnefo.logic import reconciliation
44
from synnefo.webproject.management.utils import parse_bool
50 45

  
51 46

  
52 47
class Command(BaseCommand):
53 48
    can_import_settings = True
54 49

  
55 50
    help = 'Reconcile contents of Synnefo DB with state of Ganeti backend'
56
    output_transaction = True  # The management command runs inside
57
                               # an SQL transaction
58 51
    option_list = BaseCommand.option_list + (
59
        make_option('--detect-stale', action='store_true', dest='detect_stale',
60
                    default=False, help='Detect stale VM entries in DB'),
61
        make_option('--detect-orphans', action='store_true',
62
                    dest='detect_orphans',
63
                    default=False, help='Detect orphan instances in Ganeti'),
64
        make_option('--detect-unsynced', action='store_true',
65
                    dest='detect_unsynced',
66
                    default=False, help='Detect unsynced operstate between ' +
67
                                        'DB and Ganeti'),
68
        make_option('--detect-build-errors', action='store_true',
69
                    dest='detect_build_errors', default=False,
70
                    help='Detect instances with build error'),
71
        make_option('--detect-unsynced-nics', action='store_true',
72
                    dest='detect_unsynced_nics', default=False,
73
                    help='Detect unsynced nics between DB and Ganeti'),
74
        make_option('--detect-unsynced-flavors', action='store_true',
75
                    dest='detect_unsynced_flavors', default=False,
76
                    help='Detect unsynced flavors between DB and Ganeti'),
77
        make_option('--detect-all', action='store_true',
78
                    dest='detect_all',
79
                    default=False, help='Enable all --detect-* arguments'),
52
        make_option('--backend-id', default=None, dest='backend-id',
53
                    help='Reconcilie VMs only for this backend'),
54
        make_option("--parallel",
55
                    dest="parallel",
56
                    default="True",
57
                    choices=["True", "False"],
58
                    metavar="True|False",
59
                    help="Perform server reconciliation for each backend"
60
                         " parallel."),
80 61
        make_option('--fix-stale', action='store_true', dest='fix_stale',
81 62
                    default=False, help='Fix (remove) stale DB entries in DB'),
82 63
        make_option('--fix-orphans', action='store_true', dest='fix_orphans',
......
84 65
        make_option('--fix-unsynced', action='store_true', dest='fix_unsynced',
85 66
                    default=False, help='Fix server operstate in DB, set ' +
86 67
                                        'from Ganeti'),
87
        make_option('--fix-build-errors', action='store_true',
88
                    dest='fix_build_errors', default=False,
89
                    help='Fix (remove) instances with build errors'),
90 68
        make_option('--fix-unsynced-nics', action='store_true',
91 69
                    dest='fix_unsynced_nics', default=False,
92 70
                    help='Fix unsynced nics between DB and Ganeti'),
......
95 73
                    help='Fix unsynced flavors between DB and Ganeti'),
96 74
        make_option('--fix-all', action='store_true', dest='fix_all',
97 75
                    default=False, help='Enable all --fix-* arguments'),
98
        make_option('--backend-id', default=None, dest='backend-id',
99
                    help='Reconcilie VMs only for this backend'),
100 76
    )
101 77

  
102 78
    def _process_args(self, options):
103
        keys_detect = [k for k in options.keys() if k.startswith('detect_')]
104 79
        keys_fix = [k for k in options.keys() if k.startswith('fix_')]
105

  
106
        if not reduce(lambda x, y: x or y,
107
                      map(lambda x: options[x], keys_detect)):
108
            options['detect_all'] = True
109

  
110
        if options['detect_all']:
111
            for kd in keys_detect:
112
                options[kd] = True
113 80
        if options['fix_all']:
114 81
            for kf in keys_fix:
115 82
                options[kf] = True
116 83

  
117
        for kf in keys_fix:
118
            kd = kf.replace('fix_', 'detect_', 1)
119
            if (options[kf] and not options[kd]):
120
                raise CommandError("Cannot use --%s without corresponding "
121
                                   "--%s argument" % (kf, kd))
122

  
123 84
    def handle(self, **options):
124
        verbosity = int(options['verbosity'])
125
        self._process_args(options)
126 85
        backend_id = options['backend-id']
127 86
        if backend_id:
128 87
            backends = [get_backend(backend_id)]
129 88
        else:
130
            backends = Backend.objects.filter(offline=False)
131

  
132
        with_nics = options["detect_unsynced_nics"]
133

  
134
        DBVMs = reconciliation.get_servers_from_db(backends, with_nics)
135
        GanetiVMs = reconciliation.get_instances_from_ganeti(backends)
136

  
137
        #
138
        # Detect problems
139
        #
140
        if options['detect_stale']:
141
            stale = reconciliation.stale_servers_in_db(DBVMs, GanetiVMs)
142
            if len(stale) > 0:
143
                print >> sys.stderr, "Found the following stale server IDs: "
144
                print "    " + "\n    ".join(
145
                    [str(x) for x in stale])
146
            elif verbosity == 2:
147
                print >> sys.stderr, "Found no stale server IDs in DB."
148

  
149
        if options['detect_orphans']:
150
            orphans = reconciliation.orphan_instances_in_ganeti(DBVMs,
151
                                                                GanetiVMs)
152
            if len(orphans) > 0:
153
                print >> sys.stderr, "Found orphan Ganeti instances with IDs: "
154
                print "    " + "\n    ".join(
155
                    [str(x) for x in orphans])
156
            elif verbosity == 2:
157
                print >> sys.stderr, "Found no orphan Ganeti instances."
158

  
159
        if options['detect_unsynced']:
160
            unsynced = reconciliation.unsynced_operstate(DBVMs, GanetiVMs)
161
            if len(unsynced) > 0:
162
                print >> sys.stderr, "The operstate of the following server" \
163
                                     " IDs is out-of-sync:"
164
                print "    " + "\n    ".join(
165
                    ["%d is %s in DB, %s in Ganeti" %
166
                     (x[0], x[1], ('UP' if x[2] else 'DOWN'))
167
                     for x in unsynced])
168
            elif verbosity == 2:
169
                print >> sys.stderr, "The operstate of all servers is in sync."
170

  
171
        if options['detect_build_errors']:
172
            build_errors = reconciliation.\
173
                instances_with_build_errors(DBVMs, GanetiVMs)
174
            if len(build_errors) > 0:
175
                msg = "The os for the following server IDs was not build"\
176
                      " successfully:"
177
                print >> sys.stderr, msg
178
                print "    " + "\n    ".join(
179
                    ["%d" % x for x in build_errors])
180
            elif verbosity == 2:
181
                print >> sys.stderr, "Found no instances with build errors."
182

  
183
        if options['detect_unsynced_nics']:
184
            def pretty_print_nics(nics):
185
                if not nics:
186
                    print ''.ljust(18) + 'None'
187
                for index, info in nics.items():
188
                    print ''.ljust(18) + 'nic/' + str(index) +\
189
                          ': MAC: %s, IP: %s, Network: %s' % \
190
                          (info['mac'], info['ipv4'], info['network'])
191

  
192
            unsynced_nics = reconciliation.unsynced_nics(DBVMs, GanetiVMs)
193
            if len(unsynced_nics) > 0:
194
                msg = "The NICs of the servers with the following IDs are"\
195
                      " unsynced:"
196
                print >> sys.stderr, msg
197
                for id, nics in unsynced_nics.items():
198
                    print ''.ljust(2) + '%6d:' % id
199
                    print ''.ljust(8) + '%8s:' % 'DB'
200
                    pretty_print_nics(nics[0])
201
                    print ''.ljust(8) + '%8s:' % 'Ganeti'
202
                    pretty_print_nics(nics[1])
203
            elif verbosity == 2:
204
                print >> sys.stderr, "All instance nics are synced."
205

  
206
        if options["detect_unsynced_flavors"]:
207
            unsynced_flavors = reconciliation.unsynced_flavors(DBVMs,
208
                                                               GanetiVMs)
209
            if len(unsynced_flavors) > 0:
210
                print >> sys.stderr, "The flavor of the following server" \
211
                                     " IDs is out-of-sync:"
212
                print "    " + "\n    ".join(
213
                    ["%d is %s in DB, %s in Ganeti" %
214
                     (x[0], x[1], x[2])
215
                     for x in unsynced_flavors])
216
            elif verbosity == 2:
217
                print >> sys.stderr, "All instance flavors are synced."
218

  
219
        #
220
        # Then fix them
221
        #
222
        if options['fix_stale'] and len(stale) > 0:
223
            print >> sys.stderr, \
224
                "Simulating successful Ganeti removal for %d " \
225
                "servers in the DB:" % len(stale)
226
            for vm in VirtualMachine.objects.filter(pk__in=stale):
227
                event_time = datetime.datetime.now()
228
                backend_mod.process_op_status(
229
                    vm=vm,
230
                    etime=event_time,
231
                    jobid=-0,
232
                    opcode='OP_INSTANCE_REMOVE', status='success',
233
                    logmsg='Reconciliation: simulated Ganeti event')
234
            print >> sys.stderr, "    ...done"
235

  
236
        if options['fix_orphans'] and len(orphans) > 0:
237
            print >> sys.stderr, \
238
                "Issuing OP_INSTANCE_REMOVE for %d Ganeti instances:" % \
239
                len(orphans)
240
            for id in orphans:
241
                try:
242
                    vm = VirtualMachine.objects.get(pk=id)
243
                    with pooled_rapi_client(vm) as client:
244
                        client.DeleteInstance(utils.id_to_instance_name(id))
245
                except VirtualMachine.DoesNotExist:
246
                    print >> sys.stderr, "No entry for VM %d in DB !!" % id
247
            print >> sys.stderr, "    ...done"
89
            backends = reconciliation.get_online_backends()
90

  
91
        parallel = parse_bool(options["parallel"])
92
        if parallel and len(backends) > 1:
93
            cmd = sys.argv
94
            processes = []
95
            for backend in backends:
96
                p = subprocess.Popen(cmd + ["--backend-id=%s" % backend.id])
97
                processes.append(p)
98
            for p in processes:
99
                p.wait()
100
            return
101

  
102
        verbosity = int(options["verbosity"])
103

  
104
        logger = logging.getLogger("reconcile-severs")
105
        logger.propagate = 0
106

  
107
        formatter = logging.Formatter("%(message)s")
108
        log_handler = logging.StreamHandler()
109
        log_handler.setFormatter(formatter)
110
        if verbosity == 2:
111
            formatter = logging.Formatter("%(asctime)s: %(message)s")
112
            log_handler.setFormatter(formatter)
113
            logger.setLevel(logging.DEBUG)
114
        elif verbosity == 1:
115
            logger.setLevel(logging.INFO)
116
        else:
117
            logger.setLevel(logging.WARNING)
248 118

  
249
        if options['fix_unsynced'] and len(unsynced) > 0:
250
            print >> sys.stderr, "Setting the state of %d out-of-sync VMs:" % \
251
                len(unsynced)
252
            for id, db_state, ganeti_up in unsynced:
253
                vm = VirtualMachine.objects.get(pk=id)
254
                opcode = "OP_INSTANCE_REBOOT" if ganeti_up \
255
                         else "OP_INSTANCE_SHUTDOWN"
256
                event_time = datetime.datetime.now()
257
                backend_mod.process_op_status(
258
                    vm=vm, etime=event_time, jobid=-0,
259
                    opcode=opcode, status='success',
260
                    logmsg='Reconciliation: simulated Ganeti event')
261
            print >> sys.stderr, "    ...done"
119
        logger.addHandler(log_handler)
262 120

  
263
        if options['fix_build_errors'] and len(build_errors) > 0:
264
            print >> sys.stderr, "Setting the state of %d build-errors VMs:" %\
265
                                 len(build_errors)
266
            for id in build_errors:
267
                vm = VirtualMachine.objects.get(pk=id)
268
                event_time = datetime.datetime.now()
269
                backend_mod.process_op_status(
270
                    vm=vm, etime=event_time, jobid=-0,
271
                    opcode="OP_INSTANCE_CREATE", status='error',
272
                    logmsg='Reconciliation: simulated Ganeti event')
273
            print >> sys.stderr, "    ...done"
121
        self._process_args(options)
274 122

  
275
        if options['fix_unsynced_nics'] and len(unsynced_nics) > 0:
276
            print >> sys.stderr, "Setting the nics of %d out-of-sync VMs:" % \
277
                                 len(unsynced_nics)
278
            for id, nics in unsynced_nics.items():
279
                vm = VirtualMachine.objects.get(pk=id)
280
                nics = nics[1]  # Ganeti nics
281
                if nics == {}:  # No nics
282
                    vm.nics.all.delete()
283
                    continue
284
                for index, nic in nics.items():
285
                    net_id = utils.id_from_network_name(nic['network'])
286
                    subnet6 = Network.objects.get(id=net_id).subnet6
287
                    # Produce ipv6
288
                    ipv6 = subnet6 and mac2eui64(nic['mac'], subnet6) or None
289
                    nic['ipv6'] = ipv6
290
                    # Rename ipv4 to ip
291
                    nic['ip'] = nic['ipv4']
292
                # Dict to sorted list
293
                final_nics = []
294
                nics_keys = nics.keys()
295
                nics_keys.sort()
296
                for i in nics_keys:
297
                    if nics[i]['network']:
298
                        final_nics.append(nics[i])
299
                    else:
300
                        print 'Network of nic %d of vm %s is None. ' \
301
                              'Can not reconcile' % (i, vm.backend_vm_id)
302
                event_time = datetime.datetime.now()
303
                backend_mod.process_net_status(vm=vm, etime=event_time,
304
                                               nics=final_nics)
305
            print >> sys.stderr, "    ...done"
306
        if options["fix_unsynced_flavors"] and len(unsynced_flavors) > 0:
307
            print >> sys.stderr, "Setting the flavor of %d unsynced VMs:" % \
308
                len(unsynced_flavors)
309
            for id, db_flavor, gnt_flavor in unsynced_flavors:
310
                vm = VirtualMachine.objects.get(pk=id)
311
                old_state = vm.operstate
312
                opcode = "OP_INSTANCE_SET_PARAMS"
313
                beparams = {"vcpus": gnt_flavor.cpu,
314
                            "minmem": gnt_flavor.ram,
315
                            "maxmem": gnt_flavor.ram}
316
                event_time = datetime.datetime.now()
317
                backend_mod.process_op_status(
318
                    vm=vm, etime=event_time, jobid=-0,
319
                    opcode=opcode, status='success',
320
                    beparams=beparams,
321
                    logmsg='Reconciliation: simulated Ganeti event')
322
                # process_op_status with beparams will set the vmstate to
323
                # shutdown. Fix this be returning it to old state
324
                vm = VirtualMachine.objects.get(pk=id)
325
                vm.operstate = old_state
326
                vm.save()
327
            print >> sys.stderr, "    ...done"
123
        for backend in backends:
124
            r = reconciliation.BackendReconciler(backend=backend,
125
                                                 logger=logger,
126
                                                 options=options)
127
            r.reconcile()
b/snf-cyclades-app/synnefo/logic/reconciliation.py
1
#!/usr/bin/env python
2 1
# -*- coding: utf-8 -*-
3 2
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
3
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 4
#
6 5
# Redistribution and use in source and binary forms, with or
7 6
# without modification, are permitted provided that the following
......
56 55

  
57 56
"""
58 57

  
59
import logging
60
import sys
61
import itertools
62 58

  
63 59
from django.core.management import setup_environ
64 60
try:
......
69 65
setup_environ(settings)
70 66

  
71 67

  
68
import logging
69
import itertools
72 70
from datetime import datetime, timedelta
73
from collections import namedtuple
74 71

  
75
from synnefo.db.models import (VirtualMachine, NetworkInterface, Flavor,
72
from django.db import transaction
73
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
76 74
                               pooled_rapi_client)
75
from synnefo.logic import utils, backend as backend_mod
77 76
from synnefo.logic.rapi import GanetiApiError
78
from synnefo.logic.backend import get_instances
79
from synnefo.logic import utils
80

  
81 77

  
82
log = logging.getLogger()
78
logger = logging.getLogger()
79
logging.basicConfig()
83 80

  
84 81
try:
85 82
    CHECK_INTERVAL = settings.RECONCILIATION_CHECK_INTERVAL
......
87 84
    CHECK_INTERVAL = 60
88 85

  
89 86

  
90
def needs_reconciliation(vm):
91
    now = datetime.now()
92
    return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
93
           (now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
94

  
95
VMState = namedtuple("VMState", ["state", "cpu", "ram", "nics"])
96

  
97

  
98
def stale_servers_in_db(D, G):
99
    idD = set(D.keys())
100
    idG = set(G.keys())
101

  
102
    stale = set()
103
    for i in idD - idG:
104
        if D[i] == 'BUILD':
105
            vm = VirtualMachine.objects.get(id=i)
106
            if needs_reconciliation(vm):
107
                with pooled_rapi_client(vm) as c:
108
                    try:
109
                        job_status = c.GetJobStatus(vm.backendjobid)['status']
110
                        if job_status in ('queued', 'waiting', 'running'):
111
                            # Server is still building in Ganeti
112
                            continue
113
                        else:
114
                            c.GetInstance(utils.id_to_instance_name(i))
115
                            # Server has just been created in Ganeti
116
                            continue
117
                    except GanetiApiError:
118
                        stale.add(i)
87
class BackendReconciler(object):
88
    def __init__(self, backend, logger, options=None):
89
        self.backend = backend
90
        self.log = logger
91
        self.client = backend.get_client()
92
        if options is None:
93
            self.options = {}
119 94
        else:
120
            stale.add(i)
121

  
122
    return stale
123

  
124

  
125
def orphan_instances_in_ganeti(D, G):
126
    idD = set(D.keys())
127
    idG = set(G.keys())
128

  
129
    return idG - idD
130

  
131

  
132
def unsynced_operstate(D, G):
133
    unsynced = set()
134
    idD = set(D.keys())
135
    idG = set(G.keys())
136

  
137
    for i in idD & idG:
138
        dbstate = D[i].state
139
        gntstate = G[i].state
140
        vm_unsynced = (gntstate and dbstate != "STARTED") or\
141
            (not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
142
        if vm_unsynced:
143
            unsynced.add((i, dbstate, gntstate))
144
        if not gntstate and dbstate == 'BUILD':
145
            vm = VirtualMachine.objects.get(id=i)
146
            if needs_reconciliation(vm):
147
                with pooled_rapi_client(vm) as c:
148
                    try:
149
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
150
                        if job_info['status'] == 'success':
151
                            unsynced.add((i, dbstate, gntstate))
152
                    except GanetiApiError:
153
                        pass
154

  
155
    return unsynced
156

  
157

  
158
def unsynced_flavors(D, G):
159
    unsynced = set()
160
    idD = set(D.keys())
161
    idG = set(G.keys())
162

  
163
    for i in idD & idG:
164
        if D[i].ram != G[i].ram or D[i].cpu != G[i].cpu:
165
            db_flavor = VirtualMachine.objects.get(id=i).flavor
95
            self.options = options
96

  
97
    def close(self):
98
        self.backend.put_client(self.client)
99

  
100
    @transaction.commit_on_success
101
    def reconcile(self):
102
        log = self.log
103
        backend = self.backend
104
        log.debug("Reconciling backend %s", backend)
105

  
106
        self.db_servers = get_database_servers(backend)
107
        self.db_servers_keys = set(self.db_servers.keys())
108
        log.debug("Got servers info from database.")
109

  
110
        self.gnt_servers = get_ganeti_servers(backend)
111
        self.gnt_servers_keys = set(self.gnt_servers.keys())
112
        log.debug("Got servers info from Ganeti backend.")
113

  
114
        self.event_time = datetime.now()
115

  
116
        self.stale_servers = self.reconcile_stale_servers()
117
        self.orphan_servers = self.reconcile_orphan_servers()
118
        self.unsynced_servers = self.reconcile_unsynced_servers()
119
        self.close()
120

  
121
    def get_build_status(self, db_server):
122
        job = db_server.backendjobid
123
        if job is None:
124
            created = db_server.created
125
            # Job has not yet been enqueued.
126
            if self.event_time < created + timedelta(seconds=60):
127
                return "RUNNING"
128
            else:
129
                return "ERROR"
130
        else:
131
            updated = db_server.backendtime
132
            if self.event_time >= updated + timedelta(seconds=60):
133
                try:
134
                    job_info = self.client.GetJobStatus(job_id=job)
135
                    finalized = ["success", "error", "cancelled"]
136
                    if job_info["status"] == "error":
137
                        return "ERROR"
138
                    elif job_info["status"] not in finalized:
139
                        return "RUNNING"
140
                    else:
141
                        return "FINALIZED"
142
                except GanetiApiError:
143
                    return "ERROR"
144
            else:
145
                self.log.debug("Pending build for server '%s'", db_server.id)
146
                return "RUNNING"
147

  
148
    def reconcile_stale_servers(self):
149
        # Detect stale servers
150
        stale = []
151
        stale_keys = self.db_servers_keys - self.gnt_servers_keys
152
        for server_id in stale_keys:
153
            db_server = self.db_servers[server_id]
154
            if db_server.operstate == "BUILD":
155
                build_status = self.get_build_status(db_server)
156
                if build_status == "ERROR":
157
                    # Special handling of BUILD eerrors
158
                    self.reconcile_building_server(db_server)
159
                elif build_status != "RUNNING":
160
                    stale.append(server_id)
161
            else:
162
                stale.append(server_id)
163

  
164
        # Report them
165
        if stale:
166
            self.log.info("Found stale servers %s at backend %s",
167
                          ", ".join(map(str, stale)), self.backend)
168
        else:
169
            self.log.debug("No stale servers at backend %s", self.backend)
170

  
171
        # Fix them
172
        if stale and self.options["fix_stale"]:
173
            for server_id in stale:
174
                db_server = self.db_servers[server_id]
175
                backend_mod.process_op_status(
176
                    vm=db_server,
177
                    etime=self.event_time,
178
                    jobid=-0,
179
                    opcode='OP_INSTANCE_REMOVE', status='success',
180
                    logmsg='Reconciliation: simulated Ganeti event')
181
            self.log.debug("Simulated Ganeti removal for stale servers.")
182

  
183
    def reconcile_orphan_servers(self):
184
        orphans = self.gnt_servers_keys - self.db_servers_keys
185
        if orphans:
186
            self.log.info("Found orphan servers %s at backend %s",
187
                          ", ".join(map(str, orphans)), self.backend)
188
        else:
189
            self.log.debug("No orphan servers at backend %s", self.backend)
190

  
191
        if orphans and self.options["fix_orphans"]:
192
            for server_id in orphans:
193
                server_name = utils.id_to_instance_name(server_id)
194
                self.client.DeleteInstance(server_name)
195
            self.log.debug("Issued OP_INSTANCE_REMOVE for orphan servers.")
196

  
197
    def reconcile_unsynced_servers(self):
198
        #log = self.log
199
        for server_id in self.db_servers_keys & self.gnt_servers_keys:
200
            db_server = self.db_servers[server_id]
201
            gnt_server = self.gnt_servers[server_id]
202
            if db_server.operstate == "BUILD":
203
                build_status = self.get_build_status(db_server)
204
                if build_status == "RUNNING":
205
                    # Do not reconcile building VMs
206
                    continue
207
                elif build_status == "ERROR":
208
                    # Special handling of build errors
209
                    self.reconcile_building_server(db_server)
210
                    continue
211

  
212
            self.reconcile_unsynced_operstate(server_id, db_server,
213
                                              gnt_server)
214
            self.reconcile_unsynced_flavor(server_id, db_server,
215
                                           gnt_server)
216
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
217
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
218

  
219
    def reconcile_building_server(self, db_server):
220
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
221
                      db_server.id)
222
        if self.options["fix_unsynced"]:
223
            fix_opcode = "OP_INSTANCE_CREATE"
224
            backend_mod.process_op_status(
225
                vm=db_server,
226
                etime=self.event_time,
227
                jobid=-0,
228
                opcode=fix_opcode, status='error',
229
                logmsg='Reconciliation: simulated Ganeti event')
230
            self.log.debug("Simulated Ganeti error build event for"
231
                           " server '%s'", db_server.id)
232

  
233
    def reconcile_unsynced_operstate(self, server_id, db_server, gnt_server):
234
        if db_server.operstate != gnt_server["state"]:
235
            self.log.info("Server '%s' is '%s' in DB and '%s' in Ganeti.",
236
                          server_id, db_server.operstate, gnt_server["state"])
237
            if self.options["fix_unsynced"]:
238
                fix_opcode = \
239
                    "OP_INSTANCE_STARTUP" if gnt_server["state"] == "STARTED"\
240
                    else "OP_INSTANCE_SHUTDOWN"
241
                backend_mod.process_op_status(
242
                    vm=db_server,
243
                    etime=self.event_time,
244
                    jobid=-0,
245
                    opcode=fix_opcode, status='success',
246
                    logmsg='Reconciliation: simulated Ganeti event')
247
                self.log.debug("Simulated Ganeti state event for server '%s'",
248
                               server_id)
249

  
250
    def reconcile_unsynced_flavor(self, server_id, db_server, gnt_server):
251
        db_flavor = db_server.flavor
252
        gnt_flavor = gnt_server["flavor"]
253
        if (db_flavor.ram != gnt_flavor["ram"] or
254
            db_flavor.cpu != gnt_flavor["vcpus"]):
166 255
            try:
167 256
                gnt_flavor = Flavor.objects.get(
168
                                    ram=G[i].ram, cpu=G[i].cpu,
169
                                    disk=db_flavor.disk,
170
                                    disk_template=db_flavor.disk_template)
257
                    ram=gnt_flavor["ram"],
258
                    cpu=gnt_flavor["vcpus"],
259
                    disk=db_flavor.disk,
260
                    disk_template=db_flavor.disk_template)
171 261
            except Flavor.DoesNotExist:
172
                gnt_flavor = None
173
            unsynced.add((i, db_flavor, gnt_flavor))
174
    return unsynced
175

  
176

  
177
def instances_with_build_errors(D, G):
178
    failed = set()
179
    idD = set(D.keys())
180
    idG = set(G.keys())
181

  
182
    for i in idD & idG:
183
        if not G[i] and D[i] == 'BUILD':
184
            vm = VirtualMachine.objects.get(id=i)
185
            if not vm.backendjobid:  # VM has not been enqueued in the backend
186
                if datetime.now() > vm.created + timedelta(seconds=120):
187
                    # If a job has not been enqueued after 2 minutues, then
188
                    # it must be a stale entry..
189
                    failed.add(i)
190
            elif needs_reconciliation(vm):
191
                # Check time to avoid many rapi calls
192
                with pooled_rapi_client(vm) as c:
193
                    try:
194
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
195
                        if job_info['status'] == 'error':
196
                            failed.add(i)
197
                    except GanetiApiError:
198
                        failed.add(i)
199

  
200
    return failed
201

  
202

  
203
def get_servers_from_db(backends, with_nics=True):
204
    vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
205
    vm_info = vms.values_list("id", "operstate", "flavor__cpu", "flavor__ram")
206
    if with_nics:
207
        nics = NetworkInterface.objects.filter(machine__in=vms)\
208
                               .order_by("machine")\
209
                               .values_list("machine", "index", "mac", "ipv4",
210
                                            "network")
211
        vm_nics = {}
212
        for machine, machine_nics in itertools.groupby(nics,
213
                                                       lambda nic: nic[0]):
214
            vm_nics[machine] = {}
215
            for machine, index, mac, ipv4, network in machine_nics:
216
                nic = {'mac':      mac,
217
                       'network':  utils.id_to_network_name(network),
218
                       'ipv4':     ipv4 if ipv4 != '' else None
219
                       }
220
                vm_nics[machine][index] = nic
221
    servers = dict([(vm_id, VMState(state=state,
222
                                    cpu=cpu,
223
                                    ram=ram,
224
                                    nics=vm_nics.get(vm_id, [])))
225
                    for vm_id, state, cpu, ram in vm_info])
226
    return servers
227

  
228

  
229
def get_instances_from_ganeti(backends):
230
    instances = []
231
    for backend in backends:
232
        instances.append(get_instances(backend))
233
    ganeti_instances = reduce(list.__add__, instances, [])
234
    snf_instances = {}
235

  
236
    prefix = settings.BACKEND_PREFIX_ID
237
    for i in ganeti_instances:
238
        if i['name'].startswith(prefix):
239
            try:
240
                id = utils.id_from_instance_name(i['name'])
241
            except Exception:
242
                log.error("Ignoring instance with malformed name %s",
243
                          i['name'])
244
                continue
245

  
246
            if id in snf_instances:
247
                log.error("Ignoring instance with duplicate Synnefo id %s",
248
                          i['name'])
249
                continue
250

  
251
            nics = get_nics_from_instance(i)
252
            beparams = i["beparams"]
253
            vcpus = beparams["vcpus"]
254
            ram = beparams["maxmem"]
255
            snf_instances[id] = VMState(state=i["oper_state"],
256
                                        cpu=vcpus,
257
                                        ram=ram,
258
                                        nics=nics)
259

  
260
    return snf_instances
262
                self.log.warning("Server '%s' has unknown flavor.", server_id)
263
                return
264

  
265
            self.log.info("Server '%s' has flavor '%' in DB and '%s' in"
266
                          " Ganeti", server_id, db_flavor, gnt_flavor)
267
            if self.options["fix_unsynced_flavors"]:
268
                old_state = db_server.operstate
269
                opcode = "OP_INSTANCE_SET_PARAMS"
270
                beparams = {"vcpus": gnt_flavor.cpu,
271
                            "minmem": gnt_flavor.ram,
272
                            "maxmem": gnt_flavor.ram}
273
                backend_mod.process_op_status(
274
                    vm=db_server, etime=self.event_time, jobid=-0,
275
                    opcode=opcode, status='success',
276
                    beparams=beparams,
277
                    logmsg='Reconciliation: simulated Ganeti event')
278
                # process_op_status with beparams will set the vmstate to
279
                # shutdown. Fix this be returning it to old state
280
                vm = VirtualMachine.objects.get(pk=server_id)
281
                vm.operstate = old_state
282
                vm.save()
283
                self.log.debug("Simulated Ganeti flavor event for server '%s'",
284
                               server_id)
285

  
286
    def reconcile_unsynced_nics(self, server_id, db_server, gnt_server):
287
        db_nics = db_server.nics.order_by("index")
288
        gnt_nics = gnt_server["nics"]
289
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
290
        if backend_mod.nics_changed(db_nics, gnt_nics_parsed):
291
            msg = "Found unsynced NICs for server '%s'.\n\t"\
292
                  "DB: %s\n\tGaneti: %s"
293
            db_nics_str = ", ".join(map(format_db_nic, db_nics))
294
            gnt_nics_str = ", ".join(map(format_gnt_nic, gnt_nics_parsed))
295
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
296
            if self.options["fix_unsynced_nics"]:
297
                backend_mod.process_net_status(vm=db_server,
298
                                               etime=self.event_time,
299
                                               nics=gnt_nics)
300

  
301
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
302
        pass
303

  
304

  
305
def format_db_nic(nic):
306
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
307
           nic.ipv4, nic.network_id, nic.mac, nic.firewall_profile)
308

  
309

  
310
def format_gnt_nic(nic):
311
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" %\
312
           (nic["index"], nic["ipv4"], nic["network"], nic["mac"],
313
            nic["firewall_profile"])
261 314

  
262 315

  
263 316
#
264
# Nics
265
#
266
def get_nics_from_ganeti(backends):
267
    """Get network interfaces for each ganeti instance.
268

  
269
    """
270
    instances = []
271
    for backend in backends:
272
        instances.append(get_instances(backend))
273
    instances = reduce(list.__add__, instances, [])
274
    prefix = settings.BACKEND_PREFIX_ID
275

  
276
    snf_instances_nics = {}
277
    for i in instances:
278
        if i['name'].startswith(prefix):
279
            try:
280
                id = utils.id_from_instance_name(i['name'])
281
            except Exception:
282
                log.error("Ignoring instance with malformed name %s",
283
                          i['name'])
284
                continue
285
            if id in snf_instances_nics:
286
                log.error("Ignoring instance with duplicate Synnefo id %s",
287
                          i['name'])
288
                continue
289

  
290
            snf_instances_nics[id] = get_nics_from_instance(i)
291

  
292
    return snf_instances_nics
293

  
294

  
295
def get_nics_from_instance(i):
296
    ips = zip(itertools.repeat('ipv4'), i['nic.ips'])
297
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
298
    networks = zip(itertools.repeat('network'), i['nic.networks'])
299
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
300
    # links = zip(itertools.repeat('link'), i['nic.links'])
301
    # nics = zip(ips,macs,modes,networks,links)
302
    nics = zip(ips, macs, networks)
303
    nics = map(lambda x: dict(x), nics)
304
    nics = dict(enumerate(nics))
305
    return nics
306

  
307

  
308
def unsynced_nics(DBVMs, GanetiVMs):
309
    """Find unsynced network interfaces between DB and Ganeti.
310

  
311
    @ rtype: dict; {instance_id: ganeti_nics}
312
    @ return Dictionary containing the instances ids that have unsynced network
313
    interfaces between DB and Ganeti and the network interfaces in Ganeti.
314

  
315
    """
316
    idD = set(DBVMs.keys())
317
    idG = set(GanetiVMs.keys())
318

  
319
    unsynced = {}
320
    for i in idD & idG:
321
        nicsD = DBVMs[i].nics
322
        nicsG = GanetiVMs[i].nics
323
        if len(nicsD) != len(nicsG):
324
            unsynced[i] = (nicsD, nicsG)
325
            continue
326
        for index in nicsG.keys():
327
            nicD = nicsD[index]
328
            nicG = nicsG[index]
329
            diff = (nicD['ipv4'] != nicG['ipv4'] or
330
                    nicD['mac'] != nicG['mac'] or
331
                    nicD['network'] != nicG['network'])
332
            if diff:
333
                    unsynced[i] = (nicsD, nicsG)
334
                    break
335

  
336
    return unsynced
337

  
338
#
339 317
# Networks
340 318
#
341 319

  
......
375 353
    return hanging
376 354

  
377 355

  
378
# Only for testing this module individually
379
def main():
380
    print get_instances_from_ganeti()
356
def get_online_backends():
357
    return Backend.objects.filter(offline=False)
358

  
359

  
360
def get_database_servers(backend):
361
    servers = backend.virtual_machines.select_related("nics", "flavor")\
362
                                      .filter(deleted=False)
363
    return dict([(s.id, s) for s in servers])
364

  
365

  
366
def get_ganeti_servers(backend):
367
    gnt_instances = backend_mod.get_instances(backend)
368
    # Filter out non-synnefo instances
369
    snf_backend_prefix = settings.BACKEND_PREFIX_ID
370
    gnt_instances = filter(lambda i: i["name"].startswith(snf_backend_prefix),
371
                           gnt_instances)
372
    gnt_instances = map(parse_gnt_instance, gnt_instances)
373
    return dict([(i["id"], i) for i in gnt_instances if i["id"] is not None])
374

  
375

  
376
def parse_gnt_instance(instance):
377
    try:
378
        instance_id = utils.id_from_instance_name(instance['name'])
379
    except Exception:
380
        logger.error("Ignoring instance with malformed name %s",
381
                     instance['name'])
382
        return (None, None)
383

  
384
    beparams = instance["beparams"]
385

  
386
    vcpus = beparams["vcpus"]
387
    ram = beparams["maxmem"]
388
    state = instance["oper_state"] and "STARTED" or "STOPPED"
389

  
390
    return {
391
        "id": instance_id,
392
        "state": state,  # FIX
393
        "updated": datetime.fromtimestamp(instance["mtime"]),
394
        "disks": disks_from_instance(instance),
395
        "nics": nics_from_instance(instance),
396
        "flavor": {"vcpus": vcpus,
397
                   "ram": ram},
398
        "tags": instance["tags"]
399
    }
400

  
401

  
402
def nics_from_instance(i):
403
    ips = zip(itertools.repeat('ip'), i['nic.ips'])
404
    macs = zip(itertools.repeat('mac'), i['nic.macs'])
405
    networks = zip(itertools.repeat('network'), i['nic.networks'])
406
    # modes = zip(itertools.repeat('mode'), i['nic.modes'])
407
    # links = zip(itertools.repeat('link'), i['nic.links'])
408
    # nics = zip(ips,macs,modes,networks,links)
409
    nics = zip(ips, macs, networks)
410
    nics = map(lambda x: dict(x), nics)
411
    #nics = dict(enumerate(nics))
412
    tags = i["tags"]
413
    for tag in tags:
414
        t = tag.split(":")
415
        if t[0:2] == ["synnefo", "network"]:
416
            if len(t) != 4:
417
                logger.error("Malformed synefo tag %s", tag)
418
                continue
419
            try:
420
                index = int(t[2])
421
                nics[index]['firewall'] = t[3]
422
            except ValueError:
423
                logger.error("Malformed synnefo tag %s", tag)
424
            except IndexError:
425
                logger.error("Found tag %s for non-existent NIC %d",
426
                             tag, index)
427
    return nics
381 428

  
382 429

  
383
if __name__ == "__main__":
384
    sys.exit(main())
430
def disks_from_instance(i):
431
    return dict([(index, {"size": size})
432
                 for index, size in enumerate(i["disk.sizes"])])

Also available in: Unified diff