Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ c566f369

History | View | Annotate | Download (31.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if status == 'success' or (status == 'error' and
163
                                   vm.operstate == 'ERROR'):
164
            # VM has been deleted. Release the instance IPs
165
            release_instance_ips(vm, [])
166
            # And delete the releated NICs (must be performed after release!)
167
            vm.nics.all().delete()
168
            vm.deleted = True
169
            vm.operstate = state_for_success
170
            vm.backendtime = etime
171
            status = "success"
172

    
173
    if status in ["success", "error", "canceled"]:
174
        # Job is finalized: Handle quotas/commissioning
175
        job_fields = {"nics": nics, "beparams": beparams}
176
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
177
                              job_status=status, job_fields=job_fields)
178
        # and clear task fields
179
        if vm.task_job_id == jobid:
180
            vm.task = None
181
            vm.task_job_id = None
182

    
183
    vm.save()
184

    
185

    
186
def _process_resize(vm, beparams):
187
    """Change flavor of a VirtualMachine based on new beparams."""
188
    old_flavor = vm.flavor
189
    vcpus = beparams.get("vcpus", old_flavor.cpu)
190
    ram = beparams.get("maxmem", old_flavor.ram)
191
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
192
        return
193
    try:
194
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
195
                                        disk=old_flavor.disk,
196
                                        disk_template=old_flavor.disk_template)
197
    except Flavor.DoesNotExist:
198
        raise Exception("Can not find flavor for VM")
199
    vm.flavor = new_flavor
200
    vm.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_net_status(vm, etime, nics):
205
    """Wrap _process_net_status inside transaction."""
206
    _process_net_status(vm, etime, nics)
207

    
208

    
209
def _process_net_status(vm, etime, nics):
210
    """Process a net status notification from the backend
211

212
    Process an incoming message from the Ganeti backend,
213
    detailing the NIC configuration of a VM instance.
214

215
    Update the state of the VM in the DB accordingly.
216
    """
217

    
218
    ganeti_nics = process_ganeti_nics(nics)
219
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
220
        log.debug("NICs for VM %s have not changed", vm)
221
        return
222

    
223
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
224
    # guarantee that no deadlock will occur with Backend allocator.
225
    Backend.objects.select_for_update().get(id=vm.backend_id)
226

    
227
    # NICs have changed. Release the instance IPs
228
    release_instance_ips(vm, ganeti_nics)
229
    # And delete the releated NICs (must be performed after release!)
230
    vm.nics.all().delete()
231

    
232
    for nic in ganeti_nics:
233
        ipv4 = nic.get('ipv4', '')
234
        net = nic['network']
235
        if ipv4:
236
            net.reserve_address(ipv4)
237

    
238
        nic['dirty'] = False
239
        vm.nics.create(**nic)
240
        # Dummy save the network, because UI uses changed-since for VMs
241
        # and Networks in order to show the VM NICs
242
        net.save()
243

    
244
    vm.backendtime = etime
245
    vm.save()
246

    
247

    
248
def process_ganeti_nics(ganeti_nics):
249
    """Process NIC dict from ganeti hooks."""
250
    new_nics = []
251
    for i, new_nic in enumerate(ganeti_nics):
252
        network = new_nic.get('network', '')
253
        n = str(network)
254
        pk = utils.id_from_network_name(n)
255

    
256
        net = Network.objects.get(pk=pk)
257

    
258
        # Get the new nic info
259
        mac = new_nic.get('mac', '')
260
        ipv4 = new_nic.get('ip', '')
261
        if net.subnet6:
262
            ipv6 = mac2eui64(mac, net.subnet6)
263
        else:
264
            ipv6 = ''
265

    
266
        firewall = new_nic.get('firewall', '')
267
        firewall_profile = _reverse_tags.get(firewall, '')
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            # Get X-Lock before searching floating IP, to exclusively search
304
            # and release floating IP. Otherwise you may release a floating IP
305
            # that has been just reserved.
306
            net = Network.objects.select_for_update().get(id=network_id)
307
            if net.floating_ip_pool:
308
                try:
309
                    floating_ip = net.floating_ips.select_for_update()\
310
                                                  .get(ipv4=ipv4, machine=vm,
311
                                                       deleted=False)
312
                    floating_ip.machine = None
313
                    floating_ip.save()
314
                except FloatingIP.DoesNotExist:
315
                    net.release_address(ipv4)
316
            else:
317
                net.release_address(ipv4)
318

    
319

    
320
@transaction.commit_on_success
321
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
322
    if status not in [x[0] for x in BACKEND_STATUSES]:
323
        raise Network.InvalidBackendMsgError(opcode, status)
324

    
325
    back_network.backendjobid = jobid
326
    back_network.backendjobstatus = status
327
    back_network.backendopcode = opcode
328
    back_network.backendlogmsg = logmsg
329

    
330
    # Notifications of success change the operating state
331
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
332
    if status == 'success' and state_for_success is not None:
333
        back_network.operstate = state_for_success
334

    
335
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
336
        back_network.operstate = 'ERROR'
337
        back_network.backendtime = etime
338

    
339
    if opcode == 'OP_NETWORK_REMOVE':
340
        if status == 'success' or (status == 'error' and
341
                                   back_network.operstate == 'ERROR'):
342
            back_network.operstate = state_for_success
343
            back_network.deleted = True
344
            back_network.backendtime = etime
345

    
346
    if status == 'success':
347
        back_network.backendtime = etime
348
    back_network.save()
349
    # Also you must update the state of the Network!!
350
    update_network_state(back_network.network)
351

    
352

    
353
def update_network_state(network):
354
    """Update the state of a Network based on BackendNetwork states.
355

356
    Update the state of a Network based on the operstate of the networks in the
357
    backends that network exists.
358

359
    The state of the network is:
360
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
361
    * DELETED: If it is is 'DELETED' in all backends that have been created.
362

363
    This function also releases the resources (MAC prefix or Bridge) and the
364
    quotas for the network.
365

366
    """
367
    if network.deleted:
368
        # Network has already been deleted. Just assert that state is also
369
        # DELETED
370
        if not network.state == "DELETED":
371
            network.state = "DELETED"
372
            network.save()
373
        return
374

    
375
    backend_states = [s.operstate for s in network.backend_networks.all()]
376
    if not backend_states and network.action != "DESTROY":
377
        if network.state != "ACTIVE":
378
            network.state = "ACTIVE"
379
            network.save()
380
            return
381

    
382
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
383
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
384
                     "DELETED")
385

    
386
    # Release the resources on the deletion of the Network
387
    if deleted:
388
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
389
                 network.id, network.mac_prefix, network.link)
390
        network.deleted = True
391
        network.state = "DELETED"
392
        if network.mac_prefix:
393
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
394
                release_resource(res_type="mac_prefix",
395
                                 value=network.mac_prefix)
396
        if network.link:
397
            if network.FLAVORS[network.flavor]["link"] == "pool":
398
                release_resource(res_type="bridge", value=network.link)
399

    
400
        # Issue commission
401
        if network.userid:
402
            quotas.issue_and_accept_commission(network, delete=True)
403
        elif not network.public:
404
            log.warning("Network %s does not have an owner!", network.id)
405
    network.save()
406

    
407

    
408
@transaction.commit_on_success
409
def process_network_modify(back_network, etime, jobid, opcode, status,
410
                           add_reserved_ips, remove_reserved_ips):
411
    assert (opcode == "OP_NETWORK_SET_PARAMS")
412
    if status not in [x[0] for x in BACKEND_STATUSES]:
413
        raise Network.InvalidBackendMsgError(opcode, status)
414

    
415
    back_network.backendjobid = jobid
416
    back_network.backendjobstatus = status
417
    back_network.opcode = opcode
418

    
419
    if add_reserved_ips or remove_reserved_ips:
420
        net = back_network.network
421
        pool = net.get_pool()
422
        if add_reserved_ips:
423
            for ip in add_reserved_ips:
424
                pool.reserve(ip, external=True)
425
        if remove_reserved_ips:
426
            for ip in remove_reserved_ips:
427
                pool.put(ip, external=True)
428
        pool.save()
429

    
430
    if status == 'success':
431
        back_network.backendtime = etime
432
    back_network.save()
433

    
434

    
435
@transaction.commit_on_success
436
def process_create_progress(vm, etime, progress):
437

    
438
    percentage = int(progress)
439

    
440
    # The percentage may exceed 100%, due to the way
441
    # snf-image:copy-progress tracks bytes read by image handling processes
442
    percentage = 100 if percentage > 100 else percentage
443
    if percentage < 0:
444
        raise ValueError("Percentage cannot be negative")
445

    
446
    # FIXME: log a warning here, see #1033
447
#   if last_update > percentage:
448
#       raise ValueError("Build percentage should increase monotonically " \
449
#                        "(old = %d, new = %d)" % (last_update, percentage))
450

    
451
    # This assumes that no message of type 'ganeti-create-progress' is going to
452
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
453
    # the instance is STARTED.  What if the two messages are processed by two
454
    # separate dispatcher threads, and the 'ganeti-op-status' message for
455
    # successful creation gets processed before the 'ganeti-create-progress'
456
    # message? [vkoukis]
457
    #
458
    #if not vm.operstate == 'BUILD':
459
    #    raise VirtualMachine.IllegalState("VM is not in building state")
460

    
461
    vm.buildpercentage = percentage
462
    vm.backendtime = etime
463
    vm.save()
464

    
465

    
466
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
467
                               details=None):
468
    """
469
    Create virtual machine instance diagnostic entry.
470

471
    :param vm: VirtualMachine instance to create diagnostic for.
472
    :param message: Diagnostic message.
473
    :param source: Diagnostic source identifier (e.g. image-helper).
474
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
475
    :param etime: The time the message occured (if available).
476
    :param details: Additional details or debug information.
477
    """
478
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
479
                                                   source_date=etime,
480
                                                   message=message,
481
                                                   details=details)
482

    
483

    
484
def create_instance(vm, nics, flavor, image):
485
    """`image` is a dictionary which should contain the keys:
486
            'backend_id', 'format' and 'metadata'
487

488
        metadata value should be a dictionary.
489
    """
490

    
491
    # Handle arguments to CreateInstance() as a dictionary,
492
    # initialize it based on a deployment-specific value.
493
    # This enables the administrator to override deployment-specific
494
    # arguments, such as the disk template to use, name of os provider
495
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
496
    #
497
    kw = vm.backend.get_create_params()
498
    kw['mode'] = 'create'
499
    kw['name'] = vm.backend_vm_id
500
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
501

    
502
    kw['disk_template'] = flavor.disk_template
503
    kw['disks'] = [{"size": flavor.disk * 1024}]
504
    provider = flavor.disk_provider
505
    if provider:
506
        kw['disks'][0]['provider'] = provider
507
        kw['disks'][0]['origin'] = flavor.disk_origin
508

    
509
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
510
                  for nic in nics]
511
    backend = vm.backend
512
    depend_jobs = []
513
    for nic in nics:
514
        network = Network.objects.select_for_update().get(id=nic.network.id)
515
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
516
                                                             network=network)
517
        if bnet.operstate != "ACTIVE":
518
            if network.public:
519
                # TODO: What to raise here ?
520
                raise Exception("LALA")
521
            else:
522
                depend_jobs.append(create_network(network, backend,
523
                                                  connect=True))
524
    kw["depends"] = [[job, ["success", "error", "canceled"]]
525
                     for job in depend_jobs]
526

    
527
    if vm.backend.use_hotplug():
528
        kw['hotplug'] = True
529
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
530
    # kw['os'] = settings.GANETI_OS_PROVIDER
531
    kw['ip_check'] = False
532
    kw['name_check'] = False
533

    
534
    # Do not specific a node explicitly, have
535
    # Ganeti use an iallocator instead
536
    #kw['pnode'] = rapi.GetNodes()[0]
537

    
538
    kw['dry_run'] = settings.TEST
539

    
540
    kw['beparams'] = {
541
        'auto_balance': True,
542
        'vcpus': flavor.cpu,
543
        'memory': flavor.ram}
544

    
545
    kw['osparams'] = {
546
        'config_url': vm.config_url,
547
        # Store image id and format to Ganeti
548
        'img_id': image['backend_id'],
549
        'img_format': image['format']}
550

    
551
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
552
    # kw['hvparams'] = dict(serial_console=False)
553

    
554
    log.debug("Creating instance %s", utils.hide_pass(kw))
555
    with pooled_rapi_client(vm) as client:
556
        return client.CreateInstance(**kw)
557

    
558

    
559
def delete_instance(vm):
560
    with pooled_rapi_client(vm) as client:
561
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
562

    
563

    
564
def reboot_instance(vm, reboot_type):
565
    assert reboot_type in ('soft', 'hard')
566
    with pooled_rapi_client(vm) as client:
567
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
568
                                     dry_run=settings.TEST)
569

    
570

    
571
def startup_instance(vm):
572
    with pooled_rapi_client(vm) as client:
573
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
574

    
575

    
576
def shutdown_instance(vm):
577
    with pooled_rapi_client(vm) as client:
578
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
579

    
580

    
581
def resize_instance(vm, vcpus, memory):
582
    beparams = {"vcpus": int(vcpus),
583
                "minmem": int(memory),
584
                "maxmem": int(memory)}
585
    with pooled_rapi_client(vm) as client:
586
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
587

    
588

    
589
def get_instance_console(vm):
590
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
591
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
592
    # useless (see #783).
593
    #
594
    # Until this is fixed on the Ganeti side, construct a console info reply
595
    # directly.
596
    #
597
    # WARNING: This assumes that VNC runs on port network_port on
598
    #          the instance's primary node, and is probably
599
    #          hypervisor-specific.
600
    #
601
    log.debug("Getting console for vm %s", vm)
602

    
603
    console = {}
604
    console['kind'] = 'vnc'
605

    
606
    with pooled_rapi_client(vm) as client:
607
        i = client.GetInstance(vm.backend_vm_id)
608

    
609
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
610
        raise Exception("hv parameter serial_console cannot be true")
611
    console['host'] = i['pnode']
612
    console['port'] = i['network_port']
613

    
614
    return console
615

    
616

    
617
def get_instance_info(vm):
618
    with pooled_rapi_client(vm) as client:
619
        return client.GetInstanceInfo(vm.backend_vm_id)
620

    
621

    
622
def create_network(network, backend, connect=True):
623
    """Create a network in a Ganeti backend"""
624
    log.debug("Creating network %s in backend %s", network, backend)
625

    
626
    job_id = _create_network(network, backend)
627

    
628
    if connect:
629
        job_ids = connect_network(network, backend, depends=[job_id])
630
        return job_ids
631
    else:
632
        return [job_id]
633

    
634

    
635
def _create_network(network, backend):
636
    """Create a network."""
637

    
638
    network_type = network.public and 'public' or 'private'
639

    
640
    tags = network.backend_tag
641
    if network.dhcp:
642
        tags.append('nfdhcpd')
643

    
644
    if network.public:
645
        conflicts_check = True
646
    else:
647
        conflicts_check = False
648

    
649
    try:
650
        bn = BackendNetwork.objects.get(network=network, backend=backend)
651
        mac_prefix = bn.mac_prefix
652
    except BackendNetwork.DoesNotExist:
653
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
654
                        " does not exist" % (network.id, backend.id))
655

    
656
    with pooled_rapi_client(backend) as client:
657
        return client.CreateNetwork(network_name=network.backend_id,
658
                                    network=network.subnet,
659
                                    network6=network.subnet6,
660
                                    gateway=network.gateway,
661
                                    gateway6=network.gateway6,
662
                                    network_type=network_type,
663
                                    mac_prefix=mac_prefix,
664
                                    conflicts_check=conflicts_check,
665
                                    tags=tags)
666

    
667

    
668
def connect_network(network, backend, depends=[], group=None):
669
    """Connect a network to nodegroups."""
670
    log.debug("Connecting network %s to backend %s", network, backend)
671

    
672
    if network.public:
673
        conflicts_check = True
674
    else:
675
        conflicts_check = False
676

    
677
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
678
    with pooled_rapi_client(backend) as client:
679
        groups = [group] if group is not None else client.GetGroups()
680
        job_ids = []
681
        for group in groups:
682
            job_id = client.ConnectNetwork(network.backend_id, group,
683
                                           network.mode, network.link,
684
                                           conflicts_check,
685
                                           depends=depends)
686
            job_ids.append(job_id)
687
    return job_ids
688

    
689

    
690
def delete_network(network, backend, disconnect=True):
691
    log.debug("Deleting network %s from backend %s", network, backend)
692

    
693
    depends = []
694
    if disconnect:
695
        depends = disconnect_network(network, backend)
696
    _delete_network(network, backend, depends=depends)
697

    
698

    
699
def _delete_network(network, backend, depends=[]):
700
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
701
    with pooled_rapi_client(backend) as client:
702
        return client.DeleteNetwork(network.backend_id, depends)
703

    
704

    
705
def disconnect_network(network, backend, group=None):
706
    log.debug("Disconnecting network %s to backend %s", network, backend)
707

    
708
    with pooled_rapi_client(backend) as client:
709
        groups = [group] if group is not None else client.GetGroups()
710
        job_ids = []
711
        for group in groups:
712
            job_id = client.DisconnectNetwork(network.backend_id, group)
713
            job_ids.append(job_id)
714
    return job_ids
715

    
716

    
717
def connect_to_network(vm, network, address=None):
718
    backend = vm.backend
719
    network = Network.objects.select_for_update().get(id=network.id)
720
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
721
                                                         network=network)
722
    depend_jobs = []
723
    if bnet.operstate != "ACTIVE":
724
        depend_jobs = create_network(network, backend, connect=True)
725

    
726
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
727

    
728
    nic = {'ip': address, 'network': network.backend_id}
729

    
730
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
731

    
732
    with pooled_rapi_client(vm) as client:
733
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
734
                                     hotplug=vm.backend.use_hotplug(),
735
                                     depends=depends,
736
                                     dry_run=settings.TEST)
737

    
738

    
739
def disconnect_from_network(vm, nic):
740
    op = [('remove', nic.index, {})]
741

    
742
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
743

    
744
    with pooled_rapi_client(vm) as client:
745
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
746
                                     hotplug=vm.backend.use_hotplug(),
747
                                     dry_run=settings.TEST)
748

    
749

    
750
def set_firewall_profile(vm, profile):
751
    try:
752
        tag = _firewall_tags[profile]
753
    except KeyError:
754
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
755

    
756
    log.debug("Setting tag of VM %s to %s", vm, profile)
757

    
758
    with pooled_rapi_client(vm) as client:
759
        # Delete all firewall tags
760
        for t in _firewall_tags.values():
761
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
762
                                      dry_run=settings.TEST)
763

    
764
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
765

    
766
        # XXX NOP ModifyInstance call to force process_net_status to run
767
        # on the dispatcher
768
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
769
        client.ModifyInstance(vm.backend_vm_id,
770
                              os_name=os_name)
771
    return None
772

    
773

    
774
def get_instances(backend, bulk=True):
775
    with pooled_rapi_client(backend) as c:
776
        return c.GetInstances(bulk=bulk)
777

    
778

    
779
def get_nodes(backend, bulk=True):
780
    with pooled_rapi_client(backend) as c:
781
        return c.GetNodes(bulk=bulk)
782

    
783

    
784
def get_jobs(backend):
785
    with pooled_rapi_client(backend) as c:
786
        return c.GetJobs()
787

    
788

    
789
def get_physical_resources(backend):
790
    """ Get the physical resources of a backend.
791

792
    Get the resources of a backend as reported by the backend (not the db).
793

794
    """
795
    nodes = get_nodes(backend, bulk=True)
796
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
797
    res = {}
798
    for a in attr:
799
        res[a] = 0
800
    for n in nodes:
801
        # Filter out drained, offline and not vm_capable nodes since they will
802
        # not take part in the vm allocation process
803
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
804
        if can_host_vms and n['cnodes']:
805
            for a in attr:
806
                res[a] += int(n[a])
807
    return res
808

    
809

    
810
def update_resources(backend, resources=None):
811
    """ Update the state of the backend resources in db.
812

813
    """
814

    
815
    if not resources:
816
        resources = get_physical_resources(backend)
817

    
818
    backend.mfree = resources['mfree']
819
    backend.mtotal = resources['mtotal']
820
    backend.dfree = resources['dfree']
821
    backend.dtotal = resources['dtotal']
822
    backend.pinst_cnt = resources['pinst_cnt']
823
    backend.ctotal = resources['ctotal']
824
    backend.updated = datetime.now()
825
    backend.save()
826

    
827

    
828
def get_memory_from_instances(backend):
829
    """ Get the memory that is used from instances.
830

831
    Get the used memory of a backend. Note: This is different for
832
    the real memory used, due to kvm's memory de-duplication.
833

834
    """
835
    with pooled_rapi_client(backend) as client:
836
        instances = client.GetInstances(bulk=True)
837
    mem = 0
838
    for i in instances:
839
        mem += i['oper_ram']
840
    return mem
841

    
842
##
843
## Synchronized operations for reconciliation
844
##
845

    
846

    
847
def create_network_synced(network, backend):
848
    result = _create_network_synced(network, backend)
849
    if result[0] != 'success':
850
        return result
851
    result = connect_network_synced(network, backend)
852
    return result
853

    
854

    
855
def _create_network_synced(network, backend):
856
    with pooled_rapi_client(backend) as client:
857
        job = _create_network(network, backend)
858
        result = wait_for_job(client, job)
859
    return result
860

    
861

    
862
def connect_network_synced(network, backend):
863
    with pooled_rapi_client(backend) as client:
864
        for group in client.GetGroups():
865
            job = client.ConnectNetwork(network.backend_id, group,
866
                                        network.mode, network.link)
867
            result = wait_for_job(client, job)
868
            if result[0] != 'success':
869
                return result
870

    
871
    return result
872

    
873

    
874
def wait_for_job(client, jobid):
875
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
876
    status = result['job_info'][0]
877
    while status not in ['success', 'error', 'cancel']:
878
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
879
                                         [result], None)
880
        status = result['job_info'][0]
881

    
882
    if status == 'success':
883
        return (status, None)
884
    else:
885
        error = result['job_info'][1]
886
        return (status, error)