Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ bbae3e45

History | View | Annotate | Download (34.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
100
                           % (vm, job_id))
101
        serial = quotas.issue_commission(user=vm.userid,
102
                                         source=quotas.DEFAULT_SOURCE,
103
                                         provisions=commission_info,
104
                                         name=commission_name,
105
                                         force=True,
106
                                         auto_accept=True)
107
        # Clear VM's serial. Expected job may arrive later. However correlated
108
        # serial must not be accepted, since it reflects a previous VM state
109
        vm.serial = None
110

    
111
    return vm
112

    
113

    
114
@transaction.commit_on_success
115
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
116
                      beparams=None):
117
    """Process a job progress notification from the backend
118

119
    Process an incoming message from the backend (currently Ganeti).
120
    Job notifications with a terminating status (sucess, error, or canceled),
121
    also update the operating state of the VM.
122

123
    """
124
    # See #1492, #1031, #1111 why this line has been removed
125
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
126
    if status not in [x[0] for x in BACKEND_STATUSES]:
127
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
128

    
129
    vm.backendjobid = jobid
130
    vm.backendjobstatus = status
131
    vm.backendopcode = opcode
132
    vm.backendlogmsg = logmsg
133

    
134
    if status in ["queued", "waiting", "running"]:
135
        vm.save()
136
        return
137

    
138
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
139
    # Notifications of success change the operating state
140
    if status == "success":
141
        if state_for_success is not None:
142
            vm.operstate = state_for_success
143
        if nics is not None:
144
            # Update the NICs of the VM
145
            _process_net_status(vm, etime, nics)
146
        if beparams:
147
            # Change the flavor of the VM
148
            _process_resize(vm, beparams)
149
        # Update backendtime only for jobs that have been successfully
150
        # completed, since only these jobs update the state of the VM. Else a
151
        # "race condition" may occur when a successful job (e.g.
152
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
153
        # in reversed order.
154
        vm.backendtime = etime
155

    
156
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
157
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
158
        vm.operstate = 'ERROR'
159
        vm.backendtime = etime
160
    elif opcode == 'OP_INSTANCE_REMOVE':
161
        # Set the deleted flag explicitly, cater for admin-initiated removals
162
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
163
        # when no instance exists at the Ganeti backend.
164
        # See ticket #799 for all the details.
165
        if (status == 'success' or
166
           (status == 'error' and (vm.operstate == 'ERROR' or
167
                                   vm.action == 'DESTROY'))):
168
            # VM has been deleted. Release the instance IPs
169
            release_instance_ips(vm, [])
170
            # And delete the releated NICs (must be performed after release!)
171
            vm.nics.all().delete()
172
            vm.deleted = True
173
            vm.operstate = state_for_success
174
            vm.backendtime = etime
175
            status = "success"
176

    
177
    if status in ["success", "error", "canceled"]:
178
        # Job is finalized: Handle quotas/commissioning
179
        job_fields = {"nics": nics, "beparams": beparams}
180
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
181
                              job_status=status, job_fields=job_fields)
182
        # and clear task fields
183
        if vm.task_job_id == jobid:
184
            vm.task = None
185
            vm.task_job_id = None
186

    
187
    vm.save()
188

    
189

    
190
def _process_resize(vm, beparams):
191
    """Change flavor of a VirtualMachine based on new beparams."""
192
    old_flavor = vm.flavor
193
    vcpus = beparams.get("vcpus", old_flavor.cpu)
194
    ram = beparams.get("maxmem", old_flavor.ram)
195
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
196
        return
197
    try:
198
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
199
                                        disk=old_flavor.disk,
200
                                        disk_template=old_flavor.disk_template)
201
    except Flavor.DoesNotExist:
202
        raise Exception("Can not find flavor for VM")
203
    vm.flavor = new_flavor
204
    vm.save()
205

    
206

    
207
@transaction.commit_on_success
208
def process_net_status(vm, etime, nics):
209
    """Wrap _process_net_status inside transaction."""
210
    _process_net_status(vm, etime, nics)
211

    
212

    
213
def _process_net_status(vm, etime, nics):
214
    """Process a net status notification from the backend
215

216
    Process an incoming message from the Ganeti backend,
217
    detailing the NIC configuration of a VM instance.
218

219
    Update the state of the VM in the DB accordingly.
220
    """
221

    
222
    ganeti_nics = process_ganeti_nics(nics)
223
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
224
        log.debug("NICs for VM %s have not changed", vm)
225
        return
226

    
227
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
228
    # guarantee that no deadlock will occur with Backend allocator.
229
    Backend.objects.select_for_update().get(id=vm.backend_id)
230

    
231
    # NICs have changed. Release the instance IPs
232
    release_instance_ips(vm, ganeti_nics)
233
    # And delete the releated NICs (must be performed after release!)
234
    vm.nics.all().delete()
235

    
236
    for nic in ganeti_nics:
237
        ipv4 = nic["ipv4"]
238
        net = nic['network']
239
        if ipv4:
240
            net.reserve_address(ipv4)
241

    
242
        nic['dirty'] = False
243
        vm.nics.create(**nic)
244
        # Dummy save the network, because UI uses changed-since for VMs
245
        # and Networks in order to show the VM NICs
246
        net.save()
247

    
248
    vm.backendtime = etime
249
    vm.save()
250

    
251

    
252
def process_ganeti_nics(ganeti_nics):
253
    """Process NIC dict from ganeti hooks."""
254
    new_nics = []
255
    for i, new_nic in enumerate(ganeti_nics):
256
        network = new_nic.get('network', '')
257
        n = str(network)
258
        pk = utils.id_from_network_name(n)
259

    
260
        net = Network.objects.get(pk=pk)
261

    
262
        # Get the new nic info
263
        mac = new_nic.get('mac')
264
        ipv4 = new_nic.get('ip')
265
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
266

    
267
        firewall = new_nic.get('firewall')
268
        firewall_profile = _reverse_tags.get(firewall)
269
        if not firewall_profile and net.public:
270
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
271

    
272
        nic = {
273
            'index': i,
274
            'network': net,
275
            'mac': mac,
276
            'ipv4': ipv4,
277
            'ipv6': ipv6,
278
            'firewall_profile': firewall_profile,
279
            'state': 'ACTIVE'}
280

    
281
        new_nics.append(nic)
282
    return new_nics
283

    
284

    
285
def nics_changed(old_nics, new_nics):
286
    """Return True if NICs have changed in any way."""
287
    if len(old_nics) != len(new_nics):
288
        return True
289
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
290
    for old_nic, new_nic in zip(old_nics, new_nics):
291
        for field in fields:
292
            if getattr(old_nic, field) != new_nic[field]:
293
                return True
294
    return False
295

    
296

    
297
def release_instance_ips(vm, ganeti_nics):
298
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
299
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
300
                            ganeti_nics))
301
    to_release = old_addresses - new_addresses
302
    for (network_id, ipv4) in to_release:
303
        if ipv4:
304
            # Get X-Lock before searching floating IP, to exclusively search
305
            # and release floating IP. Otherwise you may release a floating IP
306
            # that has been just reserved.
307
            net = Network.objects.select_for_update().get(id=network_id)
308
            if net.floating_ip_pool:
309
                try:
310
                    floating_ip = net.floating_ips.select_for_update()\
311
                                                  .get(ipv4=ipv4, machine=vm,
312
                                                       deleted=False)
313
                    floating_ip.machine = None
314
                    floating_ip.save()
315
                except FloatingIP.DoesNotExist:
316
                    net.release_address(ipv4)
317
            else:
318
                net.release_address(ipv4)
319

    
320

    
321
@transaction.commit_on_success
322
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
323
    if status not in [x[0] for x in BACKEND_STATUSES]:
324
        raise Network.InvalidBackendMsgError(opcode, status)
325

    
326
    back_network.backendjobid = jobid
327
    back_network.backendjobstatus = status
328
    back_network.backendopcode = opcode
329
    back_network.backendlogmsg = logmsg
330

    
331
    network = back_network.network
332

    
333
    # Notifications of success change the operating state
334
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
335
    if status == 'success' and state_for_success is not None:
336
        back_network.operstate = state_for_success
337

    
338
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
339
        back_network.operstate = 'ERROR'
340
        back_network.backendtime = etime
341

    
342
    if opcode == 'OP_NETWORK_REMOVE':
343
        if (status == 'success' or
344
           (status == 'error' and (back_network.operstate == 'ERROR' or
345
                                   network.action == 'DESTROY'))):
346
            back_network.operstate = state_for_success
347
            back_network.deleted = True
348
            back_network.backendtime = etime
349

    
350
    if status == 'success':
351
        back_network.backendtime = etime
352
    back_network.save()
353
    # Also you must update the state of the Network!!
354
    update_network_state(network)
355

    
356

    
357
def update_network_state(network):
358
    """Update the state of a Network based on BackendNetwork states.
359

360
    Update the state of a Network based on the operstate of the networks in the
361
    backends that network exists.
362

363
    The state of the network is:
364
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
365
    * DELETED: If it is is 'DELETED' in all backends that have been created.
366

367
    This function also releases the resources (MAC prefix or Bridge) and the
368
    quotas for the network.
369

370
    """
371
    if network.deleted:
372
        # Network has already been deleted. Just assert that state is also
373
        # DELETED
374
        if not network.state == "DELETED":
375
            network.state = "DELETED"
376
            network.save()
377
        return
378

    
379
    backend_states = [s.operstate for s in network.backend_networks.all()]
380
    if not backend_states and network.action != "DESTROY":
381
        if network.state != "ACTIVE":
382
            network.state = "ACTIVE"
383
            network.save()
384
            return
385

    
386
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
387
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
388
                     "DELETED")
389

    
390
    # Release the resources on the deletion of the Network
391
    if deleted:
392
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
393
                 network.id, network.mac_prefix, network.link)
394
        network.deleted = True
395
        network.state = "DELETED"
396
        if network.mac_prefix:
397
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
398
                release_resource(res_type="mac_prefix",
399
                                 value=network.mac_prefix)
400
        if network.link:
401
            if network.FLAVORS[network.flavor]["link"] == "pool":
402
                release_resource(res_type="bridge", value=network.link)
403

    
404
        # Issue commission
405
        if network.userid:
406
            quotas.issue_and_accept_commission(network, delete=True)
407
        elif not network.public:
408
            log.warning("Network %s does not have an owner!", network.id)
409
    network.save()
410

    
411

    
412
@transaction.commit_on_success
413
def process_network_modify(back_network, etime, jobid, opcode, status,
414
                           add_reserved_ips, remove_reserved_ips):
415
    assert (opcode == "OP_NETWORK_SET_PARAMS")
416
    if status not in [x[0] for x in BACKEND_STATUSES]:
417
        raise Network.InvalidBackendMsgError(opcode, status)
418

    
419
    back_network.backendjobid = jobid
420
    back_network.backendjobstatus = status
421
    back_network.opcode = opcode
422

    
423
    if add_reserved_ips or remove_reserved_ips:
424
        net = back_network.network
425
        pool = net.get_pool()
426
        if add_reserved_ips:
427
            for ip in add_reserved_ips:
428
                pool.reserve(ip, external=True)
429
        if remove_reserved_ips:
430
            for ip in remove_reserved_ips:
431
                pool.put(ip, external=True)
432
        pool.save()
433

    
434
    if status == 'success':
435
        back_network.backendtime = etime
436
    back_network.save()
437

    
438

    
439
@transaction.commit_on_success
440
def process_create_progress(vm, etime, progress):
441

    
442
    percentage = int(progress)
443

    
444
    # The percentage may exceed 100%, due to the way
445
    # snf-image:copy-progress tracks bytes read by image handling processes
446
    percentage = 100 if percentage > 100 else percentage
447
    if percentage < 0:
448
        raise ValueError("Percentage cannot be negative")
449

    
450
    # FIXME: log a warning here, see #1033
451
#   if last_update > percentage:
452
#       raise ValueError("Build percentage should increase monotonically " \
453
#                        "(old = %d, new = %d)" % (last_update, percentage))
454

    
455
    # This assumes that no message of type 'ganeti-create-progress' is going to
456
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
457
    # the instance is STARTED.  What if the two messages are processed by two
458
    # separate dispatcher threads, and the 'ganeti-op-status' message for
459
    # successful creation gets processed before the 'ganeti-create-progress'
460
    # message? [vkoukis]
461
    #
462
    #if not vm.operstate == 'BUILD':
463
    #    raise VirtualMachine.IllegalState("VM is not in building state")
464

    
465
    vm.buildpercentage = percentage
466
    vm.backendtime = etime
467
    vm.save()
468

    
469

    
470
@transaction.commit_on_success
471
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
472
                               details=None):
473
    """
474
    Create virtual machine instance diagnostic entry.
475

476
    :param vm: VirtualMachine instance to create diagnostic for.
477
    :param message: Diagnostic message.
478
    :param source: Diagnostic source identifier (e.g. image-helper).
479
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
480
    :param etime: The time the message occured (if available).
481
    :param details: Additional details or debug information.
482
    """
483
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
484
                                                   source_date=etime,
485
                                                   message=message,
486
                                                   details=details)
487

    
488

    
489
def create_instance(vm, nics, flavor, image):
490
    """`image` is a dictionary which should contain the keys:
491
            'backend_id', 'format' and 'metadata'
492

493
        metadata value should be a dictionary.
494
    """
495

    
496
    # Handle arguments to CreateInstance() as a dictionary,
497
    # initialize it based on a deployment-specific value.
498
    # This enables the administrator to override deployment-specific
499
    # arguments, such as the disk template to use, name of os provider
500
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
501
    #
502
    kw = vm.backend.get_create_params()
503
    kw['mode'] = 'create'
504
    kw['name'] = vm.backend_vm_id
505
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
506

    
507
    kw['disk_template'] = flavor.disk_template
508
    kw['disks'] = [{"size": flavor.disk * 1024}]
509
    provider = flavor.disk_provider
510
    if provider:
511
        kw['disks'][0]['provider'] = provider
512
        kw['disks'][0]['origin'] = flavor.disk_origin
513

    
514
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
515
                  for nic in nics]
516
    backend = vm.backend
517
    depend_jobs = []
518
    for nic in nics:
519
        network = Network.objects.select_for_update().get(id=nic.network.id)
520
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
521
                                                             network=network)
522
        if bnet.operstate != "ACTIVE":
523
            if network.public:
524
                msg = "Can not connect instance to network %s. Network is not"\
525
                      " ACTIVE in backend %s." % (network, backend)
526
                raise Exception(msg)
527
            else:
528
                jobs = create_network(network, backend, connect=True)
529
                if isinstance(jobs, list):
530
                    depend_jobs.extend(jobs)
531
                else:
532
                    depend_jobs.append(jobs)
533
    kw["depends"] = [[job, ["success", "error", "canceled"]]
534
                     for job in depend_jobs]
535

    
536
    if vm.backend.use_hotplug():
537
        kw['hotplug'] = True
538
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
539
    # kw['os'] = settings.GANETI_OS_PROVIDER
540
    kw['ip_check'] = False
541
    kw['name_check'] = False
542

    
543
    # Do not specific a node explicitly, have
544
    # Ganeti use an iallocator instead
545
    #kw['pnode'] = rapi.GetNodes()[0]
546

    
547
    kw['dry_run'] = settings.TEST
548

    
549
    kw['beparams'] = {
550
        'auto_balance': True,
551
        'vcpus': flavor.cpu,
552
        'memory': flavor.ram}
553

    
554
    kw['osparams'] = {
555
        'config_url': vm.config_url,
556
        # Store image id and format to Ganeti
557
        'img_id': image['backend_id'],
558
        'img_format': image['format']}
559

    
560
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
561
    # kw['hvparams'] = dict(serial_console=False)
562

    
563
    log.debug("Creating instance %s", utils.hide_pass(kw))
564
    with pooled_rapi_client(vm) as client:
565
        return client.CreateInstance(**kw)
566

    
567

    
568
def delete_instance(vm):
569
    with pooled_rapi_client(vm) as client:
570
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
571

    
572

    
573
def reboot_instance(vm, reboot_type):
574
    assert reboot_type in ('soft', 'hard')
575
    kwargs = {"instance": vm.backend_vm_id,
576
              "reboot_type": "hard"}
577
    # XXX: Currently shutdown_timeout parameter is not supported from the
578
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
579
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
580
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
581
    # of OS API is different from the one in Ganeti, and maps to
582
    # 'shutdown_timeout'.
583
    #if reboot_type == "hard":
584
    #    kwargs["shutdown_timeout"] = 0
585
    if settings.TEST:
586
        kwargs["dry_run"] = True
587
    with pooled_rapi_client(vm) as client:
588
        return client.RebootInstance(**kwargs)
589

    
590

    
591
def startup_instance(vm):
592
    with pooled_rapi_client(vm) as client:
593
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
594

    
595

    
596
def shutdown_instance(vm):
597
    with pooled_rapi_client(vm) as client:
598
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
599

    
600

    
601
def resize_instance(vm, vcpus, memory):
602
    beparams = {"vcpus": int(vcpus),
603
                "minmem": int(memory),
604
                "maxmem": int(memory)}
605
    with pooled_rapi_client(vm) as client:
606
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
607

    
608

    
609
def get_instance_console(vm):
610
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
611
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
612
    # useless (see #783).
613
    #
614
    # Until this is fixed on the Ganeti side, construct a console info reply
615
    # directly.
616
    #
617
    # WARNING: This assumes that VNC runs on port network_port on
618
    #          the instance's primary node, and is probably
619
    #          hypervisor-specific.
620
    #
621
    log.debug("Getting console for vm %s", vm)
622

    
623
    console = {}
624
    console['kind'] = 'vnc'
625

    
626
    with pooled_rapi_client(vm) as client:
627
        i = client.GetInstance(vm.backend_vm_id)
628

    
629
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
630
        raise Exception("hv parameter serial_console cannot be true")
631
    console['host'] = i['pnode']
632
    console['port'] = i['network_port']
633

    
634
    return console
635

    
636

    
637
def get_instance_info(vm):
638
    with pooled_rapi_client(vm) as client:
639
        return client.GetInstanceInfo(vm.backend_vm_id)
640

    
641

    
642
def create_network(network, backend, connect=True):
643
    """Create a network in a Ganeti backend"""
644
    log.debug("Creating network %s in backend %s", network, backend)
645

    
646
    job_id = _create_network(network, backend)
647

    
648
    if connect:
649
        job_ids = connect_network(network, backend, depends=[job_id])
650
        return job_ids
651
    else:
652
        return [job_id]
653

    
654

    
655
def _create_network(network, backend):
656
    """Create a network."""
657

    
658
    network_type = network.public and 'public' or 'private'
659

    
660
    tags = network.backend_tag
661
    if network.dhcp:
662
        tags.append('nfdhcpd')
663

    
664
    if network.public:
665
        conflicts_check = True
666
    else:
667
        conflicts_check = False
668

    
669
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
670
    # not support IPv6 only networks. To bypass this limitation, we create the
671
    # network with a dummy network subnet, and make Cyclades connect instances
672
    # to such networks, with address=None.
673
    subnet = network.subnet
674
    if subnet is None:
675
        subnet = "10.0.0.0/24"
676

    
677
    try:
678
        bn = BackendNetwork.objects.get(network=network, backend=backend)
679
        mac_prefix = bn.mac_prefix
680
    except BackendNetwork.DoesNotExist:
681
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
682
                        " does not exist" % (network.id, backend.id))
683

    
684
    with pooled_rapi_client(backend) as client:
685
        return client.CreateNetwork(network_name=network.backend_id,
686
                                    network=subnet,
687
                                    network6=network.subnet6,
688
                                    gateway=network.gateway,
689
                                    gateway6=network.gateway6,
690
                                    network_type=network_type,
691
                                    mac_prefix=mac_prefix,
692
                                    conflicts_check=conflicts_check,
693
                                    tags=tags)
694

    
695

    
696
def connect_network(network, backend, depends=[], group=None):
697
    """Connect a network to nodegroups."""
698
    log.debug("Connecting network %s to backend %s", network, backend)
699

    
700
    if network.public:
701
        conflicts_check = True
702
    else:
703
        conflicts_check = False
704

    
705
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
706
    with pooled_rapi_client(backend) as client:
707
        groups = [group] if group is not None else client.GetGroups()
708
        job_ids = []
709
        for group in groups:
710
            job_id = client.ConnectNetwork(network.backend_id, group,
711
                                           network.mode, network.link,
712
                                           conflicts_check,
713
                                           depends=depends)
714
            job_ids.append(job_id)
715
    return job_ids
716

    
717

    
718
def delete_network(network, backend, disconnect=True):
719
    log.debug("Deleting network %s from backend %s", network, backend)
720

    
721
    depends = []
722
    if disconnect:
723
        depends = disconnect_network(network, backend)
724
    _delete_network(network, backend, depends=depends)
725

    
726

    
727
def _delete_network(network, backend, depends=[]):
728
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
729
    with pooled_rapi_client(backend) as client:
730
        return client.DeleteNetwork(network.backend_id, depends)
731

    
732

    
733
def disconnect_network(network, backend, group=None):
734
    log.debug("Disconnecting network %s to backend %s", network, backend)
735

    
736
    with pooled_rapi_client(backend) as client:
737
        groups = [group] if group is not None else client.GetGroups()
738
        job_ids = []
739
        for group in groups:
740
            job_id = client.DisconnectNetwork(network.backend_id, group)
741
            job_ids.append(job_id)
742
    return job_ids
743

    
744

    
745
def connect_to_network(vm, network, address=None):
746
    backend = vm.backend
747
    network = Network.objects.select_for_update().get(id=network.id)
748
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
749
                                                         network=network)
750
    depend_jobs = []
751
    if bnet.operstate != "ACTIVE":
752
        depend_jobs = create_network(network, backend, connect=True)
753

    
754
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
755

    
756
    nic = {'ip': address, 'network': network.backend_id}
757

    
758
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
759

    
760
    kwargs = {
761
        "instance": vm.backend_vm_id,
762
        "nics": [("add", nic)],
763
        "depends": depends,
764
    }
765
    if vm.backend.use_hotplug():
766
        kwargs["hotplug"] = True
767
    if settings.TEST:
768
        kwargs["dry_run"] = True
769

    
770
    with pooled_rapi_client(vm) as client:
771
        return client.ModifyInstance(**kwargs)
772

    
773

    
774
def disconnect_from_network(vm, nic):
775
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
776

    
777
    kwargs = {
778
        "instance": vm.backend_vm_id,
779
        "nics": [("remove", nic.index, {})],
780
    }
781
    if vm.backend.use_hotplug():
782
        kwargs["hotplug"] = True
783
    if settings.TEST:
784
        kwargs["dry_run"] = True
785

    
786
    with pooled_rapi_client(vm) as client:
787
        jobID = client.ModifyInstance(**kwargs)
788
        # If the NIC has a tag for a firewall profile it must be deleted,
789
        # otherwise it may affect another NIC. XXX: Deleting the tag should
790
        # depend on the removing the NIC, but currently RAPI client does not
791
        # support this, this may result in clearing the firewall profile
792
        # without successfully removing the NIC. This issue will be fixed with
793
        # use of NIC UUIDs.
794
        firewall_profile = nic.firewall_profile
795
        if firewall_profile and firewall_profile != "DISABLED":
796
            tag = _firewall_tags[firewall_profile] % nic.index
797
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
798
                                      dry_run=settings.TEST)
799

    
800
        return jobID
801

    
802

    
803
def set_firewall_profile(vm, profile, index=0):
804
    try:
805
        tag = _firewall_tags[profile] % index
806
    except KeyError:
807
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
808

    
809
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
810

    
811
    with pooled_rapi_client(vm) as client:
812
        # Delete previous firewall tags
813
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
814
        delete_tags = [(t % index) for t in _firewall_tags.values()
815
                       if (t % index) in old_tags]
816
        if delete_tags:
817
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
818
                                      dry_run=settings.TEST)
819

    
820
        if profile != "DISABLED":
821
            client.AddInstanceTags(vm.backend_vm_id, [tag],
822
                                   dry_run=settings.TEST)
823

    
824
        # XXX NOP ModifyInstance call to force process_net_status to run
825
        # on the dispatcher
826
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
827
        client.ModifyInstance(vm.backend_vm_id,
828
                              os_name=os_name)
829
    return None
830

    
831

    
832
def get_instances(backend, bulk=True):
833
    with pooled_rapi_client(backend) as c:
834
        return c.GetInstances(bulk=bulk)
835

    
836

    
837
def get_nodes(backend, bulk=True):
838
    with pooled_rapi_client(backend) as c:
839
        return c.GetNodes(bulk=bulk)
840

    
841

    
842
def get_jobs(backend, bulk=True):
843
    with pooled_rapi_client(backend) as c:
844
        return c.GetJobs(bulk=bulk)
845

    
846

    
847
def get_physical_resources(backend):
848
    """ Get the physical resources of a backend.
849

850
    Get the resources of a backend as reported by the backend (not the db).
851

852
    """
853
    nodes = get_nodes(backend, bulk=True)
854
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
855
    res = {}
856
    for a in attr:
857
        res[a] = 0
858
    for n in nodes:
859
        # Filter out drained, offline and not vm_capable nodes since they will
860
        # not take part in the vm allocation process
861
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
862
        if can_host_vms and n['cnodes']:
863
            for a in attr:
864
                res[a] += int(n[a])
865
    return res
866

    
867

    
868
def update_resources(backend, resources=None):
869
    """ Update the state of the backend resources in db.
870

871
    """
872

    
873
    if not resources:
874
        resources = get_physical_resources(backend)
875

    
876
    backend.mfree = resources['mfree']
877
    backend.mtotal = resources['mtotal']
878
    backend.dfree = resources['dfree']
879
    backend.dtotal = resources['dtotal']
880
    backend.pinst_cnt = resources['pinst_cnt']
881
    backend.ctotal = resources['ctotal']
882
    backend.updated = datetime.now()
883
    backend.save()
884

    
885

    
886
def get_memory_from_instances(backend):
887
    """ Get the memory that is used from instances.
888

889
    Get the used memory of a backend. Note: This is different for
890
    the real memory used, due to kvm's memory de-duplication.
891

892
    """
893
    with pooled_rapi_client(backend) as client:
894
        instances = client.GetInstances(bulk=True)
895
    mem = 0
896
    for i in instances:
897
        mem += i['oper_ram']
898
    return mem
899

    
900
##
901
## Synchronized operations for reconciliation
902
##
903

    
904

    
905
def create_network_synced(network, backend):
906
    result = _create_network_synced(network, backend)
907
    if result[0] != 'success':
908
        return result
909
    result = connect_network_synced(network, backend)
910
    return result
911

    
912

    
913
def _create_network_synced(network, backend):
914
    with pooled_rapi_client(backend) as client:
915
        job = _create_network(network, backend)
916
        result = wait_for_job(client, job)
917
    return result
918

    
919

    
920
def connect_network_synced(network, backend):
921
    with pooled_rapi_client(backend) as client:
922
        for group in client.GetGroups():
923
            job = client.ConnectNetwork(network.backend_id, group,
924
                                        network.mode, network.link)
925
            result = wait_for_job(client, job)
926
            if result[0] != 'success':
927
                return result
928

    
929
    return result
930

    
931

    
932
def wait_for_job(client, jobid):
933
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
934
    status = result['job_info'][0]
935
    while status not in ['success', 'error', 'cancel']:
936
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
937
                                         [result], None)
938
        status = result['job_info'][0]
939

    
940
    if status == 'success':
941
        return (status, None)
942
    else:
943
        error = result['job_info'][1]
944
        return (status, error)