Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ f8675683

History | View | Annotate | Download (31.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if status == 'success' or (status == 'error' and
163
                                   vm.operstate == 'ERROR'):
164
            # VM has been deleted. Release the instance IPs
165
            release_instance_ips(vm, [])
166
            # And delete the releated NICs (must be performed after release!)
167
            vm.nics.all().delete()
168
            vm.deleted = True
169
            vm.operstate = state_for_success
170
            vm.backendtime = etime
171
            status = "success"
172

    
173
    if status in ["success", "error", "canceled"]:
174
        # Job is finalized: Handle quotas/commissioning
175
        job_fields = {"nics": nics, "beparams": beparams}
176
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
177
                              job_status=status, job_fields=job_fields)
178
        # and clear task fields
179
        if vm.task_job_id == jobid:
180
            vm.task = None
181
            vm.task_job_id = None
182

    
183
    vm.save()
184

    
185

    
186
def _process_resize(vm, beparams):
187
    """Change flavor of a VirtualMachine based on new beparams."""
188
    old_flavor = vm.flavor
189
    vcpus = beparams.get("vcpus", old_flavor.cpu)
190
    ram = beparams.get("maxmem", old_flavor.ram)
191
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
192
        return
193
    try:
194
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
195
                                        disk=old_flavor.disk,
196
                                        disk_template=old_flavor.disk_template)
197
    except Flavor.DoesNotExist:
198
        raise Exception("Can not find flavor for VM")
199
    vm.flavor = new_flavor
200
    vm.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_net_status(vm, etime, nics):
205
    """Wrap _process_net_status inside transaction."""
206
    _process_net_status(vm, etime, nics)
207

    
208

    
209
def _process_net_status(vm, etime, nics):
210
    """Process a net status notification from the backend
211

212
    Process an incoming message from the Ganeti backend,
213
    detailing the NIC configuration of a VM instance.
214

215
    Update the state of the VM in the DB accordingly.
216
    """
217

    
218
    ganeti_nics = process_ganeti_nics(nics)
219
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
220
        log.debug("NICs for VM %s have not changed", vm)
221
        return
222

    
223
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
224
    # guarantee that no deadlock will occur with Backend allocator.
225
    Backend.objects.select_for_update().get(id=vm.backend_id)
226

    
227
    # NICs have changed. Release the instance IPs
228
    release_instance_ips(vm, ganeti_nics)
229
    # And delete the releated NICs (must be performed after release!)
230
    vm.nics.all().delete()
231

    
232
    for nic in ganeti_nics:
233
        ipv4 = nic.get('ipv4', '')
234
        net = nic['network']
235
        if ipv4:
236
            net.reserve_address(ipv4)
237

    
238
        nic['dirty'] = False
239
        vm.nics.create(**nic)
240
        # Dummy save the network, because UI uses changed-since for VMs
241
        # and Networks in order to show the VM NICs
242
        net.save()
243

    
244
    vm.backendtime = etime
245
    vm.save()
246

    
247

    
248
def process_ganeti_nics(ganeti_nics):
249
    """Process NIC dict from ganeti hooks."""
250
    new_nics = []
251
    for i, new_nic in enumerate(ganeti_nics):
252
        network = new_nic.get('network', '')
253
        n = str(network)
254
        pk = utils.id_from_network_name(n)
255

    
256
        net = Network.objects.get(pk=pk)
257

    
258
        # Get the new nic info
259
        mac = new_nic.get('mac', '')
260
        ipv4 = new_nic.get('ip', '')
261
        if net.subnet6:
262
            ipv6 = mac2eui64(mac, net.subnet6)
263
        else:
264
            ipv6 = ''
265

    
266
        firewall = new_nic.get('firewall', '')
267
        firewall_profile = _reverse_tags.get(firewall, '')
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            net = Network.objects.get(id=network_id)
304
            # Important: Take exclusive lock in pool before checking if there
305
            # is a floating IP with this ipv4 address, otherwise there is a
306
            # race condition, where you may release a floating IP that has been
307
            # created after search floating IPs and before you get exclusively
308
            # the pool
309
            pool = net.get_pool()
310
            try:
311
                floating_ip = net.floating_ips.select_for_update()\
312
                                              .get(ipv4=ipv4, machine=vm,
313
                                                   deleted=False)
314
                floating_ip.machine = None
315
                floating_ip.save()
316
            except FloatingIP.DoesNotExist:
317
                net.release_address(ipv4)
318
                pool.save()
319

    
320

    
321
@transaction.commit_on_success
322
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
323
    if status not in [x[0] for x in BACKEND_STATUSES]:
324
        raise Network.InvalidBackendMsgError(opcode, status)
325

    
326
    back_network.backendjobid = jobid
327
    back_network.backendjobstatus = status
328
    back_network.backendopcode = opcode
329
    back_network.backendlogmsg = logmsg
330

    
331
    # Notifications of success change the operating state
332
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
333
    if status == 'success' and state_for_success is not None:
334
        back_network.operstate = state_for_success
335

    
336
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
337
        back_network.operstate = 'ERROR'
338
        back_network.backendtime = etime
339

    
340
    if opcode == 'OP_NETWORK_REMOVE':
341
        if status == 'success' or (status == 'error' and
342
                                   back_network.operstate == 'ERROR'):
343
            back_network.operstate = state_for_success
344
            back_network.deleted = True
345
            back_network.backendtime = etime
346

    
347
    if status == 'success':
348
        back_network.backendtime = etime
349
    back_network.save()
350
    # Also you must update the state of the Network!!
351
    update_network_state(back_network.network)
352

    
353

    
354
def update_network_state(network):
355
    """Update the state of a Network based on BackendNetwork states.
356

357
    Update the state of a Network based on the operstate of the networks in the
358
    backends that network exists.
359

360
    The state of the network is:
361
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
362
    * DELETED: If it is is 'DELETED' in all backends that have been created.
363

364
    This function also releases the resources (MAC prefix or Bridge) and the
365
    quotas for the network.
366

367
    """
368
    if network.deleted:
369
        # Network has already been deleted. Just assert that state is also
370
        # DELETED
371
        if not network.state == "DELETED":
372
            network.state = "DELETED"
373
            network.save()
374
        return
375

    
376
    backend_states = [s.operstate for s in network.backend_networks.all()]
377
    if not backend_states and network.action != "DESTROY":
378
        if network.state != "ACTIVE":
379
            network.state = "ACTIVE"
380
            network.save()
381
            return
382

    
383
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
384
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
385
                     "DELETED")
386

    
387
    # Release the resources on the deletion of the Network
388
    if deleted:
389
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
390
                 network.id, network.mac_prefix, network.link)
391
        network.deleted = True
392
        network.state = "DELETED"
393
        if network.mac_prefix:
394
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
395
                release_resource(res_type="mac_prefix",
396
                                 value=network.mac_prefix)
397
        if network.link:
398
            if network.FLAVORS[network.flavor]["link"] == "pool":
399
                release_resource(res_type="bridge", value=network.link)
400

    
401
        # Issue commission
402
        if network.userid:
403
            quotas.issue_and_accept_commission(network, delete=True)
404
        elif not network.public:
405
            log.warning("Network %s does not have an owner!", network.id)
406
    network.save()
407

    
408

    
409
@transaction.commit_on_success
410
def process_network_modify(back_network, etime, jobid, opcode, status,
411
                           add_reserved_ips, remove_reserved_ips):
412
    assert (opcode == "OP_NETWORK_SET_PARAMS")
413
    if status not in [x[0] for x in BACKEND_STATUSES]:
414
        raise Network.InvalidBackendMsgError(opcode, status)
415

    
416
    back_network.backendjobid = jobid
417
    back_network.backendjobstatus = status
418
    back_network.opcode = opcode
419

    
420
    if add_reserved_ips or remove_reserved_ips:
421
        net = back_network.network
422
        pool = net.get_pool()
423
        if add_reserved_ips:
424
            for ip in add_reserved_ips:
425
                pool.reserve(ip, external=True)
426
        if remove_reserved_ips:
427
            for ip in remove_reserved_ips:
428
                pool.put(ip, external=True)
429
        pool.save()
430

    
431
    if status == 'success':
432
        back_network.backendtime = etime
433
    back_network.save()
434

    
435

    
436
@transaction.commit_on_success
437
def process_create_progress(vm, etime, progress):
438

    
439
    percentage = int(progress)
440

    
441
    # The percentage may exceed 100%, due to the way
442
    # snf-image:copy-progress tracks bytes read by image handling processes
443
    percentage = 100 if percentage > 100 else percentage
444
    if percentage < 0:
445
        raise ValueError("Percentage cannot be negative")
446

    
447
    # FIXME: log a warning here, see #1033
448
#   if last_update > percentage:
449
#       raise ValueError("Build percentage should increase monotonically " \
450
#                        "(old = %d, new = %d)" % (last_update, percentage))
451

    
452
    # This assumes that no message of type 'ganeti-create-progress' is going to
453
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
454
    # the instance is STARTED.  What if the two messages are processed by two
455
    # separate dispatcher threads, and the 'ganeti-op-status' message for
456
    # successful creation gets processed before the 'ganeti-create-progress'
457
    # message? [vkoukis]
458
    #
459
    #if not vm.operstate == 'BUILD':
460
    #    raise VirtualMachine.IllegalState("VM is not in building state")
461

    
462
    vm.buildpercentage = percentage
463
    vm.backendtime = etime
464
    vm.save()
465

    
466

    
467
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
468
                               details=None):
469
    """
470
    Create virtual machine instance diagnostic entry.
471

472
    :param vm: VirtualMachine instance to create diagnostic for.
473
    :param message: Diagnostic message.
474
    :param source: Diagnostic source identifier (e.g. image-helper).
475
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
476
    :param etime: The time the message occured (if available).
477
    :param details: Additional details or debug information.
478
    """
479
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
480
                                                   source_date=etime,
481
                                                   message=message,
482
                                                   details=details)
483

    
484

    
485
def create_instance(vm, nics, flavor, image):
486
    """`image` is a dictionary which should contain the keys:
487
            'backend_id', 'format' and 'metadata'
488

489
        metadata value should be a dictionary.
490
    """
491

    
492
    # Handle arguments to CreateInstance() as a dictionary,
493
    # initialize it based on a deployment-specific value.
494
    # This enables the administrator to override deployment-specific
495
    # arguments, such as the disk template to use, name of os provider
496
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
497
    #
498
    kw = vm.backend.get_create_params()
499
    kw['mode'] = 'create'
500
    kw['name'] = vm.backend_vm_id
501
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
502

    
503
    kw['disk_template'] = flavor.disk_template
504
    kw['disks'] = [{"size": flavor.disk * 1024}]
505
    provider = flavor.disk_provider
506
    if provider:
507
        kw['disks'][0]['provider'] = provider
508
        kw['disks'][0]['origin'] = flavor.disk_origin
509

    
510
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
511
                  for nic in nics]
512
    backend = vm.backend
513
    depend_jobs = []
514
    for nic in nics:
515
        network = Network.objects.select_for_update().get(id=nic.network.id)
516
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
517
                                                             network=network)
518
        if bnet.operstate != "ACTIVE":
519
            if network.public:
520
                # TODO: What to raise here ?
521
                raise Exception("LALA")
522
            else:
523
                depend_jobs.append(create_network(network, backend,
524
                                                  connect=True))
525
    kw["depends"] = [[job, ["success", "error", "canceled"]]
526
                     for job in depend_jobs]
527

    
528
    if vm.backend.use_hotplug():
529
        kw['hotplug'] = True
530
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
531
    # kw['os'] = settings.GANETI_OS_PROVIDER
532
    kw['ip_check'] = False
533
    kw['name_check'] = False
534

    
535
    # Do not specific a node explicitly, have
536
    # Ganeti use an iallocator instead
537
    #kw['pnode'] = rapi.GetNodes()[0]
538

    
539
    kw['dry_run'] = settings.TEST
540

    
541
    kw['beparams'] = {
542
        'auto_balance': True,
543
        'vcpus': flavor.cpu,
544
        'memory': flavor.ram}
545

    
546
    kw['osparams'] = {
547
        'config_url': vm.config_url,
548
        # Store image id and format to Ganeti
549
        'img_id': image['backend_id'],
550
        'img_format': image['format']}
551

    
552
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
553
    # kw['hvparams'] = dict(serial_console=False)
554

    
555
    log.debug("Creating instance %s", utils.hide_pass(kw))
556
    with pooled_rapi_client(vm) as client:
557
        return client.CreateInstance(**kw)
558

    
559

    
560
def delete_instance(vm):
561
    with pooled_rapi_client(vm) as client:
562
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
563

    
564

    
565
def reboot_instance(vm, reboot_type):
566
    assert reboot_type in ('soft', 'hard')
567
    with pooled_rapi_client(vm) as client:
568
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
569
                                     dry_run=settings.TEST)
570

    
571

    
572
def startup_instance(vm):
573
    with pooled_rapi_client(vm) as client:
574
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
575

    
576

    
577
def shutdown_instance(vm):
578
    with pooled_rapi_client(vm) as client:
579
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
580

    
581

    
582
def resize_instance(vm, vcpus, memory):
583
    beparams = {"vcpus": int(vcpus),
584
                "minmem": int(memory),
585
                "maxmem": int(memory)}
586
    with pooled_rapi_client(vm) as client:
587
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
588

    
589

    
590
def get_instance_console(vm):
591
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
592
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
593
    # useless (see #783).
594
    #
595
    # Until this is fixed on the Ganeti side, construct a console info reply
596
    # directly.
597
    #
598
    # WARNING: This assumes that VNC runs on port network_port on
599
    #          the instance's primary node, and is probably
600
    #          hypervisor-specific.
601
    #
602
    log.debug("Getting console for vm %s", vm)
603

    
604
    console = {}
605
    console['kind'] = 'vnc'
606

    
607
    with pooled_rapi_client(vm) as client:
608
        i = client.GetInstance(vm.backend_vm_id)
609

    
610
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
611
        raise Exception("hv parameter serial_console cannot be true")
612
    console['host'] = i['pnode']
613
    console['port'] = i['network_port']
614

    
615
    return console
616

    
617

    
618
def get_instance_info(vm):
619
    with pooled_rapi_client(vm) as client:
620
        return client.GetInstanceInfo(vm.backend_vm_id)
621

    
622

    
623
def create_network(network, backend, connect=True):
624
    """Create a network in a Ganeti backend"""
625
    log.debug("Creating network %s in backend %s", network, backend)
626

    
627
    job_id = _create_network(network, backend)
628

    
629
    if connect:
630
        job_ids = connect_network(network, backend, depends=[job_id])
631
        return job_ids
632
    else:
633
        return [job_id]
634

    
635

    
636
def _create_network(network, backend):
637
    """Create a network."""
638

    
639
    network_type = network.public and 'public' or 'private'
640

    
641
    tags = network.backend_tag
642
    if network.dhcp:
643
        tags.append('nfdhcpd')
644

    
645
    if network.public:
646
        conflicts_check = True
647
    else:
648
        conflicts_check = False
649

    
650
    try:
651
        bn = BackendNetwork.objects.get(network=network, backend=backend)
652
        mac_prefix = bn.mac_prefix
653
    except BackendNetwork.DoesNotExist:
654
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
655
                        " does not exist" % (network.id, backend.id))
656

    
657
    with pooled_rapi_client(backend) as client:
658
        return client.CreateNetwork(network_name=network.backend_id,
659
                                    network=network.subnet,
660
                                    network6=network.subnet6,
661
                                    gateway=network.gateway,
662
                                    gateway6=network.gateway6,
663
                                    network_type=network_type,
664
                                    mac_prefix=mac_prefix,
665
                                    conflicts_check=conflicts_check,
666
                                    tags=tags)
667

    
668

    
669
def connect_network(network, backend, depends=[], group=None):
670
    """Connect a network to nodegroups."""
671
    log.debug("Connecting network %s to backend %s", network, backend)
672

    
673
    if network.public:
674
        conflicts_check = True
675
    else:
676
        conflicts_check = False
677

    
678
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
679
    with pooled_rapi_client(backend) as client:
680
        groups = [group] if group is not None else client.GetGroups()
681
        job_ids = []
682
        for group in groups:
683
            job_id = client.ConnectNetwork(network.backend_id, group,
684
                                           network.mode, network.link,
685
                                           conflicts_check,
686
                                           depends=depends)
687
            job_ids.append(job_id)
688
    return job_ids
689

    
690

    
691
def delete_network(network, backend, disconnect=True):
692
    log.debug("Deleting network %s from backend %s", network, backend)
693

    
694
    depends = []
695
    if disconnect:
696
        depends = disconnect_network(network, backend)
697
    _delete_network(network, backend, depends=depends)
698

    
699

    
700
def _delete_network(network, backend, depends=[]):
701
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
702
    with pooled_rapi_client(backend) as client:
703
        return client.DeleteNetwork(network.backend_id, depends)
704

    
705

    
706
def disconnect_network(network, backend, group=None):
707
    log.debug("Disconnecting network %s to backend %s", network, backend)
708

    
709
    with pooled_rapi_client(backend) as client:
710
        groups = [group] if group is not None else client.GetGroups()
711
        job_ids = []
712
        for group in groups:
713
            job_id = client.DisconnectNetwork(network.backend_id, group)
714
            job_ids.append(job_id)
715
    return job_ids
716

    
717

    
718
def connect_to_network(vm, network, address=None):
719
    backend = vm.backend
720
    network = Network.objects.select_for_update().get(id=network.id)
721
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
722
                                                         network=network)
723
    depend_jobs = []
724
    if bnet.operstate != "ACTIVE":
725
        depend_jobs = create_network(network, backend, connect=True)
726

    
727
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
728

    
729
    nic = {'ip': address, 'network': network.backend_id}
730

    
731
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
732

    
733
    with pooled_rapi_client(vm) as client:
734
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
735
                                     hotplug=vm.backend.use_hotplug(),
736
                                     depends=depends,
737
                                     dry_run=settings.TEST)
738

    
739

    
740
def disconnect_from_network(vm, nic):
741
    op = [('remove', nic.index, {})]
742

    
743
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
744

    
745
    with pooled_rapi_client(vm) as client:
746
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
747
                                     hotplug=vm.backend.use_hotplug(),
748
                                     dry_run=settings.TEST)
749

    
750

    
751
def set_firewall_profile(vm, profile):
752
    try:
753
        tag = _firewall_tags[profile]
754
    except KeyError:
755
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
756

    
757
    log.debug("Setting tag of VM %s to %s", vm, profile)
758

    
759
    with pooled_rapi_client(vm) as client:
760
        # Delete all firewall tags
761
        for t in _firewall_tags.values():
762
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
763
                                      dry_run=settings.TEST)
764

    
765
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
766

    
767
        # XXX NOP ModifyInstance call to force process_net_status to run
768
        # on the dispatcher
769
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
770
        client.ModifyInstance(vm.backend_vm_id,
771
                              os_name=os_name)
772
    return None
773

    
774

    
775
def get_instances(backend, bulk=True):
776
    with pooled_rapi_client(backend) as c:
777
        return c.GetInstances(bulk=bulk)
778

    
779

    
780
def get_nodes(backend, bulk=True):
781
    with pooled_rapi_client(backend) as c:
782
        return c.GetNodes(bulk=bulk)
783

    
784

    
785
def get_jobs(backend):
786
    with pooled_rapi_client(backend) as c:
787
        return c.GetJobs()
788

    
789

    
790
def get_physical_resources(backend):
791
    """ Get the physical resources of a backend.
792

793
    Get the resources of a backend as reported by the backend (not the db).
794

795
    """
796
    nodes = get_nodes(backend, bulk=True)
797
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
798
    res = {}
799
    for a in attr:
800
        res[a] = 0
801
    for n in nodes:
802
        # Filter out drained, offline and not vm_capable nodes since they will
803
        # not take part in the vm allocation process
804
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
805
        if can_host_vms and n['cnodes']:
806
            for a in attr:
807
                res[a] += int(n[a])
808
    return res
809

    
810

    
811
def update_resources(backend, resources=None):
812
    """ Update the state of the backend resources in db.
813

814
    """
815

    
816
    if not resources:
817
        resources = get_physical_resources(backend)
818

    
819
    backend.mfree = resources['mfree']
820
    backend.mtotal = resources['mtotal']
821
    backend.dfree = resources['dfree']
822
    backend.dtotal = resources['dtotal']
823
    backend.pinst_cnt = resources['pinst_cnt']
824
    backend.ctotal = resources['ctotal']
825
    backend.updated = datetime.now()
826
    backend.save()
827

    
828

    
829
def get_memory_from_instances(backend):
830
    """ Get the memory that is used from instances.
831

832
    Get the used memory of a backend. Note: This is different for
833
    the real memory used, due to kvm's memory de-duplication.
834

835
    """
836
    with pooled_rapi_client(backend) as client:
837
        instances = client.GetInstances(bulk=True)
838
    mem = 0
839
    for i in instances:
840
        mem += i['oper_ram']
841
    return mem
842

    
843
##
844
## Synchronized operations for reconciliation
845
##
846

    
847

    
848
def create_network_synced(network, backend):
849
    result = _create_network_synced(network, backend)
850
    if result[0] != 'success':
851
        return result
852
    result = connect_network_synced(network, backend)
853
    return result
854

    
855

    
856
def _create_network_synced(network, backend):
857
    with pooled_rapi_client(backend) as client:
858
        job = _create_network(network, backend)
859
        result = wait_for_job(client, job)
860
    return result
861

    
862

    
863
def connect_network_synced(network, backend):
864
    with pooled_rapi_client(backend) as client:
865
        for group in client.GetGroups():
866
            job = client.ConnectNetwork(network.backend_id, group,
867
                                        network.mode, network.link)
868
            result = wait_for_job(client, job)
869
            if result[0] != 'success':
870
                return result
871

    
872
    return result
873

    
874

    
875
def wait_for_job(client, jobid):
876
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
877
    status = result['job_info'][0]
878
    while status not in ['success', 'error', 'cancel']:
879
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
880
                                         [result], None)
881
        status = result['job_info'][0]
882

    
883
    if status == 'success':
884
        return (status, None)
885
    else:
886
        error = result['job_info'][1]
887
        return (status, error)