Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 90858bda

History | View | Annotate | Download (34.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
100
                           % (vm, job_id))
101
        serial = quotas.issue_commission(user=vm.userid,
102
                                         source=quotas.DEFAULT_SOURCE,
103
                                         provisions=commission_info,
104
                                         name=commission_name,
105
                                         force=True,
106
                                         auto_accept=True)
107
        # Clear VM's serial. Expected job may arrive later. However correlated
108
        # serial must not be accepted, since it reflects a previous VM state
109
        vm.serial = None
110

    
111
    return vm
112

    
113

    
114
@transaction.commit_on_success
115
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
116
                      beparams=None):
117
    """Process a job progress notification from the backend
118

119
    Process an incoming message from the backend (currently Ganeti).
120
    Job notifications with a terminating status (sucess, error, or canceled),
121
    also update the operating state of the VM.
122

123
    """
124
    # See #1492, #1031, #1111 why this line has been removed
125
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
126
    if status not in [x[0] for x in BACKEND_STATUSES]:
127
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
128

    
129
    vm.backendjobid = jobid
130
    vm.backendjobstatus = status
131
    vm.backendopcode = opcode
132
    vm.backendlogmsg = logmsg
133

    
134
    if status in ["queued", "waiting", "running"]:
135
        vm.save()
136
        return
137

    
138
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
139
    # Notifications of success change the operating state
140
    if status == "success":
141
        if state_for_success is not None:
142
            vm.operstate = state_for_success
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    if status in ["success", "error", "canceled"] and nics is not None:
154
        # Update the NICs of the VM
155
        _process_net_status(vm, etime, nics)
156

    
157
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
158
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
159
        vm.operstate = 'ERROR'
160
        vm.backendtime = etime
161
    elif opcode == 'OP_INSTANCE_REMOVE':
162
        # Set the deleted flag explicitly, cater for admin-initiated removals
163
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
164
        # when no instance exists at the Ganeti backend.
165
        # See ticket #799 for all the details.
166
        if (status == 'success' or
167
           (status == 'error' and (vm.operstate == 'ERROR' or
168
                                   vm.action == 'DESTROY'))):
169
            # VM has been deleted. Release the instance IPs
170
            release_instance_ips(vm, [])
171
            # And delete the releated NICs (must be performed after release!)
172
            vm.nics.all().delete()
173
            vm.deleted = True
174
            vm.operstate = state_for_success
175
            vm.backendtime = etime
176
            status = "success"
177

    
178
    if status in ["success", "error", "canceled"]:
179
        # Job is finalized: Handle quotas/commissioning
180
        job_fields = {"nics": nics, "beparams": beparams}
181
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
182
                              job_status=status, job_fields=job_fields)
183
        # and clear task fields
184
        if vm.task_job_id == jobid:
185
            vm.task = None
186
            vm.task_job_id = None
187

    
188
    vm.save()
189

    
190

    
191
def _process_resize(vm, beparams):
192
    """Change flavor of a VirtualMachine based on new beparams."""
193
    old_flavor = vm.flavor
194
    vcpus = beparams.get("vcpus", old_flavor.cpu)
195
    ram = beparams.get("maxmem", old_flavor.ram)
196
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
197
        return
198
    try:
199
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
200
                                        disk=old_flavor.disk,
201
                                        disk_template=old_flavor.disk_template)
202
    except Flavor.DoesNotExist:
203
        raise Exception("Can not find flavor for VM")
204
    vm.flavor = new_flavor
205
    vm.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_net_status(vm, etime, nics):
210
    """Wrap _process_net_status inside transaction."""
211
    _process_net_status(vm, etime, nics)
212

    
213

    
214
def _process_net_status(vm, etime, nics):
215
    """Process a net status notification from the backend
216

217
    Process an incoming message from the Ganeti backend,
218
    detailing the NIC configuration of a VM instance.
219

220
    Update the state of the VM in the DB accordingly.
221
    """
222

    
223
    ganeti_nics = process_ganeti_nics(nics)
224
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
225
        log.debug("NICs for VM %s have not changed", vm)
226
        return
227

    
228
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
229
    # guarantee that no deadlock will occur with Backend allocator.
230
    Backend.objects.select_for_update().get(id=vm.backend_id)
231

    
232
    # NICs have changed. Release the instance IPs
233
    release_instance_ips(vm, ganeti_nics)
234
    # And delete the releated NICs (must be performed after release!)
235
    vm.nics.all().delete()
236

    
237
    for nic in ganeti_nics:
238
        ipv4 = nic["ipv4"]
239
        net = nic['network']
240
        if ipv4:
241
            net.reserve_address(ipv4)
242

    
243
        nic['dirty'] = False
244
        vm.nics.create(**nic)
245
        # Dummy save the network, because UI uses changed-since for VMs
246
        # and Networks in order to show the VM NICs
247
        net.save()
248

    
249
    vm.backendtime = etime
250
    vm.save()
251

    
252

    
253
def process_ganeti_nics(ganeti_nics):
254
    """Process NIC dict from ganeti hooks."""
255
    new_nics = []
256
    for i, new_nic in enumerate(ganeti_nics):
257
        network = new_nic.get('network', '')
258
        n = str(network)
259
        pk = utils.id_from_network_name(n)
260

    
261
        net = Network.objects.get(pk=pk)
262

    
263
        # Get the new nic info
264
        mac = new_nic.get('mac')
265
        ipv4 = new_nic.get('ip')
266
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
267

    
268
        firewall = new_nic.get('firewall')
269
        firewall_profile = _reverse_tags.get(firewall)
270
        if not firewall_profile and net.public:
271
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
272

    
273
        nic = {
274
            'index': i,
275
            'network': net,
276
            'mac': mac,
277
            'ipv4': ipv4,
278
            'ipv6': ipv6,
279
            'firewall_profile': firewall_profile,
280
            'state': 'ACTIVE'}
281

    
282
        new_nics.append(nic)
283
    return new_nics
284

    
285

    
286
def nics_changed(old_nics, new_nics):
287
    """Return True if NICs have changed in any way."""
288
    if len(old_nics) != len(new_nics):
289
        return True
290
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
291
    for old_nic, new_nic in zip(old_nics, new_nics):
292
        for field in fields:
293
            if getattr(old_nic, field) != new_nic[field]:
294
                return True
295
    return False
296

    
297

    
298
def release_instance_ips(vm, ganeti_nics):
299
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
300
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
301
                            ganeti_nics))
302
    to_release = old_addresses - new_addresses
303
    for (network_id, ipv4) in to_release:
304
        if ipv4:
305
            # Get X-Lock before searching floating IP, to exclusively search
306
            # and release floating IP. Otherwise you may release a floating IP
307
            # that has been just reserved.
308
            net = Network.objects.select_for_update().get(id=network_id)
309
            if net.floating_ip_pool:
310
                try:
311
                    floating_ip = net.floating_ips.select_for_update()\
312
                                                  .get(ipv4=ipv4, machine=vm,
313
                                                       deleted=False)
314
                    floating_ip.machine = None
315
                    floating_ip.save()
316
                except FloatingIP.DoesNotExist:
317
                    net.release_address(ipv4)
318
            else:
319
                net.release_address(ipv4)
320

    
321

    
322
@transaction.commit_on_success
323
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
324
    if status not in [x[0] for x in BACKEND_STATUSES]:
325
        raise Network.InvalidBackendMsgError(opcode, status)
326

    
327
    back_network.backendjobid = jobid
328
    back_network.backendjobstatus = status
329
    back_network.backendopcode = opcode
330
    back_network.backendlogmsg = logmsg
331

    
332
    network = back_network.network
333

    
334
    # Notifications of success change the operating state
335
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
336
    if status == 'success' and state_for_success is not None:
337
        back_network.operstate = state_for_success
338

    
339
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
340
        back_network.operstate = 'ERROR'
341
        back_network.backendtime = etime
342

    
343
    if opcode == 'OP_NETWORK_REMOVE':
344
        if (status == 'success' or
345
           (status == 'error' and (back_network.operstate == 'ERROR' or
346
                                   network.action == 'DESTROY'))):
347
            back_network.operstate = state_for_success
348
            back_network.deleted = True
349
            back_network.backendtime = etime
350

    
351
    if status == 'success':
352
        back_network.backendtime = etime
353
    back_network.save()
354
    # Also you must update the state of the Network!!
355
    update_network_state(network)
356

    
357

    
358
def update_network_state(network):
359
    """Update the state of a Network based on BackendNetwork states.
360

361
    Update the state of a Network based on the operstate of the networks in the
362
    backends that network exists.
363

364
    The state of the network is:
365
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
366
    * DELETED: If it is is 'DELETED' in all backends that have been created.
367

368
    This function also releases the resources (MAC prefix or Bridge) and the
369
    quotas for the network.
370

371
    """
372
    if network.deleted:
373
        # Network has already been deleted. Just assert that state is also
374
        # DELETED
375
        if not network.state == "DELETED":
376
            network.state = "DELETED"
377
            network.save()
378
        return
379

    
380
    backend_states = [s.operstate for s in network.backend_networks.all()]
381
    if not backend_states and network.action != "DESTROY":
382
        if network.state != "ACTIVE":
383
            network.state = "ACTIVE"
384
            network.save()
385
            return
386

    
387
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
388
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
389
                     "DELETED")
390

    
391
    # Release the resources on the deletion of the Network
392
    if deleted:
393
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
394
                 network.id, network.mac_prefix, network.link)
395
        network.deleted = True
396
        network.state = "DELETED"
397
        if network.mac_prefix:
398
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
399
                release_resource(res_type="mac_prefix",
400
                                 value=network.mac_prefix)
401
        if network.link:
402
            if network.FLAVORS[network.flavor]["link"] == "pool":
403
                release_resource(res_type="bridge", value=network.link)
404

    
405
        # Issue commission
406
        if network.userid:
407
            quotas.issue_and_accept_commission(network, delete=True)
408
        elif not network.public:
409
            log.warning("Network %s does not have an owner!", network.id)
410
    network.save()
411

    
412

    
413
@transaction.commit_on_success
414
def process_network_modify(back_network, etime, jobid, opcode, status,
415
                           add_reserved_ips, remove_reserved_ips):
416
    assert (opcode == "OP_NETWORK_SET_PARAMS")
417
    if status not in [x[0] for x in BACKEND_STATUSES]:
418
        raise Network.InvalidBackendMsgError(opcode, status)
419

    
420
    back_network.backendjobid = jobid
421
    back_network.backendjobstatus = status
422
    back_network.opcode = opcode
423

    
424
    if add_reserved_ips or remove_reserved_ips:
425
        net = back_network.network
426
        pool = net.get_pool()
427
        if add_reserved_ips:
428
            for ip in add_reserved_ips:
429
                pool.reserve(ip, external=True)
430
        if remove_reserved_ips:
431
            for ip in remove_reserved_ips:
432
                pool.put(ip, external=True)
433
        pool.save()
434

    
435
    if status == 'success':
436
        back_network.backendtime = etime
437
    back_network.save()
438

    
439

    
440
@transaction.commit_on_success
441
def process_create_progress(vm, etime, progress):
442

    
443
    percentage = int(progress)
444

    
445
    # The percentage may exceed 100%, due to the way
446
    # snf-image:copy-progress tracks bytes read by image handling processes
447
    percentage = 100 if percentage > 100 else percentage
448
    if percentage < 0:
449
        raise ValueError("Percentage cannot be negative")
450

    
451
    # FIXME: log a warning here, see #1033
452
#   if last_update > percentage:
453
#       raise ValueError("Build percentage should increase monotonically " \
454
#                        "(old = %d, new = %d)" % (last_update, percentage))
455

    
456
    # This assumes that no message of type 'ganeti-create-progress' is going to
457
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
458
    # the instance is STARTED.  What if the two messages are processed by two
459
    # separate dispatcher threads, and the 'ganeti-op-status' message for
460
    # successful creation gets processed before the 'ganeti-create-progress'
461
    # message? [vkoukis]
462
    #
463
    #if not vm.operstate == 'BUILD':
464
    #    raise VirtualMachine.IllegalState("VM is not in building state")
465

    
466
    vm.buildpercentage = percentage
467
    vm.backendtime = etime
468
    vm.save()
469

    
470

    
471
@transaction.commit_on_success
472
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
473
                               details=None):
474
    """
475
    Create virtual machine instance diagnostic entry.
476

477
    :param vm: VirtualMachine instance to create diagnostic for.
478
    :param message: Diagnostic message.
479
    :param source: Diagnostic source identifier (e.g. image-helper).
480
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
481
    :param etime: The time the message occured (if available).
482
    :param details: Additional details or debug information.
483
    """
484
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
485
                                                   source_date=etime,
486
                                                   message=message,
487
                                                   details=details)
488

    
489

    
490
def create_instance(vm, nics, flavor, image):
491
    """`image` is a dictionary which should contain the keys:
492
            'backend_id', 'format' and 'metadata'
493

494
        metadata value should be a dictionary.
495
    """
496

    
497
    # Handle arguments to CreateInstance() as a dictionary,
498
    # initialize it based on a deployment-specific value.
499
    # This enables the administrator to override deployment-specific
500
    # arguments, such as the disk template to use, name of os provider
501
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
502
    #
503
    kw = vm.backend.get_create_params()
504
    kw['mode'] = 'create'
505
    kw['name'] = vm.backend_vm_id
506
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
507

    
508
    kw['disk_template'] = flavor.disk_template
509
    kw['disks'] = [{"size": flavor.disk * 1024}]
510
    provider = flavor.disk_provider
511
    if provider:
512
        kw['disks'][0]['provider'] = provider
513
        kw['disks'][0]['origin'] = flavor.disk_origin
514

    
515
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
516
                  for nic in nics]
517
    backend = vm.backend
518
    depend_jobs = []
519
    for nic in nics:
520
        network = Network.objects.select_for_update().get(id=nic.network.id)
521
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
522
                                                             network=network)
523
        if bnet.operstate != "ACTIVE":
524
            if network.public:
525
                msg = "Can not connect instance to network %s. Network is not"\
526
                      " ACTIVE in backend %s." % (network, backend)
527
                raise Exception(msg)
528
            else:
529
                jobs = create_network(network, backend, connect=True)
530
                if isinstance(jobs, list):
531
                    depend_jobs.extend(jobs)
532
                else:
533
                    depend_jobs.append(jobs)
534
    kw["depends"] = [[job, ["success", "error", "canceled"]]
535
                     for job in depend_jobs]
536

    
537
    if vm.backend.use_hotplug():
538
        kw['hotplug'] = True
539
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
540
    # kw['os'] = settings.GANETI_OS_PROVIDER
541
    kw['ip_check'] = False
542
    kw['name_check'] = False
543

    
544
    # Do not specific a node explicitly, have
545
    # Ganeti use an iallocator instead
546
    #kw['pnode'] = rapi.GetNodes()[0]
547

    
548
    kw['dry_run'] = settings.TEST
549

    
550
    kw['beparams'] = {
551
        'auto_balance': True,
552
        'vcpus': flavor.cpu,
553
        'memory': flavor.ram}
554

    
555
    kw['osparams'] = {
556
        'config_url': vm.config_url,
557
        # Store image id and format to Ganeti
558
        'img_id': image['backend_id'],
559
        'img_format': image['format']}
560

    
561
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
562
    # kw['hvparams'] = dict(serial_console=False)
563

    
564
    log.debug("Creating instance %s", utils.hide_pass(kw))
565
    with pooled_rapi_client(vm) as client:
566
        return client.CreateInstance(**kw)
567

    
568

    
569
def delete_instance(vm):
570
    with pooled_rapi_client(vm) as client:
571
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
572

    
573

    
574
def reboot_instance(vm, reboot_type):
575
    assert reboot_type in ('soft', 'hard')
576
    kwargs = {"instance": vm.backend_vm_id,
577
              "reboot_type": "hard"}
578
    # XXX: Currently shutdown_timeout parameter is not supported from the
579
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
580
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
581
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
582
    # of OS API is different from the one in Ganeti, and maps to
583
    # 'shutdown_timeout'.
584
    #if reboot_type == "hard":
585
    #    kwargs["shutdown_timeout"] = 0
586
    if settings.TEST:
587
        kwargs["dry_run"] = True
588
    with pooled_rapi_client(vm) as client:
589
        return client.RebootInstance(**kwargs)
590

    
591

    
592
def startup_instance(vm):
593
    with pooled_rapi_client(vm) as client:
594
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
595

    
596

    
597
def shutdown_instance(vm):
598
    with pooled_rapi_client(vm) as client:
599
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
600

    
601

    
602
def resize_instance(vm, vcpus, memory):
603
    beparams = {"vcpus": int(vcpus),
604
                "minmem": int(memory),
605
                "maxmem": int(memory)}
606
    with pooled_rapi_client(vm) as client:
607
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
608

    
609

    
610
def get_instance_console(vm):
611
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
612
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
613
    # useless (see #783).
614
    #
615
    # Until this is fixed on the Ganeti side, construct a console info reply
616
    # directly.
617
    #
618
    # WARNING: This assumes that VNC runs on port network_port on
619
    #          the instance's primary node, and is probably
620
    #          hypervisor-specific.
621
    #
622
    log.debug("Getting console for vm %s", vm)
623

    
624
    console = {}
625
    console['kind'] = 'vnc'
626

    
627
    with pooled_rapi_client(vm) as client:
628
        i = client.GetInstance(vm.backend_vm_id)
629

    
630
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
631
        raise Exception("hv parameter serial_console cannot be true")
632
    console['host'] = i['pnode']
633
    console['port'] = i['network_port']
634

    
635
    return console
636

    
637

    
638
def get_instance_info(vm):
639
    with pooled_rapi_client(vm) as client:
640
        return client.GetInstanceInfo(vm.backend_vm_id)
641

    
642

    
643
def create_network(network, backend, connect=True):
644
    """Create a network in a Ganeti backend"""
645
    log.debug("Creating network %s in backend %s", network, backend)
646

    
647
    job_id = _create_network(network, backend)
648

    
649
    if connect:
650
        job_ids = connect_network(network, backend, depends=[job_id])
651
        return job_ids
652
    else:
653
        return [job_id]
654

    
655

    
656
def _create_network(network, backend):
657
    """Create a network."""
658

    
659
    network_type = network.public and 'public' or 'private'
660

    
661
    tags = network.backend_tag
662
    if network.dhcp:
663
        tags.append('nfdhcpd')
664

    
665
    if network.public:
666
        conflicts_check = True
667
    else:
668
        conflicts_check = False
669

    
670
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
671
    # not support IPv6 only networks. To bypass this limitation, we create the
672
    # network with a dummy network subnet, and make Cyclades connect instances
673
    # to such networks, with address=None.
674
    subnet = network.subnet
675
    if subnet is None:
676
        subnet = "10.0.0.0/24"
677

    
678
    try:
679
        bn = BackendNetwork.objects.get(network=network, backend=backend)
680
        mac_prefix = bn.mac_prefix
681
    except BackendNetwork.DoesNotExist:
682
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
683
                        " does not exist" % (network.id, backend.id))
684

    
685
    with pooled_rapi_client(backend) as client:
686
        return client.CreateNetwork(network_name=network.backend_id,
687
                                    network=subnet,
688
                                    network6=network.subnet6,
689
                                    gateway=network.gateway,
690
                                    gateway6=network.gateway6,
691
                                    network_type=network_type,
692
                                    mac_prefix=mac_prefix,
693
                                    conflicts_check=conflicts_check,
694
                                    tags=tags)
695

    
696

    
697
def connect_network(network, backend, depends=[], group=None):
698
    """Connect a network to nodegroups."""
699
    log.debug("Connecting network %s to backend %s", network, backend)
700

    
701
    if network.public:
702
        conflicts_check = True
703
    else:
704
        conflicts_check = False
705

    
706
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
707
    with pooled_rapi_client(backend) as client:
708
        groups = [group] if group is not None else client.GetGroups()
709
        job_ids = []
710
        for group in groups:
711
            job_id = client.ConnectNetwork(network.backend_id, group,
712
                                           network.mode, network.link,
713
                                           conflicts_check,
714
                                           depends=depends)
715
            job_ids.append(job_id)
716
    return job_ids
717

    
718

    
719
def delete_network(network, backend, disconnect=True):
720
    log.debug("Deleting network %s from backend %s", network, backend)
721

    
722
    depends = []
723
    if disconnect:
724
        depends = disconnect_network(network, backend)
725
    _delete_network(network, backend, depends=depends)
726

    
727

    
728
def _delete_network(network, backend, depends=[]):
729
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
730
    with pooled_rapi_client(backend) as client:
731
        return client.DeleteNetwork(network.backend_id, depends)
732

    
733

    
734
def disconnect_network(network, backend, group=None):
735
    log.debug("Disconnecting network %s to backend %s", network, backend)
736

    
737
    with pooled_rapi_client(backend) as client:
738
        groups = [group] if group is not None else client.GetGroups()
739
        job_ids = []
740
        for group in groups:
741
            job_id = client.DisconnectNetwork(network.backend_id, group)
742
            job_ids.append(job_id)
743
    return job_ids
744

    
745

    
746
def connect_to_network(vm, nic):
747
    network = nic.network
748
    backend = vm.backend
749
    network = Network.objects.select_for_update().get(id=network.id)
750
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
751
                                                         network=network)
752
    depend_jobs = []
753
    if bnet.operstate != "ACTIVE":
754
        depend_jobs = create_network(network, backend, connect=True)
755

    
756
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
757

    
758
    nic = {'ip': nic.ipv4, 'network': network.backend_id}
759

    
760
    log.debug("Connecting NIC %s to VM %s", nic, vm)
761

    
762
    kwargs = {
763
        "instance": vm.backend_vm_id,
764
        "nics": [("add", nic)],
765
        "depends": depends,
766
    }
767
    if vm.backend.use_hotplug():
768
        kwargs["hotplug"] = True
769
    if settings.TEST:
770
        kwargs["dry_run"] = True
771

    
772
    with pooled_rapi_client(vm) as client:
773
        return client.ModifyInstance(**kwargs)
774

    
775

    
776
def disconnect_from_network(vm, nic):
777
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
778

    
779
    kwargs = {
780
        "instance": vm.backend_vm_id,
781
        "nics": [("remove", nic.index, {})],
782
    }
783
    if vm.backend.use_hotplug():
784
        kwargs["hotplug"] = True
785
    if settings.TEST:
786
        kwargs["dry_run"] = True
787

    
788
    with pooled_rapi_client(vm) as client:
789
        jobID = client.ModifyInstance(**kwargs)
790
        # If the NIC has a tag for a firewall profile it must be deleted,
791
        # otherwise it may affect another NIC. XXX: Deleting the tag should
792
        # depend on the removing the NIC, but currently RAPI client does not
793
        # support this, this may result in clearing the firewall profile
794
        # without successfully removing the NIC. This issue will be fixed with
795
        # use of NIC UUIDs.
796
        firewall_profile = nic.firewall_profile
797
        if firewall_profile and firewall_profile != "DISABLED":
798
            tag = _firewall_tags[firewall_profile] % nic.index
799
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
800
                                      dry_run=settings.TEST)
801

    
802
        return jobID
803

    
804

    
805
def set_firewall_profile(vm, profile, index=0):
806
    try:
807
        tag = _firewall_tags[profile] % index
808
    except KeyError:
809
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
810

    
811
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
812

    
813
    with pooled_rapi_client(vm) as client:
814
        # Delete previous firewall tags
815
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
816
        delete_tags = [(t % index) for t in _firewall_tags.values()
817
                       if (t % index) in old_tags]
818
        if delete_tags:
819
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
820
                                      dry_run=settings.TEST)
821

    
822
        if profile != "DISABLED":
823
            client.AddInstanceTags(vm.backend_vm_id, [tag],
824
                                   dry_run=settings.TEST)
825

    
826
        # XXX NOP ModifyInstance call to force process_net_status to run
827
        # on the dispatcher
828
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
829
        client.ModifyInstance(vm.backend_vm_id,
830
                              os_name=os_name)
831
    return None
832

    
833

    
834
def get_instances(backend, bulk=True):
835
    with pooled_rapi_client(backend) as c:
836
        return c.GetInstances(bulk=bulk)
837

    
838

    
839
def get_nodes(backend, bulk=True):
840
    with pooled_rapi_client(backend) as c:
841
        return c.GetNodes(bulk=bulk)
842

    
843

    
844
def get_jobs(backend, bulk=True):
845
    with pooled_rapi_client(backend) as c:
846
        return c.GetJobs(bulk=bulk)
847

    
848

    
849
def get_physical_resources(backend):
850
    """ Get the physical resources of a backend.
851

852
    Get the resources of a backend as reported by the backend (not the db).
853

854
    """
855
    nodes = get_nodes(backend, bulk=True)
856
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
857
    res = {}
858
    for a in attr:
859
        res[a] = 0
860
    for n in nodes:
861
        # Filter out drained, offline and not vm_capable nodes since they will
862
        # not take part in the vm allocation process
863
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
864
        if can_host_vms and n['cnodes']:
865
            for a in attr:
866
                res[a] += int(n[a])
867
    return res
868

    
869

    
870
def update_resources(backend, resources=None):
871
    """ Update the state of the backend resources in db.
872

873
    """
874

    
875
    if not resources:
876
        resources = get_physical_resources(backend)
877

    
878
    backend.mfree = resources['mfree']
879
    backend.mtotal = resources['mtotal']
880
    backend.dfree = resources['dfree']
881
    backend.dtotal = resources['dtotal']
882
    backend.pinst_cnt = resources['pinst_cnt']
883
    backend.ctotal = resources['ctotal']
884
    backend.updated = datetime.now()
885
    backend.save()
886

    
887

    
888
def get_memory_from_instances(backend):
889
    """ Get the memory that is used from instances.
890

891
    Get the used memory of a backend. Note: This is different for
892
    the real memory used, due to kvm's memory de-duplication.
893

894
    """
895
    with pooled_rapi_client(backend) as client:
896
        instances = client.GetInstances(bulk=True)
897
    mem = 0
898
    for i in instances:
899
        mem += i['oper_ram']
900
    return mem
901

    
902
##
903
## Synchronized operations for reconciliation
904
##
905

    
906

    
907
def create_network_synced(network, backend):
908
    result = _create_network_synced(network, backend)
909
    if result[0] != 'success':
910
        return result
911
    result = connect_network_synced(network, backend)
912
    return result
913

    
914

    
915
def _create_network_synced(network, backend):
916
    with pooled_rapi_client(backend) as client:
917
        job = _create_network(network, backend)
918
        result = wait_for_job(client, job)
919
    return result
920

    
921

    
922
def connect_network_synced(network, backend):
923
    with pooled_rapi_client(backend) as client:
924
        for group in client.GetGroups():
925
            job = client.ConnectNetwork(network.backend_id, group,
926
                                        network.mode, network.link)
927
            result = wait_for_job(client, job)
928
            if result[0] != 'success':
929
                return result
930

    
931
    return result
932

    
933

    
934
def wait_for_job(client, jobid):
935
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
936
    status = result['job_info'][0]
937
    while status not in ['success', 'error', 'cancel']:
938
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
939
                                         [result], None)
940
        status = result['job_info'][0]
941

    
942
    if status == 'success':
943
        return (status, None)
944
    else:
945
        error = result['job_info'][1]
946
        return (status, error)