Revision d05e5324

b/snf-cyclades-app/synnefo/db/models.py
994 994
        ("DETACHING", "The volume is detaching from an instance"),
995 995
        ("IN_USE", "The volume is attached to an instance"),
996 996
        ("DELETING", "The volume is being deleted"),
997
        ("DELETED", "The volume has been deleted"),
997 998
        ("ERROR", "An error has occured with the volume"),
998 999
        ("ERROR_DELETING", "There was an error deleting this volume"),
999 1000
        ("BACKING_UP", "The volume is being backed up"),
b/snf-cyclades-app/synnefo/logic/backend.py
44 44
from synnefo.api.util import release_resource
45 45
from synnefo.util.mac2eui64 import mac2eui64
46 46
from synnefo.logic import rapi
47
from synnefo.volume.util import update_snapshot_status
47
from synnefo import volume
48 48

  
49 49
from logging import getLogger
50 50
log = getLogger(__name__)
......
61 61
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
62 62
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
63 63
DISK_FIELDS = ["status", "size", "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-"
65
UNKNOWN_DISK_PREFIX = "unknown-"
64
UNKNOWN_NIC_PREFIX = "unknown-nic-"
65
UNKNOWN_DISK_PREFIX = "unknown-disk-"
66 66

  
67 67

  
68 68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
......
111 111
                      vm.task_job_id, job_id, vm.serial)
112 112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113 113
                      % (vm, job_id))
114
            serial = quotas.handle_resource_commission(
115
                vm, action,
116
                action_fields=job_fields,
117
                commission_name=reason,
118
                force=True,
119
                auto_accept=True)
114
            try:
115
                serial = quotas.handle_resource_commission(
116
                    vm, action,
117
                    action_fields=job_fields,
118
                    commission_name=reason,
119
                    force=True,
120
                    auto_accept=True)
121
            except:
122
                log.exception("Error while handling new commission")
123
                raise
120 124
            log.debug("Issued new commission: %s", serial)
121 125
    return vm
122 126

  
......
138 142

  
139 143
    if opcode == "OP_INSTANCE_SNAPSHOT":
140 144
        for disk_id, disk_info in job_fields.get("disks", []):
141
            snapshot_name = disk_info.get("snapshot_name")
142
            snapshot_info = json.loads(disk_info["snapshot_info"])
143
            user_id = vm.userid
144
            _process_snapshot_status(snapshot_name, snapshot_info,
145
                                     user_id, etime, jobid, status)
145
            snap_info = json.loads(disk_info["snapshot_info"])
146
            snap_id = snap_info["snapshot_id"]
147
            update_snapshot(snap_id, user_id=vm.userid, job_id=jobid,
148
                            job_status=status, etime=etime)
146 149
        return
147 150

  
148 151
    vm.backendjobid = jobid
......
162 165
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
163 166

  
164 167
    if status == rapi.JOB_STATUS_SUCCESS:
165
        # If job succeeds, change operating state if needed
166 168
        if state_for_success is not None:
167 169
            new_operstate = state_for_success
168 170

  
169
        beparams = job_fields.get("beparams", None)
171
        beparams = job_fields.get("beparams")
170 172
        if beparams:
171
            # Change the flavor of the VM
172
            new_flavor = _process_resize(vm, beparams)
173
            cpu = beparams.get("vcpus")
174
            ram = beparams.get("maxmem")
175
            new_flavor = find_new_flavor(vm, cpu=cpu, ram=ram)
173 176

  
174
        # Update backendtime only for jobs that have been successfully
177
        # XXX: Update backendtime only for jobs that have been successfully
175 178
        # completed, since only these jobs update the state of the VM. Else a
176 179
        # "race condition" may occur when a successful job (e.g.
177 180
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
......
179 182
        vm.backendtime = etime
180 183

  
181 184
    if status in rapi.JOB_STATUS_FINALIZED:
182
        if nics is not None:  # Update the NICs of the VM
183
            _process_net_status(vm, etime, nics)
184
        if disks is not None:  # Update the disks of the VM
185
            _process_disks_status(vm, etime, disks)
185
        if nics is not None:
186
            update_vm_nics(vm, nics, etime)
187
        if disks is not None:
188
            # XXX: Replace the job fields with mocked changes as produced by
189
            # the diff between the DB and Ganeti disks. This is required in
190
            # order to update quotas for disks that changed, but not from this
191
            # job!
192
            disk_changes = update_vm_disks(vm, disks, etime)
193
            job_fields["disks"] = disk_changes
186 194

  
187 195
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
188 196
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
......
199 207
        if (status == rapi.JOB_STATUS_SUCCESS or
200 208
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
201 209
            # server has been deleted, so delete the server's attachments
202
            vm.volumes.all().update(deleted=True, machine=None)
210
            vm.volumes.all().update(deleted=True, status="DELETED",
211
                                    machine=None)
203 212
            for nic in vm.nics.all():
204 213
                # but first release the IP
205 214
                remove_nic_ips(nic)
......
218 227
            vm.task = None
219 228
            vm.task_job_id = None
220 229

  
230
    # Update VM's state and flavor after handling of quotas, since computation
231
    # of quotas depends on these attributes
221 232
    if new_operstate is not None:
222 233
        vm.operstate = new_operstate
223 234
    if new_flavor is not None:
......
226 237
    vm.save()
227 238

  
228 239

  
229
def _process_resize(vm, beparams):
230
    """Change flavor of a VirtualMachine based on new beparams."""
240
def find_new_flavor(vm, cpu=None, ram=None):
241
    """Find VM's new flavor based on the new CPU and RAM"""
242
    if cpu is None and ram is None:
243
        return None
244

  
231 245
    old_flavor = vm.flavor
232
    vcpus = beparams.get("vcpus", old_flavor.cpu)
233
    ram = beparams.get("maxmem", old_flavor.ram)
234
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
235
        return
246
    ram = ram if ram is not None else old_flavor.ram
247
    cpu = cpu if cpu is not None else old_flavor.cpu
248
    if cpu == old_flavor.cpu and ram == old_flavor.ram:
249
        return None
250

  
236 251
    try:
237
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
252
        new_flavor = Flavor.objects.get(cpu=cpu, ram=ram,
238 253
                                        disk=old_flavor.disk,
239 254
                                        disk_template=old_flavor.disk_template)
240 255
    except Flavor.DoesNotExist:
241
        raise Exception("Cannot find flavor for VM")
256
        raise Exception("There is no flavor to match the instance specs!"
257
                        " Instance: %s CPU: %s RAM %s: Disk: %s Template: %s"
258
                        % (vm.backend_vm_id, cpu, ram, old_flavor.disk,
259
                           old_flavor.disk_template))
260
    log.info("Flavor of VM '%s' changed from '%s' to '%s'", vm,
261
             old_flavor.name, new_flavor.name)
242 262
    return new_flavor
243 263

  
244 264

  
245
@transaction.commit_on_success
246
def process_net_status(vm, etime, nics):
247
    """Wrap _process_net_status inside transaction."""
248
    _process_net_status(vm, etime, nics)
265
def nics_are_equal(db_nic, gnt_nic):
266
    """Check if DB and Ganeti NICs are equal."""
267
    for field in NIC_FIELDS:
268
        if getattr(db_nic, field) != gnt_nic[field]:
269
            return False
270
    return True
271

  
272

  
273
def parse_instance_nics(gnt_nics):
274
    """Parse NICs of a Ganeti instance"""
275
    nics = []
276
    for index, gnic in enumerate(gnt_nics):
277
        nic_name = gnic.get("name", None)
278
        if nic_name is not None:
279
            nic_id = utils.id_from_nic_name(nic_name)
280
        else:
281
            # Unknown NIC
282
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
283

  
284
        network_name = gnic.get('network', '')
285
        network_id = utils.id_from_network_name(network_name)
286
        network = Network.objects.get(id=network_id)
287
        subnet6 = network.subnet6
288

  
289
        # Get the new nic info
290
        mac = gnic.get('mac')
291
        ipv4 = gnic.get('ip')
292
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
293

  
294
        firewall = gnic.get('firewall')
295
        firewall_profile = _reverse_tags.get(firewall)
296
        if not firewall_profile and network.public:
297
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
298

  
299
        nic_info = {
300
            'index': index,
301
            'network': network,
302
            'mac': mac,
303
            'ipv4_address': ipv4,
304
            'ipv6_address': ipv6,
305
            'firewall_profile': firewall_profile,
306
            'state': 'ACTIVE'}
307

  
308
        nics.append((nic_id, nic_info))
309
    return dict(nics)
310

  
249 311

  
312
def update_vm_nics(vm, nics, etime=None):
313
    """Update VM's NICs to match with the NICs of the Ganeti instance
250 314

  
251
def _process_net_status(vm, etime, nics):
252
    """Process a net status notification from the backend
315
    This function will update the VM's NICs(update, delete or create) and
316
    return a list of quotable changes.
253 317

  
254
    Process an incoming message from the Ganeti backend,
255
    detailing the NIC configuration of a VM instance.
318
    @param vm: The VirtualMachine the NICs belong to
319
    @type vm: VirtualMachine object
320
    @param nics: The NICs of the Ganeti instance
321
    @type nics: List of dictionaries with NIC information
322
    @param etime: The datetime the Ganeti instance had these NICs
323
    @type etime: datetime
256 324

  
257
    Update the state of the VM in the DB accordingly.
325
    @return: List of quotable changes (add/remove NIC) (currently empty list)
326
    @rtype: List of dictionaries
258 327

  
259 328
    """
260
    ganeti_nics = process_ganeti_nics(nics)
261
    db_nics = dict([(nic.id, nic)
262
                    for nic in vm.nics.select_related("network")
263
                                      .prefetch_related("ips")])
329
    ganeti_nics = parse_instance_nics(nics)
330
    db_nics = dict([(nic.id, nic) for nic in vm.nics.select_related("network")
331
                                                    .prefetch_related("ips")])
264 332

  
265 333
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
266 334
        db_nic = db_nics.get(nic_name)
......
301 369
                                       new_address=gnt_ipv6_address,
302 370
                                       version=6)
303 371

  
304
    vm.backendtime = etime
305
    vm.save()
306

  
307

  
308
def change_address_of_port(port, userid, old_address, new_address, version):
309
    """Change."""
310
    if old_address is not None:
311
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
312
               % (version, port.machine_id, old_address, new_address))
313
        log.error(msg)
314

  
315
    # Remove the old IP address
316
    remove_nic_ips(port, version=version)
317

  
318
    if version == 4:
319
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
320
        ipaddress.nic = port
321
        ipaddress.save()
322
    elif version == 6:
323
        subnet6 = port.network.subnet6
324
        ipaddress = IPAddress.objects.create(userid=userid,
325
                                             network=port.network,
326
                                             subnet=subnet6,
327
                                             nic=port,
328
                                             address=new_address,
329
                                             ipversion=6)
330
    else:
331
        raise ValueError("Unknown version: %s" % version)
332

  
333
    # New address log
334
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
335
                                         network_id=port.network_id,
336
                                         address=new_address,
337
                                         active=True)
338
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
339
             ip_log.id, new_address, port.machine_id)
340

  
341
    return ipaddress
342

  
343

  
344
def nics_are_equal(db_nic, gnt_nic):
345
    for field in NIC_FIELDS:
346
        if getattr(db_nic, field) != gnt_nic[field]:
347
            return False
348
    return True
349

  
350

  
351
def process_ganeti_nics(ganeti_nics):
352
    """Process NIC dict from ganeti"""
353
    new_nics = []
354
    for index, gnic in enumerate(ganeti_nics):
355
        nic_name = gnic.get("name", None)
356
        if nic_name is not None:
357
            nic_id = utils.id_from_nic_name(nic_name)
358
        else:
359
            # Put as default value the index. If it is an unknown NIC to
360
            # synnefo it will be created automaticaly.
361
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
362
        network_name = gnic.get('network', '')
363
        network_id = utils.id_from_network_name(network_name)
364
        network = Network.objects.get(id=network_id)
365

  
366
        # Get the new nic info
367
        mac = gnic.get('mac')
368
        ipv4 = gnic.get('ip')
369
        subnet6 = network.subnet6
370
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
371

  
372
        firewall = gnic.get('firewall')
373
        firewall_profile = _reverse_tags.get(firewall)
374
        if not firewall_profile and network.public:
375
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
376

  
377
        nic_info = {
378
            'index': index,
379
            'network': network,
380
            'mac': mac,
381
            'ipv4_address': ipv4,
382
            'ipv6_address': ipv6,
383
            'firewall_profile': firewall_profile,
384
            'state': 'ACTIVE'}
385

  
386
        new_nics.append((nic_id, nic_info))
387
    return dict(new_nics)
372
    return []
388 373

  
389 374

  
390 375
def remove_nic_ips(nic, version=None):
......
440 425
    ip_log.save()
441 426

  
442 427

  
443
@transaction.commit_on_success
444
def process_disks_status(vm, etime, disks):
445
    """Wrap _process_disks_status inside transaction."""
446
    _process_disks_status(vm, etime, disks)
428
def change_address_of_port(port, userid, old_address, new_address, version):
429
    """Change."""
430
    if old_address is not None:
431
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
432
               % (version, port.machine_id, old_address, new_address))
433
        log.error(msg)
447 434

  
435
    # Remove the old IP address
436
    remove_nic_ips(port, version=version)
448 437

  
449
def _process_disks_status(vm, etime, disks):
450
    """Process a disks status notification from the backend
438
    if version == 4:
439
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
440
        ipaddress.nic = port
441
        ipaddress.save()
442
    elif version == 6:
443
        subnet6 = port.network.subnet6
444
        ipaddress = IPAddress.objects.create(userid=userid,
445
                                             network=port.network,
446
                                             subnet=subnet6,
447
                                             nic=port,
448
                                             address=new_address,
449
                                             ipversion=6)
450
    else:
451
        raise ValueError("Unknown version: %s" % version)
452

  
453
    # New address log
454
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
455
                                         network_id=port.network_id,
456
                                         address=new_address,
457
                                         active=True)
458
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
459
             ip_log.id, new_address, port.machine_id)
460

  
461
    return ipaddress
451 462

  
452
    Process an incoming message from the Ganeti backend,
453
    detailing the disk configuration of a VM instance.
454 463

  
455
    Update the state of the VM in the DB accordingly.
464
def update_vm_disks(vm, disks, etime=None):
465
    """Update VM's disks to match with the disks of the Ganeti instance
466

  
467
    This function will update the VM's disks(update, delete or create) and
468
    return a list of quotable changes.
469

  
470
    @param vm: The VirtualMachine the disks belong to
471
    @type vm: VirtualMachine object
472
    @param disks: The disks of the Ganeti instance
473
    @type disks: List of dictionaries with disk information
474
    @param etime: The datetime the Ganeti instance had these disks
475
    @type etime: datetime
476

  
477
    @return: List of quotable changes (add/remove disk)
478
    @rtype: List of dictionaries
456 479

  
457 480
    """
458
    ganeti_disks = process_ganeti_disks(disks)
481
    gnt_disks = parse_instance_disks(disks)
459 482
    db_disks = dict([(disk.id, disk)
460 483
                     for disk in vm.volumes.filter(deleted=False)])
461 484

  
462
    for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
485
    changes = []
486
    for disk_name in set(db_disks.keys()) | set(gnt_disks.keys()):
463 487
        db_disk = db_disks.get(disk_name)
464
        ganeti_disk = ganeti_disks.get(disk_name)
465
        if ganeti_disk is None:
488
        gnt_disk = gnt_disks.get(disk_name)
489
        if gnt_disk is None:
490
            # Disk exists in DB but not in Ganeti
466 491
            if disk_is_stale(vm, disk):
467 492
                log.debug("Removing stale disk '%s'" % db_disk)
468
                # TODO: Handle disk deletion
493
                db_disk.status = "DELETED"
469 494
                db_disk.deleted = True
470 495
                db_disk.save()
496
                changes.append(("remove", db_disk, {}))
471 497
            else:
472 498
                log.info("disk '%s' is still being created" % db_disk)
473 499
        elif db_disk is None:
500
            # Disk exists in Ganeti but not in DB
501
            # TODO: Automatically import disk!
474 502
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
475 503
                   " automatically fix this issue!" % (disk_name, vm))
476 504
            log.error(msg)
477 505
            continue
478
        elif not disks_are_equal(db_disk, ganeti_disk):
479
            for f in DISK_FIELDS:
480
                # Update the disk in DB with the values from Ganeti disk
481
                setattr(db_disk, f, ganeti_disk[f])
482
                db_disk.save()
483

  
484
            # TODO: Special case where the size of the disk has changed!!
485
            assert(ganeti_disk["size"] == db_disk.size)
486

  
487
    vm.backendtime = etime
488
    vm.save()
506
        elif not disks_are_equal(db_disk, gnt_disk):
507
            # Disk has changed
508
            if gnt_disk["size"] != db_disk.size:
509
                # Size of the disk has changed! TODO: Fix flavor!
510
                size_delta = gnt_disk["size"] - db_disk.size
511
                changes.append(("modify", db_disk, {"size_delta": size_delta}))
512
            if db_disk.status == "CREATING":
513
                # Disk has been created
514
                changes.append(("add", db_disk, {}))
515
            # Update the disk in DB with the values from Ganeti disk
516
            [setattr(db_disk, f, gnt_disk[f]) for f in DISK_FIELDS]
517
            db_disk.save()
518

  
519
    return changes
489 520

  
490 521

  
491 522
def disks_are_equal(db_disk, gnt_disk):
523
    """Check if DB and Ganeti disks are equal"""
492 524
    for field in DISK_FIELDS:
493 525
        if getattr(db_disk, field) != gnt_disk[field]:
494 526
            return False
495 527
    return True
496 528

  
497 529

  
498
def process_ganeti_disks(ganeti_disks):
499
    """Process disk dict from ganeti"""
500
    new_disks = []
501
    for index, gdisk in enumerate(ganeti_disks):
502
        disk_name = gdisk.get("name", None)
530
def parse_instance_disks(gnt_disks):
531
    """Parse disks of a Ganeti instance"""
532
    disks = []
533
    for index, gnt_disk in enumerate(gnt_disks):
534
        disk_name = gnt_disk.get("name", None)
503 535
        if disk_name is not None:
504 536
            disk_id = utils.id_from_disk_name(disk_name)
505
        else:
506
            # Put as default value the index. If it is an unknown disk to
507
            # synnefo it will be created automaticaly.
537
        else:  # Unknown disk
508 538
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
509 539

  
510
        # Get disk size in GB
511
        size = gdisk.get("size") >> 10
512

  
513 540
        disk_info = {
514 541
            'index': index,
515
            'size': size,
542
            'size': gnt_disk["size"] >> 10,  # Size in GB
516 543
            'status': "IN_USE"}
517 544

  
518
        new_disks.append((disk_id, disk_info))
519
    return dict(new_disks)
520

  
545
        disks.append((disk_id, disk_info))
546
    return dict(disks)
521 547

  
522
@transaction.commit_on_success
523
def process_snapshot_status(*args, **kwargs):
524
    return _process_snapshot_status(*args, **kwargs)
525 548

  
526

  
527
def _process_snapshot_status(snapshot_name, snapshot_info, user_id, etime,
528
                             jobid, status):
529
    """Process a notification for a snapshot."""
530
    snapshot_id = snapshot_info.get("snapshot_id")
531
    assert(snapshot_id is not None), "Missing snapshot_id"
532
    if status in rapi.JOB_STATUS_FINALIZED:
533
        snapshot_status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
534
        log.debug("Updating status of snapshot '%s' to '%s'", snapshot_id,
535
                  snapshot_status)
536
        update_snapshot_status(snapshot_id, user_id, status=snapshot_status)
549
def update_snapshot(snap_id, user_id, job_id, job_status, etime):
550
    """Update a snapshot based on result of a Ganeti job."""
551
    if job_status in rapi.JOB_STATUS_FINALIZED:
552
        status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
553
        log.debug("Updating status of snapshot '%s' to '%s'", snap_id, status)
554
        volume.util.update_snapshot_status(snap_id, user_id, status=status)
537 555

  
538 556

  
539 557
@transaction.commit_on_success
b/snf-cyclades-app/synnefo/logic/reconciliation.py
302 302
                                         created__lte=building_time) \
303 303
                                .order_by("id")
304 304
        gnt_nics = gnt_server["nics"]
305
        gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
305
        gnt_nics_parsed = backend_mod.parse_instance_nics(gnt_nics)
306 306
        nics_changed = len(db_nics) != len(gnt_nics)
307 307
        for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
308 308
            gnt_nic_id, gnt_nic = gnt_nic
......
321 321
            self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
322 322
            if self.options["fix_unsynced_nics"]:
323 323
                vm = get_locked_server(server_id)
324
                backend_mod.process_net_status(vm=vm,
325
                                               etime=self.event_time,
326
                                               nics=gnt_nics)
324
                backend_mod.process_op_status(
325
                    vm=vm, etime=self.event_time, jobid=-0,
326
                    opcode="OP_INSTANCE_SET_PARAMS", status='success',
327
                    logmsg="Reconciliation: simulated Ganeti event",
328
                    nics=gnt_nics)
327 329

  
328 330
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
329 331
        building_time = self.event_time - BUILDING_NIC_TIMEOUT
......
332 334
                                    .filter(deleted=False)\
333 335
                                    .order_by("id")
334 336
        gnt_disks = gnt_server["disks"]
335
        gnt_disks_parsed = backend_mod.process_ganeti_disks(gnt_disks)
337
        gnt_disks_parsed = backend_mod.parse_instance_disks(gnt_disks)
336 338
        disks_changed = len(db_disks) != len(gnt_disks)
337 339
        for db_disk, gnt_disk in zip(db_disks,
338 340
                                     sorted(gnt_disks_parsed.items())):
......
352 354
            self.log.info(msg, server_id, db_disks_str, gnt_disks_str)
353 355
            if self.options["fix_unsynced_disks"]:
354 356
                vm = get_locked_server(server_id)
355
                backend_mod.process_disks_status(vm=vm,
356
                                                 etime=self.event_time,
357
                                                 disks=gnt_disks)
357
                backend_mod.process_op_status(
358
                    vm=vm, etime=self.event_time, jobid=-0,
359
                    opcode="OP_INSTANCE_SET_PARAMS", status='success',
360
                    logmsg="Reconciliation: simulated Ganeti event",
361
                    disks=gnt_disks)
358 362

  
359 363
    def reconcile_pending_task(self, server_id, db_server):
360 364
        job_id = db_server.task_job_id
b/snf-cyclades-app/synnefo/logic/server_attachments.py
36 36
log = logging.getLogger(__name__)
37 37

  
38 38

  
39
@commands.server_command("ATTACH_VOLUME")
40 39
def attach_volume(vm, volume):
41 40
    """Attach a volume to a server.
42 41

  
......
60 59
        raise faults.BadRequest(msg)
61 60

  
62 61
    # Check maximum disk per instance hard limit
63
    if vm.volumes.filter(deleted=False).count() == settings.GANETI_MAX_DISKS_PER_INSTANCE:
62
    vm_volumes_num = vm.volumes.filter(deleted=False).count()
63
    if vm_volumes_num == settings.GANETI_MAX_DISKS_PER_INSTANCE:
64 64
        raise faults.BadRequest("Maximum volumes per server limit reached")
65 65

  
66
    jobid = backend.attach_volume(vm, volume)
66
    if volume.status == "CREATING":
67
        action_fields = {"disks": [("add", volume, {})]}
68
    comm = commands.server_command("ATTACH_VOLUME",
69
                                   action_fields=action_fields)
70
    return comm(_attach_volume)(vm, volume)
71

  
67 72

  
73
def _attach_volume(vm, volume):
74
    """Attach a Volume to a VM and update the Volume's status."""
75
    jobid = backend.attach_volume(vm, volume)
68 76
    log.info("Attached volume '%s' to server '%s'. JobID: '%s'", volume.id,
69 77
             volume.machine_id, jobid)
70

  
71 78
    volume.backendjobid = jobid
72 79
    volume.machine = vm
73
    volume.status = "ATTACHING"
80
    if volume.status == "AVAILALBE":
81
        volume.status = "ATTACHING"
82
    else:
83
        volume.status = "CREATING"
74 84
    volume.save()
75 85
    return jobid
76 86

  
77 87

  
78
@commands.server_command("DETACH_VOLUME")
79 88
def detach_volume(vm, volume):
80
    """Detach a volume to a server.
89
    """Detach a Volume from a VM
81 90

  
82 91
    The volume must be in 'IN_USE' status in order to be detached. Also,
83 92
    the root volume of the instance (index=0) can not be detached. This
......
87 96
    """
88 97

  
89 98
    _check_attachment(vm, volume)
90
    if volume.status != "IN_USE":
91
        #TODO: Maybe allow other statuses as well ?
99
    if volume.status not in ["IN_USE", "ERROR"]:
92 100
        raise faults.BadRequest("Cannot detach volume while volume is in"
93 101
                                " '%s' status." % volume.status)
94 102
    if volume.index == 0:
95 103
        raise faults.BadRequest("Cannot detach the root volume of a server")
104

  
105
    action_fields = {"disks": [("remove", volume, {})]}
106
    comm = commands.server_command("DETACH_VOLUME",
107
                                   action_fields=action_fields)
108
    return comm(_detach_volume)(vm, volume)
109

  
110

  
111
def _detach_volume(vm, volume):
112
    """Detach a Volume from a VM and update the Volume's status"""
96 113
    jobid = backend.detach_volume(vm, volume)
97 114
    log.info("Detached volume '%s' from server '%s'. JobID: '%s'", volume.id,
98 115
             volume.machine_id, jobid)
99 116
    volume.backendjobid = jobid
100
    volume.status = "DETACHING"
117
    if volume.delete_on_termination:
118
        volume.status = "DELETING"
119
    else:
120
        volume.status = "DETACHING"
101 121
    volume.save()
102 122
    return jobid
103 123

  
104 124

  
105 125
def _check_attachment(vm, volume):
106
    """Check that volume is attached to vm."""
126
    """Check that the Volume is attached to the VM"""
107 127
    if volume.machine_id != vm.id:
108 128
        raise faults.BadRequest("Volume '%s' is not attached to server '%s'"
109 129
                                % volume.id, vm.id)
b/snf-cyclades-app/synnefo/logic/utils.py
101 101
    return int(ns)
102 102

  
103 103

  
104
def id_to_disk_name(id):
105
    return "%svol-%s" % (settings.BACKEND_PREFIX_ID, str(id))
106

  
107

  
104 108
def get_rsapi_state(vm):
105 109
    """Returns the API state for a virtual machine
106 110

  
......
175 179
def get_action_from_opcode(opcode, job_fields):
176 180
    if opcode == "OP_INSTANCE_SET_PARAMS":
177 181
        nics = job_fields.get("nics")
182
        disks = job_fields.get("disks")
178 183
        beparams = job_fields.get("beparams")
179 184
        if nics:
180 185
            try:
......
187 192
                    return None
188 193
            except:
189 194
                return None
195
        if disks:
196
            try:
197
                disk_action = disks[0][0]
198
                if disk_action == "add":
199
                    return "ATTACH_VOLUME"
200
                elif disk_action == "remove":
201
                    return "DETACH_VOLUME"
202
                else:
203
                    return None
204
            except:
205
                return None
190 206
        elif beparams:
191 207
            return "RESIZE"
192 208
        else:
b/snf-cyclades-app/synnefo/quotas/__init__.py
29 29

  
30 30
from django.utils import simplejson as json
31 31
from django.db import transaction
32
from django.db.models import Sum
32 33

  
33 34
from snf_django.lib.api import faults
34 35
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36
                               IPAddress, Volume)
36 37

  
37 38
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38 39
                              ASTAKOS_AUTH_URL)
39 40
from astakosclient import AstakosClient
40 41
from astakosclient import errors
42
from synnefo.logic.utils import id_from_disk_name
41 43

  
42 44
import logging
43 45
log = logging.getLogger(__name__)
......
300 302
        flavor = resource.flavor
301 303
        resources = {"cyclades.vm": 1,
302 304
                     "cyclades.total_cpu": flavor.cpu,
303
                     "cyclades.disk": 1073741824 * flavor.disk,
304
                     "cyclades.total_ram": 1048576 * flavor.ram}
305
                     "cyclades.total_ram": flavor.ram << 20}
305 306
        online_resources = {"cyclades.cpu": flavor.cpu,
306
                            "cyclades.ram": 1048576 * flavor.ram}
307
                            "cyclades.ram": flavor.ram << 20}
307 308
        if action == "BUILD":
309
            new_volumes = resource.volumes.filter(status="CREATING")
310
            new_volumes_size = new_volumes.aggregate(Sum("size"))["size__sum"]
311
            resources["cyclades.disk"] = new_volumes_size << 30
308 312
            resources.update(online_resources)
309 313
            return resources
310 314
        if action == "START":
......
323 327
            else:
324 328
                return None
325 329
        elif action == "DESTROY":
330
            volumes = resource.volumes.filter(deleted=False)
331
            volumes_size = volumes.aggregate(Sum("size"))["size__sum"]
332
            resources["cyclades.disk"] = volumes_size << 30
333
            resources.update(online_resources)
326 334
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
327 335
                resources.update(online_resources)
328 336
            return reverse_quantities(resources)
......
331 339
            cpu = beparams.get("vcpus", flavor.cpu)
332 340
            ram = beparams.get("maxmem", flavor.ram)
333 341
            return {"cyclades.total_cpu": cpu - flavor.cpu,
334
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
342
                    "cyclades.total_ram": (ram - flavor.ram) << 20}
343
        elif action in ["ATTACH_VOLUME", "DETACH_VOLUME"]:
344
            if action_fields is not None:
345
                volumes_changes = action_fields.get("disks")
346
                if volumes_changes is not None:
347
                    size_delta = get_volumes_size_delta(volumes_changes)
348
                    if size_delta:
349
                        return {"cyclades.disk": size_delta << 30}
335 350
        else:
336 351
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
337 352
            return None
......
350 365
                return reverse_quantities(resources)
351 366
        else:
352 367
            return None
368
    elif isinstance(resource, Volume):
369
        size = resource.size
370
        resources = {"cyclades.disk": size << 30}
371
        if resource.status == "CREATING" and action == "BUILD":
372
            return resources
373
        elif action == "DESTROY":
374
            reverse_quantities(resources)
375
        else:
376
            return None
377

  
378

  
379
def get_volumes_size_delta(volumes_changes):
380
    """Compute the total change in the size of volumes"""
381
    size_delta = 0
382
    for vchange in volumes_changes:
383
        action, db_volume, info = vchange
384
        if action == "add":
385
            size_delta += int(db_volume.size)
386
        elif action == "remove":
387
            size_delta -= int(db_volume.size)
388
        elif action == "modify":
389
            size_delta += info.get("size_delta", 0)
390
        else:
391
            raise ValueError("Unknwon volume action '%s'" % action)
392
    return size_delta
353 393

  
354 394

  
355 395
def reverse_quantities(resources):
b/snf-cyclades-app/synnefo/quotas/util.py
33 33

  
34 34
from django.db.models import Sum, Count, Q
35 35

  
36
from synnefo.db.models import VirtualMachine, Network, IPAddress
36
from synnefo.db.models import VirtualMachine, Network, IPAddress, Volume
37 37
from synnefo.quotas import Quotaholder
38 38

  
39 39

  
......
44 44
    vms = VirtualMachine.objects.filter(deleted=False)
45 45
    networks = Network.objects.filter(deleted=False)
46 46
    floating_ips = IPAddress.objects.filter(deleted=False, floating_ip=True)
47
    volumes = Volume.objects.filter(deleted=False)
47 48

  
48 49
    if user is not None:
49 50
        vms = vms.filter(userid=user)
......
54 55
    vm_resources = vms.values("userid")\
55 56
                      .annotate(num=Count("id"),
56 57
                                total_ram=Sum("flavor__ram"),
57
                                total_cpu=Sum("flavor__cpu"),
58
                                disk=Sum("flavor__disk"))
58
                                total_cpu=Sum("flavor__cpu"))
59 59
    vm_active_resources = \
60 60
        vms.values("userid")\
61 61
           .filter(Q(operstate="STARTED") | Q(operstate="BUILD") |
......
67 67
        user = vm_res['userid']
68 68
        res = {"cyclades.vm": vm_res["num"],
69 69
               "cyclades.total_cpu": vm_res["total_cpu"],
70
               "cyclades.disk": 1073741824 * vm_res["disk"],
71
               "cyclades.total_ram": 1048576 * vm_res["total_ram"]}
70
               "cyclades.total_ram": vm_res["total_ram"] << 20}
72 71
        holdings[user] = res
73 72

  
74 73
    for vm_res in vm_active_resources.iterator():
75 74
        user = vm_res['userid']
76 75
        holdings[user]["cyclades.cpu"] = vm_res["cpu"]
77
        holdings[user]["cyclades.ram"] = 1048576 * vm_res["ram"]
76
        holdings[user]["cyclades.ram"] = vm_res["ram"] << 20
77

  
78
    # Get disk resource
79
    disk_resources = volumes.values("userid").annotate(Sum("size"))
80
    for disk_res in disk_resources.iterator():
81
        user = disk_res["userid"]
82
        holdings.setdefault(user, {})
83
        holdings[user]["cyclades.disk"] = disk_res["size__sum"] << 30
78 84

  
79 85
    # Get resources related with networks
80 86
    net_resources = networks.values("userid")\

Also available in: Unified diff