Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ d05e5324

History | View | Annotate | Download (50.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from django.utils import simplejson as json
36
from datetime import datetime, timedelta
37

    
38
from synnefo.db.models import (VirtualMachine, Network,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor, IPAddress, IPAddressLog)
42
from synnefo.logic import utils, ips
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic import rapi
47
from synnefo import volume
48

    
49
from logging import getLogger
50
log = getLogger(__name__)
51

    
52

    
53
_firewall_tags = {
54
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
55
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
56
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
57

    
58
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
59

    
60
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
61
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
62
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
63
DISK_FIELDS = ["status", "size", "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-nic-"
65
UNKNOWN_DISK_PREFIX = "unknown-disk-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_resource_serial(vm)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_resource_serial(vm)
101
    elif job_status == rapi.JOB_STATUS_SUCCESS:
102
        commission_info = quotas.get_commission_info(resource=vm,
103
                                                     action=action,
104
                                                     action_fields=job_fields)
105
        if commission_info is not None:
106
            # Commission for this change has not been issued, or the issued
107
            # commission was unaware of the current change. Reject all previous
108
            # commissions and create a new one in forced mode!
109
            log.debug("Expected job was %s. Processing job %s. "
110
                      "Attached serial %s",
111
                      vm.task_job_id, job_id, vm.serial)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            try:
115
                serial = quotas.handle_resource_commission(
116
                    vm, action,
117
                    action_fields=job_fields,
118
                    commission_name=reason,
119
                    force=True,
120
                    auto_accept=True)
121
            except:
122
                log.exception("Error while handling new commission")
123
                raise
124
            log.debug("Issued new commission: %s", serial)
125
    return vm
126

    
127

    
128
@transaction.commit_on_success
129
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
130
                      disks=None, job_fields=None):
131
    """Process a job progress notification from the backend
132

133
    Process an incoming message from the backend (currently Ganeti).
134
    Job notifications with a terminating status (sucess, error, or canceled),
135
    also update the operating state of the VM.
136

137
    """
138
    # See #1492, #1031, #1111 why this line has been removed
139
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
140
    if status not in [x[0] for x in BACKEND_STATUSES]:
141
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
142

    
143
    if opcode == "OP_INSTANCE_SNAPSHOT":
144
        for disk_id, disk_info in job_fields.get("disks", []):
145
            snap_info = json.loads(disk_info["snapshot_info"])
146
            snap_id = snap_info["snapshot_id"]
147
            update_snapshot(snap_id, user_id=vm.userid, job_id=jobid,
148
                            job_status=status, etime=etime)
149
        return
150

    
151
    vm.backendjobid = jobid
152
    vm.backendjobstatus = status
153
    vm.backendopcode = opcode
154
    vm.backendlogmsg = logmsg
155

    
156
    if status not in rapi.JOB_STATUS_FINALIZED:
157
        vm.save()
158
        return
159

    
160
    if job_fields is None:
161
        job_fields = {}
162

    
163
    new_operstate = None
164
    new_flavor = None
165
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
166

    
167
    if status == rapi.JOB_STATUS_SUCCESS:
168
        if state_for_success is not None:
169
            new_operstate = state_for_success
170

    
171
        beparams = job_fields.get("beparams")
172
        if beparams:
173
            cpu = beparams.get("vcpus")
174
            ram = beparams.get("maxmem")
175
            new_flavor = find_new_flavor(vm, cpu=cpu, ram=ram)
176

    
177
        # XXX: Update backendtime only for jobs that have been successfully
178
        # completed, since only these jobs update the state of the VM. Else a
179
        # "race condition" may occur when a successful job (e.g.
180
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
181
        # in reversed order.
182
        vm.backendtime = etime
183

    
184
    if status in rapi.JOB_STATUS_FINALIZED:
185
        if nics is not None:
186
            update_vm_nics(vm, nics, etime)
187
        if disks is not None:
188
            # XXX: Replace the job fields with mocked changes as produced by
189
            # the diff between the DB and Ganeti disks. This is required in
190
            # order to update quotas for disks that changed, but not from this
191
            # job!
192
            disk_changes = update_vm_disks(vm, disks, etime)
193
            job_fields["disks"] = disk_changes
194

    
195
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
196
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
197
                                                     rapi.JOB_STATUS_ERROR):
198
        new_operstate = "ERROR"
199
        vm.backendtime = etime
200
        # Update state of associated attachments
201
        vm.nics.all().update(state="ERROR")
202
        vm.volumes.all().update(status="ERROR")
203
    elif opcode == 'OP_INSTANCE_REMOVE':
204
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
205
        # when no instance exists at the Ganeti backend.
206
        # See ticket #799 for all the details.
207
        if (status == rapi.JOB_STATUS_SUCCESS or
208
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
209
            # server has been deleted, so delete the server's attachments
210
            vm.volumes.all().update(deleted=True, status="DELETED",
211
                                    machine=None)
212
            for nic in vm.nics.all():
213
                # but first release the IP
214
                remove_nic_ips(nic)
215
                nic.delete()
216
            vm.deleted = True
217
            new_operstate = state_for_success
218
            vm.backendtime = etime
219
            status = rapi.JOB_STATUS_SUCCESS
220

    
221
    if status in rapi.JOB_STATUS_FINALIZED:
222
        # Job is finalized: Handle quotas/commissioning
223
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
224
                              job_status=status, job_fields=job_fields)
225
        # and clear task fields
226
        if vm.task_job_id == jobid:
227
            vm.task = None
228
            vm.task_job_id = None
229

    
230
    # Update VM's state and flavor after handling of quotas, since computation
231
    # of quotas depends on these attributes
232
    if new_operstate is not None:
233
        vm.operstate = new_operstate
234
    if new_flavor is not None:
235
        vm.flavor = new_flavor
236

    
237
    vm.save()
238

    
239

    
240
def find_new_flavor(vm, cpu=None, ram=None):
241
    """Find VM's new flavor based on the new CPU and RAM"""
242
    if cpu is None and ram is None:
243
        return None
244

    
245
    old_flavor = vm.flavor
246
    ram = ram if ram is not None else old_flavor.ram
247
    cpu = cpu if cpu is not None else old_flavor.cpu
248
    if cpu == old_flavor.cpu and ram == old_flavor.ram:
249
        return None
250

    
251
    try:
252
        new_flavor = Flavor.objects.get(cpu=cpu, ram=ram,
253
                                        disk=old_flavor.disk,
254
                                        disk_template=old_flavor.disk_template)
255
    except Flavor.DoesNotExist:
256
        raise Exception("There is no flavor to match the instance specs!"
257
                        " Instance: %s CPU: %s RAM %s: Disk: %s Template: %s"
258
                        % (vm.backend_vm_id, cpu, ram, old_flavor.disk,
259
                           old_flavor.disk_template))
260
    log.info("Flavor of VM '%s' changed from '%s' to '%s'", vm,
261
             old_flavor.name, new_flavor.name)
262
    return new_flavor
263

    
264

    
265
def nics_are_equal(db_nic, gnt_nic):
266
    """Check if DB and Ganeti NICs are equal."""
267
    for field in NIC_FIELDS:
268
        if getattr(db_nic, field) != gnt_nic[field]:
269
            return False
270
    return True
271

    
272

    
273
def parse_instance_nics(gnt_nics):
274
    """Parse NICs of a Ganeti instance"""
275
    nics = []
276
    for index, gnic in enumerate(gnt_nics):
277
        nic_name = gnic.get("name", None)
278
        if nic_name is not None:
279
            nic_id = utils.id_from_nic_name(nic_name)
280
        else:
281
            # Unknown NIC
282
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
283

    
284
        network_name = gnic.get('network', '')
285
        network_id = utils.id_from_network_name(network_name)
286
        network = Network.objects.get(id=network_id)
287
        subnet6 = network.subnet6
288

    
289
        # Get the new nic info
290
        mac = gnic.get('mac')
291
        ipv4 = gnic.get('ip')
292
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
293

    
294
        firewall = gnic.get('firewall')
295
        firewall_profile = _reverse_tags.get(firewall)
296
        if not firewall_profile and network.public:
297
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
298

    
299
        nic_info = {
300
            'index': index,
301
            'network': network,
302
            'mac': mac,
303
            'ipv4_address': ipv4,
304
            'ipv6_address': ipv6,
305
            'firewall_profile': firewall_profile,
306
            'state': 'ACTIVE'}
307

    
308
        nics.append((nic_id, nic_info))
309
    return dict(nics)
310

    
311

    
312
def update_vm_nics(vm, nics, etime=None):
313
    """Update VM's NICs to match with the NICs of the Ganeti instance
314

315
    This function will update the VM's NICs(update, delete or create) and
316
    return a list of quotable changes.
317

318
    @param vm: The VirtualMachine the NICs belong to
319
    @type vm: VirtualMachine object
320
    @param nics: The NICs of the Ganeti instance
321
    @type nics: List of dictionaries with NIC information
322
    @param etime: The datetime the Ganeti instance had these NICs
323
    @type etime: datetime
324

325
    @return: List of quotable changes (add/remove NIC) (currently empty list)
326
    @rtype: List of dictionaries
327

328
    """
329
    ganeti_nics = parse_instance_nics(nics)
330
    db_nics = dict([(nic.id, nic) for nic in vm.nics.select_related("network")
331
                                                    .prefetch_related("ips")])
332

    
333
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
334
        db_nic = db_nics.get(nic_name)
335
        ganeti_nic = ganeti_nics.get(nic_name)
336
        if ganeti_nic is None:
337
            if nic_is_stale(vm, nic):
338
                log.debug("Removing stale NIC '%s'" % db_nic)
339
                remove_nic_ips(db_nic)
340
                db_nic.delete()
341
            else:
342
                log.info("NIC '%s' is still being created" % db_nic)
343
        elif db_nic is None:
344
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
345
                   " fix this issue!" % (nic_name, vm))
346
            log.error(msg)
347
            continue
348
        elif not nics_are_equal(db_nic, ganeti_nic):
349
            for f in SIMPLE_NIC_FIELDS:
350
                # Update the NIC in DB with the values from Ganeti NIC
351
                setattr(db_nic, f, ganeti_nic[f])
352
                db_nic.save()
353

    
354
            # Special case where the IPv4 address has changed, because you
355
            # need to release the old IPv4 address and reserve the new one
356
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
357
            db_ipv4_address = db_nic.ipv4_address
358
            if db_ipv4_address != gnt_ipv4_address:
359
                change_address_of_port(db_nic, vm.userid,
360
                                       old_address=db_ipv4_address,
361
                                       new_address=gnt_ipv4_address,
362
                                       version=4)
363

    
364
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
365
            db_ipv6_address = db_nic.ipv6_address
366
            if db_ipv6_address != gnt_ipv6_address:
367
                change_address_of_port(db_nic, vm.userid,
368
                                       old_address=db_ipv6_address,
369
                                       new_address=gnt_ipv6_address,
370
                                       version=6)
371

    
372
    return []
373

    
374

    
375
def remove_nic_ips(nic, version=None):
376
    """Remove IP addresses associated with a NetworkInterface.
377

378
    Remove all IP addresses that are associated with the NetworkInterface
379
    object, by returning them to the pool and deleting the IPAddress object. If
380
    the IP is a floating IP, then it is just disassociated from the NIC.
381
    If version is specified, then only IP addressses of that version will be
382
    removed.
383

384
    """
385
    for ip in nic.ips.all():
386
        if version and ip.ipversion != version:
387
            continue
388

    
389
        # Update the DB table holding the logging of all IP addresses
390
        terminate_active_ipaddress_log(nic, ip)
391

    
392
        if ip.floating_ip:
393
            ip.nic = None
394
            ip.save()
395
        else:
396
            # Release the IPv4 address
397
            ip.release_address()
398
            ip.delete()
399

    
400

    
401
def terminate_active_ipaddress_log(nic, ip):
402
    """Update DB logging entry for this IP address."""
403
    if not ip.network.public or nic.machine is None:
404
        return
405
    try:
406
        ip_log, created = \
407
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
408
                                               network_id=ip.network_id,
409
                                               address=ip.address,
410
                                               active=True)
411
    except IPAddressLog.MultipleObjectsReturned:
412
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
413
                  "Server %s. Cannot proceed!"
414
                  % (ip.address, ip.network, nic.machine))
415
        log.error(logmsg)
416
        raise
417

    
418
    if created:
419
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
420
                  " but with wrong creation timestamp."
421
                  % (ip.address, ip.network, nic.machine))
422
        log.error(logmsg)
423
    ip_log.released_at = datetime.now()
424
    ip_log.active = False
425
    ip_log.save()
426

    
427

    
428
def change_address_of_port(port, userid, old_address, new_address, version):
429
    """Change."""
430
    if old_address is not None:
431
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
432
               % (version, port.machine_id, old_address, new_address))
433
        log.error(msg)
434

    
435
    # Remove the old IP address
436
    remove_nic_ips(port, version=version)
437

    
438
    if version == 4:
439
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
440
        ipaddress.nic = port
441
        ipaddress.save()
442
    elif version == 6:
443
        subnet6 = port.network.subnet6
444
        ipaddress = IPAddress.objects.create(userid=userid,
445
                                             network=port.network,
446
                                             subnet=subnet6,
447
                                             nic=port,
448
                                             address=new_address,
449
                                             ipversion=6)
450
    else:
451
        raise ValueError("Unknown version: %s" % version)
452

    
453
    # New address log
454
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
455
                                         network_id=port.network_id,
456
                                         address=new_address,
457
                                         active=True)
458
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
459
             ip_log.id, new_address, port.machine_id)
460

    
461
    return ipaddress
462

    
463

    
464
def update_vm_disks(vm, disks, etime=None):
465
    """Update VM's disks to match with the disks of the Ganeti instance
466

467
    This function will update the VM's disks(update, delete or create) and
468
    return a list of quotable changes.
469

470
    @param vm: The VirtualMachine the disks belong to
471
    @type vm: VirtualMachine object
472
    @param disks: The disks of the Ganeti instance
473
    @type disks: List of dictionaries with disk information
474
    @param etime: The datetime the Ganeti instance had these disks
475
    @type etime: datetime
476

477
    @return: List of quotable changes (add/remove disk)
478
    @rtype: List of dictionaries
479

480
    """
481
    gnt_disks = parse_instance_disks(disks)
482
    db_disks = dict([(disk.id, disk)
483
                     for disk in vm.volumes.filter(deleted=False)])
484

    
485
    changes = []
486
    for disk_name in set(db_disks.keys()) | set(gnt_disks.keys()):
487
        db_disk = db_disks.get(disk_name)
488
        gnt_disk = gnt_disks.get(disk_name)
489
        if gnt_disk is None:
490
            # Disk exists in DB but not in Ganeti
491
            if disk_is_stale(vm, disk):
492
                log.debug("Removing stale disk '%s'" % db_disk)
493
                db_disk.status = "DELETED"
494
                db_disk.deleted = True
495
                db_disk.save()
496
                changes.append(("remove", db_disk, {}))
497
            else:
498
                log.info("disk '%s' is still being created" % db_disk)
499
        elif db_disk is None:
500
            # Disk exists in Ganeti but not in DB
501
            # TODO: Automatically import disk!
502
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
503
                   " automatically fix this issue!" % (disk_name, vm))
504
            log.error(msg)
505
            continue
506
        elif not disks_are_equal(db_disk, gnt_disk):
507
            # Disk has changed
508
            if gnt_disk["size"] != db_disk.size:
509
                # Size of the disk has changed! TODO: Fix flavor!
510
                size_delta = gnt_disk["size"] - db_disk.size
511
                changes.append(("modify", db_disk, {"size_delta": size_delta}))
512
            if db_disk.status == "CREATING":
513
                # Disk has been created
514
                changes.append(("add", db_disk, {}))
515
            # Update the disk in DB with the values from Ganeti disk
516
            [setattr(db_disk, f, gnt_disk[f]) for f in DISK_FIELDS]
517
            db_disk.save()
518

    
519
    return changes
520

    
521

    
522
def disks_are_equal(db_disk, gnt_disk):
523
    """Check if DB and Ganeti disks are equal"""
524
    for field in DISK_FIELDS:
525
        if getattr(db_disk, field) != gnt_disk[field]:
526
            return False
527
    return True
528

    
529

    
530
def parse_instance_disks(gnt_disks):
531
    """Parse disks of a Ganeti instance"""
532
    disks = []
533
    for index, gnt_disk in enumerate(gnt_disks):
534
        disk_name = gnt_disk.get("name", None)
535
        if disk_name is not None:
536
            disk_id = utils.id_from_disk_name(disk_name)
537
        else:  # Unknown disk
538
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
539

    
540
        disk_info = {
541
            'index': index,
542
            'size': gnt_disk["size"] >> 10,  # Size in GB
543
            'status': "IN_USE"}
544

    
545
        disks.append((disk_id, disk_info))
546
    return dict(disks)
547

    
548

    
549
def update_snapshot(snap_id, user_id, job_id, job_status, etime):
550
    """Update a snapshot based on result of a Ganeti job."""
551
    if job_status in rapi.JOB_STATUS_FINALIZED:
552
        status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
553
        log.debug("Updating status of snapshot '%s' to '%s'", snap_id, status)
554
        volume.util.update_snapshot_status(snap_id, user_id, status=status)
555

    
556

    
557
@transaction.commit_on_success
558
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
559
    if status not in [x[0] for x in BACKEND_STATUSES]:
560
        raise Network.InvalidBackendMsgError(opcode, status)
561

    
562
    back_network.backendjobid = jobid
563
    back_network.backendjobstatus = status
564
    back_network.backendopcode = opcode
565
    back_network.backendlogmsg = logmsg
566

    
567
    # Note: Network is already locked!
568
    network = back_network.network
569

    
570
    # Notifications of success change the operating state
571
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
572
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
573
        back_network.operstate = state_for_success
574

    
575
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
576
       and opcode == 'OP_NETWORK_ADD'):
577
        back_network.operstate = 'ERROR'
578
        back_network.backendtime = etime
579

    
580
    if opcode == 'OP_NETWORK_REMOVE':
581
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
582
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
583
                                  network_exists_in_backend(back_network)):
584
            back_network.operstate = state_for_success
585
            back_network.deleted = True
586
            back_network.backendtime = etime
587

    
588
    if status == rapi.JOB_STATUS_SUCCESS:
589
        back_network.backendtime = etime
590
    back_network.save()
591
    # Also you must update the state of the Network!!
592
    update_network_state(network)
593

    
594

    
595
def update_network_state(network):
596
    """Update the state of a Network based on BackendNetwork states.
597

598
    Update the state of a Network based on the operstate of the networks in the
599
    backends that network exists.
600

601
    The state of the network is:
602
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
603
    * DELETED: If it is is 'DELETED' in all backends that have been created.
604

605
    This function also releases the resources (MAC prefix or Bridge) and the
606
    quotas for the network.
607

608
    """
609
    if network.deleted:
610
        # Network has already been deleted. Just assert that state is also
611
        # DELETED
612
        if not network.state == "DELETED":
613
            network.state = "DELETED"
614
            network.save()
615
        return
616

    
617
    backend_states = [s.operstate for s in network.backend_networks.all()]
618
    if not backend_states and network.action != "DESTROY":
619
        if network.state != "ACTIVE":
620
            network.state = "ACTIVE"
621
            network.save()
622
            return
623

    
624
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
625
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
626
                     "DELETED")
627

    
628
    # Release the resources on the deletion of the Network
629
    if deleted:
630
        if network.ips.filter(deleted=False, floating_ip=True).exists():
631
            msg = "Cannot delete network %s! Floating IPs still in use!"
632
            log.error(msg % network)
633
            raise Exception(msg % network)
634
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
635
                 network.id, network.mac_prefix, network.link)
636
        network.deleted = True
637
        network.state = "DELETED"
638
        # Undrain the network, otherwise the network state will remain
639
        # as 'SNF:DRAINED'
640
        network.drained = False
641
        if network.mac_prefix:
642
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
643
                release_resource(res_type="mac_prefix",
644
                                 value=network.mac_prefix)
645
        if network.link:
646
            if network.FLAVORS[network.flavor]["link"] == "pool":
647
                release_resource(res_type="bridge", value=network.link)
648

    
649
        # Set all subnets as deleted
650
        network.subnets.update(deleted=True)
651
        # And delete the IP pools
652
        for subnet in network.subnets.all():
653
            if subnet.ipversion == 4:
654
                subnet.ip_pools.all().delete()
655
        # And all the backend networks since there are useless
656
        network.backend_networks.all().delete()
657

    
658
        # Issue commission
659
        if network.userid:
660
            quotas.issue_and_accept_commission(network, action="DESTROY")
661
            # the above has already saved the object and committed;
662
            # a second save would override others' changes, since the
663
            # object is now unlocked
664
            return
665
        elif not network.public:
666
            log.warning("Network %s does not have an owner!", network.id)
667
    network.save()
668

    
669

    
670
@transaction.commit_on_success
671
def process_network_modify(back_network, etime, jobid, opcode, status,
672
                           job_fields):
673
    assert (opcode == "OP_NETWORK_SET_PARAMS")
674
    if status not in [x[0] for x in BACKEND_STATUSES]:
675
        raise Network.InvalidBackendMsgError(opcode, status)
676

    
677
    back_network.backendjobid = jobid
678
    back_network.backendjobstatus = status
679
    back_network.opcode = opcode
680

    
681
    add_reserved_ips = job_fields.get("add_reserved_ips")
682
    if add_reserved_ips:
683
        network = back_network.network
684
        for ip in add_reserved_ips:
685
            network.reserve_address(ip, external=True)
686

    
687
    if status == rapi.JOB_STATUS_SUCCESS:
688
        back_network.backendtime = etime
689
    back_network.save()
690

    
691

    
692
@transaction.commit_on_success
693
def process_create_progress(vm, etime, progress):
694

    
695
    percentage = int(progress)
696

    
697
    # The percentage may exceed 100%, due to the way
698
    # snf-image:copy-progress tracks bytes read by image handling processes
699
    percentage = 100 if percentage > 100 else percentage
700
    if percentage < 0:
701
        raise ValueError("Percentage cannot be negative")
702

    
703
    # FIXME: log a warning here, see #1033
704
#   if last_update > percentage:
705
#       raise ValueError("Build percentage should increase monotonically " \
706
#                        "(old = %d, new = %d)" % (last_update, percentage))
707

    
708
    # This assumes that no message of type 'ganeti-create-progress' is going to
709
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
710
    # the instance is STARTED.  What if the two messages are processed by two
711
    # separate dispatcher threads, and the 'ganeti-op-status' message for
712
    # successful creation gets processed before the 'ganeti-create-progress'
713
    # message? [vkoukis]
714
    #
715
    #if not vm.operstate == 'BUILD':
716
    #    raise VirtualMachine.IllegalState("VM is not in building state")
717

    
718
    vm.buildpercentage = percentage
719
    vm.backendtime = etime
720
    vm.save()
721

    
722

    
723
@transaction.commit_on_success
724
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
725
                               details=None):
726
    """
727
    Create virtual machine instance diagnostic entry.
728

729
    :param vm: VirtualMachine instance to create diagnostic for.
730
    :param message: Diagnostic message.
731
    :param source: Diagnostic source identifier (e.g. image-helper).
732
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
733
    :param etime: The time the message occured (if available).
734
    :param details: Additional details or debug information.
735
    """
736
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
737
                                                   source_date=etime,
738
                                                   message=message,
739
                                                   details=details)
740

    
741

    
742
def create_instance(vm, nics, volumes, flavor, image):
743
    """`image` is a dictionary which should contain the keys:
744
            'backend_id', 'format' and 'metadata'
745

746
        metadata value should be a dictionary.
747
    """
748

    
749
    # Handle arguments to CreateInstance() as a dictionary,
750
    # initialize it based on a deployment-specific value.
751
    # This enables the administrator to override deployment-specific
752
    # arguments, such as the disk template to use, name of os provider
753
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
754
    #
755
    kw = vm.backend.get_create_params()
756
    kw['mode'] = 'create'
757
    kw['name'] = vm.backend_vm_id
758
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
759

    
760
    kw['disk_template'] = volumes[0].template
761
    disks = []
762
    for volume in volumes:
763
        disk = {"name": volume.backend_volume_uuid,
764
                "size": volume.size * 1024}
765
        provider = volume.provider
766
        if provider is not None:
767
            disk["provider"] = provider
768
            disk["origin"] = volume.origin
769
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
770
                                        .get(provider)
771
            if extra_disk_params is not None:
772
                disk.update(extra_disk_params)
773
        disks.append(disk)
774

    
775
    kw["disks"] = disks
776

    
777
    kw['nics'] = [{"name": nic.backend_uuid,
778
                   "network": nic.network.backend_id,
779
                   "ip": nic.ipv4_address}
780
                  for nic in nics]
781

    
782
    backend = vm.backend
783
    depend_jobs = []
784
    for nic in nics:
785
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
786
        depend_jobs.extend(job_ids)
787

    
788
    kw["depends"] = create_job_dependencies(depend_jobs)
789

    
790
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
791
    # kw['os'] = settings.GANETI_OS_PROVIDER
792
    kw['ip_check'] = False
793
    kw['name_check'] = False
794

    
795
    # Do not specific a node explicitly, have
796
    # Ganeti use an iallocator instead
797
    #kw['pnode'] = rapi.GetNodes()[0]
798

    
799
    kw['dry_run'] = settings.TEST
800

    
801
    kw['beparams'] = {
802
        'auto_balance': True,
803
        'vcpus': flavor.cpu,
804
        'memory': flavor.ram}
805

    
806
    kw['osparams'] = {
807
        'config_url': vm.config_url,
808
        # Store image id and format to Ganeti
809
        'img_id': image['backend_id'],
810
        'img_format': image['format']}
811

    
812
    # Use opportunistic locking
813
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
814

    
815
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
816
    # kw['hvparams'] = dict(serial_console=False)
817

    
818
    log.debug("Creating instance %s", utils.hide_pass(kw))
819
    with pooled_rapi_client(vm) as client:
820
        return client.CreateInstance(**kw)
821

    
822

    
823
def delete_instance(vm, shutdown_timeout=None):
824
    with pooled_rapi_client(vm) as client:
825
        return client.DeleteInstance(vm.backend_vm_id,
826
                                     shutdown_timeout=shutdown_timeout,
827
                                     dry_run=settings.TEST)
828

    
829

    
830
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
831
    assert reboot_type in ('soft', 'hard')
832
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
833
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
834
    # 'shutdown_timeout'.
835
    kwargs = {"instance": vm.backend_vm_id,
836
              "reboot_type": "hard"}
837
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
838
    # Ganeti > 2.10. In other versions this parameter will be ignored and
839
    # we will fallback to default timeout of Ganeti (120s).
840
    if shutdown_timeout is not None:
841
        kwargs["shutdown_timeout"] = shutdown_timeout
842
    if reboot_type == "hard":
843
        kwargs["shutdown_timeout"] = 0
844
    if settings.TEST:
845
        kwargs["dry_run"] = True
846
    with pooled_rapi_client(vm) as client:
847
        return client.RebootInstance(**kwargs)
848

    
849

    
850
def startup_instance(vm):
851
    with pooled_rapi_client(vm) as client:
852
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
853

    
854

    
855
def shutdown_instance(vm, shutdown_timeout=None):
856
    with pooled_rapi_client(vm) as client:
857
        return client.ShutdownInstance(vm.backend_vm_id,
858
                                       timeout=shutdown_timeout,
859
                                       dry_run=settings.TEST)
860

    
861

    
862
def resize_instance(vm, vcpus, memory):
863
    beparams = {"vcpus": int(vcpus),
864
                "minmem": int(memory),
865
                "maxmem": int(memory)}
866
    with pooled_rapi_client(vm) as client:
867
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
868

    
869

    
870
def get_instance_console(vm):
871
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
872
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
873
    # useless (see #783).
874
    #
875
    # Until this is fixed on the Ganeti side, construct a console info reply
876
    # directly.
877
    #
878
    # WARNING: This assumes that VNC runs on port network_port on
879
    #          the instance's primary node, and is probably
880
    #          hypervisor-specific.
881
    #
882
    log.debug("Getting console for vm %s", vm)
883

    
884
    console = {}
885
    console['kind'] = 'vnc'
886

    
887
    with pooled_rapi_client(vm) as client:
888
        i = client.GetInstance(vm.backend_vm_id)
889

    
890
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
891
        raise Exception("hv parameter serial_console cannot be true")
892
    console['host'] = i['pnode']
893
    console['port'] = i['network_port']
894

    
895
    return console
896

    
897

    
898
def get_instance_info(vm):
899
    with pooled_rapi_client(vm) as client:
900
        return client.GetInstance(vm.backend_vm_id)
901

    
902

    
903
def vm_exists_in_backend(vm):
904
    try:
905
        get_instance_info(vm)
906
        return True
907
    except rapi.GanetiApiError as e:
908
        if e.code == 404:
909
            return False
910
        raise e
911

    
912

    
913
def get_network_info(backend_network):
914
    with pooled_rapi_client(backend_network) as client:
915
        return client.GetNetwork(backend_network.network.backend_id)
916

    
917

    
918
def network_exists_in_backend(backend_network):
919
    try:
920
        get_network_info(backend_network)
921
        return True
922
    except rapi.GanetiApiError as e:
923
        if e.code == 404:
924
            return False
925

    
926

    
927
def job_is_still_running(vm, job_id=None):
928
    with pooled_rapi_client(vm) as c:
929
        try:
930
            if job_id is None:
931
                job_id = vm.backendjobid
932
            job_info = c.GetJobStatus(job_id)
933
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
934
        except rapi.GanetiApiError:
935
            return False
936

    
937

    
938
def disk_is_stale(vm, disk, timeout=60):
939
    """Check if a disk is stale or exists in the Ganeti backend."""
940
    # First check the state of the disk
941
    if disk.status == "CREATING":
942
        if datetime.now() < disk.created + timedelta(seconds=timeout):
943
            # Do not check for too recent disks to avoid the time overhead
944
            return False
945
        if job_is_still_running(vm, job_id=disk.backendjobid):
946
            return False
947
        else:
948
            # If job has finished, check that the disk exists, because the
949
            # message may have been lost or stuck in the queue.
950
            vm_info = get_instance_info(vm)
951
            if disk.backend_volume_uuid in vm_info["disk.names"]:
952
                return False
953
    return True
954

    
955

    
956
def nic_is_stale(vm, nic, timeout=60):
957
    """Check if a NIC is stale or exists in the Ganeti backend."""
958
    # First check the state of the NIC and if there is a pending CONNECT
959
    if nic.state == "BUILD" and vm.task == "CONNECT":
960
        if datetime.now() < nic.created + timedelta(seconds=timeout):
961
            # Do not check for too recent NICs to avoid the time overhead
962
            return False
963
        if job_is_still_running(vm, job_id=vm.task_job_id):
964
            return False
965
        else:
966
            # If job has finished, check that the NIC exists, because the
967
            # message may have been lost or stuck in the queue.
968
            vm_info = get_instance_info(vm)
969
            if nic.backend_uuid in vm_info["nic.names"]:
970
                return False
971
    return True
972

    
973

    
974
def ensure_network_is_active(backend, network_id):
975
    """Ensure that a network is active in the specified backend
976

977
    Check that a network exists and is active in the specified backend. If not
978
    (re-)create the network. Return the corresponding BackendNetwork object
979
    and the IDs of the Ganeti job to create the network.
980

981
    """
982
    job_ids = []
983
    try:
984
        bnet = BackendNetwork.objects.select_related("network")\
985
                                     .get(backend=backend, network=network_id)
986
        if bnet.operstate != "ACTIVE":
987
            job_ids = create_network(bnet.network, backend, connect=True)
988
    except BackendNetwork.DoesNotExist:
989
        network = Network.objects.select_for_update().get(id=network_id)
990
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
991
        job_ids = create_network(network, backend, connect=True)
992

    
993
    return bnet, job_ids
994

    
995

    
996
def create_network(network, backend, connect=True):
997
    """Create a network in a Ganeti backend"""
998
    log.debug("Creating network %s in backend %s", network, backend)
999

    
1000
    job_id = _create_network(network, backend)
1001

    
1002
    if connect:
1003
        job_ids = connect_network(network, backend, depends=[job_id])
1004
        return job_ids
1005
    else:
1006
        return [job_id]
1007

    
1008

    
1009
def _create_network(network, backend):
1010
    """Create a network."""
1011

    
1012
    tags = network.backend_tag
1013
    subnet = None
1014
    subnet6 = None
1015
    gateway = None
1016
    gateway6 = None
1017
    for _subnet in network.subnets.all():
1018
        if _subnet.dhcp and not "nfdhcpd" in tags:
1019
            tags.append("nfdhcpd")
1020
        if _subnet.ipversion == 4:
1021
            subnet = _subnet.cidr
1022
            gateway = _subnet.gateway
1023
        elif _subnet.ipversion == 6:
1024
            subnet6 = _subnet.cidr
1025
            gateway6 = _subnet.gateway
1026

    
1027
    conflicts_check = False
1028
    if network.public:
1029
        tags.append('public')
1030
        if subnet is not None:
1031
            conflicts_check = True
1032
    else:
1033
        tags.append('private')
1034

    
1035
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
1036
    # not support IPv6 only networks. To bypass this limitation, we create the
1037
    # network with a dummy network subnet, and make Cyclades connect instances
1038
    # to such networks, with address=None.
1039
    if subnet is None:
1040
        subnet = "10.0.0.0/29"
1041

    
1042
    try:
1043
        bn = BackendNetwork.objects.get(network=network, backend=backend)
1044
        mac_prefix = bn.mac_prefix
1045
    except BackendNetwork.DoesNotExist:
1046
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
1047
                        " does not exist" % (network.id, backend.id))
1048

    
1049
    with pooled_rapi_client(backend) as client:
1050
        return client.CreateNetwork(network_name=network.backend_id,
1051
                                    network=subnet,
1052
                                    network6=subnet6,
1053
                                    gateway=gateway,
1054
                                    gateway6=gateway6,
1055
                                    mac_prefix=mac_prefix,
1056
                                    conflicts_check=conflicts_check,
1057
                                    tags=tags)
1058

    
1059

    
1060
def connect_network(network, backend, depends=[], group=None):
1061
    """Connect a network to nodegroups."""
1062
    log.debug("Connecting network %s to backend %s", network, backend)
1063

    
1064
    conflicts_check = False
1065
    if network.public and (network.subnet4 is not None):
1066
        conflicts_check = True
1067

    
1068
    depends = create_job_dependencies(depends)
1069
    with pooled_rapi_client(backend) as client:
1070
        groups = [group] if group is not None else client.GetGroups()
1071
        job_ids = []
1072
        for group in groups:
1073
            job_id = client.ConnectNetwork(network.backend_id, group,
1074
                                           network.mode, network.link,
1075
                                           conflicts_check,
1076
                                           depends=depends)
1077
            job_ids.append(job_id)
1078
    return job_ids
1079

    
1080

    
1081
def delete_network(network, backend, disconnect=True):
1082
    log.debug("Deleting network %s from backend %s", network, backend)
1083

    
1084
    depends = []
1085
    if disconnect:
1086
        depends = disconnect_network(network, backend)
1087
    _delete_network(network, backend, depends=depends)
1088

    
1089

    
1090
def _delete_network(network, backend, depends=[]):
1091
    depends = create_job_dependencies(depends)
1092
    with pooled_rapi_client(backend) as client:
1093
        return client.DeleteNetwork(network.backend_id, depends)
1094

    
1095

    
1096
def disconnect_network(network, backend, group=None):
1097
    log.debug("Disconnecting network %s to backend %s", network, backend)
1098

    
1099
    with pooled_rapi_client(backend) as client:
1100
        groups = [group] if group is not None else client.GetGroups()
1101
        job_ids = []
1102
        for group in groups:
1103
            job_id = client.DisconnectNetwork(network.backend_id, group)
1104
            job_ids.append(job_id)
1105
    return job_ids
1106

    
1107

    
1108
def connect_to_network(vm, nic):
1109
    network = nic.network
1110
    backend = vm.backend
1111
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
1112

    
1113
    depends = create_job_dependencies(depend_jobs)
1114

    
1115
    nic = {'name': nic.backend_uuid,
1116
           'network': network.backend_id,
1117
           'ip': nic.ipv4_address}
1118

    
1119
    log.debug("Adding NIC %s to VM %s", nic, vm)
1120

    
1121
    kwargs = {
1122
        "instance": vm.backend_vm_id,
1123
        "nics": [("add", "-1", nic)],
1124
        "depends": depends,
1125
    }
1126
    if vm.backend.use_hotplug():
1127
        kwargs["hotplug_if_possible"] = True
1128
    if settings.TEST:
1129
        kwargs["dry_run"] = True
1130

    
1131
    with pooled_rapi_client(vm) as client:
1132
        return client.ModifyInstance(**kwargs)
1133

    
1134

    
1135
def disconnect_from_network(vm, nic):
1136
    log.debug("Removing NIC %s of VM %s", nic, vm)
1137

    
1138
    kwargs = {
1139
        "instance": vm.backend_vm_id,
1140
        "nics": [("remove", nic.backend_uuid, {})],
1141
    }
1142
    if vm.backend.use_hotplug():
1143
        kwargs["hotplug_if_possible"] = True
1144
    if settings.TEST:
1145
        kwargs["dry_run"] = True
1146

    
1147
    with pooled_rapi_client(vm) as client:
1148
        jobID = client.ModifyInstance(**kwargs)
1149
        firewall_profile = nic.firewall_profile
1150
        if firewall_profile and firewall_profile != "DISABLED":
1151
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1152
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1153
                                      dry_run=settings.TEST)
1154

    
1155
        return jobID
1156

    
1157

    
1158
def set_firewall_profile(vm, profile, nic):
1159
    uuid = nic.backend_uuid
1160
    try:
1161
        tag = _firewall_tags[profile] % uuid
1162
    except KeyError:
1163
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1164

    
1165
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1166

    
1167
    with pooled_rapi_client(vm) as client:
1168
        # Delete previous firewall tags
1169
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1170
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1171
                       if (t % uuid) in old_tags]
1172
        if delete_tags:
1173
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1174
                                      dry_run=settings.TEST)
1175

    
1176
        if profile != "DISABLED":
1177
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1178
                                   dry_run=settings.TEST)
1179

    
1180
        # XXX NOP ModifyInstance call to force process_net_status to run
1181
        # on the dispatcher
1182
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1183
        client.ModifyInstance(vm.backend_vm_id,
1184
                              os_name=os_name)
1185
    return None
1186

    
1187

    
1188
def attach_volume(vm, volume, depends=[]):
1189
    log.debug("Attaching volume %s to vm %s", volume, vm)
1190

    
1191
    disk = {"size": int(volume.size) << 10,
1192
            "name": volume.backend_volume_uuid}
1193

    
1194
    disk_provider = volume.provider
1195
    if disk_provider is not None:
1196
        disk["provider"] = disk_provider
1197

    
1198
    if volume.origin is not None:
1199
        disk["origin"] = volume.origin
1200

    
1201
    kwargs = {
1202
        "instance": vm.backend_vm_id,
1203
        "disks": [("add", "-1", disk)],
1204
        "depends": depends,
1205
    }
1206
    if vm.backend.use_hotplug():
1207
        kwargs["hotplug_if_possible"] = True
1208
    if settings.TEST:
1209
        kwargs["dry_run"] = True
1210

    
1211
    with pooled_rapi_client(vm) as client:
1212
        return client.ModifyInstance(**kwargs)
1213

    
1214

    
1215
def detach_volume(vm, volume, depends=[]):
1216
    log.debug("Removing volume %s from vm %s", volume, vm)
1217
    kwargs = {
1218
        "instance": vm.backend_vm_id,
1219
        "disks": [("remove", volume.backend_volume_uuid, {})],
1220
        "depends": depends,
1221
    }
1222
    if vm.backend.use_hotplug():
1223
        kwargs["hotplug_if_possible"] = True
1224
    if settings.TEST:
1225
        kwargs["dry_run"] = True
1226

    
1227
    with pooled_rapi_client(vm) as client:
1228
        return client.ModifyInstance(**kwargs)
1229

    
1230

    
1231
def snapshot_instance(vm, snapshot_name, snapshot_id):
1232
    #volume = instance.volumes.all()[0]
1233
    reason = json.dumps({"snapshot_id": snapshot_id})
1234
    with pooled_rapi_client(vm) as client:
1235
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1236
                                       snapshot_name=snapshot_name,
1237
                                       reason=reason)
1238

    
1239

    
1240
def get_instances(backend, bulk=True):
1241
    with pooled_rapi_client(backend) as c:
1242
        return c.GetInstances(bulk=bulk)
1243

    
1244

    
1245
def get_nodes(backend, bulk=True):
1246
    with pooled_rapi_client(backend) as c:
1247
        return c.GetNodes(bulk=bulk)
1248

    
1249

    
1250
def get_jobs(backend, bulk=True):
1251
    with pooled_rapi_client(backend) as c:
1252
        return c.GetJobs(bulk=bulk)
1253

    
1254

    
1255
def get_physical_resources(backend):
1256
    """ Get the physical resources of a backend.
1257

1258
    Get the resources of a backend as reported by the backend (not the db).
1259

1260
    """
1261
    nodes = get_nodes(backend, bulk=True)
1262
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1263
    res = {}
1264
    for a in attr:
1265
        res[a] = 0
1266
    for n in nodes:
1267
        # Filter out drained, offline and not vm_capable nodes since they will
1268
        # not take part in the vm allocation process
1269
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1270
        if can_host_vms and n['cnodes']:
1271
            for a in attr:
1272
                res[a] += int(n[a] or 0)
1273
    return res
1274

    
1275

    
1276
def update_backend_resources(backend, resources=None):
1277
    """ Update the state of the backend resources in db.
1278

1279
    """
1280

    
1281
    if not resources:
1282
        resources = get_physical_resources(backend)
1283

    
1284
    backend.mfree = resources['mfree']
1285
    backend.mtotal = resources['mtotal']
1286
    backend.dfree = resources['dfree']
1287
    backend.dtotal = resources['dtotal']
1288
    backend.pinst_cnt = resources['pinst_cnt']
1289
    backend.ctotal = resources['ctotal']
1290
    backend.updated = datetime.now()
1291
    backend.save()
1292

    
1293

    
1294
def get_memory_from_instances(backend):
1295
    """ Get the memory that is used from instances.
1296

1297
    Get the used memory of a backend. Note: This is different for
1298
    the real memory used, due to kvm's memory de-duplication.
1299

1300
    """
1301
    with pooled_rapi_client(backend) as client:
1302
        instances = client.GetInstances(bulk=True)
1303
    mem = 0
1304
    for i in instances:
1305
        mem += i['oper_ram']
1306
    return mem
1307

    
1308

    
1309
def get_available_disk_templates(backend):
1310
    """Get the list of available disk templates of a Ganeti backend.
1311

1312
    The list contains the disk templates that are enabled in the Ganeti backend
1313
    and also included in ipolicy-disk-templates.
1314

1315
    """
1316
    with pooled_rapi_client(backend) as c:
1317
        info = c.GetInfo()
1318
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1319
    try:
1320
        enabled_disk_templates = info["enabled_disk_templates"]
1321
        return [dp for dp in enabled_disk_templates
1322
                if dp in ipolicy_disk_templates]
1323
    except KeyError:
1324
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1325
        return ipolicy_disk_templates
1326

    
1327

    
1328
def update_backend_disk_templates(backend):
1329
    disk_templates = get_available_disk_templates(backend)
1330
    backend.disk_templates = disk_templates
1331
    backend.save()
1332

    
1333

    
1334
##
1335
## Synchronized operations for reconciliation
1336
##
1337

    
1338

    
1339
def create_network_synced(network, backend):
1340
    result = _create_network_synced(network, backend)
1341
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1342
        return result
1343
    result = connect_network_synced(network, backend)
1344
    return result
1345

    
1346

    
1347
def _create_network_synced(network, backend):
1348
    with pooled_rapi_client(backend) as client:
1349
        job = _create_network(network, backend)
1350
        result = wait_for_job(client, job)
1351
    return result
1352

    
1353

    
1354
def connect_network_synced(network, backend):
1355
    with pooled_rapi_client(backend) as client:
1356
        for group in client.GetGroups():
1357
            job = client.ConnectNetwork(network.backend_id, group,
1358
                                        network.mode, network.link)
1359
            result = wait_for_job(client, job)
1360
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1361
                return result
1362

    
1363
    return result
1364

    
1365

    
1366
def wait_for_job(client, jobid):
1367
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1368
    status = result['job_info'][0]
1369
    while status not in rapi.JOB_STATUS_FINALIZED:
1370
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1371
                                         [result], None)
1372
        status = result['job_info'][0]
1373

    
1374
    if status == rapi.JOB_STATUS_SUCCESS:
1375
        return (status, None)
1376
    else:
1377
        error = result['job_info'][1]
1378
        return (status, error)
1379

    
1380

    
1381
def create_job_dependencies(job_ids=[], job_states=None):
1382
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1383
    if job_states is None:
1384
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1385
    assert(type(job_states) == list)
1386
    return [[job_id, job_states] for job_id in job_ids]