Revision d05e5324 snf-cyclades-app/synnefo/logic/backend.py

b/snf-cyclades-app/synnefo/logic/backend.py
44 44
from synnefo.api.util import release_resource
45 45
from synnefo.util.mac2eui64 import mac2eui64
46 46
from synnefo.logic import rapi
47
from synnefo.volume.util import update_snapshot_status
47
from synnefo import volume
48 48

  
49 49
from logging import getLogger
50 50
log = getLogger(__name__)
......
61 61
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
62 62
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
63 63
DISK_FIELDS = ["status", "size", "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-"
65
UNKNOWN_DISK_PREFIX = "unknown-"
64
UNKNOWN_NIC_PREFIX = "unknown-nic-"
65
UNKNOWN_DISK_PREFIX = "unknown-disk-"
66 66

  
67 67

  
68 68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
......
111 111
                      vm.task_job_id, job_id, vm.serial)
112 112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113 113
                      % (vm, job_id))
114
            serial = quotas.handle_resource_commission(
115
                vm, action,
116
                action_fields=job_fields,
117
                commission_name=reason,
118
                force=True,
119
                auto_accept=True)
114
            try:
115
                serial = quotas.handle_resource_commission(
116
                    vm, action,
117
                    action_fields=job_fields,
118
                    commission_name=reason,
119
                    force=True,
120
                    auto_accept=True)
121
            except:
122
                log.exception("Error while handling new commission")
123
                raise
120 124
            log.debug("Issued new commission: %s", serial)
121 125
    return vm
122 126

  
......
138 142

  
139 143
    if opcode == "OP_INSTANCE_SNAPSHOT":
140 144
        for disk_id, disk_info in job_fields.get("disks", []):
141
            snapshot_name = disk_info.get("snapshot_name")
142
            snapshot_info = json.loads(disk_info["snapshot_info"])
143
            user_id = vm.userid
144
            _process_snapshot_status(snapshot_name, snapshot_info,
145
                                     user_id, etime, jobid, status)
145
            snap_info = json.loads(disk_info["snapshot_info"])
146
            snap_id = snap_info["snapshot_id"]
147
            update_snapshot(snap_id, user_id=vm.userid, job_id=jobid,
148
                            job_status=status, etime=etime)
146 149
        return
147 150

  
148 151
    vm.backendjobid = jobid
......
162 165
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
163 166

  
164 167
    if status == rapi.JOB_STATUS_SUCCESS:
165
        # If job succeeds, change operating state if needed
166 168
        if state_for_success is not None:
167 169
            new_operstate = state_for_success
168 170

  
169
        beparams = job_fields.get("beparams", None)
171
        beparams = job_fields.get("beparams")
170 172
        if beparams:
171
            # Change the flavor of the VM
172
            new_flavor = _process_resize(vm, beparams)
173
            cpu = beparams.get("vcpus")
174
            ram = beparams.get("maxmem")
175
            new_flavor = find_new_flavor(vm, cpu=cpu, ram=ram)
173 176

  
174
        # Update backendtime only for jobs that have been successfully
177
        # XXX: Update backendtime only for jobs that have been successfully
175 178
        # completed, since only these jobs update the state of the VM. Else a
176 179
        # "race condition" may occur when a successful job (e.g.
177 180
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
......
179 182
        vm.backendtime = etime
180 183

  
181 184
    if status in rapi.JOB_STATUS_FINALIZED:
182
        if nics is not None:  # Update the NICs of the VM
183
            _process_net_status(vm, etime, nics)
184
        if disks is not None:  # Update the disks of the VM
185
            _process_disks_status(vm, etime, disks)
185
        if nics is not None:
186
            update_vm_nics(vm, nics, etime)
187
        if disks is not None:
188
            # XXX: Replace the job fields with mocked changes as produced by
189
            # the diff between the DB and Ganeti disks. This is required in
190
            # order to update quotas for disks that changed, but not from this
191
            # job!
192
            disk_changes = update_vm_disks(vm, disks, etime)
193
            job_fields["disks"] = disk_changes
186 194

  
187 195
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
188 196
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
......
199 207
        if (status == rapi.JOB_STATUS_SUCCESS or
200 208
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
201 209
            # server has been deleted, so delete the server's attachments
202
            vm.volumes.all().update(deleted=True, machine=None)
210
            vm.volumes.all().update(deleted=True, status="DELETED",
211
                                    machine=None)
203 212
            for nic in vm.nics.all():
204 213
                # but first release the IP
205 214
                remove_nic_ips(nic)
......
218 227
            vm.task = None
219 228
            vm.task_job_id = None
220 229

  
230
    # Update VM's state and flavor after handling of quotas, since computation
231
    # of quotas depends on these attributes
221 232
    if new_operstate is not None:
222 233
        vm.operstate = new_operstate
223 234
    if new_flavor is not None:
......
226 237
    vm.save()
227 238

  
228 239

  
229
def _process_resize(vm, beparams):
230
    """Change flavor of a VirtualMachine based on new beparams."""
240
def find_new_flavor(vm, cpu=None, ram=None):
241
    """Find VM's new flavor based on the new CPU and RAM"""
242
    if cpu is None and ram is None:
243
        return None
244

  
231 245
    old_flavor = vm.flavor
232
    vcpus = beparams.get("vcpus", old_flavor.cpu)
233
    ram = beparams.get("maxmem", old_flavor.ram)
234
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
235
        return
246
    ram = ram if ram is not None else old_flavor.ram
247
    cpu = cpu if cpu is not None else old_flavor.cpu
248
    if cpu == old_flavor.cpu and ram == old_flavor.ram:
249
        return None
250

  
236 251
    try:
237
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
252
        new_flavor = Flavor.objects.get(cpu=cpu, ram=ram,
238 253
                                        disk=old_flavor.disk,
239 254
                                        disk_template=old_flavor.disk_template)
240 255
    except Flavor.DoesNotExist:
241
        raise Exception("Cannot find flavor for VM")
256
        raise Exception("There is no flavor to match the instance specs!"
257
                        " Instance: %s CPU: %s RAM %s: Disk: %s Template: %s"
258
                        % (vm.backend_vm_id, cpu, ram, old_flavor.disk,
259
                           old_flavor.disk_template))
260
    log.info("Flavor of VM '%s' changed from '%s' to '%s'", vm,
261
             old_flavor.name, new_flavor.name)
242 262
    return new_flavor
243 263

  
244 264

  
245
@transaction.commit_on_success
246
def process_net_status(vm, etime, nics):
247
    """Wrap _process_net_status inside transaction."""
248
    _process_net_status(vm, etime, nics)
265
def nics_are_equal(db_nic, gnt_nic):
266
    """Check if DB and Ganeti NICs are equal."""
267
    for field in NIC_FIELDS:
268
        if getattr(db_nic, field) != gnt_nic[field]:
269
            return False
270
    return True
271

  
272

  
273
def parse_instance_nics(gnt_nics):
274
    """Parse NICs of a Ganeti instance"""
275
    nics = []
276
    for index, gnic in enumerate(gnt_nics):
277
        nic_name = gnic.get("name", None)
278
        if nic_name is not None:
279
            nic_id = utils.id_from_nic_name(nic_name)
280
        else:
281
            # Unknown NIC
282
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
283

  
284
        network_name = gnic.get('network', '')
285
        network_id = utils.id_from_network_name(network_name)
286
        network = Network.objects.get(id=network_id)
287
        subnet6 = network.subnet6
288

  
289
        # Get the new nic info
290
        mac = gnic.get('mac')
291
        ipv4 = gnic.get('ip')
292
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
293

  
294
        firewall = gnic.get('firewall')
295
        firewall_profile = _reverse_tags.get(firewall)
296
        if not firewall_profile and network.public:
297
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
298

  
299
        nic_info = {
300
            'index': index,
301
            'network': network,
302
            'mac': mac,
303
            'ipv4_address': ipv4,
304
            'ipv6_address': ipv6,
305
            'firewall_profile': firewall_profile,
306
            'state': 'ACTIVE'}
307

  
308
        nics.append((nic_id, nic_info))
309
    return dict(nics)
310

  
249 311

  
312
def update_vm_nics(vm, nics, etime=None):
313
    """Update VM's NICs to match with the NICs of the Ganeti instance
250 314

  
251
def _process_net_status(vm, etime, nics):
252
    """Process a net status notification from the backend
315
    This function will update the VM's NICs(update, delete or create) and
316
    return a list of quotable changes.
253 317

  
254
    Process an incoming message from the Ganeti backend,
255
    detailing the NIC configuration of a VM instance.
318
    @param vm: The VirtualMachine the NICs belong to
319
    @type vm: VirtualMachine object
320
    @param nics: The NICs of the Ganeti instance
321
    @type nics: List of dictionaries with NIC information
322
    @param etime: The datetime the Ganeti instance had these NICs
323
    @type etime: datetime
256 324

  
257
    Update the state of the VM in the DB accordingly.
325
    @return: List of quotable changes (add/remove NIC) (currently empty list)
326
    @rtype: List of dictionaries
258 327

  
259 328
    """
260
    ganeti_nics = process_ganeti_nics(nics)
261
    db_nics = dict([(nic.id, nic)
262
                    for nic in vm.nics.select_related("network")
263
                                      .prefetch_related("ips")])
329
    ganeti_nics = parse_instance_nics(nics)
330
    db_nics = dict([(nic.id, nic) for nic in vm.nics.select_related("network")
331
                                                    .prefetch_related("ips")])
264 332

  
265 333
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
266 334
        db_nic = db_nics.get(nic_name)
......
301 369
                                       new_address=gnt_ipv6_address,
302 370
                                       version=6)
303 371

  
304
    vm.backendtime = etime
305
    vm.save()
306

  
307

  
308
def change_address_of_port(port, userid, old_address, new_address, version):
309
    """Change."""
310
    if old_address is not None:
311
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
312
               % (version, port.machine_id, old_address, new_address))
313
        log.error(msg)
314

  
315
    # Remove the old IP address
316
    remove_nic_ips(port, version=version)
317

  
318
    if version == 4:
319
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
320
        ipaddress.nic = port
321
        ipaddress.save()
322
    elif version == 6:
323
        subnet6 = port.network.subnet6
324
        ipaddress = IPAddress.objects.create(userid=userid,
325
                                             network=port.network,
326
                                             subnet=subnet6,
327
                                             nic=port,
328
                                             address=new_address,
329
                                             ipversion=6)
330
    else:
331
        raise ValueError("Unknown version: %s" % version)
332

  
333
    # New address log
334
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
335
                                         network_id=port.network_id,
336
                                         address=new_address,
337
                                         active=True)
338
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
339
             ip_log.id, new_address, port.machine_id)
340

  
341
    return ipaddress
342

  
343

  
344
def nics_are_equal(db_nic, gnt_nic):
345
    for field in NIC_FIELDS:
346
        if getattr(db_nic, field) != gnt_nic[field]:
347
            return False
348
    return True
349

  
350

  
351
def process_ganeti_nics(ganeti_nics):
352
    """Process NIC dict from ganeti"""
353
    new_nics = []
354
    for index, gnic in enumerate(ganeti_nics):
355
        nic_name = gnic.get("name", None)
356
        if nic_name is not None:
357
            nic_id = utils.id_from_nic_name(nic_name)
358
        else:
359
            # Put as default value the index. If it is an unknown NIC to
360
            # synnefo it will be created automaticaly.
361
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
362
        network_name = gnic.get('network', '')
363
        network_id = utils.id_from_network_name(network_name)
364
        network = Network.objects.get(id=network_id)
365

  
366
        # Get the new nic info
367
        mac = gnic.get('mac')
368
        ipv4 = gnic.get('ip')
369
        subnet6 = network.subnet6
370
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
371

  
372
        firewall = gnic.get('firewall')
373
        firewall_profile = _reverse_tags.get(firewall)
374
        if not firewall_profile and network.public:
375
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
376

  
377
        nic_info = {
378
            'index': index,
379
            'network': network,
380
            'mac': mac,
381
            'ipv4_address': ipv4,
382
            'ipv6_address': ipv6,
383
            'firewall_profile': firewall_profile,
384
            'state': 'ACTIVE'}
385

  
386
        new_nics.append((nic_id, nic_info))
387
    return dict(new_nics)
372
    return []
388 373

  
389 374

  
390 375
def remove_nic_ips(nic, version=None):
......
440 425
    ip_log.save()
441 426

  
442 427

  
443
@transaction.commit_on_success
444
def process_disks_status(vm, etime, disks):
445
    """Wrap _process_disks_status inside transaction."""
446
    _process_disks_status(vm, etime, disks)
428
def change_address_of_port(port, userid, old_address, new_address, version):
429
    """Change."""
430
    if old_address is not None:
431
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
432
               % (version, port.machine_id, old_address, new_address))
433
        log.error(msg)
447 434

  
435
    # Remove the old IP address
436
    remove_nic_ips(port, version=version)
448 437

  
449
def _process_disks_status(vm, etime, disks):
450
    """Process a disks status notification from the backend
438
    if version == 4:
439
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
440
        ipaddress.nic = port
441
        ipaddress.save()
442
    elif version == 6:
443
        subnet6 = port.network.subnet6
444
        ipaddress = IPAddress.objects.create(userid=userid,
445
                                             network=port.network,
446
                                             subnet=subnet6,
447
                                             nic=port,
448
                                             address=new_address,
449
                                             ipversion=6)
450
    else:
451
        raise ValueError("Unknown version: %s" % version)
452

  
453
    # New address log
454
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
455
                                         network_id=port.network_id,
456
                                         address=new_address,
457
                                         active=True)
458
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
459
             ip_log.id, new_address, port.machine_id)
460

  
461
    return ipaddress
451 462

  
452
    Process an incoming message from the Ganeti backend,
453
    detailing the disk configuration of a VM instance.
454 463

  
455
    Update the state of the VM in the DB accordingly.
464
def update_vm_disks(vm, disks, etime=None):
465
    """Update VM's disks to match with the disks of the Ganeti instance
466

  
467
    This function will update the VM's disks(update, delete or create) and
468
    return a list of quotable changes.
469

  
470
    @param vm: The VirtualMachine the disks belong to
471
    @type vm: VirtualMachine object
472
    @param disks: The disks of the Ganeti instance
473
    @type disks: List of dictionaries with disk information
474
    @param etime: The datetime the Ganeti instance had these disks
475
    @type etime: datetime
476

  
477
    @return: List of quotable changes (add/remove disk)
478
    @rtype: List of dictionaries
456 479

  
457 480
    """
458
    ganeti_disks = process_ganeti_disks(disks)
481
    gnt_disks = parse_instance_disks(disks)
459 482
    db_disks = dict([(disk.id, disk)
460 483
                     for disk in vm.volumes.filter(deleted=False)])
461 484

  
462
    for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
485
    changes = []
486
    for disk_name in set(db_disks.keys()) | set(gnt_disks.keys()):
463 487
        db_disk = db_disks.get(disk_name)
464
        ganeti_disk = ganeti_disks.get(disk_name)
465
        if ganeti_disk is None:
488
        gnt_disk = gnt_disks.get(disk_name)
489
        if gnt_disk is None:
490
            # Disk exists in DB but not in Ganeti
466 491
            if disk_is_stale(vm, disk):
467 492
                log.debug("Removing stale disk '%s'" % db_disk)
468
                # TODO: Handle disk deletion
493
                db_disk.status = "DELETED"
469 494
                db_disk.deleted = True
470 495
                db_disk.save()
496
                changes.append(("remove", db_disk, {}))
471 497
            else:
472 498
                log.info("disk '%s' is still being created" % db_disk)
473 499
        elif db_disk is None:
500
            # Disk exists in Ganeti but not in DB
501
            # TODO: Automatically import disk!
474 502
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
475 503
                   " automatically fix this issue!" % (disk_name, vm))
476 504
            log.error(msg)
477 505
            continue
478
        elif not disks_are_equal(db_disk, ganeti_disk):
479
            for f in DISK_FIELDS:
480
                # Update the disk in DB with the values from Ganeti disk
481
                setattr(db_disk, f, ganeti_disk[f])
482
                db_disk.save()
483

  
484
            # TODO: Special case where the size of the disk has changed!!
485
            assert(ganeti_disk["size"] == db_disk.size)
486

  
487
    vm.backendtime = etime
488
    vm.save()
506
        elif not disks_are_equal(db_disk, gnt_disk):
507
            # Disk has changed
508
            if gnt_disk["size"] != db_disk.size:
509
                # Size of the disk has changed! TODO: Fix flavor!
510
                size_delta = gnt_disk["size"] - db_disk.size
511
                changes.append(("modify", db_disk, {"size_delta": size_delta}))
512
            if db_disk.status == "CREATING":
513
                # Disk has been created
514
                changes.append(("add", db_disk, {}))
515
            # Update the disk in DB with the values from Ganeti disk
516
            [setattr(db_disk, f, gnt_disk[f]) for f in DISK_FIELDS]
517
            db_disk.save()
518

  
519
    return changes
489 520

  
490 521

  
491 522
def disks_are_equal(db_disk, gnt_disk):
523
    """Check if DB and Ganeti disks are equal"""
492 524
    for field in DISK_FIELDS:
493 525
        if getattr(db_disk, field) != gnt_disk[field]:
494 526
            return False
495 527
    return True
496 528

  
497 529

  
498
def process_ganeti_disks(ganeti_disks):
499
    """Process disk dict from ganeti"""
500
    new_disks = []
501
    for index, gdisk in enumerate(ganeti_disks):
502
        disk_name = gdisk.get("name", None)
530
def parse_instance_disks(gnt_disks):
531
    """Parse disks of a Ganeti instance"""
532
    disks = []
533
    for index, gnt_disk in enumerate(gnt_disks):
534
        disk_name = gnt_disk.get("name", None)
503 535
        if disk_name is not None:
504 536
            disk_id = utils.id_from_disk_name(disk_name)
505
        else:
506
            # Put as default value the index. If it is an unknown disk to
507
            # synnefo it will be created automaticaly.
537
        else:  # Unknown disk
508 538
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
509 539

  
510
        # Get disk size in GB
511
        size = gdisk.get("size") >> 10
512

  
513 540
        disk_info = {
514 541
            'index': index,
515
            'size': size,
542
            'size': gnt_disk["size"] >> 10,  # Size in GB
516 543
            'status': "IN_USE"}
517 544

  
518
        new_disks.append((disk_id, disk_info))
519
    return dict(new_disks)
520

  
545
        disks.append((disk_id, disk_info))
546
    return dict(disks)
521 547

  
522
@transaction.commit_on_success
523
def process_snapshot_status(*args, **kwargs):
524
    return _process_snapshot_status(*args, **kwargs)
525 548

  
526

  
527
def _process_snapshot_status(snapshot_name, snapshot_info, user_id, etime,
528
                             jobid, status):
529
    """Process a notification for a snapshot."""
530
    snapshot_id = snapshot_info.get("snapshot_id")
531
    assert(snapshot_id is not None), "Missing snapshot_id"
532
    if status in rapi.JOB_STATUS_FINALIZED:
533
        snapshot_status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
534
        log.debug("Updating status of snapshot '%s' to '%s'", snapshot_id,
535
                  snapshot_status)
536
        update_snapshot_status(snapshot_id, user_id, status=snapshot_status)
549
def update_snapshot(snap_id, user_id, job_id, job_status, etime):
550
    """Update a snapshot based on result of a Ganeti job."""
551
    if job_status in rapi.JOB_STATUS_FINALIZED:
552
        status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
553
        log.debug("Updating status of snapshot '%s' to '%s'", snap_id, status)
554
        volume.util.update_snapshot_status(snap_id, user_id, status=status)
537 555

  
538 556

  
539 557
@transaction.commit_on_success

Also available in: Unified diff