Revision 7ab30015
b/snf-cyclades-app/synnefo/logic/management/commands/reconcile-servers.py | ||
---|---|---|
123 | 123 |
else: |
124 | 124 |
backends = Backend.objects.filter(offline=False) |
125 | 125 |
|
126 |
D = reconciliation.get_servers_from_db(backends) |
|
127 |
G, GNics = reconciliation.get_instances_from_ganeti(backends) |
|
126 |
with_nics = options["detect_unsynced_nics"] |
|
128 | 127 |
|
129 |
DBNics = reconciliation.get_nics_from_db(backends) |
|
128 |
DBVMs = reconciliation.get_servers_from_db(backend, with_nics) |
|
129 |
GanetiVMs = reconciliation.get_instances_from_ganeti(backend) |
|
130 | 130 |
|
131 | 131 |
# |
132 | 132 |
# Detect problems |
133 | 133 |
# |
134 | 134 |
if options['detect_stale']: |
135 |
stale = reconciliation.stale_servers_in_db(D, G)
|
|
135 |
stale = reconciliation.stale_servers_in_db(DBVMs, GanetiVMs)
|
|
136 | 136 |
if len(stale) > 0: |
137 | 137 |
print >> sys.stderr, "Found the following stale server IDs: " |
138 | 138 |
print " " + "\n ".join( |
... | ... | |
141 | 141 |
print >> sys.stderr, "Found no stale server IDs in DB." |
142 | 142 |
|
143 | 143 |
if options['detect_orphans']: |
144 |
orphans = reconciliation.orphan_instances_in_ganeti(D, G) |
|
144 |
orphans = reconciliation.orphan_instances_in_ganeti(DBVMs, |
|
145 |
GanetiVMs) |
|
145 | 146 |
if len(orphans) > 0: |
146 | 147 |
print >> sys.stderr, "Found orphan Ganeti instances with IDs: " |
147 | 148 |
print " " + "\n ".join( |
... | ... | |
150 | 151 |
print >> sys.stderr, "Found no orphan Ganeti instances." |
151 | 152 |
|
152 | 153 |
if options['detect_unsynced']: |
153 |
unsynced = reconciliation.unsynced_operstate(D, G)
|
|
154 |
unsynced = reconciliation.unsynced_operstate(DBVMs, GanetiVMs)
|
|
154 | 155 |
if len(unsynced) > 0: |
155 | 156 |
print >> sys.stderr, "The operstate of the following server" \ |
156 | 157 |
" IDs is out-of-sync:" |
... | ... | |
162 | 163 |
print >> sys.stderr, "The operstate of all servers is in sync." |
163 | 164 |
|
164 | 165 |
if options['detect_build_errors']: |
165 |
build_errors = reconciliation.instances_with_build_errors(D, G) |
|
166 |
build_errors = reconciliation.\ |
|
167 |
instances_with_build_errors(DBVMs, GanetiVMs) |
|
166 | 168 |
if len(build_errors) > 0: |
167 | 169 |
msg = "The os for the following server IDs was not build"\ |
168 | 170 |
" successfully:" |
... | ... | |
181 | 183 |
': MAC: %s, IP: %s, Network: %s' % \ |
182 | 184 |
(info['mac'], info['ipv4'], info['network']) |
183 | 185 |
|
184 |
unsynced_nics = reconciliation.unsynced_nics(DBNics, GNics)
|
|
186 |
unsynced_nics = reconciliation.unsynced_nics(DBVMs, GanetiVMs)
|
|
185 | 187 |
if len(unsynced_nics) > 0: |
186 | 188 |
msg = "The NICs of the servers with the following IDs are"\ |
187 | 189 |
" unsynced:" |
b/snf-cyclades-app/synnefo/logic/reconciliation.py | ||
---|---|---|
70 | 70 |
|
71 | 71 |
|
72 | 72 |
from datetime import datetime, timedelta |
73 |
from collections import namedtuple |
|
73 | 74 |
|
74 |
from synnefo.db.models import (VirtualMachine, pooled_rapi_client) |
|
75 |
from synnefo.db.models import (VirtualMachine, NetworkInterface, |
|
76 |
pooled_rapi_client) |
|
75 | 77 |
from synnefo.logic.rapi import GanetiApiError |
76 | 78 |
from synnefo.logic.backend import get_instances |
77 | 79 |
from synnefo.logic import utils |
... | ... | |
90 | 92 |
return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\ |
91 | 93 |
(now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL)) |
92 | 94 |
|
95 |
VMState = namedtuple("VMState", ["state", "nics"]) |
|
96 |
|
|
93 | 97 |
|
94 | 98 |
def stale_servers_in_db(D, G): |
95 | 99 |
idD = set(D.keys()) |
... | ... | |
131 | 135 |
idG = set(G.keys()) |
132 | 136 |
|
133 | 137 |
for i in idD & idG: |
134 |
vm_unsynced = (G[i] and D[i] != "STARTED") or\ |
|
135 |
(not G[i] and D[i] not in ('BUILD', 'ERROR', 'STOPPED')) |
|
138 |
dbstate = D[i].state |
|
139 |
gntstate = G[i].state |
|
140 |
vm_unsynced = (gntstate and dbstate != "STARTED") or\ |
|
141 |
(not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED')) |
|
136 | 142 |
if vm_unsynced: |
137 |
unsynced.add((i, D[i], G[i]))
|
|
138 |
if not G[i] and D[i] == 'BUILD':
|
|
143 |
unsynced.add((i, dbstate, gntstate))
|
|
144 |
if not gntstate and dbstate == 'BUILD':
|
|
139 | 145 |
vm = VirtualMachine.objects.get(id=i) |
140 | 146 |
if needs_reconciliation(vm): |
141 | 147 |
with pooled_rapi_client(vm) as c: |
142 | 148 |
try: |
143 | 149 |
job_info = c.GetJobStatus(job_id=vm.backendjobid) |
144 | 150 |
if job_info['status'] == 'success': |
145 |
unsynced.add((i, D[i], G[i]))
|
|
151 |
unsynced.add((i, dbstate, gntstate))
|
|
146 | 152 |
except GanetiApiError: |
147 | 153 |
pass |
148 | 154 |
|
... | ... | |
175 | 181 |
return failed |
176 | 182 |
|
177 | 183 |
|
178 |
def get_servers_from_db(backends): |
|
184 |
def get_servers_from_db(backends, with_nics=True):
|
|
179 | 185 |
vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends) |
180 |
return dict(map(lambda x: (x.id, x.operstate), vms)) |
|
186 |
vm_info = vms.values_list("id", "operstate") |
|
187 |
if with_nics: |
|
188 |
nics = NetworkInterface.objects.filter(machine__in=vms)\ |
|
189 |
.order_by("machine")\ |
|
190 |
.values_list("machine", "index", "mac", "ipv4", |
|
191 |
"network") |
|
192 |
vm_nics = {} |
|
193 |
for machine, machine_nics in itertools.groupby(nics, |
|
194 |
lambda nic: nic[0]): |
|
195 |
vm_nics[machine] = {} |
|
196 |
for machine, index, mac, ipv4, network in machine_nics: |
|
197 |
nic = {'mac': mac, |
|
198 |
'network': utils.id_to_network_name(network), |
|
199 |
'ipv4': ipv4 if ipv4 != '' else None |
|
200 |
} |
|
201 |
vm_nics[machine][index] = nic |
|
202 |
servers = dict([(vm_id, VMState(state=state, nics=vm_nics.get(vm_id, []))) |
|
203 |
for vm_id, state in vm_info]) |
|
204 |
return servers |
|
181 | 205 |
|
182 | 206 |
|
183 | 207 |
def get_instances_from_ganeti(backends): |
... | ... | |
186 | 210 |
instances.append(get_instances(backend)) |
187 | 211 |
ganeti_instances = reduce(list.__add__, instances, []) |
188 | 212 |
snf_instances = {} |
189 |
snf_nics = {} |
|
190 | 213 |
|
191 | 214 |
prefix = settings.BACKEND_PREFIX_ID |
192 | 215 |
for i in ganeti_instances: |
... | ... | |
203 | 226 |
i['name']) |
204 | 227 |
continue |
205 | 228 |
|
206 |
snf_instances[id] = i['oper_state'] |
|
207 |
snf_nics[id] = get_nics_from_instance(i) |
|
229 |
nics = get_nics_from_instance(i) |
|
230 |
snf_instances[id] = VMState(state=i["oper_state"], |
|
231 |
nics=nics) |
|
208 | 232 |
|
209 |
return snf_instances, snf_nics
|
|
233 |
return snf_instances |
|
210 | 234 |
|
211 | 235 |
|
212 | 236 |
# |
... | ... | |
254 | 278 |
return nics |
255 | 279 |
|
256 | 280 |
|
257 |
def get_nics_from_db(backends): |
|
258 |
"""Get network interfaces for each vm in DB. |
|
259 |
|
|
260 |
""" |
|
261 |
instances = VirtualMachine.objects.filter(deleted=False, |
|
262 |
backend__in=backends) |
|
263 |
instances_nics = {} |
|
264 |
for instance in instances: |
|
265 |
nics = {} |
|
266 |
for n in instance.nics.all(): |
|
267 |
ipv4 = n.ipv4 |
|
268 |
nic = {'mac': n.mac, |
|
269 |
'network': n.network.backend_id, |
|
270 |
'ipv4': ipv4 if ipv4 != '' else None |
|
271 |
} |
|
272 |
nics[n.index] = nic |
|
273 |
instances_nics[instance.id] = nics |
|
274 |
return instances_nics |
|
275 |
|
|
276 |
|
|
277 |
def unsynced_nics(DBNics, GNics): |
|
281 |
def unsynced_nics(DBVMs, GanetiVMs): |
|
278 | 282 |
"""Find unsynced network interfaces between DB and Ganeti. |
279 | 283 |
|
280 | 284 |
@ rtype: dict; {instance_id: ganeti_nics} |
... | ... | |
282 | 286 |
interfaces between DB and Ganeti and the network interfaces in Ganeti. |
283 | 287 |
|
284 | 288 |
""" |
285 |
idD = set(DBNics.keys())
|
|
286 |
idG = set(GNics.keys())
|
|
289 |
idD = set(DBVMs.keys())
|
|
290 |
idG = set(GanetiVMs.keys())
|
|
287 | 291 |
|
288 | 292 |
unsynced = {} |
289 | 293 |
for i in idD & idG: |
290 |
nicsD = DBNics[i]
|
|
291 |
nicsG = GNics[i]
|
|
294 |
nicsD = DBVMs[i].nics
|
|
295 |
nicsG = GanetiVMs[i].nics
|
|
292 | 296 |
if len(nicsD) != len(nicsG): |
293 | 297 |
unsynced[i] = (nicsD, nicsG) |
294 | 298 |
continue |
b/snf-cyclades-app/synnefo/logic/tests.py | ||
---|---|---|
610 | 610 |
self.assertEqual(vm.buildpercentage, old) |
611 | 611 |
|
612 | 612 |
|
613 |
from synnefo.logic.reconciliation import VMState |
|
613 | 614 |
class ReconciliationTest(TestCase): |
614 |
SERVERS = 1000 |
|
615 |
fixtures = ['db_test_data'] |
|
615 |
def get_vm(self, operstate, deleted=False): |
|
616 |
flavor = mfactory.FlavorFactory(cpu=2, ram=1024) |
|
617 |
vm = mfactory.VirtualMachineFactory(deleted=deleted, flavor=flavor) |
|
618 |
vm.operstate = operstate |
|
619 |
vm.save() |
|
620 |
return vm |
|
616 | 621 |
|
617 | 622 |
def test_get_servers_from_db(self): |
618 | 623 |
"""Test getting a dictionary from each server to its operstate""" |
619 | 624 |
backend = 30000 |
620 |
self.assertEquals(reconciliation.get_servers_from_db(backends=[backend]), |
|
621 |
{30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'}) |
|
625 |
vm1 = self.get_vm('STARTED') |
|
626 |
vm2 = self.get_vm('DESTROYED', deleted=True) |
|
627 |
vm3 = self.get_vm('STOPPED') |
|
628 |
self.assertEquals(reconciliation.get_servers_from_db(), |
|
629 |
{vm1.id: VMState(state='STARTED', cpu=2, ram=1024, nics=[]), |
|
630 |
vm3.id: VMState(state='STOPPED', cpu=2, ram=1024, nics=[])} |
|
631 |
) |
|
622 | 632 |
|
623 | 633 |
def test_stale_servers_in_db(self): |
624 | 634 |
"""Test discovery of stale entries in DB""" |
625 | 635 |
|
626 |
D = {1: 'STARTED', 2: 'STOPPED', 3: 'STARTED', 30000: 'BUILD',
|
|
627 |
30002: 'STOPPED'}
|
|
636 |
D = {1: None, 2: 'None', 3: None, 30000: 'BUILD',
|
|
637 |
30002: 'None'}
|
|
628 | 638 |
G = {1: True, 3: True, 30000: True} |
629 | 639 |
self.assertEquals(reconciliation.stale_servers_in_db(D, G), |
630 | 640 |
set([2, 30002])) |
... | ... | |
666 | 676 |
|
667 | 677 |
def test_unsynced_operstate(self): |
668 | 678 |
"""Test discovery of unsynced operstate between the DB and Ganeti""" |
669 |
|
|
670 |
G = {1: True, 2: False, 3: True, 4: False, 50: True} |
|
671 |
D = {1: 'STARTED', 2: 'STARTED', 3: 'BUILD', 4: 'STARTED', 50: 'BUILD'} |
|
679 |
mkstate = lambda state: VMState(state=state, cpu=1, ram=1024, nics=[]) |
|
680 |
vm1 = self.get_vm("STARTED") |
|
681 |
vm2 = self.get_vm("STARTED") |
|
682 |
vm3= self.get_vm("BUILD") |
|
683 |
vm4 = self.get_vm("STARTED") |
|
684 |
vm5 = self.get_vm("BUILD") |
|
685 |
|
|
686 |
D = {1: mkstate("STARTED"), 2: mkstate("STARTED"), 3: mkstate("BUILD"), |
|
687 |
4: mkstate("STARTED"), 50: mkstate("BUILD")} |
|
688 |
G = {vm1.id: mkstate(True), vm2.id: mkstate(False), |
|
689 |
vm4.id: mkstate(True), vm4.id: mkstate(False), |
|
690 |
vm5.id: mkstate(False)} |
|
672 | 691 |
self.assertEquals(reconciliation.unsynced_operstate(D, G), |
673 |
set([(2, 'STARTED', False), |
|
674 |
(3, 'BUILD', True), (4, 'STARTED', False), |
|
675 |
(50, 'BUILD', True)])) |
|
692 |
set([(vm2.id, "STARTED", False), |
|
693 |
(vm4.id, "STARTED", False)])) |
|
676 | 694 |
|
677 | 695 |
from synnefo.logic.test.rapi_pool_tests import * |
678 | 696 |
from synnefo.logic.test.utils_tests import * |
Also available in: Unified diff