Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 2432c417

History | View | Annotate | Download (49 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from django.utils import simplejson as json
36
from datetime import datetime, timedelta
37

    
38
from synnefo.db.models import (VirtualMachine, Network,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor, IPAddress, IPAddressLog)
42
from synnefo.logic import utils, ips
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic import rapi
47
from synnefo.volume.util import update_snapshot_status
48

    
49
from logging import getLogger
50
log = getLogger(__name__)
51

    
52

    
53
_firewall_tags = {
54
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
55
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
56
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
57

    
58
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
59

    
60
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
61
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
62
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
63
DISK_FIELDS = ["status", "size", "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-"
65
UNKNOWN_DISK_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_resource_serial(vm)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_resource_serial(vm)
101
    elif job_status == rapi.JOB_STATUS_SUCCESS:
102
        commission_info = quotas.get_commission_info(resource=vm,
103
                                                     action=action,
104
                                                     action_fields=job_fields)
105
        if commission_info is not None:
106
            # Commission for this change has not been issued, or the issued
107
            # commission was unaware of the current change. Reject all previous
108
            # commissions and create a new one in forced mode!
109
            log.debug("Expected job was %s. Processing job %s. "
110
                      "Attached serial %s",
111
                      vm.task_job_id, job_id, vm.serial)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            serial = quotas.handle_resource_commission(
115
                vm, action,
116
                action_fields=job_fields,
117
                commission_name=reason,
118
                force=True,
119
                auto_accept=True)
120
            log.debug("Issued new commission: %s", serial)
121
    return vm
122

    
123

    
124
@transaction.commit_on_success
125
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
126
                      disks=None, job_fields=None):
127
    """Process a job progress notification from the backend
128

129
    Process an incoming message from the backend (currently Ganeti).
130
    Job notifications with a terminating status (sucess, error, or canceled),
131
    also update the operating state of the VM.
132

133
    """
134
    # See #1492, #1031, #1111 why this line has been removed
135
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
136
    if status not in [x[0] for x in BACKEND_STATUSES]:
137
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
138

    
139
    if opcode == "OP_INSTANCE_SNAPSHOT":
140
        for disk_id, disk_info in job_fields.get("disks", []):
141
            snapshot_name = disk_info.get("snapshot_name")
142
            snapshot_info = json.loads(disk_info["snapshot_info"])
143
            user_id = vm.userid
144
            _process_snapshot_status(snapshot_name, snapshot_info,
145
                                     user_id, etime, jobid, status)
146
        return
147

    
148
    vm.backendjobid = jobid
149
    vm.backendjobstatus = status
150
    vm.backendopcode = opcode
151
    vm.backendlogmsg = logmsg
152

    
153
    if status not in rapi.JOB_STATUS_FINALIZED:
154
        vm.save()
155
        return
156

    
157
    if job_fields is None:
158
        job_fields = {}
159

    
160
    new_operstate = None
161
    new_flavor = None
162
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
163

    
164
    if status == rapi.JOB_STATUS_SUCCESS:
165
        # If job succeeds, change operating state if needed
166
        if state_for_success is not None:
167
            new_operstate = state_for_success
168

    
169
        beparams = job_fields.get("beparams", None)
170
        if beparams:
171
            # Change the flavor of the VM
172
            new_flavor = _process_resize(vm, beparams)
173

    
174
        # Update backendtime only for jobs that have been successfully
175
        # completed, since only these jobs update the state of the VM. Else a
176
        # "race condition" may occur when a successful job (e.g.
177
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
178
        # in reversed order.
179
        vm.backendtime = etime
180

    
181
    if status in rapi.JOB_STATUS_FINALIZED:
182
        if nics is not None:  # Update the NICs of the VM
183
            _process_net_status(vm, etime, nics)
184
        if disks is not None:  # Update the disks of the VM
185
            _process_disks_status(vm, etime, disks)
186

    
187
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
188
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
189
                                                     rapi.JOB_STATUS_ERROR):
190
        new_operstate = "ERROR"
191
        vm.backendtime = etime
192
        # Update state of associated attachments
193
        vm.nics.all().update(state="ERROR")
194
        vm.volumes.all().update(status="ERROR")
195
    elif opcode == 'OP_INSTANCE_REMOVE':
196
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
197
        # when no instance exists at the Ganeti backend.
198
        # See ticket #799 for all the details.
199
        if (status == rapi.JOB_STATUS_SUCCESS or
200
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
201
            # server has been deleted, so delete the server's attachments
202
            vm.volumes.all().update(deleted=True, machine=None)
203
            for nic in vm.nics.all():
204
                # but first release the IP
205
                remove_nic_ips(nic)
206
                nic.delete()
207
            vm.deleted = True
208
            new_operstate = state_for_success
209
            vm.backendtime = etime
210
            status = rapi.JOB_STATUS_SUCCESS
211

    
212
    if status in rapi.JOB_STATUS_FINALIZED:
213
        # Job is finalized: Handle quotas/commissioning
214
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
215
                              job_status=status, job_fields=job_fields)
216
        # and clear task fields
217
        if vm.task_job_id == jobid:
218
            vm.task = None
219
            vm.task_job_id = None
220

    
221
    if new_operstate is not None:
222
        vm.operstate = new_operstate
223
    if new_flavor is not None:
224
        vm.flavor = new_flavor
225

    
226
    vm.save()
227

    
228

    
229
def _process_resize(vm, beparams):
230
    """Change flavor of a VirtualMachine based on new beparams."""
231
    old_flavor = vm.flavor
232
    vcpus = beparams.get("vcpus", old_flavor.cpu)
233
    ram = beparams.get("maxmem", old_flavor.ram)
234
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
235
        return
236
    try:
237
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
238
                                        disk=old_flavor.disk,
239
                                        disk_template=old_flavor.disk_template)
240
    except Flavor.DoesNotExist:
241
        raise Exception("Cannot find flavor for VM")
242
    return new_flavor
243

    
244

    
245
@transaction.commit_on_success
246
def process_net_status(vm, etime, nics):
247
    """Wrap _process_net_status inside transaction."""
248
    _process_net_status(vm, etime, nics)
249

    
250

    
251
def _process_net_status(vm, etime, nics):
252
    """Process a net status notification from the backend
253

254
    Process an incoming message from the Ganeti backend,
255
    detailing the NIC configuration of a VM instance.
256

257
    Update the state of the VM in the DB accordingly.
258

259
    """
260
    ganeti_nics = process_ganeti_nics(nics)
261
    db_nics = dict([(nic.id, nic)
262
                    for nic in vm.nics.select_related("network")
263
                                      .prefetch_related("ips")])
264

    
265
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
266
        db_nic = db_nics.get(nic_name)
267
        ganeti_nic = ganeti_nics.get(nic_name)
268
        if ganeti_nic is None:
269
            if nic_is_stale(vm, nic):
270
                log.debug("Removing stale NIC '%s'" % db_nic)
271
                remove_nic_ips(db_nic)
272
                db_nic.delete()
273
            else:
274
                log.info("NIC '%s' is still being created" % db_nic)
275
        elif db_nic is None:
276
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
277
                   " fix this issue!" % (nic_name, vm))
278
            log.error(msg)
279
            continue
280
        elif not nics_are_equal(db_nic, ganeti_nic):
281
            for f in SIMPLE_NIC_FIELDS:
282
                # Update the NIC in DB with the values from Ganeti NIC
283
                setattr(db_nic, f, ganeti_nic[f])
284
                db_nic.save()
285

    
286
            # Special case where the IPv4 address has changed, because you
287
            # need to release the old IPv4 address and reserve the new one
288
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
289
            db_ipv4_address = db_nic.ipv4_address
290
            if db_ipv4_address != gnt_ipv4_address:
291
                change_address_of_port(db_nic, vm.userid,
292
                                       old_address=db_ipv4_address,
293
                                       new_address=gnt_ipv4_address,
294
                                       version=4)
295

    
296
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
297
            db_ipv6_address = db_nic.ipv6_address
298
            if db_ipv6_address != gnt_ipv6_address:
299
                change_address_of_port(db_nic, vm.userid,
300
                                       old_address=db_ipv6_address,
301
                                       new_address=gnt_ipv6_address,
302
                                       version=6)
303

    
304
    vm.backendtime = etime
305
    vm.save()
306

    
307

    
308
def change_address_of_port(port, userid, old_address, new_address, version):
309
    """Change."""
310
    if old_address is not None:
311
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
312
               % (version, port.machine_id, old_address, new_address))
313
        log.error(msg)
314

    
315
    # Remove the old IP address
316
    remove_nic_ips(port, version=version)
317

    
318
    if version == 4:
319
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
320
        ipaddress.nic = port
321
        ipaddress.save()
322
    elif version == 6:
323
        subnet6 = port.network.subnet6
324
        ipaddress = IPAddress.objects.create(userid=userid,
325
                                             network=port.network,
326
                                             subnet=subnet6,
327
                                             nic=port,
328
                                             address=new_address,
329
                                             ipversion=6)
330
    else:
331
        raise ValueError("Unknown version: %s" % version)
332

    
333
    # New address log
334
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
335
                                         network_id=port.network_id,
336
                                         address=new_address,
337
                                         active=True)
338
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
339
             ip_log.id, new_address, port.machine_id)
340

    
341
    return ipaddress
342

    
343

    
344
def nics_are_equal(db_nic, gnt_nic):
345
    for field in NIC_FIELDS:
346
        if getattr(db_nic, field) != gnt_nic[field]:
347
            return False
348
    return True
349

    
350

    
351
def process_ganeti_nics(ganeti_nics):
352
    """Process NIC dict from ganeti"""
353
    new_nics = []
354
    for index, gnic in enumerate(ganeti_nics):
355
        nic_name = gnic.get("name", None)
356
        if nic_name is not None:
357
            nic_id = utils.id_from_nic_name(nic_name)
358
        else:
359
            # Put as default value the index. If it is an unknown NIC to
360
            # synnefo it will be created automaticaly.
361
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
362
        network_name = gnic.get('network', '')
363
        network_id = utils.id_from_network_name(network_name)
364
        network = Network.objects.get(id=network_id)
365

    
366
        # Get the new nic info
367
        mac = gnic.get('mac')
368
        ipv4 = gnic.get('ip')
369
        subnet6 = network.subnet6
370
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
371

    
372
        firewall = gnic.get('firewall')
373
        firewall_profile = _reverse_tags.get(firewall)
374
        if not firewall_profile and network.public:
375
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
376

    
377
        nic_info = {
378
            'index': index,
379
            'network': network,
380
            'mac': mac,
381
            'ipv4_address': ipv4,
382
            'ipv6_address': ipv6,
383
            'firewall_profile': firewall_profile,
384
            'state': 'ACTIVE'}
385

    
386
        new_nics.append((nic_id, nic_info))
387
    return dict(new_nics)
388

    
389

    
390
def remove_nic_ips(nic, version=None):
391
    """Remove IP addresses associated with a NetworkInterface.
392

393
    Remove all IP addresses that are associated with the NetworkInterface
394
    object, by returning them to the pool and deleting the IPAddress object. If
395
    the IP is a floating IP, then it is just disassociated from the NIC.
396
    If version is specified, then only IP addressses of that version will be
397
    removed.
398

399
    """
400
    for ip in nic.ips.all():
401
        if version and ip.ipversion != version:
402
            continue
403

    
404
        # Update the DB table holding the logging of all IP addresses
405
        terminate_active_ipaddress_log(nic, ip)
406

    
407
        if ip.floating_ip:
408
            ip.nic = None
409
            ip.save()
410
        else:
411
            # Release the IPv4 address
412
            ip.release_address()
413
            ip.delete()
414

    
415

    
416
def terminate_active_ipaddress_log(nic, ip):
417
    """Update DB logging entry for this IP address."""
418
    if not ip.network.public or nic.machine is None:
419
        return
420
    try:
421
        ip_log, created = \
422
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
423
                                               network_id=ip.network_id,
424
                                               address=ip.address,
425
                                               active=True)
426
    except IPAddressLog.MultipleObjectsReturned:
427
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
428
                  "Server %s. Cannot proceed!"
429
                  % (ip.address, ip.network, nic.machine))
430
        log.error(logmsg)
431
        raise
432

    
433
    if created:
434
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
435
                  " but with wrong creation timestamp."
436
                  % (ip.address, ip.network, nic.machine))
437
        log.error(logmsg)
438
    ip_log.released_at = datetime.now()
439
    ip_log.active = False
440
    ip_log.save()
441

    
442

    
443
@transaction.commit_on_success
444
def process_disks_status(vm, etime, disks):
445
    """Wrap _process_disks_status inside transaction."""
446
    _process_disks_status(vm, etime, disks)
447

    
448

    
449
def _process_disks_status(vm, etime, disks):
450
    """Process a disks status notification from the backend
451

452
    Process an incoming message from the Ganeti backend,
453
    detailing the disk configuration of a VM instance.
454

455
    Update the state of the VM in the DB accordingly.
456

457
    """
458
    ganeti_disks = process_ganeti_disks(disks)
459
    db_disks = dict([(disk.id, disk)
460
                     for disk in vm.volumes.filter(deleted=False)])
461

    
462
    for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
463
        db_disk = db_disks.get(disk_name)
464
        ganeti_disk = ganeti_disks.get(disk_name)
465
        if ganeti_disk is None:
466
            if disk_is_stale(vm, disk):
467
                log.debug("Removing stale disk '%s'" % db_disk)
468
                # TODO: Handle disk deletion
469
                db_disk.deleted = True
470
                db_disk.save()
471
            else:
472
                log.info("disk '%s' is still being created" % db_disk)
473
        elif db_disk is None:
474
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
475
                   " automatically fix this issue!" % (disk_name, vm))
476
            log.error(msg)
477
            continue
478
        elif not disks_are_equal(db_disk, ganeti_disk):
479
            for f in DISK_FIELDS:
480
                # Update the disk in DB with the values from Ganeti disk
481
                setattr(db_disk, f, ganeti_disk[f])
482
                db_disk.save()
483

    
484
            # TODO: Special case where the size of the disk has changed!!
485
            assert(ganeti_disk["size"] == db_disk.size)
486

    
487
    vm.backendtime = etime
488
    vm.save()
489

    
490

    
491
def disks_are_equal(db_disk, gnt_disk):
492
    for field in DISK_FIELDS:
493
        if getattr(db_disk, field) != gnt_disk[field]:
494
            return False
495
    return True
496

    
497

    
498
def process_ganeti_disks(ganeti_disks):
499
    """Process disk dict from ganeti"""
500
    new_disks = []
501
    for index, gdisk in enumerate(ganeti_disks):
502
        disk_name = gdisk.get("name", None)
503
        if disk_name is not None:
504
            disk_id = utils.id_from_disk_name(disk_name)
505
        else:
506
            # Put as default value the index. If it is an unknown disk to
507
            # synnefo it will be created automaticaly.
508
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
509

    
510
        # Get disk size in GB
511
        size = gdisk.get("size") >> 10
512

    
513
        disk_info = {
514
            'index': index,
515
            'size': size,
516
            'status': "IN_USE"}
517

    
518
        new_disks.append((disk_id, disk_info))
519
    return dict(new_disks)
520

    
521

    
522
@transaction.commit_on_success
523
def process_snapshot_status(*args, **kwargs):
524
    return _process_snapshot_status(*args, **kwargs)
525

    
526

    
527
def _process_snapshot_status(snapshot_name, snapshot_info, user_id, etime,
528
                             jobid, status):
529
    """Process a notification for a snapshot."""
530
    snapshot_id = snapshot_info.get("snapshot_id")
531
    assert(snapshot_id is not None), "Missing snapshot_id"
532
    if status in rapi.JOB_STATUS_FINALIZED:
533
        snapshot_status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
534
        log.debug("Updating status of snapshot '%s' to '%s'", snapshot_id,
535
                  snapshot_status)
536
        update_snapshot_status(snapshot_id, user_id, status=snapshot_status)
537

    
538

    
539
@transaction.commit_on_success
540
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
541
    if status not in [x[0] for x in BACKEND_STATUSES]:
542
        raise Network.InvalidBackendMsgError(opcode, status)
543

    
544
    back_network.backendjobid = jobid
545
    back_network.backendjobstatus = status
546
    back_network.backendopcode = opcode
547
    back_network.backendlogmsg = logmsg
548

    
549
    # Note: Network is already locked!
550
    network = back_network.network
551

    
552
    # Notifications of success change the operating state
553
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
554
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
555
        back_network.operstate = state_for_success
556

    
557
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
558
       and opcode == 'OP_NETWORK_ADD'):
559
        back_network.operstate = 'ERROR'
560
        back_network.backendtime = etime
561

    
562
    if opcode == 'OP_NETWORK_REMOVE':
563
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
564
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
565
                                  network_exists_in_backend(back_network)):
566
            back_network.operstate = state_for_success
567
            back_network.deleted = True
568
            back_network.backendtime = etime
569

    
570
    if status == rapi.JOB_STATUS_SUCCESS:
571
        back_network.backendtime = etime
572
    back_network.save()
573
    # Also you must update the state of the Network!!
574
    update_network_state(network)
575

    
576

    
577
def update_network_state(network):
578
    """Update the state of a Network based on BackendNetwork states.
579

580
    Update the state of a Network based on the operstate of the networks in the
581
    backends that network exists.
582

583
    The state of the network is:
584
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
585
    * DELETED: If it is is 'DELETED' in all backends that have been created.
586

587
    This function also releases the resources (MAC prefix or Bridge) and the
588
    quotas for the network.
589

590
    """
591
    if network.deleted:
592
        # Network has already been deleted. Just assert that state is also
593
        # DELETED
594
        if not network.state == "DELETED":
595
            network.state = "DELETED"
596
            network.save()
597
        return
598

    
599
    backend_states = [s.operstate for s in network.backend_networks.all()]
600
    if not backend_states and network.action != "DESTROY":
601
        if network.state != "ACTIVE":
602
            network.state = "ACTIVE"
603
            network.save()
604
            return
605

    
606
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
607
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
608
                     "DELETED")
609

    
610
    # Release the resources on the deletion of the Network
611
    if deleted:
612
        if network.ips.filter(deleted=False, floating_ip=True).exists():
613
            msg = "Cannot delete network %s! Floating IPs still in use!"
614
            log.error(msg % network)
615
            raise Exception(msg % network)
616
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
617
                 network.id, network.mac_prefix, network.link)
618
        network.deleted = True
619
        network.state = "DELETED"
620
        # Undrain the network, otherwise the network state will remain
621
        # as 'SNF:DRAINED'
622
        network.drained = False
623
        if network.mac_prefix:
624
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
625
                release_resource(res_type="mac_prefix",
626
                                 value=network.mac_prefix)
627
        if network.link:
628
            if network.FLAVORS[network.flavor]["link"] == "pool":
629
                release_resource(res_type="bridge", value=network.link)
630

    
631
        # Set all subnets as deleted
632
        network.subnets.update(deleted=True)
633
        # And delete the IP pools
634
        for subnet in network.subnets.all():
635
            if subnet.ipversion == 4:
636
                subnet.ip_pools.all().delete()
637
        # And all the backend networks since there are useless
638
        network.backend_networks.all().delete()
639

    
640
        # Issue commission
641
        if network.userid:
642
            quotas.issue_and_accept_commission(network, action="DESTROY")
643
            # the above has already saved the object and committed;
644
            # a second save would override others' changes, since the
645
            # object is now unlocked
646
            return
647
        elif not network.public:
648
            log.warning("Network %s does not have an owner!", network.id)
649
    network.save()
650

    
651

    
652
@transaction.commit_on_success
653
def process_network_modify(back_network, etime, jobid, opcode, status,
654
                           job_fields):
655
    assert (opcode == "OP_NETWORK_SET_PARAMS")
656
    if status not in [x[0] for x in BACKEND_STATUSES]:
657
        raise Network.InvalidBackendMsgError(opcode, status)
658

    
659
    back_network.backendjobid = jobid
660
    back_network.backendjobstatus = status
661
    back_network.opcode = opcode
662

    
663
    add_reserved_ips = job_fields.get("add_reserved_ips")
664
    if add_reserved_ips:
665
        network = back_network.network
666
        for ip in add_reserved_ips:
667
            network.reserve_address(ip, external=True)
668

    
669
    if status == rapi.JOB_STATUS_SUCCESS:
670
        back_network.backendtime = etime
671
    back_network.save()
672

    
673

    
674
@transaction.commit_on_success
675
def process_create_progress(vm, etime, progress):
676

    
677
    percentage = int(progress)
678

    
679
    # The percentage may exceed 100%, due to the way
680
    # snf-image:copy-progress tracks bytes read by image handling processes
681
    percentage = 100 if percentage > 100 else percentage
682
    if percentage < 0:
683
        raise ValueError("Percentage cannot be negative")
684

    
685
    # FIXME: log a warning here, see #1033
686
#   if last_update > percentage:
687
#       raise ValueError("Build percentage should increase monotonically " \
688
#                        "(old = %d, new = %d)" % (last_update, percentage))
689

    
690
    # This assumes that no message of type 'ganeti-create-progress' is going to
691
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
692
    # the instance is STARTED.  What if the two messages are processed by two
693
    # separate dispatcher threads, and the 'ganeti-op-status' message for
694
    # successful creation gets processed before the 'ganeti-create-progress'
695
    # message? [vkoukis]
696
    #
697
    #if not vm.operstate == 'BUILD':
698
    #    raise VirtualMachine.IllegalState("VM is not in building state")
699

    
700
    vm.buildpercentage = percentage
701
    vm.backendtime = etime
702
    vm.save()
703

    
704

    
705
@transaction.commit_on_success
706
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
707
                               details=None):
708
    """
709
    Create virtual machine instance diagnostic entry.
710

711
    :param vm: VirtualMachine instance to create diagnostic for.
712
    :param message: Diagnostic message.
713
    :param source: Diagnostic source identifier (e.g. image-helper).
714
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
715
    :param etime: The time the message occured (if available).
716
    :param details: Additional details or debug information.
717
    """
718
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
719
                                                   source_date=etime,
720
                                                   message=message,
721
                                                   details=details)
722

    
723

    
724
def create_instance(vm, nics, volumes, flavor, image):
725
    """`image` is a dictionary which should contain the keys:
726
            'backend_id', 'format' and 'metadata'
727

728
        metadata value should be a dictionary.
729
    """
730

    
731
    # Handle arguments to CreateInstance() as a dictionary,
732
    # initialize it based on a deployment-specific value.
733
    # This enables the administrator to override deployment-specific
734
    # arguments, such as the disk template to use, name of os provider
735
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
736
    #
737
    kw = vm.backend.get_create_params()
738
    kw['mode'] = 'create'
739
    kw['name'] = vm.backend_vm_id
740
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
741

    
742
    kw['disk_template'] = volumes[0].template
743
    disks = []
744
    for volume in volumes:
745
        disk = {"name": volume.backend_volume_uuid,
746
                "size": volume.size * 1024}
747
        provider = volume.provider
748
        if provider is not None:
749
            disk["provider"] = provider
750
            disk["origin"] = volume.origin
751
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
752
                                        .get(provider)
753
            if extra_disk_params is not None:
754
                disk.update(extra_disk_params)
755
        disks.append(disk)
756

    
757
    kw["disks"] = disks
758

    
759
    kw['nics'] = [{"name": nic.backend_uuid,
760
                   "network": nic.network.backend_id,
761
                   "ip": nic.ipv4_address}
762
                  for nic in nics]
763

    
764
    backend = vm.backend
765
    depend_jobs = []
766
    for nic in nics:
767
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
768
        depend_jobs.extend(job_ids)
769

    
770
    kw["depends"] = create_job_dependencies(depend_jobs)
771

    
772
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
773
    # kw['os'] = settings.GANETI_OS_PROVIDER
774
    kw['ip_check'] = False
775
    kw['name_check'] = False
776

    
777
    # Do not specific a node explicitly, have
778
    # Ganeti use an iallocator instead
779
    #kw['pnode'] = rapi.GetNodes()[0]
780

    
781
    kw['dry_run'] = settings.TEST
782

    
783
    kw['beparams'] = {
784
        'auto_balance': True,
785
        'vcpus': flavor.cpu,
786
        'memory': flavor.ram}
787

    
788
    kw['osparams'] = {
789
        'config_url': vm.config_url,
790
        # Store image id and format to Ganeti
791
        'img_id': image['backend_id'],
792
        'img_format': image['format']}
793

    
794
    # Use opportunistic locking
795
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
796

    
797
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
798
    # kw['hvparams'] = dict(serial_console=False)
799

    
800
    log.debug("Creating instance %s", utils.hide_pass(kw))
801
    with pooled_rapi_client(vm) as client:
802
        return client.CreateInstance(**kw)
803

    
804

    
805
def delete_instance(vm, shutdown_timeout=None):
806
    with pooled_rapi_client(vm) as client:
807
        return client.DeleteInstance(vm.backend_vm_id,
808
                                     shutdown_timeout=shutdown_timeout,
809
                                     dry_run=settings.TEST)
810

    
811

    
812
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
813
    assert reboot_type in ('soft', 'hard')
814
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
815
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
816
    # 'shutdown_timeout'.
817
    kwargs = {"instance": vm.backend_vm_id,
818
              "reboot_type": "hard"}
819
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
820
    # Ganeti > 2.10. In other versions this parameter will be ignored and
821
    # we will fallback to default timeout of Ganeti (120s).
822
    if shutdown_timeout is not None:
823
        kwargs["shutdown_timeout"] = shutdown_timeout
824
    if reboot_type == "hard":
825
        kwargs["shutdown_timeout"] = 0
826
    if settings.TEST:
827
        kwargs["dry_run"] = True
828
    with pooled_rapi_client(vm) as client:
829
        return client.RebootInstance(**kwargs)
830

    
831

    
832
def startup_instance(vm):
833
    with pooled_rapi_client(vm) as client:
834
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
835

    
836

    
837
def shutdown_instance(vm, shutdown_timeout=None):
838
    with pooled_rapi_client(vm) as client:
839
        return client.ShutdownInstance(vm.backend_vm_id,
840
                                       timeout=shutdown_timeout,
841
                                       dry_run=settings.TEST)
842

    
843

    
844
def resize_instance(vm, vcpus, memory):
845
    beparams = {"vcpus": int(vcpus),
846
                "minmem": int(memory),
847
                "maxmem": int(memory)}
848
    with pooled_rapi_client(vm) as client:
849
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
850

    
851

    
852
def get_instance_console(vm):
853
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
854
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
855
    # useless (see #783).
856
    #
857
    # Until this is fixed on the Ganeti side, construct a console info reply
858
    # directly.
859
    #
860
    # WARNING: This assumes that VNC runs on port network_port on
861
    #          the instance's primary node, and is probably
862
    #          hypervisor-specific.
863
    #
864
    log.debug("Getting console for vm %s", vm)
865

    
866
    console = {}
867
    console['kind'] = 'vnc'
868

    
869
    with pooled_rapi_client(vm) as client:
870
        i = client.GetInstance(vm.backend_vm_id)
871

    
872
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
873
        raise Exception("hv parameter serial_console cannot be true")
874
    console['host'] = i['pnode']
875
    console['port'] = i['network_port']
876

    
877
    return console
878

    
879

    
880
def get_instance_info(vm):
881
    with pooled_rapi_client(vm) as client:
882
        return client.GetInstance(vm.backend_vm_id)
883

    
884

    
885
def vm_exists_in_backend(vm):
886
    try:
887
        get_instance_info(vm)
888
        return True
889
    except rapi.GanetiApiError as e:
890
        if e.code == 404:
891
            return False
892
        raise e
893

    
894

    
895
def get_network_info(backend_network):
896
    with pooled_rapi_client(backend_network) as client:
897
        return client.GetNetwork(backend_network.network.backend_id)
898

    
899

    
900
def network_exists_in_backend(backend_network):
901
    try:
902
        get_network_info(backend_network)
903
        return True
904
    except rapi.GanetiApiError as e:
905
        if e.code == 404:
906
            return False
907

    
908

    
909
def job_is_still_running(vm, job_id=None):
910
    with pooled_rapi_client(vm) as c:
911
        try:
912
            if job_id is None:
913
                job_id = vm.backendjobid
914
            job_info = c.GetJobStatus(job_id)
915
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
916
        except rapi.GanetiApiError:
917
            return False
918

    
919

    
920
def disk_is_stale(vm, disk, timeout=60):
921
    """Check if a disk is stale or exists in the Ganeti backend."""
922
    # First check the state of the disk
923
    if disk.status == "CREATING":
924
        if datetime.now() < disk.created + timedelta(seconds=timeout):
925
            # Do not check for too recent disks to avoid the time overhead
926
            return False
927
        if job_is_still_running(vm, job_id=disk.backendjobid):
928
            return False
929
        else:
930
            # If job has finished, check that the disk exists, because the
931
            # message may have been lost or stuck in the queue.
932
            vm_info = get_instance_info(vm)
933
            if disk.backend_volume_uuid in vm_info["disk.names"]:
934
                return False
935
    return True
936

    
937

    
938
def nic_is_stale(vm, nic, timeout=60):
939
    """Check if a NIC is stale or exists in the Ganeti backend."""
940
    # First check the state of the NIC and if there is a pending CONNECT
941
    if nic.state == "BUILD" and vm.task == "CONNECT":
942
        if datetime.now() < nic.created + timedelta(seconds=timeout):
943
            # Do not check for too recent NICs to avoid the time overhead
944
            return False
945
        if job_is_still_running(vm, job_id=vm.task_job_id):
946
            return False
947
        else:
948
            # If job has finished, check that the NIC exists, because the
949
            # message may have been lost or stuck in the queue.
950
            vm_info = get_instance_info(vm)
951
            if nic.backend_uuid in vm_info["nic.names"]:
952
                return False
953
    return True
954

    
955

    
956
def ensure_network_is_active(backend, network_id):
957
    """Ensure that a network is active in the specified backend
958

959
    Check that a network exists and is active in the specified backend. If not
960
    (re-)create the network. Return the corresponding BackendNetwork object
961
    and the IDs of the Ganeti job to create the network.
962

963
    """
964
    job_ids = []
965
    try:
966
        bnet = BackendNetwork.objects.select_related("network")\
967
                                     .get(backend=backend, network=network_id)
968
        if bnet.operstate != "ACTIVE":
969
            job_ids = create_network(bnet.network, backend, connect=True)
970
    except BackendNetwork.DoesNotExist:
971
        network = Network.objects.select_for_update().get(id=network_id)
972
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
973
        job_ids = create_network(network, backend, connect=True)
974

    
975
    return bnet, job_ids
976

    
977

    
978
def create_network(network, backend, connect=True):
979
    """Create a network in a Ganeti backend"""
980
    log.debug("Creating network %s in backend %s", network, backend)
981

    
982
    job_id = _create_network(network, backend)
983

    
984
    if connect:
985
        job_ids = connect_network(network, backend, depends=[job_id])
986
        return job_ids
987
    else:
988
        return [job_id]
989

    
990

    
991
def _create_network(network, backend):
992
    """Create a network."""
993

    
994
    tags = network.backend_tag
995
    subnet = None
996
    subnet6 = None
997
    gateway = None
998
    gateway6 = None
999
    for _subnet in network.subnets.all():
1000
        if _subnet.dhcp and not "nfdhcpd" in tags:
1001
            tags.append("nfdhcpd")
1002
        if _subnet.ipversion == 4:
1003
            subnet = _subnet.cidr
1004
            gateway = _subnet.gateway
1005
        elif _subnet.ipversion == 6:
1006
            subnet6 = _subnet.cidr
1007
            gateway6 = _subnet.gateway
1008

    
1009
    conflicts_check = False
1010
    if network.public:
1011
        tags.append('public')
1012
        if subnet is not None:
1013
            conflicts_check = True
1014
    else:
1015
        tags.append('private')
1016

    
1017
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
1018
    # not support IPv6 only networks. To bypass this limitation, we create the
1019
    # network with a dummy network subnet, and make Cyclades connect instances
1020
    # to such networks, with address=None.
1021
    if subnet is None:
1022
        subnet = "10.0.0.0/29"
1023

    
1024
    try:
1025
        bn = BackendNetwork.objects.get(network=network, backend=backend)
1026
        mac_prefix = bn.mac_prefix
1027
    except BackendNetwork.DoesNotExist:
1028
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
1029
                        " does not exist" % (network.id, backend.id))
1030

    
1031
    with pooled_rapi_client(backend) as client:
1032
        return client.CreateNetwork(network_name=network.backend_id,
1033
                                    network=subnet,
1034
                                    network6=subnet6,
1035
                                    gateway=gateway,
1036
                                    gateway6=gateway6,
1037
                                    mac_prefix=mac_prefix,
1038
                                    conflicts_check=conflicts_check,
1039
                                    tags=tags)
1040

    
1041

    
1042
def connect_network(network, backend, depends=[], group=None):
1043
    """Connect a network to nodegroups."""
1044
    log.debug("Connecting network %s to backend %s", network, backend)
1045

    
1046
    conflicts_check = False
1047
    if network.public and (network.subnet4 is not None):
1048
        conflicts_check = True
1049

    
1050
    depends = create_job_dependencies(depends)
1051
    with pooled_rapi_client(backend) as client:
1052
        groups = [group] if group is not None else client.GetGroups()
1053
        job_ids = []
1054
        for group in groups:
1055
            job_id = client.ConnectNetwork(network.backend_id, group,
1056
                                           network.mode, network.link,
1057
                                           conflicts_check,
1058
                                           depends=depends)
1059
            job_ids.append(job_id)
1060
    return job_ids
1061

    
1062

    
1063
def delete_network(network, backend, disconnect=True):
1064
    log.debug("Deleting network %s from backend %s", network, backend)
1065

    
1066
    depends = []
1067
    if disconnect:
1068
        depends = disconnect_network(network, backend)
1069
    _delete_network(network, backend, depends=depends)
1070

    
1071

    
1072
def _delete_network(network, backend, depends=[]):
1073
    depends = create_job_dependencies(depends)
1074
    with pooled_rapi_client(backend) as client:
1075
        return client.DeleteNetwork(network.backend_id, depends)
1076

    
1077

    
1078
def disconnect_network(network, backend, group=None):
1079
    log.debug("Disconnecting network %s to backend %s", network, backend)
1080

    
1081
    with pooled_rapi_client(backend) as client:
1082
        groups = [group] if group is not None else client.GetGroups()
1083
        job_ids = []
1084
        for group in groups:
1085
            job_id = client.DisconnectNetwork(network.backend_id, group)
1086
            job_ids.append(job_id)
1087
    return job_ids
1088

    
1089

    
1090
def connect_to_network(vm, nic):
1091
    network = nic.network
1092
    backend = vm.backend
1093
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
1094

    
1095
    depends = create_job_dependencies(depend_jobs)
1096

    
1097
    nic = {'name': nic.backend_uuid,
1098
           'network': network.backend_id,
1099
           'ip': nic.ipv4_address}
1100

    
1101
    log.debug("Adding NIC %s to VM %s", nic, vm)
1102

    
1103
    kwargs = {
1104
        "instance": vm.backend_vm_id,
1105
        "nics": [("add", "-1", nic)],
1106
        "depends": depends,
1107
    }
1108
    if vm.backend.use_hotplug():
1109
        kwargs["hotplug_if_possible"] = True
1110
    if settings.TEST:
1111
        kwargs["dry_run"] = True
1112

    
1113
    with pooled_rapi_client(vm) as client:
1114
        return client.ModifyInstance(**kwargs)
1115

    
1116

    
1117
def disconnect_from_network(vm, nic):
1118
    log.debug("Removing NIC %s of VM %s", nic, vm)
1119

    
1120
    kwargs = {
1121
        "instance": vm.backend_vm_id,
1122
        "nics": [("remove", nic.backend_uuid, {})],
1123
    }
1124
    if vm.backend.use_hotplug():
1125
        kwargs["hotplug_if_possible"] = True
1126
    if settings.TEST:
1127
        kwargs["dry_run"] = True
1128

    
1129
    with pooled_rapi_client(vm) as client:
1130
        jobID = client.ModifyInstance(**kwargs)
1131
        firewall_profile = nic.firewall_profile
1132
        if firewall_profile and firewall_profile != "DISABLED":
1133
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1134
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1135
                                      dry_run=settings.TEST)
1136

    
1137
        return jobID
1138

    
1139

    
1140
def set_firewall_profile(vm, profile, nic):
1141
    uuid = nic.backend_uuid
1142
    try:
1143
        tag = _firewall_tags[profile] % uuid
1144
    except KeyError:
1145
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1146

    
1147
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1148

    
1149
    with pooled_rapi_client(vm) as client:
1150
        # Delete previous firewall tags
1151
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1152
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1153
                       if (t % uuid) in old_tags]
1154
        if delete_tags:
1155
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1156
                                      dry_run=settings.TEST)
1157

    
1158
        if profile != "DISABLED":
1159
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1160
                                   dry_run=settings.TEST)
1161

    
1162
        # XXX NOP ModifyInstance call to force process_net_status to run
1163
        # on the dispatcher
1164
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1165
        client.ModifyInstance(vm.backend_vm_id,
1166
                              os_name=os_name)
1167
    return None
1168

    
1169

    
1170
def attach_volume(vm, volume, depends=[]):
1171
    log.debug("Attaching volume %s to vm %s", volume, vm)
1172

    
1173
    disk = {"size": int(volume.size) << 10,
1174
            "name": volume.backend_volume_uuid,
1175
            "volume_name": volume.backend_volume_uuid}
1176

    
1177
    disk_provider = volume.provider
1178
    if disk_provider is not None:
1179
        disk["provider"] = disk_provider
1180

    
1181
    if volume.origin is not None:
1182
        disk["origin"] = volume.origin
1183

    
1184
    kwargs = {
1185
        "instance": vm.backend_vm_id,
1186
        "disks": [("add", "-1", disk)],
1187
        "depends": depends,
1188
    }
1189
    if vm.backend.use_hotplug():
1190
        kwargs["hotplug_if_possible"] = True
1191
    if settings.TEST:
1192
        kwargs["dry_run"] = True
1193

    
1194
    with pooled_rapi_client(vm) as client:
1195
        return client.ModifyInstance(**kwargs)
1196

    
1197

    
1198
def detach_volume(vm, volume, depends=[]):
1199
    log.debug("Removing volume %s from vm %s", volume, vm)
1200
    kwargs = {
1201
        "instance": vm.backend_vm_id,
1202
        "disks": [("remove", volume.backend_volume_uuid, {})],
1203
        "depends": depends,
1204
    }
1205
    if vm.backend.use_hotplug():
1206
        kwargs["hotplug_if_possible"] = True
1207
    if settings.TEST:
1208
        kwargs["dry_run"] = True
1209

    
1210
    with pooled_rapi_client(vm) as client:
1211
        return client.ModifyInstance(**kwargs)
1212

    
1213

    
1214
def snapshot_instance(vm, snapshot_name, snapshot_id):
1215
    #volume = instance.volumes.all()[0]
1216
    reason = json.dumps({"snapshot_id": snapshot_id})
1217
    with pooled_rapi_client(vm) as client:
1218
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1219
                                       snapshot_name=snapshot_name,
1220
                                       reason=reason)
1221

    
1222

    
1223
def get_instances(backend, bulk=True):
1224
    with pooled_rapi_client(backend) as c:
1225
        return c.GetInstances(bulk=bulk)
1226

    
1227

    
1228
def get_nodes(backend, bulk=True):
1229
    with pooled_rapi_client(backend) as c:
1230
        return c.GetNodes(bulk=bulk)
1231

    
1232

    
1233
def get_jobs(backend, bulk=True):
1234
    with pooled_rapi_client(backend) as c:
1235
        return c.GetJobs(bulk=bulk)
1236

    
1237

    
1238
def get_physical_resources(backend):
1239
    """ Get the physical resources of a backend.
1240

1241
    Get the resources of a backend as reported by the backend (not the db).
1242

1243
    """
1244
    nodes = get_nodes(backend, bulk=True)
1245
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1246
    res = {}
1247
    for a in attr:
1248
        res[a] = 0
1249
    for n in nodes:
1250
        # Filter out drained, offline and not vm_capable nodes since they will
1251
        # not take part in the vm allocation process
1252
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1253
        if can_host_vms and n['cnodes']:
1254
            for a in attr:
1255
                res[a] += int(n[a] or 0)
1256
    return res
1257

    
1258

    
1259
def update_backend_resources(backend, resources=None):
1260
    """ Update the state of the backend resources in db.
1261

1262
    """
1263

    
1264
    if not resources:
1265
        resources = get_physical_resources(backend)
1266

    
1267
    backend.mfree = resources['mfree']
1268
    backend.mtotal = resources['mtotal']
1269
    backend.dfree = resources['dfree']
1270
    backend.dtotal = resources['dtotal']
1271
    backend.pinst_cnt = resources['pinst_cnt']
1272
    backend.ctotal = resources['ctotal']
1273
    backend.updated = datetime.now()
1274
    backend.save()
1275

    
1276

    
1277
def get_memory_from_instances(backend):
1278
    """ Get the memory that is used from instances.
1279

1280
    Get the used memory of a backend. Note: This is different for
1281
    the real memory used, due to kvm's memory de-duplication.
1282

1283
    """
1284
    with pooled_rapi_client(backend) as client:
1285
        instances = client.GetInstances(bulk=True)
1286
    mem = 0
1287
    for i in instances:
1288
        mem += i['oper_ram']
1289
    return mem
1290

    
1291

    
1292
def get_available_disk_templates(backend):
1293
    """Get the list of available disk templates of a Ganeti backend.
1294

1295
    The list contains the disk templates that are enabled in the Ganeti backend
1296
    and also included in ipolicy-disk-templates.
1297

1298
    """
1299
    with pooled_rapi_client(backend) as c:
1300
        info = c.GetInfo()
1301
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1302
    try:
1303
        enabled_disk_templates = info["enabled_disk_templates"]
1304
        return [dp for dp in enabled_disk_templates
1305
                if dp in ipolicy_disk_templates]
1306
    except KeyError:
1307
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1308
        return ipolicy_disk_templates
1309

    
1310

    
1311
def update_backend_disk_templates(backend):
1312
    disk_templates = get_available_disk_templates(backend)
1313
    backend.disk_templates = disk_templates
1314
    backend.save()
1315

    
1316

    
1317
##
1318
## Synchronized operations for reconciliation
1319
##
1320

    
1321

    
1322
def create_network_synced(network, backend):
1323
    result = _create_network_synced(network, backend)
1324
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1325
        return result
1326
    result = connect_network_synced(network, backend)
1327
    return result
1328

    
1329

    
1330
def _create_network_synced(network, backend):
1331
    with pooled_rapi_client(backend) as client:
1332
        job = _create_network(network, backend)
1333
        result = wait_for_job(client, job)
1334
    return result
1335

    
1336

    
1337
def connect_network_synced(network, backend):
1338
    with pooled_rapi_client(backend) as client:
1339
        for group in client.GetGroups():
1340
            job = client.ConnectNetwork(network.backend_id, group,
1341
                                        network.mode, network.link)
1342
            result = wait_for_job(client, job)
1343
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1344
                return result
1345

    
1346
    return result
1347

    
1348

    
1349
def wait_for_job(client, jobid):
1350
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1351
    status = result['job_info'][0]
1352
    while status not in rapi.JOB_STATUS_FINALIZED:
1353
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1354
                                         [result], None)
1355
        status = result['job_info'][0]
1356

    
1357
    if status == rapi.JOB_STATUS_SUCCESS:
1358
        return (status, None)
1359
    else:
1360
        error = result['job_info'][1]
1361
        return (status, error)
1362

    
1363

    
1364
def create_job_dependencies(job_ids=[], job_states=None):
1365
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1366
    if job_states is None:
1367
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1368
    assert(type(job_states) == list)
1369
    return [[job_id, job_states] for job_id in job_ids]