Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 7c714455

History | View | Annotate | Download (35.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic.rapi import GanetiApiError
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
61
    """Handle quotas for updated VirtualMachine.
62

63
    Update quotas for the updated VirtualMachine based on the job that run on
64
    the Ganeti backend. If a commission has been already issued for this job,
65
    then this commission is just accepted or rejected based on the job status.
66
    Otherwise, a new commission for the given change is issued, that is also in
67
    force and auto-accept mode. In this case, previous commissions are
68
    rejected, since they reflect a previous state of the VM.
69

70
    """
71
    if job_status not in ["success", "error", "canceled"]:
72
        return vm
73

    
74
    # Check successful completion of a job will trigger any quotable change in
75
    # the VM state.
76
    action = utils.get_action_from_opcode(job_opcode, job_fields)
77
    if action == "BUILD":
78
        # Quotas for new VMs are automatically accepted by the API
79
        return vm
80
    commission_info = quotas.get_commission_info(vm, action=action,
81
                                                 action_fields=job_fields)
82

    
83
    if vm.task_job_id == job_id and vm.serial is not None:
84
        # Commission for this change has already been issued. So just
85
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
86
        # if fails, must be accepted, as the user must manually remove the
87
        # failed server
88
        serial = vm.serial
89
        if job_status == "success":
90
            quotas.accept_serial(serial)
91
        elif job_status in ["error", "canceled"]:
92
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
93
                      serial)
94
            quotas.reject_serial(serial)
95
        vm.serial = None
96
    elif job_status == "success" and commission_info is not None:
97
        log.debug("Expected job was %s. Processing job %s. Commission for"
98
                  " this job: %s", vm.task_job_id, job_id, commission_info)
99
        # Commission for this change has not been issued, or the issued
100
        # commission was unaware of the current change. Reject all previous
101
        # commissions and create a new one in forced mode!
102
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
103
                           % (vm, job_id))
104
        quotas.handle_resource_commission(vm, action,
105
                                          commission_info=commission_info,
106
                                          commission_name=commission_name,
107
                                          force=True,
108
                                          auto_accept=True)
109
        log.debug("Issued new commission: %s", vm.serial)
110

    
111
    return vm
112

    
113

    
114
@transaction.commit_on_success
115
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
116
                      beparams=None):
117
    """Process a job progress notification from the backend
118

119
    Process an incoming message from the backend (currently Ganeti).
120
    Job notifications with a terminating status (sucess, error, or canceled),
121
    also update the operating state of the VM.
122

123
    """
124
    # See #1492, #1031, #1111 why this line has been removed
125
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
126
    if status not in [x[0] for x in BACKEND_STATUSES]:
127
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
128

    
129
    vm.backendjobid = jobid
130
    vm.backendjobstatus = status
131
    vm.backendopcode = opcode
132
    vm.backendlogmsg = logmsg
133

    
134
    if status in ["queued", "waiting", "running"]:
135
        vm.save()
136
        return
137

    
138
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
139

    
140
    # Notifications of success change the operating state
141
    if status == "success":
142
        if state_for_success is not None:
143
            vm.operstate = state_for_success
144
        if beparams:
145
            # Change the flavor of the VM
146
            _process_resize(vm, beparams)
147
        # Update backendtime only for jobs that have been successfully
148
        # completed, since only these jobs update the state of the VM. Else a
149
        # "race condition" may occur when a successful job (e.g.
150
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
151
        # in reversed order.
152
        vm.backendtime = etime
153

    
154
    if status in ["success", "error", "canceled"] and nics is not None:
155
        # Update the NICs of the VM
156
        _process_net_status(vm, etime, nics)
157

    
158
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
159
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
160
        vm.operstate = 'ERROR'
161
        vm.backendtime = etime
162
    elif opcode == 'OP_INSTANCE_REMOVE':
163
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
164
        # when no instance exists at the Ganeti backend.
165
        # See ticket #799 for all the details.
166
        if status == 'success' or (status == 'error' and
167
                                   not vm_exists_in_backend(vm)):
168
            # VM has been deleted. Release the instance IPs
169
            release_instance_ips(vm, [])
170
            # And delete the releated NICs (must be performed after release!)
171
            vm.nics.all().delete()
172
            vm.deleted = True
173
            vm.operstate = state_for_success
174
            vm.backendtime = etime
175
            status = "success"
176

    
177
    if status in ["success", "error", "canceled"]:
178
        # Job is finalized: Handle quotas/commissioning
179
        job_fields = {"nics": nics, "beparams": beparams}
180
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
181
                              job_status=status, job_fields=job_fields)
182
        # and clear task fields
183
        if vm.task_job_id == jobid:
184
            vm.task = None
185
            vm.task_job_id = None
186

    
187
    vm.save()
188

    
189

    
190
def _process_resize(vm, beparams):
191
    """Change flavor of a VirtualMachine based on new beparams."""
192
    old_flavor = vm.flavor
193
    vcpus = beparams.get("vcpus", old_flavor.cpu)
194
    ram = beparams.get("maxmem", old_flavor.ram)
195
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
196
        return
197
    try:
198
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
199
                                        disk=old_flavor.disk,
200
                                        disk_template=old_flavor.disk_template)
201
    except Flavor.DoesNotExist:
202
        raise Exception("Can not find flavor for VM")
203
    vm.flavor = new_flavor
204
    vm.save()
205

    
206

    
207
@transaction.commit_on_success
208
def process_net_status(vm, etime, nics):
209
    """Wrap _process_net_status inside transaction."""
210
    _process_net_status(vm, etime, nics)
211

    
212

    
213
def _process_net_status(vm, etime, nics):
214
    """Process a net status notification from the backend
215

216
    Process an incoming message from the Ganeti backend,
217
    detailing the NIC configuration of a VM instance.
218

219
    Update the state of the VM in the DB accordingly.
220
    """
221

    
222
    ganeti_nics = process_ganeti_nics(nics)
223
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
224
        log.debug("NICs for VM %s have not changed", vm)
225
        return
226

    
227
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
228
    # guarantee that no deadlock will occur with Backend allocator.
229
    Backend.objects.select_for_update().get(id=vm.backend_id)
230

    
231
    # NICs have changed. Release the instance IPs
232
    release_instance_ips(vm, ganeti_nics)
233
    # And delete the releated NICs (must be performed after release!)
234
    vm.nics.all().delete()
235

    
236
    for nic in ganeti_nics:
237
        ipv4 = nic["ipv4"]
238
        net = nic['network']
239
        if ipv4:
240
            net.reserve_address(ipv4)
241

    
242
        vm.nics.create(**nic)
243
        # Dummy save the network, because UI uses changed-since for VMs
244
        # and Networks in order to show the VM NICs
245
        net.save()
246

    
247
    vm.backendtime = etime
248
    vm.save()
249

    
250

    
251
def process_ganeti_nics(ganeti_nics):
252
    """Process NIC dict from ganeti hooks."""
253
    new_nics = []
254
    for i, new_nic in enumerate(ganeti_nics):
255
        network = new_nic.get('network', '')
256
        n = str(network)
257
        pk = utils.id_from_network_name(n)
258

    
259
        net = Network.objects.get(pk=pk)
260

    
261
        # Get the new nic info
262
        mac = new_nic.get('mac')
263
        ipv4 = new_nic.get('ip')
264
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
265

    
266
        firewall = new_nic.get('firewall')
267
        firewall_profile = _reverse_tags.get(firewall)
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            # Get X-Lock before searching floating IP, to exclusively search
304
            # and release floating IP. Otherwise you may release a floating IP
305
            # that has been just reserved.
306
            net = Network.objects.select_for_update().get(id=network_id)
307
            if net.floating_ip_pool:
308
                try:
309
                    floating_ip = net.floating_ips.select_for_update()\
310
                                                  .get(ipv4=ipv4, machine=vm,
311
                                                       deleted=False)
312
                    floating_ip.machine = None
313
                    floating_ip.save()
314
                except FloatingIP.DoesNotExist:
315
                    net.release_address(ipv4)
316
            else:
317
                net.release_address(ipv4)
318

    
319

    
320
@transaction.commit_on_success
321
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
322
    if status not in [x[0] for x in BACKEND_STATUSES]:
323
        raise Network.InvalidBackendMsgError(opcode, status)
324

    
325
    back_network.backendjobid = jobid
326
    back_network.backendjobstatus = status
327
    back_network.backendopcode = opcode
328
    back_network.backendlogmsg = logmsg
329

    
330
    network = back_network.network
331

    
332
    # Notifications of success change the operating state
333
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
334
    if status == 'success' and state_for_success is not None:
335
        back_network.operstate = state_for_success
336

    
337
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
338
        back_network.operstate = 'ERROR'
339
        back_network.backendtime = etime
340

    
341
    if opcode == 'OP_NETWORK_REMOVE':
342
        network_is_deleted = (status == "success")
343
        if network_is_deleted or (status == "error" and not
344
                                  network_exists_in_backend(back_network)):
345
            back_network.operstate = state_for_success
346
            back_network.deleted = True
347
            back_network.backendtime = etime
348

    
349
    if status == 'success':
350
        back_network.backendtime = etime
351
    back_network.save()
352
    # Also you must update the state of the Network!!
353
    update_network_state(network)
354

    
355

    
356
def update_network_state(network):
357
    """Update the state of a Network based on BackendNetwork states.
358

359
    Update the state of a Network based on the operstate of the networks in the
360
    backends that network exists.
361

362
    The state of the network is:
363
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
364
    * DELETED: If it is is 'DELETED' in all backends that have been created.
365

366
    This function also releases the resources (MAC prefix or Bridge) and the
367
    quotas for the network.
368

369
    """
370
    if network.deleted:
371
        # Network has already been deleted. Just assert that state is also
372
        # DELETED
373
        if not network.state == "DELETED":
374
            network.state = "DELETED"
375
            network.save()
376
        return
377

    
378
    backend_states = [s.operstate for s in network.backend_networks.all()]
379
    if not backend_states and network.action != "DESTROY":
380
        if network.state != "ACTIVE":
381
            network.state = "ACTIVE"
382
            network.save()
383
            return
384

    
385
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
386
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
387
                     "DELETED")
388

    
389
    # Release the resources on the deletion of the Network
390
    if deleted:
391
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
392
                 network.id, network.mac_prefix, network.link)
393
        network.deleted = True
394
        network.state = "DELETED"
395
        if network.mac_prefix:
396
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
397
                release_resource(res_type="mac_prefix",
398
                                 value=network.mac_prefix)
399
        if network.link:
400
            if network.FLAVORS[network.flavor]["link"] == "pool":
401
                release_resource(res_type="bridge", value=network.link)
402

    
403
        # Issue commission
404
        if network.userid:
405
            quotas.issue_and_accept_commission(network, delete=True)
406
            # the above has already saved the object and committed;
407
            # a second save would override others' changes, since the
408
            # object is now unlocked
409
            return
410
        elif not network.public:
411
            log.warning("Network %s does not have an owner!", network.id)
412
    network.save()
413

    
414

    
415
@transaction.commit_on_success
416
def process_network_modify(back_network, etime, jobid, opcode, status,
417
                           add_reserved_ips):
418
    assert (opcode == "OP_NETWORK_SET_PARAMS")
419
    if status not in [x[0] for x in BACKEND_STATUSES]:
420
        raise Network.InvalidBackendMsgError(opcode, status)
421

    
422
    back_network.backendjobid = jobid
423
    back_network.backendjobstatus = status
424
    back_network.opcode = opcode
425

    
426
    if add_reserved_ips:
427
        net = back_network.network
428
        pool = net.get_pool()
429
        if add_reserved_ips:
430
            for ip in add_reserved_ips:
431
                pool.reserve(ip, external=True)
432
        pool.save()
433

    
434
    if status == 'success':
435
        back_network.backendtime = etime
436
    back_network.save()
437

    
438

    
439
@transaction.commit_on_success
440
def process_create_progress(vm, etime, progress):
441

    
442
    percentage = int(progress)
443

    
444
    # The percentage may exceed 100%, due to the way
445
    # snf-image:copy-progress tracks bytes read by image handling processes
446
    percentage = 100 if percentage > 100 else percentage
447
    if percentage < 0:
448
        raise ValueError("Percentage cannot be negative")
449

    
450
    # FIXME: log a warning here, see #1033
451
#   if last_update > percentage:
452
#       raise ValueError("Build percentage should increase monotonically " \
453
#                        "(old = %d, new = %d)" % (last_update, percentage))
454

    
455
    # This assumes that no message of type 'ganeti-create-progress' is going to
456
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
457
    # the instance is STARTED.  What if the two messages are processed by two
458
    # separate dispatcher threads, and the 'ganeti-op-status' message for
459
    # successful creation gets processed before the 'ganeti-create-progress'
460
    # message? [vkoukis]
461
    #
462
    #if not vm.operstate == 'BUILD':
463
    #    raise VirtualMachine.IllegalState("VM is not in building state")
464

    
465
    vm.buildpercentage = percentage
466
    vm.backendtime = etime
467
    vm.save()
468

    
469

    
470
@transaction.commit_on_success
471
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
472
                               details=None):
473
    """
474
    Create virtual machine instance diagnostic entry.
475

476
    :param vm: VirtualMachine instance to create diagnostic for.
477
    :param message: Diagnostic message.
478
    :param source: Diagnostic source identifier (e.g. image-helper).
479
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
480
    :param etime: The time the message occured (if available).
481
    :param details: Additional details or debug information.
482
    """
483
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
484
                                                   source_date=etime,
485
                                                   message=message,
486
                                                   details=details)
487

    
488

    
489
def create_instance(vm, nics, flavor, image):
490
    """`image` is a dictionary which should contain the keys:
491
            'backend_id', 'format' and 'metadata'
492

493
        metadata value should be a dictionary.
494
    """
495

    
496
    # Handle arguments to CreateInstance() as a dictionary,
497
    # initialize it based on a deployment-specific value.
498
    # This enables the administrator to override deployment-specific
499
    # arguments, such as the disk template to use, name of os provider
500
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
501
    #
502
    kw = vm.backend.get_create_params()
503
    kw['mode'] = 'create'
504
    kw['name'] = vm.backend_vm_id
505
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
506

    
507
    kw['disk_template'] = flavor.disk_template
508
    kw['disks'] = [{"size": flavor.disk * 1024}]
509
    provider = flavor.disk_provider
510
    if provider:
511
        kw['disks'][0]['provider'] = provider
512
        kw['disks'][0]['origin'] = flavor.disk_origin
513

    
514
    kw['nics'] = [{"name": nic.backend_uuid,
515
                   "network": nic.network.backend_id,
516
                   "ip": nic.ipv4}
517
                  for nic in nics]
518
    backend = vm.backend
519
    depend_jobs = []
520
    for nic in nics:
521
        network = Network.objects.select_for_update().get(id=nic.network.id)
522
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
523
                                                             network=network)
524
        if bnet.operstate != "ACTIVE":
525
            if network.public:
526
                msg = "Can not connect instance to network %s. Network is not"\
527
                      " ACTIVE in backend %s." % (network, backend)
528
                raise Exception(msg)
529
            else:
530
                jobs = create_network(network, backend, connect=True)
531
                if isinstance(jobs, list):
532
                    depend_jobs.extend(jobs)
533
                else:
534
                    depend_jobs.append(jobs)
535
    kw["depends"] = [[job, ["success", "error", "canceled"]]
536
                     for job in depend_jobs]
537

    
538
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
539
    # kw['os'] = settings.GANETI_OS_PROVIDER
540
    kw['ip_check'] = False
541
    kw['name_check'] = False
542

    
543
    # Do not specific a node explicitly, have
544
    # Ganeti use an iallocator instead
545
    #kw['pnode'] = rapi.GetNodes()[0]
546

    
547
    kw['dry_run'] = settings.TEST
548

    
549
    kw['beparams'] = {
550
        'auto_balance': True,
551
        'vcpus': flavor.cpu,
552
        'memory': flavor.ram}
553

    
554
    kw['osparams'] = {
555
        'config_url': vm.config_url,
556
        # Store image id and format to Ganeti
557
        'img_id': image['backend_id'],
558
        'img_format': image['format']}
559

    
560
    # Use opportunistic locking
561
    kw['opportunistic_locking'] = True
562

    
563
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
564
    # kw['hvparams'] = dict(serial_console=False)
565

    
566
    log.debug("Creating instance %s", utils.hide_pass(kw))
567
    with pooled_rapi_client(vm) as client:
568
        return client.CreateInstance(**kw)
569

    
570

    
571
def delete_instance(vm):
572
    with pooled_rapi_client(vm) as client:
573
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
574

    
575

    
576
def reboot_instance(vm, reboot_type):
577
    assert reboot_type in ('soft', 'hard')
578
    kwargs = {"instance": vm.backend_vm_id,
579
              "reboot_type": "hard"}
580
    # XXX: Currently shutdown_timeout parameter is not supported from the
581
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
582
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
583
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
584
    # of OS API is different from the one in Ganeti, and maps to
585
    # 'shutdown_timeout'.
586
    #if reboot_type == "hard":
587
    #    kwargs["shutdown_timeout"] = 0
588
    if settings.TEST:
589
        kwargs["dry_run"] = True
590
    with pooled_rapi_client(vm) as client:
591
        return client.RebootInstance(**kwargs)
592

    
593

    
594
def startup_instance(vm):
595
    with pooled_rapi_client(vm) as client:
596
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
597

    
598

    
599
def shutdown_instance(vm):
600
    with pooled_rapi_client(vm) as client:
601
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
602

    
603

    
604
def resize_instance(vm, vcpus, memory):
605
    beparams = {"vcpus": int(vcpus),
606
                "minmem": int(memory),
607
                "maxmem": int(memory)}
608
    with pooled_rapi_client(vm) as client:
609
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
610

    
611

    
612
def get_instance_console(vm):
613
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
614
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
615
    # useless (see #783).
616
    #
617
    # Until this is fixed on the Ganeti side, construct a console info reply
618
    # directly.
619
    #
620
    # WARNING: This assumes that VNC runs on port network_port on
621
    #          the instance's primary node, and is probably
622
    #          hypervisor-specific.
623
    #
624
    log.debug("Getting console for vm %s", vm)
625

    
626
    console = {}
627
    console['kind'] = 'vnc'
628

    
629
    with pooled_rapi_client(vm) as client:
630
        i = client.GetInstance(vm.backend_vm_id)
631

    
632
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
633
        raise Exception("hv parameter serial_console cannot be true")
634
    console['host'] = i['pnode']
635
    console['port'] = i['network_port']
636

    
637
    return console
638

    
639

    
640
def get_instance_info(vm):
641
    with pooled_rapi_client(vm) as client:
642
        return client.GetInstance(vm.backend_vm_id)
643

    
644

    
645
def vm_exists_in_backend(vm):
646
    try:
647
        get_instance_info(vm)
648
        return True
649
    except GanetiApiError as e:
650
        if e.code == 404:
651
            return False
652
        raise e
653

    
654

    
655
def get_network_info(backend_network):
656
    with pooled_rapi_client(backend_network) as client:
657
        return client.GetNetwork(backend_network.network.backend_id)
658

    
659

    
660
def network_exists_in_backend(backend_network):
661
    try:
662
        get_network_info(backend_network)
663
        return True
664
    except GanetiApiError as e:
665
        if e.code == 404:
666
            return False
667

    
668

    
669
def create_network(network, backend, connect=True):
670
    """Create a network in a Ganeti backend"""
671
    log.debug("Creating network %s in backend %s", network, backend)
672

    
673
    job_id = _create_network(network, backend)
674

    
675
    if connect:
676
        job_ids = connect_network(network, backend, depends=[job_id])
677
        return job_ids
678
    else:
679
        return [job_id]
680

    
681

    
682
def _create_network(network, backend):
683
    """Create a network."""
684

    
685
    tags = network.backend_tag
686
    if network.dhcp:
687
        tags.append('nfdhcpd')
688

    
689
    if network.public:
690
        conflicts_check = True
691
        tags.append('public')
692
    else:
693
        conflicts_check = False
694
        tags.append('private')
695

    
696
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
697
    # not support IPv6 only networks. To bypass this limitation, we create the
698
    # network with a dummy network subnet, and make Cyclades connect instances
699
    # to such networks, with address=None.
700
    subnet = network.subnet
701
    if subnet is None:
702
        subnet = "10.0.0.0/24"
703

    
704
    try:
705
        bn = BackendNetwork.objects.get(network=network, backend=backend)
706
        mac_prefix = bn.mac_prefix
707
    except BackendNetwork.DoesNotExist:
708
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
709
                        " does not exist" % (network.id, backend.id))
710

    
711
    with pooled_rapi_client(backend) as client:
712
        return client.CreateNetwork(network_name=network.backend_id,
713
                                    network=subnet,
714
                                    network6=network.subnet6,
715
                                    gateway=network.gateway,
716
                                    gateway6=network.gateway6,
717
                                    mac_prefix=mac_prefix,
718
                                    conflicts_check=conflicts_check,
719
                                    tags=tags)
720

    
721

    
722
def connect_network(network, backend, depends=[], group=None):
723
    """Connect a network to nodegroups."""
724
    log.debug("Connecting network %s to backend %s", network, backend)
725

    
726
    if network.public:
727
        conflicts_check = True
728
    else:
729
        conflicts_check = False
730

    
731
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
732
    with pooled_rapi_client(backend) as client:
733
        groups = [group] if group is not None else client.GetGroups()
734
        job_ids = []
735
        for group in groups:
736
            job_id = client.ConnectNetwork(network.backend_id, group,
737
                                           network.mode, network.link,
738
                                           conflicts_check,
739
                                           depends=depends)
740
            job_ids.append(job_id)
741
    return job_ids
742

    
743

    
744
def delete_network(network, backend, disconnect=True):
745
    log.debug("Deleting network %s from backend %s", network, backend)
746

    
747
    depends = []
748
    if disconnect:
749
        depends = disconnect_network(network, backend)
750
    _delete_network(network, backend, depends=depends)
751

    
752

    
753
def _delete_network(network, backend, depends=[]):
754
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
755
    with pooled_rapi_client(backend) as client:
756
        return client.DeleteNetwork(network.backend_id, depends)
757

    
758

    
759
def disconnect_network(network, backend, group=None):
760
    log.debug("Disconnecting network %s to backend %s", network, backend)
761

    
762
    with pooled_rapi_client(backend) as client:
763
        groups = [group] if group is not None else client.GetGroups()
764
        job_ids = []
765
        for group in groups:
766
            job_id = client.DisconnectNetwork(network.backend_id, group)
767
            job_ids.append(job_id)
768
    return job_ids
769

    
770

    
771
def connect_to_network(vm, nic):
772
    network = nic.network
773
    backend = vm.backend
774
    network = Network.objects.select_for_update().get(id=network.id)
775
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
776
                                                         network=network)
777
    depend_jobs = []
778
    if bnet.operstate != "ACTIVE":
779
        depend_jobs = create_network(network, backend, connect=True)
780

    
781
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
782

    
783
    nic = {'name': nic.backend_uuid,
784
           'network': network.backend_id,
785
           'ip': nic.ipv4}
786

    
787
    log.debug("Adding NIC %s to VM %s", nic, vm)
788

    
789
    kwargs = {
790
        "instance": vm.backend_vm_id,
791
        "nics": [("add", "-1", nic)],
792
        "depends": depends,
793
    }
794
    if vm.backend.use_hotplug():
795
        kwargs["hotplug"] = True
796
    if settings.TEST:
797
        kwargs["dry_run"] = True
798

    
799
    with pooled_rapi_client(vm) as client:
800
        return client.ModifyInstance(**kwargs)
801

    
802

    
803
def disconnect_from_network(vm, nic):
804
    log.debug("Removing NIC %s of VM %s", nic, vm)
805

    
806
    kwargs = {
807
        "instance": vm.backend_vm_id,
808
        "nics": [("remove", nic.index, {})],
809
    }
810
    if vm.backend.use_hotplug():
811
        kwargs["hotplug"] = True
812
    if settings.TEST:
813
        kwargs["dry_run"] = True
814

    
815
    with pooled_rapi_client(vm) as client:
816
        jobID = client.ModifyInstance(**kwargs)
817
        # If the NIC has a tag for a firewall profile it must be deleted,
818
        # otherwise it may affect another NIC. XXX: Deleting the tag should
819
        # depend on the removing the NIC, but currently RAPI client does not
820
        # support this, this may result in clearing the firewall profile
821
        # without successfully removing the NIC. This issue will be fixed with
822
        # use of NIC UUIDs.
823
        firewall_profile = nic.firewall_profile
824
        if firewall_profile and firewall_profile != "DISABLED":
825
            tag = _firewall_tags[firewall_profile] % nic.index
826
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
827
                                      dry_run=settings.TEST)
828

    
829
        return jobID
830

    
831

    
832
def set_firewall_profile(vm, profile, index=0):
833
    try:
834
        tag = _firewall_tags[profile] % index
835
    except KeyError:
836
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
837

    
838
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
839

    
840
    with pooled_rapi_client(vm) as client:
841
        # Delete previous firewall tags
842
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
843
        delete_tags = [(t % index) for t in _firewall_tags.values()
844
                       if (t % index) in old_tags]
845
        if delete_tags:
846
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
847
                                      dry_run=settings.TEST)
848

    
849
        if profile != "DISABLED":
850
            client.AddInstanceTags(vm.backend_vm_id, [tag],
851
                                   dry_run=settings.TEST)
852

    
853
        # XXX NOP ModifyInstance call to force process_net_status to run
854
        # on the dispatcher
855
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
856
        client.ModifyInstance(vm.backend_vm_id,
857
                              os_name=os_name)
858
    return None
859

    
860

    
861
def get_instances(backend, bulk=True):
862
    with pooled_rapi_client(backend) as c:
863
        return c.GetInstances(bulk=bulk)
864

    
865

    
866
def get_nodes(backend, bulk=True):
867
    with pooled_rapi_client(backend) as c:
868
        return c.GetNodes(bulk=bulk)
869

    
870

    
871
def get_jobs(backend, bulk=True):
872
    with pooled_rapi_client(backend) as c:
873
        return c.GetJobs(bulk=bulk)
874

    
875

    
876
def get_physical_resources(backend):
877
    """ Get the physical resources of a backend.
878

879
    Get the resources of a backend as reported by the backend (not the db).
880

881
    """
882
    nodes = get_nodes(backend, bulk=True)
883
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
884
    res = {}
885
    for a in attr:
886
        res[a] = 0
887
    for n in nodes:
888
        # Filter out drained, offline and not vm_capable nodes since they will
889
        # not take part in the vm allocation process
890
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
891
        if can_host_vms and n['cnodes']:
892
            for a in attr:
893
                res[a] += int(n[a])
894
    return res
895

    
896

    
897
def update_backend_resources(backend, resources=None):
898
    """ Update the state of the backend resources in db.
899

900
    """
901

    
902
    if not resources:
903
        resources = get_physical_resources(backend)
904

    
905
    backend.mfree = resources['mfree']
906
    backend.mtotal = resources['mtotal']
907
    backend.dfree = resources['dfree']
908
    backend.dtotal = resources['dtotal']
909
    backend.pinst_cnt = resources['pinst_cnt']
910
    backend.ctotal = resources['ctotal']
911
    backend.updated = datetime.now()
912
    backend.save()
913

    
914

    
915
def get_memory_from_instances(backend):
916
    """ Get the memory that is used from instances.
917

918
    Get the used memory of a backend. Note: This is different for
919
    the real memory used, due to kvm's memory de-duplication.
920

921
    """
922
    with pooled_rapi_client(backend) as client:
923
        instances = client.GetInstances(bulk=True)
924
    mem = 0
925
    for i in instances:
926
        mem += i['oper_ram']
927
    return mem
928

    
929

    
930
def get_available_disk_templates(backend):
931
    """Get the list of available disk templates of a Ganeti backend.
932

933
    The list contains the disk templates that are enabled in the Ganeti backend
934
    and also included in ipolicy-disk-templates.
935

936
    """
937
    with pooled_rapi_client(backend) as c:
938
        info = c.GetInfo()
939
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
940
    try:
941
        enabled_disk_templates = info["enabled_disk_templates"]
942
        return [dp for dp in enabled_disk_templates
943
                if dp in ipolicy_disk_templates]
944
    except KeyError:
945
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
946
        return ipolicy_disk_templates
947

    
948

    
949
def update_backend_disk_templates(backend):
950
    disk_templates = get_available_disk_templates(backend)
951
    backend.disk_templates = disk_templates
952
    backend.save()
953

    
954

    
955
##
956
## Synchronized operations for reconciliation
957
##
958

    
959

    
960
def create_network_synced(network, backend):
961
    result = _create_network_synced(network, backend)
962
    if result[0] != 'success':
963
        return result
964
    result = connect_network_synced(network, backend)
965
    return result
966

    
967

    
968
def _create_network_synced(network, backend):
969
    with pooled_rapi_client(backend) as client:
970
        job = _create_network(network, backend)
971
        result = wait_for_job(client, job)
972
    return result
973

    
974

    
975
def connect_network_synced(network, backend):
976
    with pooled_rapi_client(backend) as client:
977
        for group in client.GetGroups():
978
            job = client.ConnectNetwork(network.backend_id, group,
979
                                        network.mode, network.link)
980
            result = wait_for_job(client, job)
981
            if result[0] != 'success':
982
                return result
983

    
984
    return result
985

    
986

    
987
def wait_for_job(client, jobid):
988
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
989
    status = result['job_info'][0]
990
    while status not in ['success', 'error', 'cancel']:
991
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
992
                                         [result], None)
993
        status = result['job_info'][0]
994

    
995
    if status == 'success':
996
        return (status, None)
997
    else:
998
        error = result['job_info'][1]
999
        return (status, error)