Revision d05e5324
b/snf-cyclades-app/synnefo/db/models.py | ||
---|---|---|
994 | 994 |
("DETACHING", "The volume is detaching from an instance"), |
995 | 995 |
("IN_USE", "The volume is attached to an instance"), |
996 | 996 |
("DELETING", "The volume is being deleted"), |
997 |
("DELETED", "The volume has been deleted"), |
|
997 | 998 |
("ERROR", "An error has occured with the volume"), |
998 | 999 |
("ERROR_DELETING", "There was an error deleting this volume"), |
999 | 1000 |
("BACKING_UP", "The volume is being backed up"), |
b/snf-cyclades-app/synnefo/logic/backend.py | ||
---|---|---|
44 | 44 |
from synnefo.api.util import release_resource |
45 | 45 |
from synnefo.util.mac2eui64 import mac2eui64 |
46 | 46 |
from synnefo.logic import rapi |
47 |
from synnefo.volume.util import update_snapshot_status
|
|
47 |
from synnefo import volume
|
|
48 | 48 |
|
49 | 49 |
from logging import getLogger |
50 | 50 |
log = getLogger(__name__) |
... | ... | |
61 | 61 |
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"] |
62 | 62 |
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS |
63 | 63 |
DISK_FIELDS = ["status", "size", "index"] |
64 |
UNKNOWN_NIC_PREFIX = "unknown-" |
|
65 |
UNKNOWN_DISK_PREFIX = "unknown-" |
|
64 |
UNKNOWN_NIC_PREFIX = "unknown-nic-"
|
|
65 |
UNKNOWN_DISK_PREFIX = "unknown-disk-"
|
|
66 | 66 |
|
67 | 67 |
|
68 | 68 |
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields): |
... | ... | |
111 | 111 |
vm.task_job_id, job_id, vm.serial) |
112 | 112 |
reason = ("client: dispatcher, resource: %s, ganeti_job: %s" |
113 | 113 |
% (vm, job_id)) |
114 |
serial = quotas.handle_resource_commission( |
|
115 |
vm, action, |
|
116 |
action_fields=job_fields, |
|
117 |
commission_name=reason, |
|
118 |
force=True, |
|
119 |
auto_accept=True) |
|
114 |
try: |
|
115 |
serial = quotas.handle_resource_commission( |
|
116 |
vm, action, |
|
117 |
action_fields=job_fields, |
|
118 |
commission_name=reason, |
|
119 |
force=True, |
|
120 |
auto_accept=True) |
|
121 |
except: |
|
122 |
log.exception("Error while handling new commission") |
|
123 |
raise |
|
120 | 124 |
log.debug("Issued new commission: %s", serial) |
121 | 125 |
return vm |
122 | 126 |
|
... | ... | |
138 | 142 |
|
139 | 143 |
if opcode == "OP_INSTANCE_SNAPSHOT": |
140 | 144 |
for disk_id, disk_info in job_fields.get("disks", []): |
141 |
snapshot_name = disk_info.get("snapshot_name") |
|
142 |
snapshot_info = json.loads(disk_info["snapshot_info"]) |
|
143 |
user_id = vm.userid |
|
144 |
_process_snapshot_status(snapshot_name, snapshot_info, |
|
145 |
user_id, etime, jobid, status) |
|
145 |
snap_info = json.loads(disk_info["snapshot_info"]) |
|
146 |
snap_id = snap_info["snapshot_id"] |
|
147 |
update_snapshot(snap_id, user_id=vm.userid, job_id=jobid, |
|
148 |
job_status=status, etime=etime) |
|
146 | 149 |
return |
147 | 150 |
|
148 | 151 |
vm.backendjobid = jobid |
... | ... | |
162 | 165 |
state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode) |
163 | 166 |
|
164 | 167 |
if status == rapi.JOB_STATUS_SUCCESS: |
165 |
# If job succeeds, change operating state if needed |
|
166 | 168 |
if state_for_success is not None: |
167 | 169 |
new_operstate = state_for_success |
168 | 170 |
|
169 |
beparams = job_fields.get("beparams", None)
|
|
171 |
beparams = job_fields.get("beparams") |
|
170 | 172 |
if beparams: |
171 |
# Change the flavor of the VM |
|
172 |
new_flavor = _process_resize(vm, beparams) |
|
173 |
cpu = beparams.get("vcpus") |
|
174 |
ram = beparams.get("maxmem") |
|
175 |
new_flavor = find_new_flavor(vm, cpu=cpu, ram=ram) |
|
173 | 176 |
|
174 |
# Update backendtime only for jobs that have been successfully |
|
177 |
# XXX: Update backendtime only for jobs that have been successfully
|
|
175 | 178 |
# completed, since only these jobs update the state of the VM. Else a |
176 | 179 |
# "race condition" may occur when a successful job (e.g. |
177 | 180 |
# OP_INSTANCE_REMOVE) completes before an error job and messages arrive |
... | ... | |
179 | 182 |
vm.backendtime = etime |
180 | 183 |
|
181 | 184 |
if status in rapi.JOB_STATUS_FINALIZED: |
182 |
if nics is not None: # Update the NICs of the VM |
|
183 |
_process_net_status(vm, etime, nics) |
|
184 |
if disks is not None: # Update the disks of the VM |
|
185 |
_process_disks_status(vm, etime, disks) |
|
185 |
if nics is not None: |
|
186 |
update_vm_nics(vm, nics, etime) |
|
187 |
if disks is not None: |
|
188 |
# XXX: Replace the job fields with mocked changes as produced by |
|
189 |
# the diff between the DB and Ganeti disks. This is required in |
|
190 |
# order to update quotas for disks that changed, but not from this |
|
191 |
# job! |
|
192 |
disk_changes = update_vm_disks(vm, disks, etime) |
|
193 |
job_fields["disks"] = disk_changes |
|
186 | 194 |
|
187 | 195 |
# Special case: if OP_INSTANCE_CREATE fails --> ERROR |
188 | 196 |
if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED, |
... | ... | |
199 | 207 |
if (status == rapi.JOB_STATUS_SUCCESS or |
200 | 208 |
(status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))): |
201 | 209 |
# server has been deleted, so delete the server's attachments |
202 |
vm.volumes.all().update(deleted=True, machine=None) |
|
210 |
vm.volumes.all().update(deleted=True, status="DELETED", |
|
211 |
machine=None) |
|
203 | 212 |
for nic in vm.nics.all(): |
204 | 213 |
# but first release the IP |
205 | 214 |
remove_nic_ips(nic) |
... | ... | |
218 | 227 |
vm.task = None |
219 | 228 |
vm.task_job_id = None |
220 | 229 |
|
230 |
# Update VM's state and flavor after handling of quotas, since computation |
|
231 |
# of quotas depends on these attributes |
|
221 | 232 |
if new_operstate is not None: |
222 | 233 |
vm.operstate = new_operstate |
223 | 234 |
if new_flavor is not None: |
... | ... | |
226 | 237 |
vm.save() |
227 | 238 |
|
228 | 239 |
|
229 |
def _process_resize(vm, beparams): |
|
230 |
"""Change flavor of a VirtualMachine based on new beparams.""" |
|
240 |
def find_new_flavor(vm, cpu=None, ram=None): |
|
241 |
"""Find VM's new flavor based on the new CPU and RAM""" |
|
242 |
if cpu is None and ram is None: |
|
243 |
return None |
|
244 |
|
|
231 | 245 |
old_flavor = vm.flavor |
232 |
vcpus = beparams.get("vcpus", old_flavor.cpu) |
|
233 |
ram = beparams.get("maxmem", old_flavor.ram) |
|
234 |
if vcpus == old_flavor.cpu and ram == old_flavor.ram: |
|
235 |
return |
|
246 |
ram = ram if ram is not None else old_flavor.ram |
|
247 |
cpu = cpu if cpu is not None else old_flavor.cpu |
|
248 |
if cpu == old_flavor.cpu and ram == old_flavor.ram: |
|
249 |
return None |
|
250 |
|
|
236 | 251 |
try: |
237 |
new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
|
|
252 |
new_flavor = Flavor.objects.get(cpu=cpu, ram=ram,
|
|
238 | 253 |
disk=old_flavor.disk, |
239 | 254 |
disk_template=old_flavor.disk_template) |
240 | 255 |
except Flavor.DoesNotExist: |
241 |
raise Exception("Cannot find flavor for VM") |
|
256 |
raise Exception("There is no flavor to match the instance specs!" |
|
257 |
" Instance: %s CPU: %s RAM %s: Disk: %s Template: %s" |
|
258 |
% (vm.backend_vm_id, cpu, ram, old_flavor.disk, |
|
259 |
old_flavor.disk_template)) |
|
260 |
log.info("Flavor of VM '%s' changed from '%s' to '%s'", vm, |
|
261 |
old_flavor.name, new_flavor.name) |
|
242 | 262 |
return new_flavor |
243 | 263 |
|
244 | 264 |
|
245 |
@transaction.commit_on_success |
|
246 |
def process_net_status(vm, etime, nics): |
|
247 |
"""Wrap _process_net_status inside transaction.""" |
|
248 |
_process_net_status(vm, etime, nics) |
|
265 |
def nics_are_equal(db_nic, gnt_nic): |
|
266 |
"""Check if DB and Ganeti NICs are equal.""" |
|
267 |
for field in NIC_FIELDS: |
|
268 |
if getattr(db_nic, field) != gnt_nic[field]: |
|
269 |
return False |
|
270 |
return True |
|
271 |
|
|
272 |
|
|
273 |
def parse_instance_nics(gnt_nics): |
|
274 |
"""Parse NICs of a Ganeti instance""" |
|
275 |
nics = [] |
|
276 |
for index, gnic in enumerate(gnt_nics): |
|
277 |
nic_name = gnic.get("name", None) |
|
278 |
if nic_name is not None: |
|
279 |
nic_id = utils.id_from_nic_name(nic_name) |
|
280 |
else: |
|
281 |
# Unknown NIC |
|
282 |
nic_id = UNKNOWN_NIC_PREFIX + str(index) |
|
283 |
|
|
284 |
network_name = gnic.get('network', '') |
|
285 |
network_id = utils.id_from_network_name(network_name) |
|
286 |
network = Network.objects.get(id=network_id) |
|
287 |
subnet6 = network.subnet6 |
|
288 |
|
|
289 |
# Get the new nic info |
|
290 |
mac = gnic.get('mac') |
|
291 |
ipv4 = gnic.get('ip') |
|
292 |
ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None |
|
293 |
|
|
294 |
firewall = gnic.get('firewall') |
|
295 |
firewall_profile = _reverse_tags.get(firewall) |
|
296 |
if not firewall_profile and network.public: |
|
297 |
firewall_profile = settings.DEFAULT_FIREWALL_PROFILE |
|
298 |
|
|
299 |
nic_info = { |
|
300 |
'index': index, |
|
301 |
'network': network, |
|
302 |
'mac': mac, |
|
303 |
'ipv4_address': ipv4, |
|
304 |
'ipv6_address': ipv6, |
|
305 |
'firewall_profile': firewall_profile, |
|
306 |
'state': 'ACTIVE'} |
|
307 |
|
|
308 |
nics.append((nic_id, nic_info)) |
|
309 |
return dict(nics) |
|
310 |
|
|
249 | 311 |
|
312 |
def update_vm_nics(vm, nics, etime=None): |
|
313 |
"""Update VM's NICs to match with the NICs of the Ganeti instance |
|
250 | 314 |
|
251 |
def _process_net_status(vm, etime, nics):
|
|
252 |
"""Process a net status notification from the backend
|
|
315 |
This function will update the VM's NICs(update, delete or create) and
|
|
316 |
return a list of quotable changes.
|
|
253 | 317 |
|
254 |
Process an incoming message from the Ganeti backend, |
|
255 |
detailing the NIC configuration of a VM instance. |
|
318 |
@param vm: The VirtualMachine the NICs belong to |
|
319 |
@type vm: VirtualMachine object |
|
320 |
@param nics: The NICs of the Ganeti instance |
|
321 |
@type nics: List of dictionaries with NIC information |
|
322 |
@param etime: The datetime the Ganeti instance had these NICs |
|
323 |
@type etime: datetime |
|
256 | 324 |
|
257 |
Update the state of the VM in the DB accordingly. |
|
325 |
@return: List of quotable changes (add/remove NIC) (currently empty list) |
|
326 |
@rtype: List of dictionaries |
|
258 | 327 |
|
259 | 328 |
""" |
260 |
ganeti_nics = process_ganeti_nics(nics) |
|
261 |
db_nics = dict([(nic.id, nic) |
|
262 |
for nic in vm.nics.select_related("network") |
|
263 |
.prefetch_related("ips")]) |
|
329 |
ganeti_nics = parse_instance_nics(nics) |
|
330 |
db_nics = dict([(nic.id, nic) for nic in vm.nics.select_related("network") |
|
331 |
.prefetch_related("ips")]) |
|
264 | 332 |
|
265 | 333 |
for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()): |
266 | 334 |
db_nic = db_nics.get(nic_name) |
... | ... | |
301 | 369 |
new_address=gnt_ipv6_address, |
302 | 370 |
version=6) |
303 | 371 |
|
304 |
vm.backendtime = etime |
|
305 |
vm.save() |
|
306 |
|
|
307 |
|
|
308 |
def change_address_of_port(port, userid, old_address, new_address, version): |
|
309 |
"""Change.""" |
|
310 |
if old_address is not None: |
|
311 |
msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'" |
|
312 |
% (version, port.machine_id, old_address, new_address)) |
|
313 |
log.error(msg) |
|
314 |
|
|
315 |
# Remove the old IP address |
|
316 |
remove_nic_ips(port, version=version) |
|
317 |
|
|
318 |
if version == 4: |
|
319 |
ipaddress = ips.allocate_ip(port.network, userid, address=new_address) |
|
320 |
ipaddress.nic = port |
|
321 |
ipaddress.save() |
|
322 |
elif version == 6: |
|
323 |
subnet6 = port.network.subnet6 |
|
324 |
ipaddress = IPAddress.objects.create(userid=userid, |
|
325 |
network=port.network, |
|
326 |
subnet=subnet6, |
|
327 |
nic=port, |
|
328 |
address=new_address, |
|
329 |
ipversion=6) |
|
330 |
else: |
|
331 |
raise ValueError("Unknown version: %s" % version) |
|
332 |
|
|
333 |
# New address log |
|
334 |
ip_log = IPAddressLog.objects.create(server_id=port.machine_id, |
|
335 |
network_id=port.network_id, |
|
336 |
address=new_address, |
|
337 |
active=True) |
|
338 |
log.info("Created IP log entry '%s' for address '%s' to server '%s'", |
|
339 |
ip_log.id, new_address, port.machine_id) |
|
340 |
|
|
341 |
return ipaddress |
|
342 |
|
|
343 |
|
|
344 |
def nics_are_equal(db_nic, gnt_nic): |
|
345 |
for field in NIC_FIELDS: |
|
346 |
if getattr(db_nic, field) != gnt_nic[field]: |
|
347 |
return False |
|
348 |
return True |
|
349 |
|
|
350 |
|
|
351 |
def process_ganeti_nics(ganeti_nics): |
|
352 |
"""Process NIC dict from ganeti""" |
|
353 |
new_nics = [] |
|
354 |
for index, gnic in enumerate(ganeti_nics): |
|
355 |
nic_name = gnic.get("name", None) |
|
356 |
if nic_name is not None: |
|
357 |
nic_id = utils.id_from_nic_name(nic_name) |
|
358 |
else: |
|
359 |
# Put as default value the index. If it is an unknown NIC to |
|
360 |
# synnefo it will be created automaticaly. |
|
361 |
nic_id = UNKNOWN_NIC_PREFIX + str(index) |
|
362 |
network_name = gnic.get('network', '') |
|
363 |
network_id = utils.id_from_network_name(network_name) |
|
364 |
network = Network.objects.get(id=network_id) |
|
365 |
|
|
366 |
# Get the new nic info |
|
367 |
mac = gnic.get('mac') |
|
368 |
ipv4 = gnic.get('ip') |
|
369 |
subnet6 = network.subnet6 |
|
370 |
ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None |
|
371 |
|
|
372 |
firewall = gnic.get('firewall') |
|
373 |
firewall_profile = _reverse_tags.get(firewall) |
|
374 |
if not firewall_profile and network.public: |
|
375 |
firewall_profile = settings.DEFAULT_FIREWALL_PROFILE |
|
376 |
|
|
377 |
nic_info = { |
|
378 |
'index': index, |
|
379 |
'network': network, |
|
380 |
'mac': mac, |
|
381 |
'ipv4_address': ipv4, |
|
382 |
'ipv6_address': ipv6, |
|
383 |
'firewall_profile': firewall_profile, |
|
384 |
'state': 'ACTIVE'} |
|
385 |
|
|
386 |
new_nics.append((nic_id, nic_info)) |
|
387 |
return dict(new_nics) |
|
372 |
return [] |
|
388 | 373 |
|
389 | 374 |
|
390 | 375 |
def remove_nic_ips(nic, version=None): |
... | ... | |
440 | 425 |
ip_log.save() |
441 | 426 |
|
442 | 427 |
|
443 |
@transaction.commit_on_success |
|
444 |
def process_disks_status(vm, etime, disks): |
|
445 |
"""Wrap _process_disks_status inside transaction.""" |
|
446 |
_process_disks_status(vm, etime, disks) |
|
428 |
def change_address_of_port(port, userid, old_address, new_address, version): |
|
429 |
"""Change.""" |
|
430 |
if old_address is not None: |
|
431 |
msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'" |
|
432 |
% (version, port.machine_id, old_address, new_address)) |
|
433 |
log.error(msg) |
|
447 | 434 |
|
435 |
# Remove the old IP address |
|
436 |
remove_nic_ips(port, version=version) |
|
448 | 437 |
|
449 |
def _process_disks_status(vm, etime, disks): |
|
450 |
"""Process a disks status notification from the backend |
|
438 |
if version == 4: |
|
439 |
ipaddress = ips.allocate_ip(port.network, userid, address=new_address) |
|
440 |
ipaddress.nic = port |
|
441 |
ipaddress.save() |
|
442 |
elif version == 6: |
|
443 |
subnet6 = port.network.subnet6 |
|
444 |
ipaddress = IPAddress.objects.create(userid=userid, |
|
445 |
network=port.network, |
|
446 |
subnet=subnet6, |
|
447 |
nic=port, |
|
448 |
address=new_address, |
|
449 |
ipversion=6) |
|
450 |
else: |
|
451 |
raise ValueError("Unknown version: %s" % version) |
|
452 |
|
|
453 |
# New address log |
|
454 |
ip_log = IPAddressLog.objects.create(server_id=port.machine_id, |
|
455 |
network_id=port.network_id, |
|
456 |
address=new_address, |
|
457 |
active=True) |
|
458 |
log.info("Created IP log entry '%s' for address '%s' to server '%s'", |
|
459 |
ip_log.id, new_address, port.machine_id) |
|
460 |
|
|
461 |
return ipaddress |
|
451 | 462 |
|
452 |
Process an incoming message from the Ganeti backend, |
|
453 |
detailing the disk configuration of a VM instance. |
|
454 | 463 |
|
455 |
Update the state of the VM in the DB accordingly. |
|
464 |
def update_vm_disks(vm, disks, etime=None): |
|
465 |
"""Update VM's disks to match with the disks of the Ganeti instance |
|
466 |
|
|
467 |
This function will update the VM's disks(update, delete or create) and |
|
468 |
return a list of quotable changes. |
|
469 |
|
|
470 |
@param vm: The VirtualMachine the disks belong to |
|
471 |
@type vm: VirtualMachine object |
|
472 |
@param disks: The disks of the Ganeti instance |
|
473 |
@type disks: List of dictionaries with disk information |
|
474 |
@param etime: The datetime the Ganeti instance had these disks |
|
475 |
@type etime: datetime |
|
476 |
|
|
477 |
@return: List of quotable changes (add/remove disk) |
|
478 |
@rtype: List of dictionaries |
|
456 | 479 |
|
457 | 480 |
""" |
458 |
ganeti_disks = process_ganeti_disks(disks)
|
|
481 |
gnt_disks = parse_instance_disks(disks)
|
|
459 | 482 |
db_disks = dict([(disk.id, disk) |
460 | 483 |
for disk in vm.volumes.filter(deleted=False)]) |
461 | 484 |
|
462 |
for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()): |
|
485 |
changes = [] |
|
486 |
for disk_name in set(db_disks.keys()) | set(gnt_disks.keys()): |
|
463 | 487 |
db_disk = db_disks.get(disk_name) |
464 |
ganeti_disk = ganeti_disks.get(disk_name) |
|
465 |
if ganeti_disk is None: |
|
488 |
gnt_disk = gnt_disks.get(disk_name) |
|
489 |
if gnt_disk is None: |
|
490 |
# Disk exists in DB but not in Ganeti |
|
466 | 491 |
if disk_is_stale(vm, disk): |
467 | 492 |
log.debug("Removing stale disk '%s'" % db_disk) |
468 |
# TODO: Handle disk deletion
|
|
493 |
db_disk.status = "DELETED"
|
|
469 | 494 |
db_disk.deleted = True |
470 | 495 |
db_disk.save() |
496 |
changes.append(("remove", db_disk, {})) |
|
471 | 497 |
else: |
472 | 498 |
log.info("disk '%s' is still being created" % db_disk) |
473 | 499 |
elif db_disk is None: |
500 |
# Disk exists in Ganeti but not in DB |
|
501 |
# TODO: Automatically import disk! |
|
474 | 502 |
msg = ("disk/%s of VM %s does not exist in DB! Cannot" |
475 | 503 |
" automatically fix this issue!" % (disk_name, vm)) |
476 | 504 |
log.error(msg) |
477 | 505 |
continue |
478 |
elif not disks_are_equal(db_disk, ganeti_disk): |
|
479 |
for f in DISK_FIELDS: |
|
480 |
# Update the disk in DB with the values from Ganeti disk |
|
481 |
setattr(db_disk, f, ganeti_disk[f]) |
|
482 |
db_disk.save() |
|
483 |
|
|
484 |
# TODO: Special case where the size of the disk has changed!! |
|
485 |
assert(ganeti_disk["size"] == db_disk.size) |
|
486 |
|
|
487 |
vm.backendtime = etime |
|
488 |
vm.save() |
|
506 |
elif not disks_are_equal(db_disk, gnt_disk): |
|
507 |
# Disk has changed |
|
508 |
if gnt_disk["size"] != db_disk.size: |
|
509 |
# Size of the disk has changed! TODO: Fix flavor! |
|
510 |
size_delta = gnt_disk["size"] - db_disk.size |
|
511 |
changes.append(("modify", db_disk, {"size_delta": size_delta})) |
|
512 |
if db_disk.status == "CREATING": |
|
513 |
# Disk has been created |
|
514 |
changes.append(("add", db_disk, {})) |
|
515 |
# Update the disk in DB with the values from Ganeti disk |
|
516 |
[setattr(db_disk, f, gnt_disk[f]) for f in DISK_FIELDS] |
|
517 |
db_disk.save() |
|
518 |
|
|
519 |
return changes |
|
489 | 520 |
|
490 | 521 |
|
491 | 522 |
def disks_are_equal(db_disk, gnt_disk): |
523 |
"""Check if DB and Ganeti disks are equal""" |
|
492 | 524 |
for field in DISK_FIELDS: |
493 | 525 |
if getattr(db_disk, field) != gnt_disk[field]: |
494 | 526 |
return False |
495 | 527 |
return True |
496 | 528 |
|
497 | 529 |
|
498 |
def process_ganeti_disks(ganeti_disks):
|
|
499 |
"""Process disk dict from ganeti"""
|
|
500 |
new_disks = []
|
|
501 |
for index, gdisk in enumerate(ganeti_disks):
|
|
502 |
disk_name = gdisk.get("name", None) |
|
530 |
def parse_instance_disks(gnt_disks):
|
|
531 |
"""Parse disks of a Ganeti instance"""
|
|
532 |
disks = [] |
|
533 |
for index, gnt_disk in enumerate(gnt_disks):
|
|
534 |
disk_name = gnt_disk.get("name", None)
|
|
503 | 535 |
if disk_name is not None: |
504 | 536 |
disk_id = utils.id_from_disk_name(disk_name) |
505 |
else: |
|
506 |
# Put as default value the index. If it is an unknown disk to |
|
507 |
# synnefo it will be created automaticaly. |
|
537 |
else: # Unknown disk |
|
508 | 538 |
disk_id = UNKNOWN_DISK_PREFIX + str(index) |
509 | 539 |
|
510 |
# Get disk size in GB |
|
511 |
size = gdisk.get("size") >> 10 |
|
512 |
|
|
513 | 540 |
disk_info = { |
514 | 541 |
'index': index, |
515 |
'size': size,
|
|
542 |
'size': gnt_disk["size"] >> 10, # Size in GB
|
|
516 | 543 |
'status': "IN_USE"} |
517 | 544 |
|
518 |
new_disks.append((disk_id, disk_info)) |
|
519 |
return dict(new_disks) |
|
520 |
|
|
545 |
disks.append((disk_id, disk_info)) |
|
546 |
return dict(disks) |
|
521 | 547 |
|
522 |
@transaction.commit_on_success |
|
523 |
def process_snapshot_status(*args, **kwargs): |
|
524 |
return _process_snapshot_status(*args, **kwargs) |
|
525 | 548 |
|
526 |
|
|
527 |
def _process_snapshot_status(snapshot_name, snapshot_info, user_id, etime, |
|
528 |
jobid, status): |
|
529 |
"""Process a notification for a snapshot.""" |
|
530 |
snapshot_id = snapshot_info.get("snapshot_id") |
|
531 |
assert(snapshot_id is not None), "Missing snapshot_id" |
|
532 |
if status in rapi.JOB_STATUS_FINALIZED: |
|
533 |
snapshot_status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR" |
|
534 |
log.debug("Updating status of snapshot '%s' to '%s'", snapshot_id, |
|
535 |
snapshot_status) |
|
536 |
update_snapshot_status(snapshot_id, user_id, status=snapshot_status) |
|
549 |
def update_snapshot(snap_id, user_id, job_id, job_status, etime): |
|
550 |
"""Update a snapshot based on result of a Ganeti job.""" |
|
551 |
if job_status in rapi.JOB_STATUS_FINALIZED: |
|
552 |
status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR" |
|
553 |
log.debug("Updating status of snapshot '%s' to '%s'", snap_id, status) |
|
554 |
volume.util.update_snapshot_status(snap_id, user_id, status=status) |
|
537 | 555 |
|
538 | 556 |
|
539 | 557 |
@transaction.commit_on_success |
b/snf-cyclades-app/synnefo/logic/reconciliation.py | ||
---|---|---|
302 | 302 |
created__lte=building_time) \ |
303 | 303 |
.order_by("id") |
304 | 304 |
gnt_nics = gnt_server["nics"] |
305 |
gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
|
|
305 |
gnt_nics_parsed = backend_mod.parse_instance_nics(gnt_nics)
|
|
306 | 306 |
nics_changed = len(db_nics) != len(gnt_nics) |
307 | 307 |
for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())): |
308 | 308 |
gnt_nic_id, gnt_nic = gnt_nic |
... | ... | |
321 | 321 |
self.log.info(msg, server_id, db_nics_str, gnt_nics_str) |
322 | 322 |
if self.options["fix_unsynced_nics"]: |
323 | 323 |
vm = get_locked_server(server_id) |
324 |
backend_mod.process_net_status(vm=vm, |
|
325 |
etime=self.event_time, |
|
326 |
nics=gnt_nics) |
|
324 |
backend_mod.process_op_status( |
|
325 |
vm=vm, etime=self.event_time, jobid=-0, |
|
326 |
opcode="OP_INSTANCE_SET_PARAMS", status='success', |
|
327 |
logmsg="Reconciliation: simulated Ganeti event", |
|
328 |
nics=gnt_nics) |
|
327 | 329 |
|
328 | 330 |
def reconcile_unsynced_disks(self, server_id, db_server, gnt_server): |
329 | 331 |
building_time = self.event_time - BUILDING_NIC_TIMEOUT |
... | ... | |
332 | 334 |
.filter(deleted=False)\ |
333 | 335 |
.order_by("id") |
334 | 336 |
gnt_disks = gnt_server["disks"] |
335 |
gnt_disks_parsed = backend_mod.process_ganeti_disks(gnt_disks)
|
|
337 |
gnt_disks_parsed = backend_mod.parse_instance_disks(gnt_disks)
|
|
336 | 338 |
disks_changed = len(db_disks) != len(gnt_disks) |
337 | 339 |
for db_disk, gnt_disk in zip(db_disks, |
338 | 340 |
sorted(gnt_disks_parsed.items())): |
... | ... | |
352 | 354 |
self.log.info(msg, server_id, db_disks_str, gnt_disks_str) |
353 | 355 |
if self.options["fix_unsynced_disks"]: |
354 | 356 |
vm = get_locked_server(server_id) |
355 |
backend_mod.process_disks_status(vm=vm, |
|
356 |
etime=self.event_time, |
|
357 |
disks=gnt_disks) |
|
357 |
backend_mod.process_op_status( |
|
358 |
vm=vm, etime=self.event_time, jobid=-0, |
|
359 |
opcode="OP_INSTANCE_SET_PARAMS", status='success', |
|
360 |
logmsg="Reconciliation: simulated Ganeti event", |
|
361 |
disks=gnt_disks) |
|
358 | 362 |
|
359 | 363 |
def reconcile_pending_task(self, server_id, db_server): |
360 | 364 |
job_id = db_server.task_job_id |
b/snf-cyclades-app/synnefo/logic/server_attachments.py | ||
---|---|---|
36 | 36 |
log = logging.getLogger(__name__) |
37 | 37 |
|
38 | 38 |
|
39 |
@commands.server_command("ATTACH_VOLUME") |
|
40 | 39 |
def attach_volume(vm, volume): |
41 | 40 |
"""Attach a volume to a server. |
42 | 41 |
|
... | ... | |
60 | 59 |
raise faults.BadRequest(msg) |
61 | 60 |
|
62 | 61 |
# Check maximum disk per instance hard limit |
63 |
if vm.volumes.filter(deleted=False).count() == settings.GANETI_MAX_DISKS_PER_INSTANCE: |
|
62 |
vm_volumes_num = vm.volumes.filter(deleted=False).count() |
|
63 |
if vm_volumes_num == settings.GANETI_MAX_DISKS_PER_INSTANCE: |
|
64 | 64 |
raise faults.BadRequest("Maximum volumes per server limit reached") |
65 | 65 |
|
66 |
jobid = backend.attach_volume(vm, volume) |
|
66 |
if volume.status == "CREATING": |
|
67 |
action_fields = {"disks": [("add", volume, {})]} |
|
68 |
comm = commands.server_command("ATTACH_VOLUME", |
|
69 |
action_fields=action_fields) |
|
70 |
return comm(_attach_volume)(vm, volume) |
|
71 |
|
|
67 | 72 |
|
73 |
def _attach_volume(vm, volume): |
|
74 |
"""Attach a Volume to a VM and update the Volume's status.""" |
|
75 |
jobid = backend.attach_volume(vm, volume) |
|
68 | 76 |
log.info("Attached volume '%s' to server '%s'. JobID: '%s'", volume.id, |
69 | 77 |
volume.machine_id, jobid) |
70 |
|
|
71 | 78 |
volume.backendjobid = jobid |
72 | 79 |
volume.machine = vm |
73 |
volume.status = "ATTACHING" |
|
80 |
if volume.status == "AVAILALBE": |
|
81 |
volume.status = "ATTACHING" |
|
82 |
else: |
|
83 |
volume.status = "CREATING" |
|
74 | 84 |
volume.save() |
75 | 85 |
return jobid |
76 | 86 |
|
77 | 87 |
|
78 |
@commands.server_command("DETACH_VOLUME") |
|
79 | 88 |
def detach_volume(vm, volume): |
80 |
"""Detach a volume to a server.
|
|
89 |
"""Detach a Volume from a VM
|
|
81 | 90 |
|
82 | 91 |
The volume must be in 'IN_USE' status in order to be detached. Also, |
83 | 92 |
the root volume of the instance (index=0) can not be detached. This |
... | ... | |
87 | 96 |
""" |
88 | 97 |
|
89 | 98 |
_check_attachment(vm, volume) |
90 |
if volume.status != "IN_USE": |
|
91 |
#TODO: Maybe allow other statuses as well ? |
|
99 |
if volume.status not in ["IN_USE", "ERROR"]: |
|
92 | 100 |
raise faults.BadRequest("Cannot detach volume while volume is in" |
93 | 101 |
" '%s' status." % volume.status) |
94 | 102 |
if volume.index == 0: |
95 | 103 |
raise faults.BadRequest("Cannot detach the root volume of a server") |
104 |
|
|
105 |
action_fields = {"disks": [("remove", volume, {})]} |
|
106 |
comm = commands.server_command("DETACH_VOLUME", |
|
107 |
action_fields=action_fields) |
|
108 |
return comm(_detach_volume)(vm, volume) |
|
109 |
|
|
110 |
|
|
111 |
def _detach_volume(vm, volume): |
|
112 |
"""Detach a Volume from a VM and update the Volume's status""" |
|
96 | 113 |
jobid = backend.detach_volume(vm, volume) |
97 | 114 |
log.info("Detached volume '%s' from server '%s'. JobID: '%s'", volume.id, |
98 | 115 |
volume.machine_id, jobid) |
99 | 116 |
volume.backendjobid = jobid |
100 |
volume.status = "DETACHING" |
|
117 |
if volume.delete_on_termination: |
|
118 |
volume.status = "DELETING" |
|
119 |
else: |
|
120 |
volume.status = "DETACHING" |
|
101 | 121 |
volume.save() |
102 | 122 |
return jobid |
103 | 123 |
|
104 | 124 |
|
105 | 125 |
def _check_attachment(vm, volume): |
106 |
"""Check that volume is attached to vm."""
|
|
126 |
"""Check that the Volume is attached to the VM"""
|
|
107 | 127 |
if volume.machine_id != vm.id: |
108 | 128 |
raise faults.BadRequest("Volume '%s' is not attached to server '%s'" |
109 | 129 |
% volume.id, vm.id) |
b/snf-cyclades-app/synnefo/logic/utils.py | ||
---|---|---|
101 | 101 |
return int(ns) |
102 | 102 |
|
103 | 103 |
|
104 |
def id_to_disk_name(id): |
|
105 |
return "%svol-%s" % (settings.BACKEND_PREFIX_ID, str(id)) |
|
106 |
|
|
107 |
|
|
104 | 108 |
def get_rsapi_state(vm): |
105 | 109 |
"""Returns the API state for a virtual machine |
106 | 110 |
|
... | ... | |
175 | 179 |
def get_action_from_opcode(opcode, job_fields): |
176 | 180 |
if opcode == "OP_INSTANCE_SET_PARAMS": |
177 | 181 |
nics = job_fields.get("nics") |
182 |
disks = job_fields.get("disks") |
|
178 | 183 |
beparams = job_fields.get("beparams") |
179 | 184 |
if nics: |
180 | 185 |
try: |
... | ... | |
187 | 192 |
return None |
188 | 193 |
except: |
189 | 194 |
return None |
195 |
if disks: |
|
196 |
try: |
|
197 |
disk_action = disks[0][0] |
|
198 |
if disk_action == "add": |
|
199 |
return "ATTACH_VOLUME" |
|
200 |
elif disk_action == "remove": |
|
201 |
return "DETACH_VOLUME" |
|
202 |
else: |
|
203 |
return None |
|
204 |
except: |
|
205 |
return None |
|
190 | 206 |
elif beparams: |
191 | 207 |
return "RESIZE" |
192 | 208 |
else: |
b/snf-cyclades-app/synnefo/quotas/__init__.py | ||
---|---|---|
29 | 29 |
|
30 | 30 |
from django.utils import simplejson as json |
31 | 31 |
from django.db import transaction |
32 |
from django.db.models import Sum |
|
32 | 33 |
|
33 | 34 |
from snf_django.lib.api import faults |
34 | 35 |
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network, |
35 |
IPAddress) |
|
36 |
IPAddress, Volume)
|
|
36 | 37 |
|
37 | 38 |
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN, |
38 | 39 |
ASTAKOS_AUTH_URL) |
39 | 40 |
from astakosclient import AstakosClient |
40 | 41 |
from astakosclient import errors |
42 |
from synnefo.logic.utils import id_from_disk_name |
|
41 | 43 |
|
42 | 44 |
import logging |
43 | 45 |
log = logging.getLogger(__name__) |
... | ... | |
300 | 302 |
flavor = resource.flavor |
301 | 303 |
resources = {"cyclades.vm": 1, |
302 | 304 |
"cyclades.total_cpu": flavor.cpu, |
303 |
"cyclades.disk": 1073741824 * flavor.disk, |
|
304 |
"cyclades.total_ram": 1048576 * flavor.ram} |
|
305 |
"cyclades.total_ram": flavor.ram << 20} |
|
305 | 306 |
online_resources = {"cyclades.cpu": flavor.cpu, |
306 |
"cyclades.ram": 1048576 * flavor.ram}
|
|
307 |
"cyclades.ram": flavor.ram << 20}
|
|
307 | 308 |
if action == "BUILD": |
309 |
new_volumes = resource.volumes.filter(status="CREATING") |
|
310 |
new_volumes_size = new_volumes.aggregate(Sum("size"))["size__sum"] |
|
311 |
resources["cyclades.disk"] = new_volumes_size << 30 |
|
308 | 312 |
resources.update(online_resources) |
309 | 313 |
return resources |
310 | 314 |
if action == "START": |
... | ... | |
323 | 327 |
else: |
324 | 328 |
return None |
325 | 329 |
elif action == "DESTROY": |
330 |
volumes = resource.volumes.filter(deleted=False) |
|
331 |
volumes_size = volumes.aggregate(Sum("size"))["size__sum"] |
|
332 |
resources["cyclades.disk"] = volumes_size << 30 |
|
333 |
resources.update(online_resources) |
|
326 | 334 |
if resource.operstate in ["STARTED", "BUILD", "ERROR"]: |
327 | 335 |
resources.update(online_resources) |
328 | 336 |
return reverse_quantities(resources) |
... | ... | |
331 | 339 |
cpu = beparams.get("vcpus", flavor.cpu) |
332 | 340 |
ram = beparams.get("maxmem", flavor.ram) |
333 | 341 |
return {"cyclades.total_cpu": cpu - flavor.cpu, |
334 |
"cyclades.total_ram": 1048576 * (ram - flavor.ram)} |
|
342 |
"cyclades.total_ram": (ram - flavor.ram) << 20} |
|
343 |
elif action in ["ATTACH_VOLUME", "DETACH_VOLUME"]: |
|
344 |
if action_fields is not None: |
|
345 |
volumes_changes = action_fields.get("disks") |
|
346 |
if volumes_changes is not None: |
|
347 |
size_delta = get_volumes_size_delta(volumes_changes) |
|
348 |
if size_delta: |
|
349 |
return {"cyclades.disk": size_delta << 30} |
|
335 | 350 |
else: |
336 | 351 |
#["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]: |
337 | 352 |
return None |
... | ... | |
350 | 365 |
return reverse_quantities(resources) |
351 | 366 |
else: |
352 | 367 |
return None |
368 |
elif isinstance(resource, Volume): |
|
369 |
size = resource.size |
|
370 |
resources = {"cyclades.disk": size << 30} |
|
371 |
if resource.status == "CREATING" and action == "BUILD": |
|
372 |
return resources |
|
373 |
elif action == "DESTROY": |
|
374 |
reverse_quantities(resources) |
|
375 |
else: |
|
376 |
return None |
|
377 |
|
|
378 |
|
|
379 |
def get_volumes_size_delta(volumes_changes): |
|
380 |
"""Compute the total change in the size of volumes""" |
|
381 |
size_delta = 0 |
|
382 |
for vchange in volumes_changes: |
|
383 |
action, db_volume, info = vchange |
|
384 |
if action == "add": |
|
385 |
size_delta += int(db_volume.size) |
|
386 |
elif action == "remove": |
|
387 |
size_delta -= int(db_volume.size) |
|
388 |
elif action == "modify": |
|
389 |
size_delta += info.get("size_delta", 0) |
|
390 |
else: |
|
391 |
raise ValueError("Unknwon volume action '%s'" % action) |
|
392 |
return size_delta |
|
353 | 393 |
|
354 | 394 |
|
355 | 395 |
def reverse_quantities(resources): |
b/snf-cyclades-app/synnefo/quotas/util.py | ||
---|---|---|
33 | 33 |
|
34 | 34 |
from django.db.models import Sum, Count, Q |
35 | 35 |
|
36 |
from synnefo.db.models import VirtualMachine, Network, IPAddress |
|
36 |
from synnefo.db.models import VirtualMachine, Network, IPAddress, Volume
|
|
37 | 37 |
from synnefo.quotas import Quotaholder |
38 | 38 |
|
39 | 39 |
|
... | ... | |
44 | 44 |
vms = VirtualMachine.objects.filter(deleted=False) |
45 | 45 |
networks = Network.objects.filter(deleted=False) |
46 | 46 |
floating_ips = IPAddress.objects.filter(deleted=False, floating_ip=True) |
47 |
volumes = Volume.objects.filter(deleted=False) |
|
47 | 48 |
|
48 | 49 |
if user is not None: |
49 | 50 |
vms = vms.filter(userid=user) |
... | ... | |
54 | 55 |
vm_resources = vms.values("userid")\ |
55 | 56 |
.annotate(num=Count("id"), |
56 | 57 |
total_ram=Sum("flavor__ram"), |
57 |
total_cpu=Sum("flavor__cpu"), |
|
58 |
disk=Sum("flavor__disk")) |
|
58 |
total_cpu=Sum("flavor__cpu")) |
|
59 | 59 |
vm_active_resources = \ |
60 | 60 |
vms.values("userid")\ |
61 | 61 |
.filter(Q(operstate="STARTED") | Q(operstate="BUILD") | |
... | ... | |
67 | 67 |
user = vm_res['userid'] |
68 | 68 |
res = {"cyclades.vm": vm_res["num"], |
69 | 69 |
"cyclades.total_cpu": vm_res["total_cpu"], |
70 |
"cyclades.disk": 1073741824 * vm_res["disk"], |
|
71 |
"cyclades.total_ram": 1048576 * vm_res["total_ram"]} |
|
70 |
"cyclades.total_ram": vm_res["total_ram"] << 20} |
|
72 | 71 |
holdings[user] = res |
73 | 72 |
|
74 | 73 |
for vm_res in vm_active_resources.iterator(): |
75 | 74 |
user = vm_res['userid'] |
76 | 75 |
holdings[user]["cyclades.cpu"] = vm_res["cpu"] |
77 |
holdings[user]["cyclades.ram"] = 1048576 * vm_res["ram"] |
|
76 |
holdings[user]["cyclades.ram"] = vm_res["ram"] << 20 |
|
77 |
|
|
78 |
# Get disk resource |
|
79 |
disk_resources = volumes.values("userid").annotate(Sum("size")) |
|
80 |
for disk_res in disk_resources.iterator(): |
|
81 |
user = disk_res["userid"] |
|
82 |
holdings.setdefault(user, {}) |
|
83 |
holdings[user]["cyclades.disk"] = disk_res["size__sum"] << 30 |
|
78 | 84 |
|
79 | 85 |
# Get resources related with networks |
80 | 86 |
net_resources = networks.values("userid")\ |
Also available in: Unified diff