Revision c583d487

b/snf-cyclades-app/synnefo/logic/backend.py
58 58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59 59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60 60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
DISK_FIELDS = ["status", "size", "index"]
61 62
UNKNOWN_NIC_PREFIX = "unknown-"
63
UNKNOWN_DISK_PREFIX = "unknown-"
62 64

  
63 65

  
64 66
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
......
119 121

  
120 122
@transaction.commit_on_success
121 123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
124
                      disks=None, job_fields=None):
123 125
    """Process a job progress notification from the backend
124 126

  
125 127
    Process an incoming message from the backend (currently Ganeti).
......
165 167
        # in reversed order.
166 168
        vm.backendtime = etime
167 169

  
168
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
169
        # Update the NICs of the VM
170
        _process_net_status(vm, etime, nics)
170
    if status in rapi.JOB_STATUS_FINALIZED:
171
        if nics is not None:  # Update the NICs of the VM
172
            _process_net_status(vm, etime, nics)
173
        if disks is not None:  # Update the disks of the VM
174
            _process_disks_status(vm, etime, disks)
171 175

  
172 176
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
173 177
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
174 178
                                                     rapi.JOB_STATUS_ERROR):
175 179
        new_operstate = "ERROR"
176 180
        vm.backendtime = etime
177
        # Update state of associated NICs
181
        # Update state of associated attachments
178 182
        vm.nics.all().update(state="ERROR")
183
        vm.volumes.all().update(status="ERROR")
179 184
    elif opcode == 'OP_INSTANCE_REMOVE':
180 185
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
181 186
        # when no instance exists at the Ganeti backend.
182 187
        # See ticket #799 for all the details.
183 188
        if (status == rapi.JOB_STATUS_SUCCESS or
184 189
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
185
            # VM has been deleted
190
            # server has been deleted, so delete the server's attachments
191
            vm.volumes.all().update(deleted=True, machine=None)
186 192
            for nic in vm.nics.all():
187
                # Release the IP
193
                # but first release the IP
188 194
                remove_nic_ips(nic)
189
                # And delete the NIC.
190 195
                nic.delete()
191 196
            vm.deleted = True
192 197
            new_operstate = state_for_success
193 198
            vm.backendtime = etime
194 199
            status = rapi.JOB_STATUS_SUCCESS
195
            #status = "success"
196
            vm.volumes.all().update(deleted=True, machine=None)
197 200

  
198 201
    if status in rapi.JOB_STATUS_FINALIZED:
199 202
        # Job is finalized: Handle quotas/commissioning
......
427 430

  
428 431

  
429 432
@transaction.commit_on_success
433
def process_disks_status(vm, etime, disks):
434
    """Wrap _process_disks_status inside transaction."""
435
    _process_disks_status(vm, etime, disks)
436

  
437

  
438
def _process_disks_status(vm, etime, disks):
439
    """Process a disks status notification from the backend
440

  
441
    Process an incoming message from the Ganeti backend,
442
    detailing the disk configuration of a VM instance.
443

  
444
    Update the state of the VM in the DB accordingly.
445

  
446
    """
447
    ganeti_disks = process_ganeti_disks(disks)
448
    db_disks = dict([(disk.id, disk)
449
                     for disk in vm.volumes.filter(deleted=False)])
450

  
451
    for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
452
        db_disk = db_disks.get(disk_name)
453
        ganeti_disk = ganeti_disks.get(disk_name)
454
        if ganeti_disk is None:
455
            if disk_is_stale(vm, disk):
456
                log.debug("Removing stale disk '%s'" % db_disk)
457
                # TODO: Handle disk deletion
458
                db_disk.deleted = True
459
                db_disk.save()
460
            else:
461
                log.info("disk '%s' is still being created" % db_disk)
462
        elif db_disk is None:
463
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
464
                   " automatically fix this issue!" % (disk_name, vm))
465
            log.error(msg)
466
            continue
467
        elif not disks_are_equal(db_disk, ganeti_disk):
468
            for f in DISK_FIELDS:
469
                # Update the disk in DB with the values from Ganeti disk
470
                setattr(db_disk, f, ganeti_disk[f])
471
                db_disk.save()
472

  
473
            # TODO: Special case where the size of the disk has changed!!
474
            assert(ganeti_disk["size"] == db_disk.size)
475

  
476
    vm.backendtime = etime
477
    vm.save()
478

  
479

  
480
def disks_are_equal(db_disk, gnt_disk):
481
    for field in DISK_FIELDS:
482
        if getattr(db_disk, field) != gnt_disk[field]:
483
            return False
484
    return True
485

  
486

  
487
def process_ganeti_disks(ganeti_disks):
488
    """Process disk dict from ganeti"""
489
    new_disks = []
490
    for index, gdisk in enumerate(ganeti_disks):
491
        disk_name = gdisk.get("name", None)
492
        if disk_name is not None:
493
            disk_id = utils.id_from_disk_name(disk_name)
494
        else:
495
            # Put as default value the index. If it is an unknown disk to
496
            # synnefo it will be created automaticaly.
497
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
498

  
499
        # Get disk size in GB
500
        size = gdisk.get("size") >> 10
501

  
502
        disk_info = {
503
            'index': index,
504
            'size': size,
505
            'status': "IN_USE"}
506

  
507
        new_disks.append((disk_id, disk_info))
508
    return dict(new_disks)
509

  
510

  
511
@transaction.commit_on_success
430 512
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
431 513
    if status not in [x[0] for x in BACKEND_STATUSES]:
432 514
        raise Network.InvalidBackendMsgError(opcode, status)
......
632 714
    kw['disk_template'] = flavor.disk_template
633 715
    disks = []
634 716
    for volume in volumes:
635
        disk = {}
636
        disk["size"] = volume.size * 1024
717
        disk = {"name": volume.backend_volume_uuid,
718
                "size": volume.size * 1024}
637 719
        provider = flavor.disk_provider
638 720
        if provider is not None:
639 721
            disk["provider"] = provider
......
807 889
            return False
808 890

  
809 891

  
892
def disk_is_stale(vm, disk, timeout=60):
893
    """Check if a disk is stale or exists in the Ganeti backend."""
894
    # First check the state of the disk
895
    if disk.status == "CREATING":
896
        if datetime.now() < disk.created + timedelta(seconds=timeout):
897
            # Do not check for too recent disks to avoid the time overhead
898
            return False
899
        if job_is_still_running(vm, job_id=disk.backendjobid):
900
            return False
901
        else:
902
            # If job has finished, check that the disk exists, because the
903
            # message may have been lost or stuck in the queue.
904
            vm_info = get_instance_info(vm)
905
            if disk.backend_volume_uuid in vm_info["disk.names"]:
906
                return False
907
    return True
908

  
909

  
810 910
def nic_is_stale(vm, nic, timeout=60):
811 911
    """Check if a NIC is stale or exists in the Ganeti backend."""
812 912
    # First check the state of the NIC and if there is a pending CONNECT
b/snf-cyclades-app/synnefo/logic/callbacks.py
183 183
    jobID = msg["jobId"]
184 184
    logmsg = msg["logmsg"]
185 185
    nics = msg.get("instance_nics", None)
186
    disks = msg.get("instance_disks", None)
186 187
    job_fields = msg.get("job_fields", {})
187 188
    result = msg.get("result", [])
188 189

  
......
223 224
    backend_mod.process_op_status(vm, event_time, jobID,
224 225
                                  operation, status,
225 226
                                  logmsg, nics=nics,
227
                                  disks=disks,
226 228
                                  job_fields=job_fields)
227 229

  
228 230
    log.debug("Done processing ganeti-op-status msg for vm %s.",
b/snf-cyclades-app/synnefo/logic/utils.py
88 88
    return int(ns)
89 89

  
90 90

  
91
def id_from_disk_name(name):
92
    """Returns Disk Django id, given a Ganeti's Disk name.
93

  
94
    """
95
    if not str(name).startswith(settings.BACKEND_PREFIX_ID):
96
        raise ValueError("Invalid Disk name: %s" % name)
97
    ns = str(name).replace(settings.BACKEND_PREFIX_ID + 'volume-', "", 1)
98
    if not ns.isdigit():
99
        raise ValueError("Invalid Disk name: %s" % name)
100

  
101
    return int(ns)
102

  
103

  
91 104
def get_rsapi_state(vm):
92 105
    """Returns the API state for a virtual machine
93 106

  

Also available in: Unified diff