Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 5c8076b6

History | View | Annotate | Download (34.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic.rapi import GanetiApiError
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
61
    """Handle quotas for updated VirtualMachine.
62

63
    Update quotas for the updated VirtualMachine based on the job that run on
64
    the Ganeti backend. If a commission has been already issued for this job,
65
    then this commission is just accepted or rejected based on the job status.
66
    Otherwise, a new commission for the given change is issued, that is also in
67
    force and auto-accept mode. In this case, previous commissions are
68
    rejected, since they reflect a previous state of the VM.
69

70
    """
71
    if job_status not in ["success", "error", "canceled"]:
72
        return
73

    
74
    # Check successful completion of a job will trigger any quotable change in
75
    # the VM state.
76
    action = utils.get_action_from_opcode(job_opcode, job_fields)
77
    commission_info = quotas.get_commission_info(vm, action=action,
78
                                                 action_fields=job_fields)
79

    
80
    if vm.task_job_id == job_id and vm.serial is not None:
81
        # Commission for this change has already been issued. So just
82
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
83
        # if fails, must be accepted, as the user must manually remove the
84
        # failed server
85
        serial = vm.serial
86
        if job_status == "success" or job_opcode == "OP_INSTANCE_CREATE":
87
            quotas.accept_serial(serial)
88
        elif job_status in ["error", "canceled"]:
89
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
90
                      serial)
91
            quotas.reject_serial(serial)
92
        vm.serial = None
93
    elif job_status == "success" and commission_info is not None:
94
        log.debug("Expected job was %s. Processing job %s. Commission for"
95
                  " this job: %s", vm.task_job_id, job_id, commission_info)
96
        # Commission for this change has not been issued, or the issued
97
        # commission was unaware of the current change. Reject all previous
98
        # commissions and create a new one in forced mode!
99
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
100
                           % (vm, job_id))
101
        quotas.handle_resource_commission(vm, action,
102
                                          commission_info=commission_info,
103
                                          commission_name=commission_name,
104
                                          force=True,
105
                                          auto_accept=True)
106
        log.debug("Issued new commission: %s", vm.serial)
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136

    
137
    # Notifications of success change the operating state
138
    if status == "success":
139
        if state_for_success is not None:
140
            vm.operstate = state_for_success
141
        if beparams:
142
            # Change the flavor of the VM
143
            _process_resize(vm, beparams)
144
        # Update backendtime only for jobs that have been successfully
145
        # completed, since only these jobs update the state of the VM. Else a
146
        # "race condition" may occur when a successful job (e.g.
147
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
148
        # in reversed order.
149
        vm.backendtime = etime
150

    
151
    if status in ["success", "error", "canceled"] and nics is not None:
152
        # Update the NICs of the VM
153
        _process_net_status(vm, etime, nics)
154

    
155
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
156
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
157
        vm.operstate = 'ERROR'
158
        vm.backendtime = etime
159
    elif opcode == 'OP_INSTANCE_REMOVE':
160
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
161
        # when no instance exists at the Ganeti backend.
162
        # See ticket #799 for all the details.
163
        if status == 'success' or (status == 'error' and
164
                                   not vm_exists_in_backend(vm)):
165
            # VM has been deleted. Release the instance IPs
166
            release_instance_ips(vm, [])
167
            # And delete the releated NICs (must be performed after release!)
168
            vm.nics.all().delete()
169
            vm.deleted = True
170
            vm.operstate = state_for_success
171
            vm.backendtime = etime
172
            status = "success"
173

    
174
    if status in ["success", "error", "canceled"]:
175
        # Job is finalized: Handle quotas/commissioning
176
        job_fields = {"nics": nics, "beparams": beparams}
177
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
178
                              job_status=status, job_fields=job_fields)
179
        # and clear task fields
180
        if vm.task_job_id == jobid:
181
            vm.task = None
182
            vm.task_job_id = None
183

    
184
    vm.save()
185

    
186

    
187
def _process_resize(vm, beparams):
188
    """Change flavor of a VirtualMachine based on new beparams."""
189
    old_flavor = vm.flavor
190
    vcpus = beparams.get("vcpus", old_flavor.cpu)
191
    ram = beparams.get("maxmem", old_flavor.ram)
192
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
193
        return
194
    try:
195
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
196
                                        disk=old_flavor.disk,
197
                                        disk_template=old_flavor.disk_template)
198
    except Flavor.DoesNotExist:
199
        raise Exception("Can not find flavor for VM")
200
    vm.flavor = new_flavor
201
    vm.save()
202

    
203

    
204
@transaction.commit_on_success
205
def process_net_status(vm, etime, nics):
206
    """Wrap _process_net_status inside transaction."""
207
    _process_net_status(vm, etime, nics)
208

    
209

    
210
def _process_net_status(vm, etime, nics):
211
    """Process a net status notification from the backend
212

213
    Process an incoming message from the Ganeti backend,
214
    detailing the NIC configuration of a VM instance.
215

216
    Update the state of the VM in the DB accordingly.
217
    """
218

    
219
    ganeti_nics = process_ganeti_nics(nics)
220
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
221
        log.debug("NICs for VM %s have not changed", vm)
222
        return
223

    
224
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
225
    # guarantee that no deadlock will occur with Backend allocator.
226
    Backend.objects.select_for_update().get(id=vm.backend_id)
227

    
228
    # NICs have changed. Release the instance IPs
229
    release_instance_ips(vm, ganeti_nics)
230
    # And delete the releated NICs (must be performed after release!)
231
    vm.nics.all().delete()
232

    
233
    for nic in ganeti_nics:
234
        ipv4 = nic["ipv4"]
235
        net = nic['network']
236
        if ipv4:
237
            net.reserve_address(ipv4)
238

    
239
        nic['dirty'] = False
240
        vm.nics.create(**nic)
241
        # Dummy save the network, because UI uses changed-since for VMs
242
        # and Networks in order to show the VM NICs
243
        net.save()
244

    
245
    vm.backendtime = etime
246
    vm.save()
247

    
248

    
249
def process_ganeti_nics(ganeti_nics):
250
    """Process NIC dict from ganeti hooks."""
251
    new_nics = []
252
    for i, new_nic in enumerate(ganeti_nics):
253
        network = new_nic.get('network', '')
254
        n = str(network)
255
        pk = utils.id_from_network_name(n)
256

    
257
        net = Network.objects.get(pk=pk)
258

    
259
        # Get the new nic info
260
        mac = new_nic.get('mac')
261
        ipv4 = new_nic.get('ip')
262
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
263

    
264
        firewall = new_nic.get('firewall')
265
        firewall_profile = _reverse_tags.get(firewall)
266
        if not firewall_profile and net.public:
267
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
268

    
269
        nic = {
270
            'index': i,
271
            'network': net,
272
            'mac': mac,
273
            'ipv4': ipv4,
274
            'ipv6': ipv6,
275
            'firewall_profile': firewall_profile,
276
            'state': 'ACTIVE'}
277

    
278
        new_nics.append(nic)
279
    return new_nics
280

    
281

    
282
def nics_changed(old_nics, new_nics):
283
    """Return True if NICs have changed in any way."""
284
    if len(old_nics) != len(new_nics):
285
        return True
286
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
287
    for old_nic, new_nic in zip(old_nics, new_nics):
288
        for field in fields:
289
            if getattr(old_nic, field) != new_nic[field]:
290
                return True
291
    return False
292

    
293

    
294
def release_instance_ips(vm, ganeti_nics):
295
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
296
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
297
                            ganeti_nics))
298
    to_release = old_addresses - new_addresses
299
    for (network_id, ipv4) in to_release:
300
        if ipv4:
301
            # Get X-Lock before searching floating IP, to exclusively search
302
            # and release floating IP. Otherwise you may release a floating IP
303
            # that has been just reserved.
304
            net = Network.objects.select_for_update().get(id=network_id)
305
            if net.floating_ip_pool:
306
                try:
307
                    floating_ip = net.floating_ips.select_for_update()\
308
                                                  .get(ipv4=ipv4, machine=vm,
309
                                                       deleted=False)
310
                    floating_ip.machine = None
311
                    floating_ip.save()
312
                except FloatingIP.DoesNotExist:
313
                    net.release_address(ipv4)
314
            else:
315
                net.release_address(ipv4)
316

    
317

    
318
@transaction.commit_on_success
319
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
320
    if status not in [x[0] for x in BACKEND_STATUSES]:
321
        raise Network.InvalidBackendMsgError(opcode, status)
322

    
323
    back_network.backendjobid = jobid
324
    back_network.backendjobstatus = status
325
    back_network.backendopcode = opcode
326
    back_network.backendlogmsg = logmsg
327

    
328
    network = back_network.network
329

    
330
    # Notifications of success change the operating state
331
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
332
    if status == 'success' and state_for_success is not None:
333
        back_network.operstate = state_for_success
334

    
335
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
336
        back_network.operstate = 'ERROR'
337
        back_network.backendtime = etime
338

    
339
    if opcode == 'OP_NETWORK_REMOVE':
340
        network_is_deleted = (status == "success")
341
        if network_is_deleted or (status == "error" and not
342
                                  network_exists_in_backend(back_network)):
343
            back_network.operstate = state_for_success
344
            back_network.deleted = True
345
            back_network.backendtime = etime
346

    
347
    if status == 'success':
348
        back_network.backendtime = etime
349
    back_network.save()
350
    # Also you must update the state of the Network!!
351
    update_network_state(network)
352

    
353

    
354
def update_network_state(network):
355
    """Update the state of a Network based on BackendNetwork states.
356

357
    Update the state of a Network based on the operstate of the networks in the
358
    backends that network exists.
359

360
    The state of the network is:
361
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
362
    * DELETED: If it is is 'DELETED' in all backends that have been created.
363

364
    This function also releases the resources (MAC prefix or Bridge) and the
365
    quotas for the network.
366

367
    """
368
    if network.deleted:
369
        # Network has already been deleted. Just assert that state is also
370
        # DELETED
371
        if not network.state == "DELETED":
372
            network.state = "DELETED"
373
            network.save()
374
        return
375

    
376
    backend_states = [s.operstate for s in network.backend_networks.all()]
377
    if not backend_states and network.action != "DESTROY":
378
        if network.state != "ACTIVE":
379
            network.state = "ACTIVE"
380
            network.save()
381
            return
382

    
383
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
384
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
385
                     "DELETED")
386

    
387
    # Release the resources on the deletion of the Network
388
    if deleted:
389
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
390
                 network.id, network.mac_prefix, network.link)
391
        network.deleted = True
392
        network.state = "DELETED"
393
        if network.mac_prefix:
394
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
395
                release_resource(res_type="mac_prefix",
396
                                 value=network.mac_prefix)
397
        if network.link:
398
            if network.FLAVORS[network.flavor]["link"] == "pool":
399
                release_resource(res_type="bridge", value=network.link)
400

    
401
        # Issue commission
402
        if network.userid:
403
            quotas.issue_and_accept_commission(network, delete=True)
404
            # the above has already saved the object and committed;
405
            # a second save would override others' changes, since the
406
            # object is now unlocked
407
            return
408
        elif not network.public:
409
            log.warning("Network %s does not have an owner!", network.id)
410
    network.save()
411

    
412

    
413
@transaction.commit_on_success
414
def process_network_modify(back_network, etime, jobid, opcode, status,
415
                           add_reserved_ips, remove_reserved_ips):
416
    assert (opcode == "OP_NETWORK_SET_PARAMS")
417
    if status not in [x[0] for x in BACKEND_STATUSES]:
418
        raise Network.InvalidBackendMsgError(opcode, status)
419

    
420
    back_network.backendjobid = jobid
421
    back_network.backendjobstatus = status
422
    back_network.opcode = opcode
423

    
424
    if add_reserved_ips or remove_reserved_ips:
425
        net = back_network.network
426
        pool = net.get_pool()
427
        if add_reserved_ips:
428
            for ip in add_reserved_ips:
429
                pool.reserve(ip, external=True)
430
        if remove_reserved_ips:
431
            for ip in remove_reserved_ips:
432
                pool.put(ip, external=True)
433
        pool.save()
434

    
435
    if status == 'success':
436
        back_network.backendtime = etime
437
    back_network.save()
438

    
439

    
440
@transaction.commit_on_success
441
def process_create_progress(vm, etime, progress):
442

    
443
    percentage = int(progress)
444

    
445
    # The percentage may exceed 100%, due to the way
446
    # snf-image:copy-progress tracks bytes read by image handling processes
447
    percentage = 100 if percentage > 100 else percentage
448
    if percentage < 0:
449
        raise ValueError("Percentage cannot be negative")
450

    
451
    # FIXME: log a warning here, see #1033
452
#   if last_update > percentage:
453
#       raise ValueError("Build percentage should increase monotonically " \
454
#                        "(old = %d, new = %d)" % (last_update, percentage))
455

    
456
    # This assumes that no message of type 'ganeti-create-progress' is going to
457
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
458
    # the instance is STARTED.  What if the two messages are processed by two
459
    # separate dispatcher threads, and the 'ganeti-op-status' message for
460
    # successful creation gets processed before the 'ganeti-create-progress'
461
    # message? [vkoukis]
462
    #
463
    #if not vm.operstate == 'BUILD':
464
    #    raise VirtualMachine.IllegalState("VM is not in building state")
465

    
466
    vm.buildpercentage = percentage
467
    vm.backendtime = etime
468
    vm.save()
469

    
470

    
471
@transaction.commit_on_success
472
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
473
                               details=None):
474
    """
475
    Create virtual machine instance diagnostic entry.
476

477
    :param vm: VirtualMachine instance to create diagnostic for.
478
    :param message: Diagnostic message.
479
    :param source: Diagnostic source identifier (e.g. image-helper).
480
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
481
    :param etime: The time the message occured (if available).
482
    :param details: Additional details or debug information.
483
    """
484
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
485
                                                   source_date=etime,
486
                                                   message=message,
487
                                                   details=details)
488

    
489

    
490
def create_instance(vm, nics, flavor, image):
491
    """`image` is a dictionary which should contain the keys:
492
            'backend_id', 'format' and 'metadata'
493

494
        metadata value should be a dictionary.
495
    """
496

    
497
    # Handle arguments to CreateInstance() as a dictionary,
498
    # initialize it based on a deployment-specific value.
499
    # This enables the administrator to override deployment-specific
500
    # arguments, such as the disk template to use, name of os provider
501
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
502
    #
503
    kw = vm.backend.get_create_params()
504
    kw['mode'] = 'create'
505
    kw['name'] = vm.backend_vm_id
506
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
507

    
508
    kw['disk_template'] = flavor.disk_template
509
    kw['disks'] = [{"size": flavor.disk * 1024}]
510
    provider = flavor.disk_provider
511
    if provider:
512
        kw['disks'][0]['provider'] = provider
513
        kw['disks'][0]['origin'] = flavor.disk_origin
514

    
515
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
516
                  for nic in nics]
517
    backend = vm.backend
518
    depend_jobs = []
519
    for nic in nics:
520
        network = Network.objects.select_for_update().get(id=nic.network.id)
521
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
522
                                                             network=network)
523
        if bnet.operstate != "ACTIVE":
524
            if network.public:
525
                msg = "Can not connect instance to network %s. Network is not"\
526
                      " ACTIVE in backend %s." % (network, backend)
527
                raise Exception(msg)
528
            else:
529
                jobs = create_network(network, backend, connect=True)
530
                if isinstance(jobs, list):
531
                    depend_jobs.extend(jobs)
532
                else:
533
                    depend_jobs.append(jobs)
534
    kw["depends"] = [[job, ["success", "error", "canceled"]]
535
                     for job in depend_jobs]
536

    
537
    if vm.backend.use_hotplug():
538
        kw['hotplug'] = True
539
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
540
    # kw['os'] = settings.GANETI_OS_PROVIDER
541
    kw['ip_check'] = False
542
    kw['name_check'] = False
543

    
544
    # Do not specific a node explicitly, have
545
    # Ganeti use an iallocator instead
546
    #kw['pnode'] = rapi.GetNodes()[0]
547

    
548
    kw['dry_run'] = settings.TEST
549

    
550
    kw['beparams'] = {
551
        'auto_balance': True,
552
        'vcpus': flavor.cpu,
553
        'memory': flavor.ram}
554

    
555
    kw['osparams'] = {
556
        'config_url': vm.config_url,
557
        # Store image id and format to Ganeti
558
        'img_id': image['backend_id'],
559
        'img_format': image['format']}
560

    
561
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
562
    # kw['hvparams'] = dict(serial_console=False)
563

    
564
    log.debug("Creating instance %s", utils.hide_pass(kw))
565
    with pooled_rapi_client(vm) as client:
566
        return client.CreateInstance(**kw)
567

    
568

    
569
def delete_instance(vm):
570
    with pooled_rapi_client(vm) as client:
571
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
572

    
573

    
574
def reboot_instance(vm, reboot_type):
575
    assert reboot_type in ('soft', 'hard')
576
    kwargs = {"instance": vm.backend_vm_id,
577
              "reboot_type": "hard"}
578
    # XXX: Currently shutdown_timeout parameter is not supported from the
579
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
580
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
581
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
582
    # of OS API is different from the one in Ganeti, and maps to
583
    # 'shutdown_timeout'.
584
    #if reboot_type == "hard":
585
    #    kwargs["shutdown_timeout"] = 0
586
    if settings.TEST:
587
        kwargs["dry_run"] = True
588
    with pooled_rapi_client(vm) as client:
589
        return client.RebootInstance(**kwargs)
590

    
591

    
592
def startup_instance(vm):
593
    with pooled_rapi_client(vm) as client:
594
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
595

    
596

    
597
def shutdown_instance(vm):
598
    with pooled_rapi_client(vm) as client:
599
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
600

    
601

    
602
def resize_instance(vm, vcpus, memory):
603
    beparams = {"vcpus": int(vcpus),
604
                "minmem": int(memory),
605
                "maxmem": int(memory)}
606
    with pooled_rapi_client(vm) as client:
607
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
608

    
609

    
610
def get_instance_console(vm):
611
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
612
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
613
    # useless (see #783).
614
    #
615
    # Until this is fixed on the Ganeti side, construct a console info reply
616
    # directly.
617
    #
618
    # WARNING: This assumes that VNC runs on port network_port on
619
    #          the instance's primary node, and is probably
620
    #          hypervisor-specific.
621
    #
622
    log.debug("Getting console for vm %s", vm)
623

    
624
    console = {}
625
    console['kind'] = 'vnc'
626

    
627
    with pooled_rapi_client(vm) as client:
628
        i = client.GetInstance(vm.backend_vm_id)
629

    
630
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
631
        raise Exception("hv parameter serial_console cannot be true")
632
    console['host'] = i['pnode']
633
    console['port'] = i['network_port']
634

    
635
    return console
636

    
637

    
638
def get_instance_info(vm):
639
    with pooled_rapi_client(vm) as client:
640
        return client.GetInstance(vm.backend_vm_id)
641

    
642

    
643
def vm_exists_in_backend(vm):
644
    try:
645
        get_instance_info(vm)
646
        return True
647
    except GanetiApiError as e:
648
        if e.code == 404:
649
            return False
650
        raise e
651

    
652

    
653
def get_network_info(backend_network):
654
    with pooled_rapi_client(backend_network) as client:
655
        return client.GetNetwork(backend_network.network.backend_id)
656

    
657

    
658
def network_exists_in_backend(backend_network):
659
    try:
660
        get_network_info(backend_network)
661
        return True
662
    except GanetiApiError as e:
663
        if e.code == 404:
664
            return False
665

    
666

    
667
def create_network(network, backend, connect=True):
668
    """Create a network in a Ganeti backend"""
669
    log.debug("Creating network %s in backend %s", network, backend)
670

    
671
    job_id = _create_network(network, backend)
672

    
673
    if connect:
674
        job_ids = connect_network(network, backend, depends=[job_id])
675
        return job_ids
676
    else:
677
        return [job_id]
678

    
679

    
680
def _create_network(network, backend):
681
    """Create a network."""
682

    
683
    network_type = network.public and 'public' or 'private'
684

    
685
    tags = network.backend_tag
686
    if network.dhcp:
687
        tags.append('nfdhcpd')
688

    
689
    if network.public:
690
        conflicts_check = True
691
    else:
692
        conflicts_check = False
693

    
694
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
695
    # not support IPv6 only networks. To bypass this limitation, we create the
696
    # network with a dummy network subnet, and make Cyclades connect instances
697
    # to such networks, with address=None.
698
    subnet = network.subnet
699
    if subnet is None:
700
        subnet = "10.0.0.0/24"
701

    
702
    try:
703
        bn = BackendNetwork.objects.get(network=network, backend=backend)
704
        mac_prefix = bn.mac_prefix
705
    except BackendNetwork.DoesNotExist:
706
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
707
                        " does not exist" % (network.id, backend.id))
708

    
709
    with pooled_rapi_client(backend) as client:
710
        return client.CreateNetwork(network_name=network.backend_id,
711
                                    network=subnet,
712
                                    network6=network.subnet6,
713
                                    gateway=network.gateway,
714
                                    gateway6=network.gateway6,
715
                                    network_type=network_type,
716
                                    mac_prefix=mac_prefix,
717
                                    conflicts_check=conflicts_check,
718
                                    tags=tags)
719

    
720

    
721
def connect_network(network, backend, depends=[], group=None):
722
    """Connect a network to nodegroups."""
723
    log.debug("Connecting network %s to backend %s", network, backend)
724

    
725
    if network.public:
726
        conflicts_check = True
727
    else:
728
        conflicts_check = False
729

    
730
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
731
    with pooled_rapi_client(backend) as client:
732
        groups = [group] if group is not None else client.GetGroups()
733
        job_ids = []
734
        for group in groups:
735
            job_id = client.ConnectNetwork(network.backend_id, group,
736
                                           network.mode, network.link,
737
                                           conflicts_check,
738
                                           depends=depends)
739
            job_ids.append(job_id)
740
    return job_ids
741

    
742

    
743
def delete_network(network, backend, disconnect=True):
744
    log.debug("Deleting network %s from backend %s", network, backend)
745

    
746
    depends = []
747
    if disconnect:
748
        depends = disconnect_network(network, backend)
749
    _delete_network(network, backend, depends=depends)
750

    
751

    
752
def _delete_network(network, backend, depends=[]):
753
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
754
    with pooled_rapi_client(backend) as client:
755
        return client.DeleteNetwork(network.backend_id, depends)
756

    
757

    
758
def disconnect_network(network, backend, group=None):
759
    log.debug("Disconnecting network %s to backend %s", network, backend)
760

    
761
    with pooled_rapi_client(backend) as client:
762
        groups = [group] if group is not None else client.GetGroups()
763
        job_ids = []
764
        for group in groups:
765
            job_id = client.DisconnectNetwork(network.backend_id, group)
766
            job_ids.append(job_id)
767
    return job_ids
768

    
769

    
770
def connect_to_network(vm, nic):
771
    network = nic.network
772
    backend = vm.backend
773
    network = Network.objects.select_for_update().get(id=network.id)
774
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
775
                                                         network=network)
776
    depend_jobs = []
777
    if bnet.operstate != "ACTIVE":
778
        depend_jobs = create_network(network, backend, connect=True)
779

    
780
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
781

    
782
    nic = {'ip': nic.ipv4, 'network': network.backend_id}
783

    
784
    log.debug("Connecting NIC %s to VM %s", nic, vm)
785

    
786
    kwargs = {
787
        "instance": vm.backend_vm_id,
788
        "nics": [("add", nic)],
789
        "depends": depends,
790
    }
791
    if vm.backend.use_hotplug():
792
        kwargs["hotplug"] = True
793
    if settings.TEST:
794
        kwargs["dry_run"] = True
795

    
796
    with pooled_rapi_client(vm) as client:
797
        return client.ModifyInstance(**kwargs)
798

    
799

    
800
def disconnect_from_network(vm, nic):
801
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
802

    
803
    kwargs = {
804
        "instance": vm.backend_vm_id,
805
        "nics": [("remove", nic.index, {})],
806
    }
807
    if vm.backend.use_hotplug():
808
        kwargs["hotplug"] = True
809
    if settings.TEST:
810
        kwargs["dry_run"] = True
811

    
812
    with pooled_rapi_client(vm) as client:
813
        jobID = client.ModifyInstance(**kwargs)
814
        # If the NIC has a tag for a firewall profile it must be deleted,
815
        # otherwise it may affect another NIC. XXX: Deleting the tag should
816
        # depend on the removing the NIC, but currently RAPI client does not
817
        # support this, this may result in clearing the firewall profile
818
        # without successfully removing the NIC. This issue will be fixed with
819
        # use of NIC UUIDs.
820
        firewall_profile = nic.firewall_profile
821
        if firewall_profile and firewall_profile != "DISABLED":
822
            tag = _firewall_tags[firewall_profile] % nic.index
823
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
824
                                      dry_run=settings.TEST)
825

    
826
        return jobID
827

    
828

    
829
def set_firewall_profile(vm, profile, index=0):
830
    try:
831
        tag = _firewall_tags[profile] % index
832
    except KeyError:
833
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
834

    
835
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
836

    
837
    with pooled_rapi_client(vm) as client:
838
        # Delete previous firewall tags
839
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
840
        delete_tags = [(t % index) for t in _firewall_tags.values()
841
                       if (t % index) in old_tags]
842
        if delete_tags:
843
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
844
                                      dry_run=settings.TEST)
845

    
846
        if profile != "DISABLED":
847
            client.AddInstanceTags(vm.backend_vm_id, [tag],
848
                                   dry_run=settings.TEST)
849

    
850
        # XXX NOP ModifyInstance call to force process_net_status to run
851
        # on the dispatcher
852
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
853
        client.ModifyInstance(vm.backend_vm_id,
854
                              os_name=os_name)
855
    return None
856

    
857

    
858
def get_instances(backend, bulk=True):
859
    with pooled_rapi_client(backend) as c:
860
        return c.GetInstances(bulk=bulk)
861

    
862

    
863
def get_nodes(backend, bulk=True):
864
    with pooled_rapi_client(backend) as c:
865
        return c.GetNodes(bulk=bulk)
866

    
867

    
868
def get_jobs(backend, bulk=True):
869
    with pooled_rapi_client(backend) as c:
870
        return c.GetJobs(bulk=bulk)
871

    
872

    
873
def get_physical_resources(backend):
874
    """ Get the physical resources of a backend.
875

876
    Get the resources of a backend as reported by the backend (not the db).
877

878
    """
879
    nodes = get_nodes(backend, bulk=True)
880
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
881
    res = {}
882
    for a in attr:
883
        res[a] = 0
884
    for n in nodes:
885
        # Filter out drained, offline and not vm_capable nodes since they will
886
        # not take part in the vm allocation process
887
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
888
        if can_host_vms and n['cnodes']:
889
            for a in attr:
890
                res[a] += int(n[a])
891
    return res
892

    
893

    
894
def update_resources(backend, resources=None):
895
    """ Update the state of the backend resources in db.
896

897
    """
898

    
899
    if not resources:
900
        resources = get_physical_resources(backend)
901

    
902
    backend.mfree = resources['mfree']
903
    backend.mtotal = resources['mtotal']
904
    backend.dfree = resources['dfree']
905
    backend.dtotal = resources['dtotal']
906
    backend.pinst_cnt = resources['pinst_cnt']
907
    backend.ctotal = resources['ctotal']
908
    backend.updated = datetime.now()
909
    backend.save()
910

    
911

    
912
def get_memory_from_instances(backend):
913
    """ Get the memory that is used from instances.
914

915
    Get the used memory of a backend. Note: This is different for
916
    the real memory used, due to kvm's memory de-duplication.
917

918
    """
919
    with pooled_rapi_client(backend) as client:
920
        instances = client.GetInstances(bulk=True)
921
    mem = 0
922
    for i in instances:
923
        mem += i['oper_ram']
924
    return mem
925

    
926
##
927
## Synchronized operations for reconciliation
928
##
929

    
930

    
931
def create_network_synced(network, backend):
932
    result = _create_network_synced(network, backend)
933
    if result[0] != 'success':
934
        return result
935
    result = connect_network_synced(network, backend)
936
    return result
937

    
938

    
939
def _create_network_synced(network, backend):
940
    with pooled_rapi_client(backend) as client:
941
        job = _create_network(network, backend)
942
        result = wait_for_job(client, job)
943
    return result
944

    
945

    
946
def connect_network_synced(network, backend):
947
    with pooled_rapi_client(backend) as client:
948
        for group in client.GetGroups():
949
            job = client.ConnectNetwork(network.backend_id, group,
950
                                        network.mode, network.link)
951
            result = wait_for_job(client, job)
952
            if result[0] != 'success':
953
                return result
954

    
955
    return result
956

    
957

    
958
def wait_for_job(client, jobid):
959
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
960
    status = result['job_info'][0]
961
    while status not in ['success', 'error', 'cancel']:
962
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
963
                                         [result], None)
964
        status = result['job_info'][0]
965

    
966
    if status == 'success':
967
        return (status, None)
968
    else:
969
        error = result['job_info'][1]
970
        return (status, error)