Revision 7ab30015

b/snf-cyclades-app/synnefo/logic/management/commands/reconcile-servers.py
123 123
        else:
124 124
            backends = Backend.objects.filter(offline=False)
125 125

  
126
        D = reconciliation.get_servers_from_db(backends)
127
        G, GNics = reconciliation.get_instances_from_ganeti(backends)
126
        with_nics = options["detect_unsynced_nics"]
128 127

  
129
        DBNics = reconciliation.get_nics_from_db(backends)
128
        DBVMs = reconciliation.get_servers_from_db(backend, with_nics)
129
        GanetiVMs = reconciliation.get_instances_from_ganeti(backend)
130 130

  
131 131
        #
132 132
        # Detect problems
133 133
        #
134 134
        if options['detect_stale']:
135
            stale = reconciliation.stale_servers_in_db(D, G)
135
            stale = reconciliation.stale_servers_in_db(DBVMs, GanetiVMs)
136 136
            if len(stale) > 0:
137 137
                print >> sys.stderr, "Found the following stale server IDs: "
138 138
                print "    " + "\n    ".join(
......
141 141
                print >> sys.stderr, "Found no stale server IDs in DB."
142 142

  
143 143
        if options['detect_orphans']:
144
            orphans = reconciliation.orphan_instances_in_ganeti(D, G)
144
            orphans = reconciliation.orphan_instances_in_ganeti(DBVMs,
145
                                                                GanetiVMs)
145 146
            if len(orphans) > 0:
146 147
                print >> sys.stderr, "Found orphan Ganeti instances with IDs: "
147 148
                print "    " + "\n    ".join(
......
150 151
                print >> sys.stderr, "Found no orphan Ganeti instances."
151 152

  
152 153
        if options['detect_unsynced']:
153
            unsynced = reconciliation.unsynced_operstate(D, G)
154
            unsynced = reconciliation.unsynced_operstate(DBVMs, GanetiVMs)
154 155
            if len(unsynced) > 0:
155 156
                print >> sys.stderr, "The operstate of the following server" \
156 157
                                     " IDs is out-of-sync:"
......
162 163
                print >> sys.stderr, "The operstate of all servers is in sync."
163 164

  
164 165
        if options['detect_build_errors']:
165
            build_errors = reconciliation.instances_with_build_errors(D, G)
166
            build_errors = reconciliation.\
167
                instances_with_build_errors(DBVMs, GanetiVMs)
166 168
            if len(build_errors) > 0:
167 169
                msg = "The os for the following server IDs was not build"\
168 170
                      " successfully:"
......
181 183
                          ': MAC: %s, IP: %s, Network: %s' % \
182 184
                          (info['mac'], info['ipv4'], info['network'])
183 185

  
184
            unsynced_nics = reconciliation.unsynced_nics(DBNics, GNics)
186
            unsynced_nics = reconciliation.unsynced_nics(DBVMs, GanetiVMs)
185 187
            if len(unsynced_nics) > 0:
186 188
                msg = "The NICs of the servers with the following IDs are"\
187 189
                      " unsynced:"
b/snf-cyclades-app/synnefo/logic/reconciliation.py
70 70

  
71 71

  
72 72
from datetime import datetime, timedelta
73
from collections import namedtuple
73 74

  
74
from synnefo.db.models import (VirtualMachine, pooled_rapi_client)
75
from synnefo.db.models import (VirtualMachine, NetworkInterface,
76
                               pooled_rapi_client)
75 77
from synnefo.logic.rapi import GanetiApiError
76 78
from synnefo.logic.backend import get_instances
77 79
from synnefo.logic import utils
......
90 92
    return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
91 93
           (now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
92 94

  
95
VMState = namedtuple("VMState", ["state", "nics"])
96

  
93 97

  
94 98
def stale_servers_in_db(D, G):
95 99
    idD = set(D.keys())
......
131 135
    idG = set(G.keys())
132 136

  
133 137
    for i in idD & idG:
134
        vm_unsynced = (G[i] and D[i] != "STARTED") or\
135
                      (not G[i] and D[i] not in ('BUILD', 'ERROR', 'STOPPED'))
138
        dbstate = D[i].state
139
        gntstate = G[i].state
140
        vm_unsynced = (gntstate and dbstate != "STARTED") or\
141
            (not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
136 142
        if vm_unsynced:
137
            unsynced.add((i, D[i], G[i]))
138
        if not G[i] and D[i] == 'BUILD':
143
            unsynced.add((i, dbstate, gntstate))
144
        if not gntstate and dbstate == 'BUILD':
139 145
            vm = VirtualMachine.objects.get(id=i)
140 146
            if needs_reconciliation(vm):
141 147
                with pooled_rapi_client(vm) as c:
142 148
                    try:
143 149
                        job_info = c.GetJobStatus(job_id=vm.backendjobid)
144 150
                        if job_info['status'] == 'success':
145
                            unsynced.add((i, D[i], G[i]))
151
                            unsynced.add((i, dbstate, gntstate))
146 152
                    except GanetiApiError:
147 153
                        pass
148 154

  
......
175 181
    return failed
176 182

  
177 183

  
178
def get_servers_from_db(backends):
184
def get_servers_from_db(backends, with_nics=True):
179 185
    vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
180
    return dict(map(lambda x: (x.id, x.operstate), vms))
186
    vm_info = vms.values_list("id", "operstate")
187
    if with_nics:
188
        nics = NetworkInterface.objects.filter(machine__in=vms)\
189
                               .order_by("machine")\
190
                               .values_list("machine", "index", "mac", "ipv4",
191
                                            "network")
192
        vm_nics = {}
193
        for machine, machine_nics in itertools.groupby(nics,
194
                                                       lambda nic: nic[0]):
195
            vm_nics[machine] = {}
196
            for machine, index, mac, ipv4, network in machine_nics:
197
                nic = {'mac':      mac,
198
                       'network':  utils.id_to_network_name(network),
199
                       'ipv4':     ipv4 if ipv4 != '' else None
200
                       }
201
                vm_nics[machine][index] = nic
202
    servers = dict([(vm_id, VMState(state=state, nics=vm_nics.get(vm_id, [])))
203
                    for vm_id, state in vm_info])
204
    return servers
181 205

  
182 206

  
183 207
def get_instances_from_ganeti(backends):
......
186 210
        instances.append(get_instances(backend))
187 211
    ganeti_instances = reduce(list.__add__, instances, [])
188 212
    snf_instances = {}
189
    snf_nics = {}
190 213

  
191 214
    prefix = settings.BACKEND_PREFIX_ID
192 215
    for i in ganeti_instances:
......
203 226
                          i['name'])
204 227
                continue
205 228

  
206
            snf_instances[id] = i['oper_state']
207
            snf_nics[id] = get_nics_from_instance(i)
229
            nics = get_nics_from_instance(i)
230
            snf_instances[id] = VMState(state=i["oper_state"],
231
                                        nics=nics)
208 232

  
209
    return snf_instances, snf_nics
233
    return snf_instances
210 234

  
211 235

  
212 236
#
......
254 278
    return nics
255 279

  
256 280

  
257
def get_nics_from_db(backends):
258
    """Get network interfaces for each vm in DB.
259

  
260
    """
261
    instances = VirtualMachine.objects.filter(deleted=False,
262
                                              backend__in=backends)
263
    instances_nics = {}
264
    for instance in instances:
265
        nics = {}
266
        for n in instance.nics.all():
267
            ipv4 = n.ipv4
268
            nic = {'mac':      n.mac,
269
                   'network':  n.network.backend_id,
270
                   'ipv4':     ipv4 if ipv4 != '' else None
271
                   }
272
            nics[n.index] = nic
273
        instances_nics[instance.id] = nics
274
    return instances_nics
275

  
276

  
277
def unsynced_nics(DBNics, GNics):
281
def unsynced_nics(DBVMs, GanetiVMs):
278 282
    """Find unsynced network interfaces between DB and Ganeti.
279 283

  
280 284
    @ rtype: dict; {instance_id: ganeti_nics}
......
282 286
    interfaces between DB and Ganeti and the network interfaces in Ganeti.
283 287

  
284 288
    """
285
    idD = set(DBNics.keys())
286
    idG = set(GNics.keys())
289
    idD = set(DBVMs.keys())
290
    idG = set(GanetiVMs.keys())
287 291

  
288 292
    unsynced = {}
289 293
    for i in idD & idG:
290
        nicsD = DBNics[i]
291
        nicsG = GNics[i]
294
        nicsD = DBVMs[i].nics
295
        nicsG = GanetiVMs[i].nics
292 296
        if len(nicsD) != len(nicsG):
293 297
            unsynced[i] = (nicsD, nicsG)
294 298
            continue
b/snf-cyclades-app/synnefo/logic/tests.py
610 610
            self.assertEqual(vm.buildpercentage, old)
611 611

  
612 612

  
613
from synnefo.logic.reconciliation import VMState
613 614
class ReconciliationTest(TestCase):
614
    SERVERS = 1000
615
    fixtures = ['db_test_data']
615
    def get_vm(self, operstate, deleted=False):
616
        flavor = mfactory.FlavorFactory(cpu=2, ram=1024)
617
        vm = mfactory.VirtualMachineFactory(deleted=deleted, flavor=flavor)
618
        vm.operstate = operstate
619
        vm.save()
620
        return vm
616 621

  
617 622
    def test_get_servers_from_db(self):
618 623
        """Test getting a dictionary from each server to its operstate"""
619 624
        backend = 30000
620
        self.assertEquals(reconciliation.get_servers_from_db(backends=[backend]),
621
                          {30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'})
625
        vm1 = self.get_vm('STARTED')
626
        vm2 = self.get_vm('DESTROYED', deleted=True)
627
        vm3 = self.get_vm('STOPPED')
628
        self.assertEquals(reconciliation.get_servers_from_db(),
629
                    {vm1.id: VMState(state='STARTED', cpu=2, ram=1024, nics=[]),
630
                     vm3.id: VMState(state='STOPPED', cpu=2, ram=1024, nics=[])}
631
                    )
622 632

  
623 633
    def test_stale_servers_in_db(self):
624 634
        """Test discovery of stale entries in DB"""
625 635

  
626
        D = {1: 'STARTED', 2: 'STOPPED', 3: 'STARTED', 30000: 'BUILD',
627
             30002: 'STOPPED'}
636
        D = {1: None, 2: 'None', 3: None, 30000: 'BUILD',
637
             30002: 'None'}
628 638
        G = {1: True, 3: True, 30000: True}
629 639
        self.assertEquals(reconciliation.stale_servers_in_db(D, G),
630 640
                          set([2, 30002]))
......
666 676

  
667 677
    def test_unsynced_operstate(self):
668 678
        """Test discovery of unsynced operstate between the DB and Ganeti"""
669

  
670
        G = {1: True, 2: False, 3: True, 4: False, 50: True}
671
        D = {1: 'STARTED', 2: 'STARTED', 3: 'BUILD', 4: 'STARTED', 50: 'BUILD'}
679
        mkstate = lambda state: VMState(state=state, cpu=1, ram=1024, nics=[])
680
        vm1 = self.get_vm("STARTED")
681
        vm2 = self.get_vm("STARTED")
682
        vm3= self.get_vm("BUILD")
683
        vm4 = self.get_vm("STARTED")
684
        vm5 = self.get_vm("BUILD")
685

  
686
        D = {1: mkstate("STARTED"), 2: mkstate("STARTED"), 3: mkstate("BUILD"),
687
             4: mkstate("STARTED"), 50: mkstate("BUILD")}
688
        G = {vm1.id: mkstate(True), vm2.id: mkstate(False),
689
             vm4.id: mkstate(True), vm4.id: mkstate(False),
690
             vm5.id: mkstate(False)}
672 691
        self.assertEquals(reconciliation.unsynced_operstate(D, G),
673
                          set([(2, 'STARTED', False),
674
                               (3, 'BUILD', True), (4, 'STARTED', False),
675
                               (50, 'BUILD', True)]))
692
                          set([(vm2.id, "STARTED", False),
693
                               (vm4.id, "STARTED", False)]))
676 694

  
677 695
from synnefo.logic.test.rapi_pool_tests import *
678 696
from synnefo.logic.test.utils_tests import *

Also available in: Unified diff