Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 4a990147

History | View | Annotate | Download (31.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if status == 'success' or (status == 'error' and
163
                                   vm.operstate == 'ERROR'):
164
            # VM has been deleted. Release the instance IPs
165
            release_instance_ips(vm, [])
166
            # And delete the releated NICs (must be performed after release!)
167
            vm.nics.all().delete()
168
            vm.deleted = True
169
            vm.operstate = state_for_success
170
            vm.backendtime = etime
171
            status = "success"
172

    
173
    if status in ["success", "error", "canceled"]:
174
        # Job is finalized: Handle quotas/commissioning
175
        job_fields = {"nics": nics, "beparams": beparams}
176
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
177
                              job_status=status, job_fields=job_fields)
178
        # and clear task fields
179
        if vm.task_job_id == jobid:
180
            vm.task = None
181
            vm.task_job_id = None
182

    
183
    vm.save()
184

    
185

    
186
def _process_resize(vm, beparams):
187
    """Change flavor of a VirtualMachine based on new beparams."""
188
    old_flavor = vm.flavor
189
    vcpus = beparams.get("vcpus", old_flavor.cpu)
190
    ram = beparams.get("maxmem", old_flavor.ram)
191
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
192
        return
193
    try:
194
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
195
                                        disk=old_flavor.disk,
196
                                        disk_template=old_flavor.disk_template)
197
    except Flavor.DoesNotExist:
198
        raise Exception("Can not find flavor for VM")
199
    vm.flavor = new_flavor
200
    vm.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_net_status(vm, etime, nics):
205
    """Wrap _process_net_status inside transaction."""
206
    _process_net_status(vm, etime, nics)
207

    
208

    
209
def _process_net_status(vm, etime, nics):
210
    """Process a net status notification from the backend
211

212
    Process an incoming message from the Ganeti backend,
213
    detailing the NIC configuration of a VM instance.
214

215
    Update the state of the VM in the DB accordingly.
216
    """
217

    
218
    ganeti_nics = process_ganeti_nics(nics)
219
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
220
        log.debug("NICs for VM %s have not changed", vm)
221
        return
222

    
223
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
224
    # guarantee that no deadlock will occur with Backend allocator.
225
    Backend.objects.select_for_update().get(id=vm.backend_id)
226

    
227
    # NICs have changed. Release the instance IPs
228
    release_instance_ips(vm, ganeti_nics)
229
    # And delete the releated NICs (must be performed after release!)
230
    vm.nics.all().delete()
231

    
232
    for nic in ganeti_nics:
233
        ipv4 = nic.get('ipv4', '')
234
        net = nic['network']
235
        if ipv4:
236
            net.reserve_address(ipv4)
237

    
238
        nic['dirty'] = False
239
        vm.nics.create(**nic)
240
        # Dummy save the network, because UI uses changed-since for VMs
241
        # and Networks in order to show the VM NICs
242
        net.save()
243

    
244
    vm.backendtime = etime
245
    vm.save()
246

    
247

    
248
def process_ganeti_nics(ganeti_nics):
249
    """Process NIC dict from ganeti hooks."""
250
    new_nics = []
251
    for i, new_nic in enumerate(ganeti_nics):
252
        network = new_nic.get('network', '')
253
        n = str(network)
254
        pk = utils.id_from_network_name(n)
255

    
256
        net = Network.objects.get(pk=pk)
257

    
258
        # Get the new nic info
259
        mac = new_nic.get('mac', '')
260
        ipv4 = new_nic.get('ip', '')
261
        if net.subnet6:
262
            ipv6 = mac2eui64(mac, net.subnet6)
263
        else:
264
            ipv6 = ''
265

    
266
        firewall = new_nic.get('firewall', '')
267
        firewall_profile = _reverse_tags.get(firewall, '')
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            # Get X-Lock before searching floating IP, to exclusively search
304
            # and release floating IP. Otherwise you may release a floating IP
305
            # that has been just reserved.
306
            net = Network.objects.select_for_update().get(id=network_id)
307
            if net.floating_ip_pool:
308
                try:
309
                    floating_ip = net.floating_ips.select_for_update()\
310
                                                  .get(ipv4=ipv4, machine=vm,
311
                                                       deleted=False)
312
                    floating_ip.machine = None
313
                    floating_ip.save()
314
                except FloatingIP.DoesNotExist:
315
                    net.release_address(ipv4)
316
            else:
317
                net.release_address(ipv4)
318

    
319

    
320
@transaction.commit_on_success
321
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
322
    if status not in [x[0] for x in BACKEND_STATUSES]:
323
        raise Network.InvalidBackendMsgError(opcode, status)
324

    
325
    back_network.backendjobid = jobid
326
    back_network.backendjobstatus = status
327
    back_network.backendopcode = opcode
328
    back_network.backendlogmsg = logmsg
329

    
330
    # Notifications of success change the operating state
331
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
332
    if status == 'success' and state_for_success is not None:
333
        back_network.operstate = state_for_success
334

    
335
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
336
        back_network.operstate = 'ERROR'
337
        back_network.backendtime = etime
338

    
339
    if opcode == 'OP_NETWORK_REMOVE':
340
        if status == 'success' or (status == 'error' and
341
                                   back_network.operstate == 'ERROR'):
342
            back_network.operstate = state_for_success
343
            back_network.deleted = True
344
            back_network.backendtime = etime
345

    
346
    if status == 'success':
347
        back_network.backendtime = etime
348
    back_network.save()
349
    # Also you must update the state of the Network!!
350
    update_network_state(back_network.network)
351

    
352

    
353
def update_network_state(network):
354
    """Update the state of a Network based on BackendNetwork states.
355

356
    Update the state of a Network based on the operstate of the networks in the
357
    backends that network exists.
358

359
    The state of the network is:
360
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
361
    * DELETED: If it is is 'DELETED' in all backends that have been created.
362

363
    This function also releases the resources (MAC prefix or Bridge) and the
364
    quotas for the network.
365

366
    """
367
    if network.deleted:
368
        # Network has already been deleted. Just assert that state is also
369
        # DELETED
370
        if not network.state == "DELETED":
371
            network.state = "DELETED"
372
            network.save()
373
        return
374

    
375
    backend_states = [s.operstate for s in network.backend_networks.all()]
376
    if not backend_states and network.action != "DESTROY":
377
        if network.state != "ACTIVE":
378
            network.state = "ACTIVE"
379
            network.save()
380
            return
381

    
382
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
383
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
384
                     "DELETED")
385

    
386
    # Release the resources on the deletion of the Network
387
    if deleted:
388
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
389
                 network.id, network.mac_prefix, network.link)
390
        network.deleted = True
391
        network.state = "DELETED"
392
        if network.mac_prefix:
393
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
394
                release_resource(res_type="mac_prefix",
395
                                 value=network.mac_prefix)
396
        if network.link:
397
            if network.FLAVORS[network.flavor]["link"] == "pool":
398
                release_resource(res_type="bridge", value=network.link)
399

    
400
        # Issue commission
401
        if network.userid:
402
            quotas.issue_and_accept_commission(network, delete=True)
403
        elif not network.public:
404
            log.warning("Network %s does not have an owner!", network.id)
405
    network.save()
406

    
407

    
408
@transaction.commit_on_success
409
def process_network_modify(back_network, etime, jobid, opcode, status,
410
                           add_reserved_ips, remove_reserved_ips):
411
    assert (opcode == "OP_NETWORK_SET_PARAMS")
412
    if status not in [x[0] for x in BACKEND_STATUSES]:
413
        raise Network.InvalidBackendMsgError(opcode, status)
414

    
415
    back_network.backendjobid = jobid
416
    back_network.backendjobstatus = status
417
    back_network.opcode = opcode
418

    
419
    if add_reserved_ips or remove_reserved_ips:
420
        net = back_network.network
421
        pool = net.get_pool()
422
        if add_reserved_ips:
423
            for ip in add_reserved_ips:
424
                pool.reserve(ip, external=True)
425
        if remove_reserved_ips:
426
            for ip in remove_reserved_ips:
427
                pool.put(ip, external=True)
428
        pool.save()
429

    
430
    if status == 'success':
431
        back_network.backendtime = etime
432
    back_network.save()
433

    
434

    
435
@transaction.commit_on_success
436
def process_create_progress(vm, etime, progress):
437

    
438
    percentage = int(progress)
439

    
440
    # The percentage may exceed 100%, due to the way
441
    # snf-image:copy-progress tracks bytes read by image handling processes
442
    percentage = 100 if percentage > 100 else percentage
443
    if percentage < 0:
444
        raise ValueError("Percentage cannot be negative")
445

    
446
    # FIXME: log a warning here, see #1033
447
#   if last_update > percentage:
448
#       raise ValueError("Build percentage should increase monotonically " \
449
#                        "(old = %d, new = %d)" % (last_update, percentage))
450

    
451
    # This assumes that no message of type 'ganeti-create-progress' is going to
452
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
453
    # the instance is STARTED.  What if the two messages are processed by two
454
    # separate dispatcher threads, and the 'ganeti-op-status' message for
455
    # successful creation gets processed before the 'ganeti-create-progress'
456
    # message? [vkoukis]
457
    #
458
    #if not vm.operstate == 'BUILD':
459
    #    raise VirtualMachine.IllegalState("VM is not in building state")
460

    
461
    vm.buildpercentage = percentage
462
    vm.backendtime = etime
463
    vm.save()
464

    
465

    
466
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
467
                               details=None):
468
    """
469
    Create virtual machine instance diagnostic entry.
470

471
    :param vm: VirtualMachine instance to create diagnostic for.
472
    :param message: Diagnostic message.
473
    :param source: Diagnostic source identifier (e.g. image-helper).
474
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
475
    :param etime: The time the message occured (if available).
476
    :param details: Additional details or debug information.
477
    """
478
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
479
                                                   source_date=etime,
480
                                                   message=message,
481
                                                   details=details)
482

    
483

    
484
def create_instance(vm, nics, flavor, image):
485
    """`image` is a dictionary which should contain the keys:
486
            'backend_id', 'format' and 'metadata'
487

488
        metadata value should be a dictionary.
489
    """
490

    
491
    # Handle arguments to CreateInstance() as a dictionary,
492
    # initialize it based on a deployment-specific value.
493
    # This enables the administrator to override deployment-specific
494
    # arguments, such as the disk template to use, name of os provider
495
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
496
    #
497
    kw = vm.backend.get_create_params()
498
    kw['mode'] = 'create'
499
    kw['name'] = vm.backend_vm_id
500
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
501

    
502
    kw['disk_template'] = flavor.disk_template
503
    kw['disks'] = [{"size": flavor.disk * 1024}]
504
    provider = flavor.disk_provider
505
    if provider:
506
        kw['disks'][0]['provider'] = provider
507
        kw['disks'][0]['origin'] = flavor.disk_origin
508

    
509
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
510
                  for nic in nics]
511
    backend = vm.backend
512
    depend_jobs = []
513
    for nic in nics:
514
        network = Network.objects.select_for_update().get(id=nic.network.id)
515
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
516
                                                             network=network)
517
        if bnet.operstate != "ACTIVE":
518
            if network.public:
519
                # TODO: What to raise here ?
520
                raise Exception("LALA")
521
            else:
522
                jobs = create_network(network, backend, connect=True)
523
                if isinstance(jobs, list):
524
                    depend_jobs.extend(jobs)
525
                else:
526
                    depend_jobs.append(jobs)
527
    kw["depends"] = [[job, ["success", "error", "canceled"]]
528
                     for job in depend_jobs]
529

    
530
    if vm.backend.use_hotplug():
531
        kw['hotplug'] = True
532
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
533
    # kw['os'] = settings.GANETI_OS_PROVIDER
534
    kw['ip_check'] = False
535
    kw['name_check'] = False
536

    
537
    # Do not specific a node explicitly, have
538
    # Ganeti use an iallocator instead
539
    #kw['pnode'] = rapi.GetNodes()[0]
540

    
541
    kw['dry_run'] = settings.TEST
542

    
543
    kw['beparams'] = {
544
        'auto_balance': True,
545
        'vcpus': flavor.cpu,
546
        'memory': flavor.ram}
547

    
548
    kw['osparams'] = {
549
        'config_url': vm.config_url,
550
        # Store image id and format to Ganeti
551
        'img_id': image['backend_id'],
552
        'img_format': image['format']}
553

    
554
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
555
    # kw['hvparams'] = dict(serial_console=False)
556

    
557
    log.debug("Creating instance %s", utils.hide_pass(kw))
558
    with pooled_rapi_client(vm) as client:
559
        return client.CreateInstance(**kw)
560

    
561

    
562
def delete_instance(vm):
563
    with pooled_rapi_client(vm) as client:
564
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
565

    
566

    
567
def reboot_instance(vm, reboot_type):
568
    assert reboot_type in ('soft', 'hard')
569
    with pooled_rapi_client(vm) as client:
570
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
571
                                     dry_run=settings.TEST)
572

    
573

    
574
def startup_instance(vm):
575
    with pooled_rapi_client(vm) as client:
576
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
577

    
578

    
579
def shutdown_instance(vm):
580
    with pooled_rapi_client(vm) as client:
581
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
582

    
583

    
584
def resize_instance(vm, vcpus, memory):
585
    beparams = {"vcpus": int(vcpus),
586
                "minmem": int(memory),
587
                "maxmem": int(memory)}
588
    with pooled_rapi_client(vm) as client:
589
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
590

    
591

    
592
def get_instance_console(vm):
593
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
594
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
595
    # useless (see #783).
596
    #
597
    # Until this is fixed on the Ganeti side, construct a console info reply
598
    # directly.
599
    #
600
    # WARNING: This assumes that VNC runs on port network_port on
601
    #          the instance's primary node, and is probably
602
    #          hypervisor-specific.
603
    #
604
    log.debug("Getting console for vm %s", vm)
605

    
606
    console = {}
607
    console['kind'] = 'vnc'
608

    
609
    with pooled_rapi_client(vm) as client:
610
        i = client.GetInstance(vm.backend_vm_id)
611

    
612
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
613
        raise Exception("hv parameter serial_console cannot be true")
614
    console['host'] = i['pnode']
615
    console['port'] = i['network_port']
616

    
617
    return console
618

    
619

    
620
def get_instance_info(vm):
621
    with pooled_rapi_client(vm) as client:
622
        return client.GetInstanceInfo(vm.backend_vm_id)
623

    
624

    
625
def create_network(network, backend, connect=True):
626
    """Create a network in a Ganeti backend"""
627
    log.debug("Creating network %s in backend %s", network, backend)
628

    
629
    job_id = _create_network(network, backend)
630

    
631
    if connect:
632
        job_ids = connect_network(network, backend, depends=[job_id])
633
        return job_ids
634
    else:
635
        return [job_id]
636

    
637

    
638
def _create_network(network, backend):
639
    """Create a network."""
640

    
641
    network_type = network.public and 'public' or 'private'
642

    
643
    tags = network.backend_tag
644
    if network.dhcp:
645
        tags.append('nfdhcpd')
646

    
647
    if network.public:
648
        conflicts_check = True
649
    else:
650
        conflicts_check = False
651

    
652
    try:
653
        bn = BackendNetwork.objects.get(network=network, backend=backend)
654
        mac_prefix = bn.mac_prefix
655
    except BackendNetwork.DoesNotExist:
656
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
657
                        " does not exist" % (network.id, backend.id))
658

    
659
    with pooled_rapi_client(backend) as client:
660
        return client.CreateNetwork(network_name=network.backend_id,
661
                                    network=network.subnet,
662
                                    network6=network.subnet6,
663
                                    gateway=network.gateway,
664
                                    gateway6=network.gateway6,
665
                                    network_type=network_type,
666
                                    mac_prefix=mac_prefix,
667
                                    conflicts_check=conflicts_check,
668
                                    tags=tags)
669

    
670

    
671
def connect_network(network, backend, depends=[], group=None):
672
    """Connect a network to nodegroups."""
673
    log.debug("Connecting network %s to backend %s", network, backend)
674

    
675
    if network.public:
676
        conflicts_check = True
677
    else:
678
        conflicts_check = False
679

    
680
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
681
    with pooled_rapi_client(backend) as client:
682
        groups = [group] if group is not None else client.GetGroups()
683
        job_ids = []
684
        for group in groups:
685
            job_id = client.ConnectNetwork(network.backend_id, group,
686
                                           network.mode, network.link,
687
                                           conflicts_check,
688
                                           depends=depends)
689
            job_ids.append(job_id)
690
    return job_ids
691

    
692

    
693
def delete_network(network, backend, disconnect=True):
694
    log.debug("Deleting network %s from backend %s", network, backend)
695

    
696
    depends = []
697
    if disconnect:
698
        depends = disconnect_network(network, backend)
699
    _delete_network(network, backend, depends=depends)
700

    
701

    
702
def _delete_network(network, backend, depends=[]):
703
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
704
    with pooled_rapi_client(backend) as client:
705
        return client.DeleteNetwork(network.backend_id, depends)
706

    
707

    
708
def disconnect_network(network, backend, group=None):
709
    log.debug("Disconnecting network %s to backend %s", network, backend)
710

    
711
    with pooled_rapi_client(backend) as client:
712
        groups = [group] if group is not None else client.GetGroups()
713
        job_ids = []
714
        for group in groups:
715
            job_id = client.DisconnectNetwork(network.backend_id, group)
716
            job_ids.append(job_id)
717
    return job_ids
718

    
719

    
720
def connect_to_network(vm, network, address=None):
721
    backend = vm.backend
722
    network = Network.objects.select_for_update().get(id=network.id)
723
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
724
                                                         network=network)
725
    depend_jobs = []
726
    if bnet.operstate != "ACTIVE":
727
        depend_jobs = create_network(network, backend, connect=True)
728

    
729
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
730

    
731
    nic = {'ip': address, 'network': network.backend_id}
732

    
733
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
734

    
735
    with pooled_rapi_client(vm) as client:
736
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
737
                                     hotplug=vm.backend.use_hotplug(),
738
                                     depends=depends,
739
                                     dry_run=settings.TEST)
740

    
741

    
742
def disconnect_from_network(vm, nic):
743
    op = [('remove', nic.index, {})]
744

    
745
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
746

    
747
    with pooled_rapi_client(vm) as client:
748
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
749
                                     hotplug=vm.backend.use_hotplug(),
750
                                     dry_run=settings.TEST)
751

    
752

    
753
def set_firewall_profile(vm, profile):
754
    try:
755
        tag = _firewall_tags[profile]
756
    except KeyError:
757
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
758

    
759
    log.debug("Setting tag of VM %s to %s", vm, profile)
760

    
761
    with pooled_rapi_client(vm) as client:
762
        # Delete all firewall tags
763
        for t in _firewall_tags.values():
764
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
765
                                      dry_run=settings.TEST)
766

    
767
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
768

    
769
        # XXX NOP ModifyInstance call to force process_net_status to run
770
        # on the dispatcher
771
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
772
        client.ModifyInstance(vm.backend_vm_id,
773
                              os_name=os_name)
774
    return None
775

    
776

    
777
def get_instances(backend, bulk=True):
778
    with pooled_rapi_client(backend) as c:
779
        return c.GetInstances(bulk=bulk)
780

    
781

    
782
def get_nodes(backend, bulk=True):
783
    with pooled_rapi_client(backend) as c:
784
        return c.GetNodes(bulk=bulk)
785

    
786

    
787
def get_jobs(backend):
788
    with pooled_rapi_client(backend) as c:
789
        return c.GetJobs()
790

    
791

    
792
def get_physical_resources(backend):
793
    """ Get the physical resources of a backend.
794

795
    Get the resources of a backend as reported by the backend (not the db).
796

797
    """
798
    nodes = get_nodes(backend, bulk=True)
799
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
800
    res = {}
801
    for a in attr:
802
        res[a] = 0
803
    for n in nodes:
804
        # Filter out drained, offline and not vm_capable nodes since they will
805
        # not take part in the vm allocation process
806
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
807
        if can_host_vms and n['cnodes']:
808
            for a in attr:
809
                res[a] += int(n[a])
810
    return res
811

    
812

    
813
def update_resources(backend, resources=None):
814
    """ Update the state of the backend resources in db.
815

816
    """
817

    
818
    if not resources:
819
        resources = get_physical_resources(backend)
820

    
821
    backend.mfree = resources['mfree']
822
    backend.mtotal = resources['mtotal']
823
    backend.dfree = resources['dfree']
824
    backend.dtotal = resources['dtotal']
825
    backend.pinst_cnt = resources['pinst_cnt']
826
    backend.ctotal = resources['ctotal']
827
    backend.updated = datetime.now()
828
    backend.save()
829

    
830

    
831
def get_memory_from_instances(backend):
832
    """ Get the memory that is used from instances.
833

834
    Get the used memory of a backend. Note: This is different for
835
    the real memory used, due to kvm's memory de-duplication.
836

837
    """
838
    with pooled_rapi_client(backend) as client:
839
        instances = client.GetInstances(bulk=True)
840
    mem = 0
841
    for i in instances:
842
        mem += i['oper_ram']
843
    return mem
844

    
845
##
846
## Synchronized operations for reconciliation
847
##
848

    
849

    
850
def create_network_synced(network, backend):
851
    result = _create_network_synced(network, backend)
852
    if result[0] != 'success':
853
        return result
854
    result = connect_network_synced(network, backend)
855
    return result
856

    
857

    
858
def _create_network_synced(network, backend):
859
    with pooled_rapi_client(backend) as client:
860
        job = _create_network(network, backend)
861
        result = wait_for_job(client, job)
862
    return result
863

    
864

    
865
def connect_network_synced(network, backend):
866
    with pooled_rapi_client(backend) as client:
867
        for group in client.GetGroups():
868
            job = client.ConnectNetwork(network.backend_id, group,
869
                                        network.mode, network.link)
870
            result = wait_for_job(client, job)
871
            if result[0] != 'success':
872
                return result
873

    
874
    return result
875

    
876

    
877
def wait_for_job(client, jobid):
878
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
879
    status = result['job_info'][0]
880
    while status not in ['success', 'error', 'cancel']:
881
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
882
                                         [result], None)
883
        status = result['job_info'][0]
884

    
885
    if status == 'success':
886
        return (status, None)
887
    else:
888
        error = result['job_info'][1]
889
        return (status, error)