Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 70a0afab

History | View | Annotate | Download (32.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if (status == 'success' or
163
           (status == 'error' and (vm.operstate == 'ERROR' or
164
                                   vm.action == 'DESTROY'))):
165
            # VM has been deleted. Release the instance IPs
166
            release_instance_ips(vm, [])
167
            # And delete the releated NICs (must be performed after release!)
168
            vm.nics.all().delete()
169
            vm.deleted = True
170
            vm.operstate = state_for_success
171
            vm.backendtime = etime
172
            status = "success"
173

    
174
    if status in ["success", "error", "canceled"]:
175
        # Job is finalized: Handle quotas/commissioning
176
        job_fields = {"nics": nics, "beparams": beparams}
177
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
178
                              job_status=status, job_fields=job_fields)
179
        # and clear task fields
180
        if vm.task_job_id == jobid:
181
            vm.task = None
182
            vm.task_job_id = None
183

    
184
    vm.save()
185

    
186

    
187
def _process_resize(vm, beparams):
188
    """Change flavor of a VirtualMachine based on new beparams."""
189
    old_flavor = vm.flavor
190
    vcpus = beparams.get("vcpus", old_flavor.cpu)
191
    ram = beparams.get("maxmem", old_flavor.ram)
192
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
193
        return
194
    try:
195
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
196
                                        disk=old_flavor.disk,
197
                                        disk_template=old_flavor.disk_template)
198
    except Flavor.DoesNotExist:
199
        raise Exception("Can not find flavor for VM")
200
    vm.flavor = new_flavor
201
    vm.save()
202

    
203

    
204
@transaction.commit_on_success
205
def process_net_status(vm, etime, nics):
206
    """Wrap _process_net_status inside transaction."""
207
    _process_net_status(vm, etime, nics)
208

    
209

    
210
def _process_net_status(vm, etime, nics):
211
    """Process a net status notification from the backend
212

213
    Process an incoming message from the Ganeti backend,
214
    detailing the NIC configuration of a VM instance.
215

216
    Update the state of the VM in the DB accordingly.
217
    """
218

    
219
    ganeti_nics = process_ganeti_nics(nics)
220
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
221
        log.debug("NICs for VM %s have not changed", vm)
222
        return
223

    
224
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
225
    # guarantee that no deadlock will occur with Backend allocator.
226
    Backend.objects.select_for_update().get(id=vm.backend_id)
227

    
228
    # NICs have changed. Release the instance IPs
229
    release_instance_ips(vm, ganeti_nics)
230
    # And delete the releated NICs (must be performed after release!)
231
    vm.nics.all().delete()
232

    
233
    for nic in ganeti_nics:
234
        ipv4 = nic["ipv4"]
235
        net = nic['network']
236
        if ipv4:
237
            net.reserve_address(ipv4)
238

    
239
        nic['dirty'] = False
240
        vm.nics.create(**nic)
241
        # Dummy save the network, because UI uses changed-since for VMs
242
        # and Networks in order to show the VM NICs
243
        net.save()
244

    
245
    vm.backendtime = etime
246
    vm.save()
247

    
248

    
249
def process_ganeti_nics(ganeti_nics):
250
    """Process NIC dict from ganeti hooks."""
251
    new_nics = []
252
    for i, new_nic in enumerate(ganeti_nics):
253
        network = new_nic.get('network', '')
254
        n = str(network)
255
        pk = utils.id_from_network_name(n)
256

    
257
        net = Network.objects.get(pk=pk)
258

    
259
        # Get the new nic info
260
        mac = new_nic.get('mac')
261
        ipv4 = new_nic.get('ip')
262
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
263

    
264
        firewall = new_nic.get('firewall')
265
        firewall_profile = _reverse_tags.get(firewall)
266
        if not firewall_profile and net.public:
267
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
268

    
269
        nic = {
270
            'index': i,
271
            'network': net,
272
            'mac': mac,
273
            'ipv4': ipv4,
274
            'ipv6': ipv6,
275
            'firewall_profile': firewall_profile,
276
            'state': 'ACTIVE'}
277

    
278
        new_nics.append(nic)
279
    return new_nics
280

    
281

    
282
def nics_changed(old_nics, new_nics):
283
    """Return True if NICs have changed in any way."""
284
    if len(old_nics) != len(new_nics):
285
        return True
286
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
287
    for old_nic, new_nic in zip(old_nics, new_nics):
288
        for field in fields:
289
            if getattr(old_nic, field) != new_nic[field]:
290
                return True
291
    return False
292

    
293

    
294
def release_instance_ips(vm, ganeti_nics):
295
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
296
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
297
                            ganeti_nics))
298
    to_release = old_addresses - new_addresses
299
    for (network_id, ipv4) in to_release:
300
        if ipv4:
301
            # Get X-Lock before searching floating IP, to exclusively search
302
            # and release floating IP. Otherwise you may release a floating IP
303
            # that has been just reserved.
304
            net = Network.objects.select_for_update().get(id=network_id)
305
            if net.floating_ip_pool:
306
                try:
307
                    floating_ip = net.floating_ips.select_for_update()\
308
                                                  .get(ipv4=ipv4, machine=vm,
309
                                                       deleted=False)
310
                    floating_ip.machine = None
311
                    floating_ip.save()
312
                except FloatingIP.DoesNotExist:
313
                    net.release_address(ipv4)
314
            else:
315
                net.release_address(ipv4)
316

    
317

    
318
@transaction.commit_on_success
319
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
320
    if status not in [x[0] for x in BACKEND_STATUSES]:
321
        raise Network.InvalidBackendMsgError(opcode, status)
322

    
323
    back_network.backendjobid = jobid
324
    back_network.backendjobstatus = status
325
    back_network.backendopcode = opcode
326
    back_network.backendlogmsg = logmsg
327

    
328
    network = back_network.network
329

    
330
    # Notifications of success change the operating state
331
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
332
    if status == 'success' and state_for_success is not None:
333
        back_network.operstate = state_for_success
334

    
335
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
336
        back_network.operstate = 'ERROR'
337
        back_network.backendtime = etime
338

    
339
    if opcode == 'OP_NETWORK_REMOVE':
340
        if (status == 'success' or
341
           (status == 'error' and (back_network.operstate == 'ERROR' or
342
                                   network.action == 'DESTROY'))):
343
            back_network.operstate = state_for_success
344
            back_network.deleted = True
345
            back_network.backendtime = etime
346

    
347
    if status == 'success':
348
        back_network.backendtime = etime
349
    back_network.save()
350
    # Also you must update the state of the Network!!
351
    update_network_state(network)
352

    
353

    
354
def update_network_state(network):
355
    """Update the state of a Network based on BackendNetwork states.
356

357
    Update the state of a Network based on the operstate of the networks in the
358
    backends that network exists.
359

360
    The state of the network is:
361
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
362
    * DELETED: If it is is 'DELETED' in all backends that have been created.
363

364
    This function also releases the resources (MAC prefix or Bridge) and the
365
    quotas for the network.
366

367
    """
368
    if network.deleted:
369
        # Network has already been deleted. Just assert that state is also
370
        # DELETED
371
        if not network.state == "DELETED":
372
            network.state = "DELETED"
373
            network.save()
374
        return
375

    
376
    backend_states = [s.operstate for s in network.backend_networks.all()]
377
    if not backend_states and network.action != "DESTROY":
378
        if network.state != "ACTIVE":
379
            network.state = "ACTIVE"
380
            network.save()
381
            return
382

    
383
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
384
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
385
                     "DELETED")
386

    
387
    # Release the resources on the deletion of the Network
388
    if deleted:
389
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
390
                 network.id, network.mac_prefix, network.link)
391
        network.deleted = True
392
        network.state = "DELETED"
393
        if network.mac_prefix:
394
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
395
                release_resource(res_type="mac_prefix",
396
                                 value=network.mac_prefix)
397
        if network.link:
398
            if network.FLAVORS[network.flavor]["link"] == "pool":
399
                release_resource(res_type="bridge", value=network.link)
400

    
401
        # Issue commission
402
        if network.userid:
403
            quotas.issue_and_accept_commission(network, delete=True)
404
        elif not network.public:
405
            log.warning("Network %s does not have an owner!", network.id)
406
    network.save()
407

    
408

    
409
@transaction.commit_on_success
410
def process_network_modify(back_network, etime, jobid, opcode, status,
411
                           add_reserved_ips, remove_reserved_ips):
412
    assert (opcode == "OP_NETWORK_SET_PARAMS")
413
    if status not in [x[0] for x in BACKEND_STATUSES]:
414
        raise Network.InvalidBackendMsgError(opcode, status)
415

    
416
    back_network.backendjobid = jobid
417
    back_network.backendjobstatus = status
418
    back_network.opcode = opcode
419

    
420
    if add_reserved_ips or remove_reserved_ips:
421
        net = back_network.network
422
        pool = net.get_pool()
423
        if add_reserved_ips:
424
            for ip in add_reserved_ips:
425
                pool.reserve(ip, external=True)
426
        if remove_reserved_ips:
427
            for ip in remove_reserved_ips:
428
                pool.put(ip, external=True)
429
        pool.save()
430

    
431
    if status == 'success':
432
        back_network.backendtime = etime
433
    back_network.save()
434

    
435

    
436
@transaction.commit_on_success
437
def process_create_progress(vm, etime, progress):
438

    
439
    percentage = int(progress)
440

    
441
    # The percentage may exceed 100%, due to the way
442
    # snf-image:copy-progress tracks bytes read by image handling processes
443
    percentage = 100 if percentage > 100 else percentage
444
    if percentage < 0:
445
        raise ValueError("Percentage cannot be negative")
446

    
447
    # FIXME: log a warning here, see #1033
448
#   if last_update > percentage:
449
#       raise ValueError("Build percentage should increase monotonically " \
450
#                        "(old = %d, new = %d)" % (last_update, percentage))
451

    
452
    # This assumes that no message of type 'ganeti-create-progress' is going to
453
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
454
    # the instance is STARTED.  What if the two messages are processed by two
455
    # separate dispatcher threads, and the 'ganeti-op-status' message for
456
    # successful creation gets processed before the 'ganeti-create-progress'
457
    # message? [vkoukis]
458
    #
459
    #if not vm.operstate == 'BUILD':
460
    #    raise VirtualMachine.IllegalState("VM is not in building state")
461

    
462
    vm.buildpercentage = percentage
463
    vm.backendtime = etime
464
    vm.save()
465

    
466

    
467
@transaction.commit_on_success
468
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
469
                               details=None):
470
    """
471
    Create virtual machine instance diagnostic entry.
472

473
    :param vm: VirtualMachine instance to create diagnostic for.
474
    :param message: Diagnostic message.
475
    :param source: Diagnostic source identifier (e.g. image-helper).
476
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
477
    :param etime: The time the message occured (if available).
478
    :param details: Additional details or debug information.
479
    """
480
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
481
                                                   source_date=etime,
482
                                                   message=message,
483
                                                   details=details)
484

    
485

    
486
def create_instance(vm, nics, flavor, image):
487
    """`image` is a dictionary which should contain the keys:
488
            'backend_id', 'format' and 'metadata'
489

490
        metadata value should be a dictionary.
491
    """
492

    
493
    # Handle arguments to CreateInstance() as a dictionary,
494
    # initialize it based on a deployment-specific value.
495
    # This enables the administrator to override deployment-specific
496
    # arguments, such as the disk template to use, name of os provider
497
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
498
    #
499
    kw = vm.backend.get_create_params()
500
    kw['mode'] = 'create'
501
    kw['name'] = vm.backend_vm_id
502
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
503

    
504
    kw['disk_template'] = flavor.disk_template
505
    kw['disks'] = [{"size": flavor.disk * 1024}]
506
    provider = flavor.disk_provider
507
    if provider:
508
        kw['disks'][0]['provider'] = provider
509
        kw['disks'][0]['origin'] = flavor.disk_origin
510

    
511
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
512
                  for nic in nics]
513
    backend = vm.backend
514
    depend_jobs = []
515
    for nic in nics:
516
        network = Network.objects.select_for_update().get(id=nic.network.id)
517
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
518
                                                             network=network)
519
        if bnet.operstate != "ACTIVE":
520
            if network.public:
521
                msg = "Can not connect instance to network %s. Network is not"\
522
                      " ACTIVE in backend %s." % (network, backend)
523
                raise Exception(msg)
524
            else:
525
                jobs = create_network(network, backend, connect=True)
526
                if isinstance(jobs, list):
527
                    depend_jobs.extend(jobs)
528
                else:
529
                    depend_jobs.append(jobs)
530
    kw["depends"] = [[job, ["success", "error", "canceled"]]
531
                     for job in depend_jobs]
532

    
533
    if vm.backend.use_hotplug():
534
        kw['hotplug'] = True
535
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
536
    # kw['os'] = settings.GANETI_OS_PROVIDER
537
    kw['ip_check'] = False
538
    kw['name_check'] = False
539

    
540
    # Do not specific a node explicitly, have
541
    # Ganeti use an iallocator instead
542
    #kw['pnode'] = rapi.GetNodes()[0]
543

    
544
    kw['dry_run'] = settings.TEST
545

    
546
    kw['beparams'] = {
547
        'auto_balance': True,
548
        'vcpus': flavor.cpu,
549
        'memory': flavor.ram}
550

    
551
    kw['osparams'] = {
552
        'config_url': vm.config_url,
553
        # Store image id and format to Ganeti
554
        'img_id': image['backend_id'],
555
        'img_format': image['format']}
556

    
557
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
558
    # kw['hvparams'] = dict(serial_console=False)
559

    
560
    log.debug("Creating instance %s", utils.hide_pass(kw))
561
    with pooled_rapi_client(vm) as client:
562
        return client.CreateInstance(**kw)
563

    
564

    
565
def delete_instance(vm):
566
    with pooled_rapi_client(vm) as client:
567
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
568

    
569

    
570
def reboot_instance(vm, reboot_type):
571
    assert reboot_type in ('soft', 'hard')
572
    with pooled_rapi_client(vm) as client:
573
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
574
                                     dry_run=settings.TEST)
575

    
576

    
577
def startup_instance(vm):
578
    with pooled_rapi_client(vm) as client:
579
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
580

    
581

    
582
def shutdown_instance(vm):
583
    with pooled_rapi_client(vm) as client:
584
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
585

    
586

    
587
def resize_instance(vm, vcpus, memory):
588
    beparams = {"vcpus": int(vcpus),
589
                "minmem": int(memory),
590
                "maxmem": int(memory)}
591
    with pooled_rapi_client(vm) as client:
592
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
593

    
594

    
595
def get_instance_console(vm):
596
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
597
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
598
    # useless (see #783).
599
    #
600
    # Until this is fixed on the Ganeti side, construct a console info reply
601
    # directly.
602
    #
603
    # WARNING: This assumes that VNC runs on port network_port on
604
    #          the instance's primary node, and is probably
605
    #          hypervisor-specific.
606
    #
607
    log.debug("Getting console for vm %s", vm)
608

    
609
    console = {}
610
    console['kind'] = 'vnc'
611

    
612
    with pooled_rapi_client(vm) as client:
613
        i = client.GetInstance(vm.backend_vm_id)
614

    
615
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
616
        raise Exception("hv parameter serial_console cannot be true")
617
    console['host'] = i['pnode']
618
    console['port'] = i['network_port']
619

    
620
    return console
621

    
622

    
623
def get_instance_info(vm):
624
    with pooled_rapi_client(vm) as client:
625
        return client.GetInstanceInfo(vm.backend_vm_id)
626

    
627

    
628
def create_network(network, backend, connect=True):
629
    """Create a network in a Ganeti backend"""
630
    log.debug("Creating network %s in backend %s", network, backend)
631

    
632
    job_id = _create_network(network, backend)
633

    
634
    if connect:
635
        job_ids = connect_network(network, backend, depends=[job_id])
636
        return job_ids
637
    else:
638
        return [job_id]
639

    
640

    
641
def _create_network(network, backend):
642
    """Create a network."""
643

    
644
    network_type = network.public and 'public' or 'private'
645

    
646
    tags = network.backend_tag
647
    if network.dhcp:
648
        tags.append('nfdhcpd')
649

    
650
    if network.public:
651
        conflicts_check = True
652
    else:
653
        conflicts_check = False
654

    
655
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
656
    # not support IPv6 only networks. To bypass this limitation, we create the
657
    # network with a dummy network subnet, and make Cyclades connect instances
658
    # to such networks, with address=None.
659
    subnet = network.subnet
660
    if subnet is None:
661
        subnet = "10.0.0.0/24"
662

    
663
    try:
664
        bn = BackendNetwork.objects.get(network=network, backend=backend)
665
        mac_prefix = bn.mac_prefix
666
    except BackendNetwork.DoesNotExist:
667
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
668
                        " does not exist" % (network.id, backend.id))
669

    
670
    with pooled_rapi_client(backend) as client:
671
        return client.CreateNetwork(network_name=network.backend_id,
672
                                    network=subnet,
673
                                    network6=network.subnet6,
674
                                    gateway=network.gateway,
675
                                    gateway6=network.gateway6,
676
                                    network_type=network_type,
677
                                    mac_prefix=mac_prefix,
678
                                    conflicts_check=conflicts_check,
679
                                    tags=tags)
680

    
681

    
682
def connect_network(network, backend, depends=[], group=None):
683
    """Connect a network to nodegroups."""
684
    log.debug("Connecting network %s to backend %s", network, backend)
685

    
686
    if network.public:
687
        conflicts_check = True
688
    else:
689
        conflicts_check = False
690

    
691
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
692
    with pooled_rapi_client(backend) as client:
693
        groups = [group] if group is not None else client.GetGroups()
694
        job_ids = []
695
        for group in groups:
696
            job_id = client.ConnectNetwork(network.backend_id, group,
697
                                           network.mode, network.link,
698
                                           conflicts_check,
699
                                           depends=depends)
700
            job_ids.append(job_id)
701
    return job_ids
702

    
703

    
704
def delete_network(network, backend, disconnect=True):
705
    log.debug("Deleting network %s from backend %s", network, backend)
706

    
707
    depends = []
708
    if disconnect:
709
        depends = disconnect_network(network, backend)
710
    _delete_network(network, backend, depends=depends)
711

    
712

    
713
def _delete_network(network, backend, depends=[]):
714
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
715
    with pooled_rapi_client(backend) as client:
716
        return client.DeleteNetwork(network.backend_id, depends)
717

    
718

    
719
def disconnect_network(network, backend, group=None):
720
    log.debug("Disconnecting network %s to backend %s", network, backend)
721

    
722
    with pooled_rapi_client(backend) as client:
723
        groups = [group] if group is not None else client.GetGroups()
724
        job_ids = []
725
        for group in groups:
726
            job_id = client.DisconnectNetwork(network.backend_id, group)
727
            job_ids.append(job_id)
728
    return job_ids
729

    
730

    
731
def connect_to_network(vm, network, address=None):
732
    backend = vm.backend
733
    network = Network.objects.select_for_update().get(id=network.id)
734
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
735
                                                         network=network)
736
    depend_jobs = []
737
    if bnet.operstate != "ACTIVE":
738
        depend_jobs = create_network(network, backend, connect=True)
739

    
740
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
741

    
742
    nic = {'ip': address, 'network': network.backend_id}
743

    
744
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
745

    
746
    with pooled_rapi_client(vm) as client:
747
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
748
                                     hotplug=vm.backend.use_hotplug(),
749
                                     depends=depends,
750
                                     dry_run=settings.TEST)
751

    
752

    
753
def disconnect_from_network(vm, nic):
754
    op = [('remove', nic.index, {})]
755

    
756
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
757

    
758
    with pooled_rapi_client(vm) as client:
759
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
760
                                     hotplug=vm.backend.use_hotplug(),
761
                                     dry_run=settings.TEST)
762

    
763

    
764
def set_firewall_profile(vm, profile, index=0):
765
    try:
766
        tag = _firewall_tags[profile] % index
767
    except KeyError:
768
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
769

    
770
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
771

    
772
    with pooled_rapi_client(vm) as client:
773
        # Delete previous firewall tags
774
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
775
        delete_tags = [(t % index) for t in _firewall_tags.values()
776
                       if (t % index) in old_tags]
777
        if delete_tags:
778
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
779
                                      dry_run=settings.TEST)
780

    
781
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
782

    
783
        # XXX NOP ModifyInstance call to force process_net_status to run
784
        # on the dispatcher
785
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
786
        client.ModifyInstance(vm.backend_vm_id,
787
                              os_name=os_name)
788
    return None
789

    
790

    
791
def get_instances(backend, bulk=True):
792
    with pooled_rapi_client(backend) as c:
793
        return c.GetInstances(bulk=bulk)
794

    
795

    
796
def get_nodes(backend, bulk=True):
797
    with pooled_rapi_client(backend) as c:
798
        return c.GetNodes(bulk=bulk)
799

    
800

    
801
def get_jobs(backend, bulk=True):
802
    with pooled_rapi_client(backend) as c:
803
        return c.GetJobs(bulk=bulk)
804

    
805

    
806
def get_physical_resources(backend):
807
    """ Get the physical resources of a backend.
808

809
    Get the resources of a backend as reported by the backend (not the db).
810

811
    """
812
    nodes = get_nodes(backend, bulk=True)
813
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
814
    res = {}
815
    for a in attr:
816
        res[a] = 0
817
    for n in nodes:
818
        # Filter out drained, offline and not vm_capable nodes since they will
819
        # not take part in the vm allocation process
820
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
821
        if can_host_vms and n['cnodes']:
822
            for a in attr:
823
                res[a] += int(n[a])
824
    return res
825

    
826

    
827
def update_resources(backend, resources=None):
828
    """ Update the state of the backend resources in db.
829

830
    """
831

    
832
    if not resources:
833
        resources = get_physical_resources(backend)
834

    
835
    backend.mfree = resources['mfree']
836
    backend.mtotal = resources['mtotal']
837
    backend.dfree = resources['dfree']
838
    backend.dtotal = resources['dtotal']
839
    backend.pinst_cnt = resources['pinst_cnt']
840
    backend.ctotal = resources['ctotal']
841
    backend.updated = datetime.now()
842
    backend.save()
843

    
844

    
845
def get_memory_from_instances(backend):
846
    """ Get the memory that is used from instances.
847

848
    Get the used memory of a backend. Note: This is different for
849
    the real memory used, due to kvm's memory de-duplication.
850

851
    """
852
    with pooled_rapi_client(backend) as client:
853
        instances = client.GetInstances(bulk=True)
854
    mem = 0
855
    for i in instances:
856
        mem += i['oper_ram']
857
    return mem
858

    
859
##
860
## Synchronized operations for reconciliation
861
##
862

    
863

    
864
def create_network_synced(network, backend):
865
    result = _create_network_synced(network, backend)
866
    if result[0] != 'success':
867
        return result
868
    result = connect_network_synced(network, backend)
869
    return result
870

    
871

    
872
def _create_network_synced(network, backend):
873
    with pooled_rapi_client(backend) as client:
874
        job = _create_network(network, backend)
875
        result = wait_for_job(client, job)
876
    return result
877

    
878

    
879
def connect_network_synced(network, backend):
880
    with pooled_rapi_client(backend) as client:
881
        for group in client.GetGroups():
882
            job = client.ConnectNetwork(network.backend_id, group,
883
                                        network.mode, network.link)
884
            result = wait_for_job(client, job)
885
            if result[0] != 'success':
886
                return result
887

    
888
    return result
889

    
890

    
891
def wait_for_job(client, jobid):
892
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
893
    status = result['job_info'][0]
894
    while status not in ['success', 'error', 'cancel']:
895
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
896
                                         [result], None)
897
        status = result['job_info'][0]
898

    
899
    if status == 'success':
900
        return (status, None)
901
    else:
902
        error = result['job_info'][1]
903
        return (status, error)