Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 58194535

History | View | Annotate | Download (47.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
DISK_FIELDS = ["status", "size", "index"]
62
UNKNOWN_NIC_PREFIX = "unknown-"
63
UNKNOWN_DISK_PREFIX = "unknown-"
64

    
65

    
66
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
67
    """Handle quotas for updated VirtualMachine.
68

69
    Update quotas for the updated VirtualMachine based on the job that run on
70
    the Ganeti backend. If a commission has been already issued for this job,
71
    then this commission is just accepted or rejected based on the job status.
72
    Otherwise, a new commission for the given change is issued, that is also in
73
    force and auto-accept mode. In this case, previous commissions are
74
    rejected, since they reflect a previous state of the VM.
75

76
    """
77
    if job_status not in rapi.JOB_STATUS_FINALIZED:
78
        return vm
79

    
80
    # Check successful completion of a job will trigger any quotable change in
81
    # the VM state.
82
    action = utils.get_action_from_opcode(job_opcode, job_fields)
83
    if action == "BUILD":
84
        # Quotas for new VMs are automatically accepted by the API
85
        return vm
86

    
87
    if vm.task_job_id == job_id and vm.serial is not None:
88
        # Commission for this change has already been issued. So just
89
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
90
        # if fails, must be accepted, as the user must manually remove the
91
        # failed server
92
        serial = vm.serial
93
        if job_status == rapi.JOB_STATUS_SUCCESS:
94
            quotas.accept_resource_serial(vm)
95
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
96
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
97
                      serial)
98
            quotas.reject_resource_serial(vm)
99
    elif job_status == rapi.JOB_STATUS_SUCCESS:
100
        commission_info = quotas.get_commission_info(resource=vm,
101
                                                     action=action,
102
                                                     action_fields=job_fields)
103
        if commission_info is not None:
104
            # Commission for this change has not been issued, or the issued
105
            # commission was unaware of the current change. Reject all previous
106
            # commissions and create a new one in forced mode!
107
            log.debug("Expected job was %s. Processing job %s. "
108
                      "Attached serial %s",
109
                      vm.task_job_id, job_id, vm.serial)
110
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                      % (vm, job_id))
112
            serial = quotas.handle_resource_commission(
113
                vm, action,
114
                action_fields=job_fields,
115
                commission_name=reason,
116
                force=True,
117
                auto_accept=True)
118
            log.debug("Issued new commission: %s", serial)
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      disks=None, job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status not in rapi.JOB_STATUS_FINALIZED:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148

    
149
    new_operstate = None
150
    new_flavor = None
151
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
152

    
153
    if status == rapi.JOB_STATUS_SUCCESS:
154
        # If job succeeds, change operating state if needed
155
        if state_for_success is not None:
156
            new_operstate = state_for_success
157

    
158
        beparams = job_fields.get("beparams", None)
159
        if beparams:
160
            # Change the flavor of the VM
161
            new_flavor = _process_resize(vm, beparams)
162

    
163
        # Update backendtime only for jobs that have been successfully
164
        # completed, since only these jobs update the state of the VM. Else a
165
        # "race condition" may occur when a successful job (e.g.
166
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
167
        # in reversed order.
168
        vm.backendtime = etime
169

    
170
    if status in rapi.JOB_STATUS_FINALIZED:
171
        if nics is not None:  # Update the NICs of the VM
172
            _process_net_status(vm, etime, nics)
173
        if disks is not None:  # Update the disks of the VM
174
            _process_disks_status(vm, etime, disks)
175

    
176
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
177
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
178
                                                     rapi.JOB_STATUS_ERROR):
179
        new_operstate = "ERROR"
180
        vm.backendtime = etime
181
        # Update state of associated attachments
182
        vm.nics.all().update(state="ERROR")
183
        vm.volumes.all().update(status="ERROR")
184
    elif opcode == 'OP_INSTANCE_REMOVE':
185
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
186
        # when no instance exists at the Ganeti backend.
187
        # See ticket #799 for all the details.
188
        if (status == rapi.JOB_STATUS_SUCCESS or
189
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
190
            # server has been deleted, so delete the server's attachments
191
            vm.volumes.all().update(deleted=True, machine=None)
192
            for nic in vm.nics.all():
193
                # but first release the IP
194
                remove_nic_ips(nic)
195
                nic.delete()
196
            vm.deleted = True
197
            new_operstate = state_for_success
198
            vm.backendtime = etime
199
            status = rapi.JOB_STATUS_SUCCESS
200

    
201
    if status in rapi.JOB_STATUS_FINALIZED:
202
        # Job is finalized: Handle quotas/commissioning
203
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
204
                              job_status=status, job_fields=job_fields)
205
        # and clear task fields
206
        if vm.task_job_id == jobid:
207
            vm.task = None
208
            vm.task_job_id = None
209

    
210
    if new_operstate is not None:
211
        vm.operstate = new_operstate
212
    if new_flavor is not None:
213
        vm.flavor = new_flavor
214

    
215
    vm.save()
216

    
217

    
218
def _process_resize(vm, beparams):
219
    """Change flavor of a VirtualMachine based on new beparams."""
220
    old_flavor = vm.flavor
221
    vcpus = beparams.get("vcpus", old_flavor.cpu)
222
    ram = beparams.get("maxmem", old_flavor.ram)
223
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
224
        return
225
    try:
226
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
227
                                        disk=old_flavor.disk,
228
                                        disk_template=old_flavor.disk_template)
229
    except Flavor.DoesNotExist:
230
        raise Exception("Cannot find flavor for VM")
231
    return new_flavor
232

    
233

    
234
@transaction.commit_on_success
235
def process_net_status(vm, etime, nics):
236
    """Wrap _process_net_status inside transaction."""
237
    _process_net_status(vm, etime, nics)
238

    
239

    
240
def _process_net_status(vm, etime, nics):
241
    """Process a net status notification from the backend
242

243
    Process an incoming message from the Ganeti backend,
244
    detailing the NIC configuration of a VM instance.
245

246
    Update the state of the VM in the DB accordingly.
247

248
    """
249
    ganeti_nics = process_ganeti_nics(nics)
250
    db_nics = dict([(nic.id, nic)
251
                    for nic in vm.nics.select_related("network")
252
                                      .prefetch_related("ips")])
253

    
254
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
255
        db_nic = db_nics.get(nic_name)
256
        ganeti_nic = ganeti_nics.get(nic_name)
257
        if ganeti_nic is None:
258
            if nic_is_stale(vm, nic):
259
                log.debug("Removing stale NIC '%s'" % db_nic)
260
                remove_nic_ips(db_nic)
261
                db_nic.delete()
262
            else:
263
                log.info("NIC '%s' is still being created" % db_nic)
264
        elif db_nic is None:
265
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
266
                   " fix this issue!" % (nic_name, vm))
267
            log.error(msg)
268
            continue
269
        elif not nics_are_equal(db_nic, ganeti_nic):
270
            for f in SIMPLE_NIC_FIELDS:
271
                # Update the NIC in DB with the values from Ganeti NIC
272
                setattr(db_nic, f, ganeti_nic[f])
273
                db_nic.save()
274

    
275
            # Special case where the IPv4 address has changed, because you
276
            # need to release the old IPv4 address and reserve the new one
277
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
278
            db_ipv4_address = db_nic.ipv4_address
279
            if db_ipv4_address != gnt_ipv4_address:
280
                change_address_of_port(db_nic, vm.userid,
281
                                       old_address=db_ipv4_address,
282
                                       new_address=gnt_ipv4_address,
283
                                       version=4)
284

    
285
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
286
            db_ipv6_address = db_nic.ipv6_address
287
            if db_ipv6_address != gnt_ipv6_address:
288
                change_address_of_port(db_nic, vm.userid,
289
                                       old_address=db_ipv6_address,
290
                                       new_address=gnt_ipv6_address,
291
                                       version=6)
292

    
293
    vm.backendtime = etime
294
    vm.save()
295

    
296

    
297
def change_address_of_port(port, userid, old_address, new_address, version):
298
    """Change."""
299
    if old_address is not None:
300
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
301
               % (version, port.machine_id, old_address, new_address))
302
        log.error(msg)
303

    
304
    # Remove the old IP address
305
    remove_nic_ips(port, version=version)
306

    
307
    if version == 4:
308
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
309
        ipaddress.nic = port
310
        ipaddress.save()
311
    elif version == 6:
312
        subnet6 = port.network.subnet6
313
        ipaddress = IPAddress.objects.create(userid=userid,
314
                                             network=port.network,
315
                                             subnet=subnet6,
316
                                             nic=port,
317
                                             address=new_address,
318
                                             ipversion=6)
319
    else:
320
        raise ValueError("Unknown version: %s" % version)
321

    
322
    # New address log
323
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
324
                                         network_id=port.network_id,
325
                                         address=new_address,
326
                                         active=True)
327
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
328
             ip_log.id, new_address, port.machine_id)
329

    
330
    return ipaddress
331

    
332

    
333
def nics_are_equal(db_nic, gnt_nic):
334
    for field in NIC_FIELDS:
335
        if getattr(db_nic, field) != gnt_nic[field]:
336
            return False
337
    return True
338

    
339

    
340
def process_ganeti_nics(ganeti_nics):
341
    """Process NIC dict from ganeti"""
342
    new_nics = []
343
    for index, gnic in enumerate(ganeti_nics):
344
        nic_name = gnic.get("name", None)
345
        if nic_name is not None:
346
            nic_id = utils.id_from_nic_name(nic_name)
347
        else:
348
            # Put as default value the index. If it is an unknown NIC to
349
            # synnefo it will be created automaticaly.
350
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
351
        network_name = gnic.get('network', '')
352
        network_id = utils.id_from_network_name(network_name)
353
        network = Network.objects.get(id=network_id)
354

    
355
        # Get the new nic info
356
        mac = gnic.get('mac')
357
        ipv4 = gnic.get('ip')
358
        subnet6 = network.subnet6
359
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
360

    
361
        firewall = gnic.get('firewall')
362
        firewall_profile = _reverse_tags.get(firewall)
363
        if not firewall_profile and network.public:
364
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
365

    
366
        nic_info = {
367
            'index': index,
368
            'network': network,
369
            'mac': mac,
370
            'ipv4_address': ipv4,
371
            'ipv6_address': ipv6,
372
            'firewall_profile': firewall_profile,
373
            'state': 'ACTIVE'}
374

    
375
        new_nics.append((nic_id, nic_info))
376
    return dict(new_nics)
377

    
378

    
379
def remove_nic_ips(nic, version=None):
380
    """Remove IP addresses associated with a NetworkInterface.
381

382
    Remove all IP addresses that are associated with the NetworkInterface
383
    object, by returning them to the pool and deleting the IPAddress object. If
384
    the IP is a floating IP, then it is just disassociated from the NIC.
385
    If version is specified, then only IP addressses of that version will be
386
    removed.
387

388
    """
389
    for ip in nic.ips.all():
390
        if version and ip.ipversion != version:
391
            continue
392

    
393
        # Update the DB table holding the logging of all IP addresses
394
        terminate_active_ipaddress_log(nic, ip)
395

    
396
        if ip.floating_ip:
397
            ip.nic = None
398
            ip.save()
399
        else:
400
            # Release the IPv4 address
401
            ip.release_address()
402
            ip.delete()
403

    
404

    
405
def terminate_active_ipaddress_log(nic, ip):
406
    """Update DB logging entry for this IP address."""
407
    if not ip.network.public or nic.machine is None:
408
        return
409
    try:
410
        ip_log, created = \
411
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
412
                                               network_id=ip.network_id,
413
                                               address=ip.address,
414
                                               active=True)
415
    except IPAddressLog.MultipleObjectsReturned:
416
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
417
                  "Server %s. Cannot proceed!"
418
                  % (ip.address, ip.network, nic.machine))
419
        log.error(logmsg)
420
        raise
421

    
422
    if created:
423
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
424
                  " but with wrong creation timestamp."
425
                  % (ip.address, ip.network, nic.machine))
426
        log.error(logmsg)
427
    ip_log.released_at = datetime.now()
428
    ip_log.active = False
429
    ip_log.save()
430

    
431

    
432
@transaction.commit_on_success
433
def process_disks_status(vm, etime, disks):
434
    """Wrap _process_disks_status inside transaction."""
435
    _process_disks_status(vm, etime, disks)
436

    
437

    
438
def _process_disks_status(vm, etime, disks):
439
    """Process a disks status notification from the backend
440

441
    Process an incoming message from the Ganeti backend,
442
    detailing the disk configuration of a VM instance.
443

444
    Update the state of the VM in the DB accordingly.
445

446
    """
447
    ganeti_disks = process_ganeti_disks(disks)
448
    db_disks = dict([(disk.id, disk)
449
                     for disk in vm.volumes.filter(deleted=False)])
450

    
451
    for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
452
        db_disk = db_disks.get(disk_name)
453
        ganeti_disk = ganeti_disks.get(disk_name)
454
        if ganeti_disk is None:
455
            if disk_is_stale(vm, disk):
456
                log.debug("Removing stale disk '%s'" % db_disk)
457
                # TODO: Handle disk deletion
458
                db_disk.deleted = True
459
                db_disk.save()
460
            else:
461
                log.info("disk '%s' is still being created" % db_disk)
462
        elif db_disk is None:
463
            msg = ("disk/%s of VM %s does not exist in DB! Cannot"
464
                   " automatically fix this issue!" % (disk_name, vm))
465
            log.error(msg)
466
            continue
467
        elif not disks_are_equal(db_disk, ganeti_disk):
468
            for f in DISK_FIELDS:
469
                # Update the disk in DB with the values from Ganeti disk
470
                setattr(db_disk, f, ganeti_disk[f])
471
                db_disk.save()
472

    
473
            # TODO: Special case where the size of the disk has changed!!
474
            assert(ganeti_disk["size"] == db_disk.size)
475

    
476
    vm.backendtime = etime
477
    vm.save()
478

    
479

    
480
def disks_are_equal(db_disk, gnt_disk):
481
    for field in DISK_FIELDS:
482
        if getattr(db_disk, field) != gnt_disk[field]:
483
            return False
484
    return True
485

    
486

    
487
def process_ganeti_disks(ganeti_disks):
488
    """Process disk dict from ganeti"""
489
    new_disks = []
490
    for index, gdisk in enumerate(ganeti_disks):
491
        disk_name = gdisk.get("name", None)
492
        if disk_name is not None:
493
            disk_id = utils.id_from_disk_name(disk_name)
494
        else:
495
            # Put as default value the index. If it is an unknown disk to
496
            # synnefo it will be created automaticaly.
497
            disk_id = UNKNOWN_DISK_PREFIX + str(index)
498

    
499
        # Get disk size in GB
500
        size = gdisk.get("size") >> 10
501

    
502
        disk_info = {
503
            'index': index,
504
            'size': size,
505
            'status': "IN_USE"}
506

    
507
        new_disks.append((disk_id, disk_info))
508
    return dict(new_disks)
509

    
510

    
511
@transaction.commit_on_success
512
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
513
    if status not in [x[0] for x in BACKEND_STATUSES]:
514
        raise Network.InvalidBackendMsgError(opcode, status)
515

    
516
    back_network.backendjobid = jobid
517
    back_network.backendjobstatus = status
518
    back_network.backendopcode = opcode
519
    back_network.backendlogmsg = logmsg
520

    
521
    # Note: Network is already locked!
522
    network = back_network.network
523

    
524
    # Notifications of success change the operating state
525
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
526
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
527
        back_network.operstate = state_for_success
528

    
529
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
530
       and opcode == 'OP_NETWORK_ADD'):
531
        back_network.operstate = 'ERROR'
532
        back_network.backendtime = etime
533

    
534
    if opcode == 'OP_NETWORK_REMOVE':
535
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
536
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
537
                                  network_exists_in_backend(back_network)):
538
            back_network.operstate = state_for_success
539
            back_network.deleted = True
540
            back_network.backendtime = etime
541

    
542
    if status == rapi.JOB_STATUS_SUCCESS:
543
        back_network.backendtime = etime
544
    back_network.save()
545
    # Also you must update the state of the Network!!
546
    update_network_state(network)
547

    
548

    
549
def update_network_state(network):
550
    """Update the state of a Network based on BackendNetwork states.
551

552
    Update the state of a Network based on the operstate of the networks in the
553
    backends that network exists.
554

555
    The state of the network is:
556
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
557
    * DELETED: If it is is 'DELETED' in all backends that have been created.
558

559
    This function also releases the resources (MAC prefix or Bridge) and the
560
    quotas for the network.
561

562
    """
563
    if network.deleted:
564
        # Network has already been deleted. Just assert that state is also
565
        # DELETED
566
        if not network.state == "DELETED":
567
            network.state = "DELETED"
568
            network.save()
569
        return
570

    
571
    backend_states = [s.operstate for s in network.backend_networks.all()]
572
    if not backend_states and network.action != "DESTROY":
573
        if network.state != "ACTIVE":
574
            network.state = "ACTIVE"
575
            network.save()
576
            return
577

    
578
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
579
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
580
                     "DELETED")
581

    
582
    # Release the resources on the deletion of the Network
583
    if deleted:
584
        if network.ips.filter(deleted=False, floating_ip=True).exists():
585
            msg = "Cannot delete network %s! Floating IPs still in use!"
586
            log.error(msg % network)
587
            raise Exception(msg % network)
588
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
589
                 network.id, network.mac_prefix, network.link)
590
        network.deleted = True
591
        network.state = "DELETED"
592
        # Undrain the network, otherwise the network state will remain
593
        # as 'SNF:DRAINED'
594
        network.drained = False
595
        if network.mac_prefix:
596
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
597
                release_resource(res_type="mac_prefix",
598
                                 value=network.mac_prefix)
599
        if network.link:
600
            if network.FLAVORS[network.flavor]["link"] == "pool":
601
                release_resource(res_type="bridge", value=network.link)
602

    
603
        # Set all subnets as deleted
604
        network.subnets.update(deleted=True)
605
        # And delete the IP pools
606
        for subnet in network.subnets.all():
607
            if subnet.ipversion == 4:
608
                subnet.ip_pools.all().delete()
609
        # And all the backend networks since there are useless
610
        network.backend_networks.all().delete()
611

    
612
        # Issue commission
613
        if network.userid:
614
            quotas.issue_and_accept_commission(network, action="DESTROY")
615
            # the above has already saved the object and committed;
616
            # a second save would override others' changes, since the
617
            # object is now unlocked
618
            return
619
        elif not network.public:
620
            log.warning("Network %s does not have an owner!", network.id)
621
    network.save()
622

    
623

    
624
@transaction.commit_on_success
625
def process_network_modify(back_network, etime, jobid, opcode, status,
626
                           job_fields):
627
    assert (opcode == "OP_NETWORK_SET_PARAMS")
628
    if status not in [x[0] for x in BACKEND_STATUSES]:
629
        raise Network.InvalidBackendMsgError(opcode, status)
630

    
631
    back_network.backendjobid = jobid
632
    back_network.backendjobstatus = status
633
    back_network.opcode = opcode
634

    
635
    add_reserved_ips = job_fields.get("add_reserved_ips")
636
    if add_reserved_ips:
637
        network = back_network.network
638
        for ip in add_reserved_ips:
639
            network.reserve_address(ip, external=True)
640

    
641
    if status == rapi.JOB_STATUS_SUCCESS:
642
        back_network.backendtime = etime
643
    back_network.save()
644

    
645

    
646
@transaction.commit_on_success
647
def process_create_progress(vm, etime, progress):
648

    
649
    percentage = int(progress)
650

    
651
    # The percentage may exceed 100%, due to the way
652
    # snf-image:copy-progress tracks bytes read by image handling processes
653
    percentage = 100 if percentage > 100 else percentage
654
    if percentage < 0:
655
        raise ValueError("Percentage cannot be negative")
656

    
657
    # FIXME: log a warning here, see #1033
658
#   if last_update > percentage:
659
#       raise ValueError("Build percentage should increase monotonically " \
660
#                        "(old = %d, new = %d)" % (last_update, percentage))
661

    
662
    # This assumes that no message of type 'ganeti-create-progress' is going to
663
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
664
    # the instance is STARTED.  What if the two messages are processed by two
665
    # separate dispatcher threads, and the 'ganeti-op-status' message for
666
    # successful creation gets processed before the 'ganeti-create-progress'
667
    # message? [vkoukis]
668
    #
669
    #if not vm.operstate == 'BUILD':
670
    #    raise VirtualMachine.IllegalState("VM is not in building state")
671

    
672
    vm.buildpercentage = percentage
673
    vm.backendtime = etime
674
    vm.save()
675

    
676

    
677
@transaction.commit_on_success
678
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
679
                               details=None):
680
    """
681
    Create virtual machine instance diagnostic entry.
682

683
    :param vm: VirtualMachine instance to create diagnostic for.
684
    :param message: Diagnostic message.
685
    :param source: Diagnostic source identifier (e.g. image-helper).
686
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
687
    :param etime: The time the message occured (if available).
688
    :param details: Additional details or debug information.
689
    """
690
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
691
                                                   source_date=etime,
692
                                                   message=message,
693
                                                   details=details)
694

    
695

    
696
def create_instance(vm, nics, volumes, flavor, image):
697
    """`image` is a dictionary which should contain the keys:
698
            'backend_id', 'format' and 'metadata'
699

700
        metadata value should be a dictionary.
701
    """
702

    
703
    # Handle arguments to CreateInstance() as a dictionary,
704
    # initialize it based on a deployment-specific value.
705
    # This enables the administrator to override deployment-specific
706
    # arguments, such as the disk template to use, name of os provider
707
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
708
    #
709
    kw = vm.backend.get_create_params()
710
    kw['mode'] = 'create'
711
    kw['name'] = vm.backend_vm_id
712
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
713

    
714
    kw['disk_template'] = volumes[0].template
715
    disks = []
716
    for volume in volumes:
717
        disk = {"name": volume.backend_volume_uuid,
718
                "size": volume.size * 1024}
719
        provider = volume.provider
720
        if provider is not None:
721
            disk["provider"] = provider
722
            disk["origin"] = volume.origin
723
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
724
                                        .get(provider)
725
            if extra_disk_params is not None:
726
                disk.update(extra_disk_params)
727
        disks.append(disk)
728

    
729
    kw["disks"] = disks
730

    
731
    kw['nics'] = [{"name": nic.backend_uuid,
732
                   "network": nic.network.backend_id,
733
                   "ip": nic.ipv4_address}
734
                  for nic in nics]
735

    
736
    backend = vm.backend
737
    depend_jobs = []
738
    for nic in nics:
739
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
740
        depend_jobs.extend(job_ids)
741

    
742
    kw["depends"] = create_job_dependencies(depend_jobs)
743

    
744
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
745
    # kw['os'] = settings.GANETI_OS_PROVIDER
746
    kw['ip_check'] = False
747
    kw['name_check'] = False
748

    
749
    # Do not specific a node explicitly, have
750
    # Ganeti use an iallocator instead
751
    #kw['pnode'] = rapi.GetNodes()[0]
752

    
753
    kw['dry_run'] = settings.TEST
754

    
755
    kw['beparams'] = {
756
        'auto_balance': True,
757
        'vcpus': flavor.cpu,
758
        'memory': flavor.ram}
759

    
760
    kw['osparams'] = {
761
        'config_url': vm.config_url,
762
        # Store image id and format to Ganeti
763
        'img_id': image['backend_id'],
764
        'img_format': image['format']}
765

    
766
    # Use opportunistic locking
767
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
768

    
769
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
770
    # kw['hvparams'] = dict(serial_console=False)
771

    
772
    log.debug("Creating instance %s", utils.hide_pass(kw))
773
    with pooled_rapi_client(vm) as client:
774
        return client.CreateInstance(**kw)
775

    
776

    
777
def delete_instance(vm, shutdown_timeout=None):
778
    with pooled_rapi_client(vm) as client:
779
        return client.DeleteInstance(vm.backend_vm_id,
780
                                     shutdown_timeout=shutdown_timeout,
781
                                     dry_run=settings.TEST)
782

    
783

    
784
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
785
    assert reboot_type in ('soft', 'hard')
786
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
787
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
788
    # 'shutdown_timeout'.
789
    kwargs = {"instance": vm.backend_vm_id,
790
              "reboot_type": "hard"}
791
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
792
    # Ganeti > 2.10. In other versions this parameter will be ignored and
793
    # we will fallback to default timeout of Ganeti (120s).
794
    if shutdown_timeout is not None:
795
        kwargs["shutdown_timeout"] = shutdown_timeout
796
    if reboot_type == "hard":
797
        kwargs["shutdown_timeout"] = 0
798
    if settings.TEST:
799
        kwargs["dry_run"] = True
800
    with pooled_rapi_client(vm) as client:
801
        return client.RebootInstance(**kwargs)
802

    
803

    
804
def startup_instance(vm):
805
    with pooled_rapi_client(vm) as client:
806
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
807

    
808

    
809
def shutdown_instance(vm, shutdown_timeout=None):
810
    with pooled_rapi_client(vm) as client:
811
        return client.ShutdownInstance(vm.backend_vm_id,
812
                                       timeout=shutdown_timeout,
813
                                       dry_run=settings.TEST)
814

    
815

    
816
def resize_instance(vm, vcpus, memory):
817
    beparams = {"vcpus": int(vcpus),
818
                "minmem": int(memory),
819
                "maxmem": int(memory)}
820
    with pooled_rapi_client(vm) as client:
821
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
822

    
823

    
824
def get_instance_console(vm):
825
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
826
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
827
    # useless (see #783).
828
    #
829
    # Until this is fixed on the Ganeti side, construct a console info reply
830
    # directly.
831
    #
832
    # WARNING: This assumes that VNC runs on port network_port on
833
    #          the instance's primary node, and is probably
834
    #          hypervisor-specific.
835
    #
836
    log.debug("Getting console for vm %s", vm)
837

    
838
    console = {}
839
    console['kind'] = 'vnc'
840

    
841
    with pooled_rapi_client(vm) as client:
842
        i = client.GetInstance(vm.backend_vm_id)
843

    
844
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
845
        raise Exception("hv parameter serial_console cannot be true")
846
    console['host'] = i['pnode']
847
    console['port'] = i['network_port']
848

    
849
    return console
850

    
851

    
852
def get_instance_info(vm):
853
    with pooled_rapi_client(vm) as client:
854
        return client.GetInstance(vm.backend_vm_id)
855

    
856

    
857
def vm_exists_in_backend(vm):
858
    try:
859
        get_instance_info(vm)
860
        return True
861
    except rapi.GanetiApiError as e:
862
        if e.code == 404:
863
            return False
864
        raise e
865

    
866

    
867
def get_network_info(backend_network):
868
    with pooled_rapi_client(backend_network) as client:
869
        return client.GetNetwork(backend_network.network.backend_id)
870

    
871

    
872
def network_exists_in_backend(backend_network):
873
    try:
874
        get_network_info(backend_network)
875
        return True
876
    except rapi.GanetiApiError as e:
877
        if e.code == 404:
878
            return False
879

    
880

    
881
def job_is_still_running(vm, job_id=None):
882
    with pooled_rapi_client(vm) as c:
883
        try:
884
            if job_id is None:
885
                job_id = vm.backendjobid
886
            job_info = c.GetJobStatus(job_id)
887
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
888
        except rapi.GanetiApiError:
889
            return False
890

    
891

    
892
def disk_is_stale(vm, disk, timeout=60):
893
    """Check if a disk is stale or exists in the Ganeti backend."""
894
    # First check the state of the disk
895
    if disk.status == "CREATING":
896
        if datetime.now() < disk.created + timedelta(seconds=timeout):
897
            # Do not check for too recent disks to avoid the time overhead
898
            return False
899
        if job_is_still_running(vm, job_id=disk.backendjobid):
900
            return False
901
        else:
902
            # If job has finished, check that the disk exists, because the
903
            # message may have been lost or stuck in the queue.
904
            vm_info = get_instance_info(vm)
905
            if disk.backend_volume_uuid in vm_info["disk.names"]:
906
                return False
907
    return True
908

    
909

    
910
def nic_is_stale(vm, nic, timeout=60):
911
    """Check if a NIC is stale or exists in the Ganeti backend."""
912
    # First check the state of the NIC and if there is a pending CONNECT
913
    if nic.state == "BUILD" and vm.task == "CONNECT":
914
        if datetime.now() < nic.created + timedelta(seconds=timeout):
915
            # Do not check for too recent NICs to avoid the time overhead
916
            return False
917
        if job_is_still_running(vm, job_id=vm.task_job_id):
918
            return False
919
        else:
920
            # If job has finished, check that the NIC exists, because the
921
            # message may have been lost or stuck in the queue.
922
            vm_info = get_instance_info(vm)
923
            if nic.backend_uuid in vm_info["nic.names"]:
924
                return False
925
    return True
926

    
927

    
928
def ensure_network_is_active(backend, network_id):
929
    """Ensure that a network is active in the specified backend
930

931
    Check that a network exists and is active in the specified backend. If not
932
    (re-)create the network. Return the corresponding BackendNetwork object
933
    and the IDs of the Ganeti job to create the network.
934

935
    """
936
    job_ids = []
937
    try:
938
        bnet = BackendNetwork.objects.select_related("network")\
939
                                     .get(backend=backend, network=network_id)
940
        if bnet.operstate != "ACTIVE":
941
            job_ids = create_network(bnet.network, backend, connect=True)
942
    except BackendNetwork.DoesNotExist:
943
        network = Network.objects.select_for_update().get(id=network_id)
944
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
945
        job_ids = create_network(network, backend, connect=True)
946

    
947
    return bnet, job_ids
948

    
949

    
950
def create_network(network, backend, connect=True):
951
    """Create a network in a Ganeti backend"""
952
    log.debug("Creating network %s in backend %s", network, backend)
953

    
954
    job_id = _create_network(network, backend)
955

    
956
    if connect:
957
        job_ids = connect_network(network, backend, depends=[job_id])
958
        return job_ids
959
    else:
960
        return [job_id]
961

    
962

    
963
def _create_network(network, backend):
964
    """Create a network."""
965

    
966
    tags = network.backend_tag
967
    subnet = None
968
    subnet6 = None
969
    gateway = None
970
    gateway6 = None
971
    for _subnet in network.subnets.all():
972
        if _subnet.dhcp and not "nfdhcpd" in tags:
973
            tags.append("nfdhcpd")
974
        if _subnet.ipversion == 4:
975
            subnet = _subnet.cidr
976
            gateway = _subnet.gateway
977
        elif _subnet.ipversion == 6:
978
            subnet6 = _subnet.cidr
979
            gateway6 = _subnet.gateway
980

    
981
    conflicts_check = False
982
    if network.public:
983
        tags.append('public')
984
        if subnet is not None:
985
            conflicts_check = True
986
    else:
987
        tags.append('private')
988

    
989
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
990
    # not support IPv6 only networks. To bypass this limitation, we create the
991
    # network with a dummy network subnet, and make Cyclades connect instances
992
    # to such networks, with address=None.
993
    if subnet is None:
994
        subnet = "10.0.0.0/29"
995

    
996
    try:
997
        bn = BackendNetwork.objects.get(network=network, backend=backend)
998
        mac_prefix = bn.mac_prefix
999
    except BackendNetwork.DoesNotExist:
1000
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
1001
                        " does not exist" % (network.id, backend.id))
1002

    
1003
    with pooled_rapi_client(backend) as client:
1004
        return client.CreateNetwork(network_name=network.backend_id,
1005
                                    network=subnet,
1006
                                    network6=subnet6,
1007
                                    gateway=gateway,
1008
                                    gateway6=gateway6,
1009
                                    mac_prefix=mac_prefix,
1010
                                    conflicts_check=conflicts_check,
1011
                                    tags=tags)
1012

    
1013

    
1014
def connect_network(network, backend, depends=[], group=None):
1015
    """Connect a network to nodegroups."""
1016
    log.debug("Connecting network %s to backend %s", network, backend)
1017

    
1018
    conflicts_check = False
1019
    if network.public and (network.subnet4 is not None):
1020
        conflicts_check = True
1021

    
1022
    depends = create_job_dependencies(depends)
1023
    with pooled_rapi_client(backend) as client:
1024
        groups = [group] if group is not None else client.GetGroups()
1025
        job_ids = []
1026
        for group in groups:
1027
            job_id = client.ConnectNetwork(network.backend_id, group,
1028
                                           network.mode, network.link,
1029
                                           conflicts_check,
1030
                                           depends=depends)
1031
            job_ids.append(job_id)
1032
    return job_ids
1033

    
1034

    
1035
def delete_network(network, backend, disconnect=True):
1036
    log.debug("Deleting network %s from backend %s", network, backend)
1037

    
1038
    depends = []
1039
    if disconnect:
1040
        depends = disconnect_network(network, backend)
1041
    _delete_network(network, backend, depends=depends)
1042

    
1043

    
1044
def _delete_network(network, backend, depends=[]):
1045
    depends = create_job_dependencies(depends)
1046
    with pooled_rapi_client(backend) as client:
1047
        return client.DeleteNetwork(network.backend_id, depends)
1048

    
1049

    
1050
def disconnect_network(network, backend, group=None):
1051
    log.debug("Disconnecting network %s to backend %s", network, backend)
1052

    
1053
    with pooled_rapi_client(backend) as client:
1054
        groups = [group] if group is not None else client.GetGroups()
1055
        job_ids = []
1056
        for group in groups:
1057
            job_id = client.DisconnectNetwork(network.backend_id, group)
1058
            job_ids.append(job_id)
1059
    return job_ids
1060

    
1061

    
1062
def connect_to_network(vm, nic):
1063
    network = nic.network
1064
    backend = vm.backend
1065
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
1066

    
1067
    depends = create_job_dependencies(depend_jobs)
1068

    
1069
    nic = {'name': nic.backend_uuid,
1070
           'network': network.backend_id,
1071
           'ip': nic.ipv4_address}
1072

    
1073
    log.debug("Adding NIC %s to VM %s", nic, vm)
1074

    
1075
    kwargs = {
1076
        "instance": vm.backend_vm_id,
1077
        "nics": [("add", "-1", nic)],
1078
        "depends": depends,
1079
    }
1080
    if vm.backend.use_hotplug():
1081
        kwargs["hotplug_if_possible"] = True
1082
    if settings.TEST:
1083
        kwargs["dry_run"] = True
1084

    
1085
    with pooled_rapi_client(vm) as client:
1086
        return client.ModifyInstance(**kwargs)
1087

    
1088

    
1089
def disconnect_from_network(vm, nic):
1090
    log.debug("Removing NIC %s of VM %s", nic, vm)
1091

    
1092
    kwargs = {
1093
        "instance": vm.backend_vm_id,
1094
        "nics": [("remove", nic.backend_uuid, {})],
1095
    }
1096
    if vm.backend.use_hotplug():
1097
        kwargs["hotplug_if_possible"] = True
1098
    if settings.TEST:
1099
        kwargs["dry_run"] = True
1100

    
1101
    with pooled_rapi_client(vm) as client:
1102
        jobID = client.ModifyInstance(**kwargs)
1103
        firewall_profile = nic.firewall_profile
1104
        if firewall_profile and firewall_profile != "DISABLED":
1105
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1106
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1107
                                      dry_run=settings.TEST)
1108

    
1109
        return jobID
1110

    
1111

    
1112
def set_firewall_profile(vm, profile, nic):
1113
    uuid = nic.backend_uuid
1114
    try:
1115
        tag = _firewall_tags[profile] % uuid
1116
    except KeyError:
1117
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1118

    
1119
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1120

    
1121
    with pooled_rapi_client(vm) as client:
1122
        # Delete previous firewall tags
1123
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1124
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1125
                       if (t % uuid) in old_tags]
1126
        if delete_tags:
1127
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1128
                                      dry_run=settings.TEST)
1129

    
1130
        if profile != "DISABLED":
1131
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1132
                                   dry_run=settings.TEST)
1133

    
1134
        # XXX NOP ModifyInstance call to force process_net_status to run
1135
        # on the dispatcher
1136
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1137
        client.ModifyInstance(vm.backend_vm_id,
1138
                              os_name=os_name)
1139
    return None
1140

    
1141

    
1142
def attach_volume(vm, volume, depends=[]):
1143
    log.debug("Attaching volume %s to vm %s", volume, vm)
1144

    
1145
    disk = {"size": int(volume.size) << 10,
1146
            "name": volume.backend_volume_uuid,
1147
            "volume_name": volume.backend_volume_uuid}
1148

    
1149
    disk_provider = volume.disk_provider
1150
    if disk_provider is not None:
1151
        disk["provider"] = disk_provider
1152

    
1153
    if volume.origin is not None:
1154
        disk["origin"] = volume.origin
1155

    
1156
    kwargs = {
1157
        "instance": vm.backend_vm_id,
1158
        "disks": [("add", "-1", disk)],
1159
        "depends": depends,
1160
    }
1161
    if vm.backend.use_hotplug():
1162
        kwargs["hotplug_if_possible"] = True
1163
    if settings.TEST:
1164
        kwargs["dry_run"] = True
1165

    
1166
    with pooled_rapi_client(vm) as client:
1167
        return client.ModifyInstance(**kwargs)
1168

    
1169

    
1170
def detach_volume(vm, volume, depends=[]):
1171
    log.debug("Removing volume %s from vm %s", volume, vm)
1172
    kwargs = {
1173
        "instance": vm.backend_vm_id,
1174
        "disks": [("remove", volume.backend_volume_uuid, {})],
1175
        "depends": depends,
1176
    }
1177
    if vm.backend.use_hotplug():
1178
        kwargs["hotplug_if_possible"] = True
1179
    if settings.TEST:
1180
        kwargs["dry_run"] = True
1181

    
1182
    with pooled_rapi_client(vm) as client:
1183
        return client.ModifyInstance(**kwargs)
1184

    
1185

    
1186
def snapshot_instance(vm, snapshot_name):
1187
    #volume = instance.volumes.all()[0]
1188
    with pooled_rapi_client(vm) as client:
1189
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1190
                                       snapshot_name=snapshot_name)
1191

    
1192

    
1193
def get_instances(backend, bulk=True):
1194
    with pooled_rapi_client(backend) as c:
1195
        return c.GetInstances(bulk=bulk)
1196

    
1197

    
1198
def get_nodes(backend, bulk=True):
1199
    with pooled_rapi_client(backend) as c:
1200
        return c.GetNodes(bulk=bulk)
1201

    
1202

    
1203
def get_jobs(backend, bulk=True):
1204
    with pooled_rapi_client(backend) as c:
1205
        return c.GetJobs(bulk=bulk)
1206

    
1207

    
1208
def get_physical_resources(backend):
1209
    """ Get the physical resources of a backend.
1210

1211
    Get the resources of a backend as reported by the backend (not the db).
1212

1213
    """
1214
    nodes = get_nodes(backend, bulk=True)
1215
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1216
    res = {}
1217
    for a in attr:
1218
        res[a] = 0
1219
    for n in nodes:
1220
        # Filter out drained, offline and not vm_capable nodes since they will
1221
        # not take part in the vm allocation process
1222
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1223
        if can_host_vms and n['cnodes']:
1224
            for a in attr:
1225
                res[a] += int(n[a] or 0)
1226
    return res
1227

    
1228

    
1229
def update_backend_resources(backend, resources=None):
1230
    """ Update the state of the backend resources in db.
1231

1232
    """
1233

    
1234
    if not resources:
1235
        resources = get_physical_resources(backend)
1236

    
1237
    backend.mfree = resources['mfree']
1238
    backend.mtotal = resources['mtotal']
1239
    backend.dfree = resources['dfree']
1240
    backend.dtotal = resources['dtotal']
1241
    backend.pinst_cnt = resources['pinst_cnt']
1242
    backend.ctotal = resources['ctotal']
1243
    backend.updated = datetime.now()
1244
    backend.save()
1245

    
1246

    
1247
def get_memory_from_instances(backend):
1248
    """ Get the memory that is used from instances.
1249

1250
    Get the used memory of a backend. Note: This is different for
1251
    the real memory used, due to kvm's memory de-duplication.
1252

1253
    """
1254
    with pooled_rapi_client(backend) as client:
1255
        instances = client.GetInstances(bulk=True)
1256
    mem = 0
1257
    for i in instances:
1258
        mem += i['oper_ram']
1259
    return mem
1260

    
1261

    
1262
def get_available_disk_templates(backend):
1263
    """Get the list of available disk templates of a Ganeti backend.
1264

1265
    The list contains the disk templates that are enabled in the Ganeti backend
1266
    and also included in ipolicy-disk-templates.
1267

1268
    """
1269
    with pooled_rapi_client(backend) as c:
1270
        info = c.GetInfo()
1271
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1272
    try:
1273
        enabled_disk_templates = info["enabled_disk_templates"]
1274
        return [dp for dp in enabled_disk_templates
1275
                if dp in ipolicy_disk_templates]
1276
    except KeyError:
1277
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1278
        return ipolicy_disk_templates
1279

    
1280

    
1281
def update_backend_disk_templates(backend):
1282
    disk_templates = get_available_disk_templates(backend)
1283
    backend.disk_templates = disk_templates
1284
    backend.save()
1285

    
1286

    
1287
##
1288
## Synchronized operations for reconciliation
1289
##
1290

    
1291

    
1292
def create_network_synced(network, backend):
1293
    result = _create_network_synced(network, backend)
1294
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1295
        return result
1296
    result = connect_network_synced(network, backend)
1297
    return result
1298

    
1299

    
1300
def _create_network_synced(network, backend):
1301
    with pooled_rapi_client(backend) as client:
1302
        job = _create_network(network, backend)
1303
        result = wait_for_job(client, job)
1304
    return result
1305

    
1306

    
1307
def connect_network_synced(network, backend):
1308
    with pooled_rapi_client(backend) as client:
1309
        for group in client.GetGroups():
1310
            job = client.ConnectNetwork(network.backend_id, group,
1311
                                        network.mode, network.link)
1312
            result = wait_for_job(client, job)
1313
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1314
                return result
1315

    
1316
    return result
1317

    
1318

    
1319
def wait_for_job(client, jobid):
1320
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1321
    status = result['job_info'][0]
1322
    while status not in rapi.JOB_STATUS_FINALIZED:
1323
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1324
                                         [result], None)
1325
        status = result['job_info'][0]
1326

    
1327
    if status == rapi.JOB_STATUS_SUCCESS:
1328
        return (status, None)
1329
    else:
1330
        error = result['job_info'][1]
1331
        return (status, error)
1332

    
1333

    
1334
def create_job_dependencies(job_ids=[], job_states=None):
1335
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1336
    if job_states is None:
1337
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1338
    assert(type(job_states) == list)
1339
    return [[job_id, job_states] for job_id in job_ids]