Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 198d91c3

History | View | Annotate | Download (27 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic)
40
from synnefo.logic import utils
41
from synnefo import quotas
42
from synnefo.api.util import release_resource
43
from synnefo.util.mac2eui64 import mac2eui64
44
from synnefo.logic.rapi import GanetiApiError
45

    
46
from logging import getLogger
47
log = getLogger(__name__)
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
@transaction.commit_on_success
59
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
60
    """Process a job progress notification from the backend
61

62
    Process an incoming message from the backend (currently Ganeti).
63
    Job notifications with a terminating status (sucess, error, or canceled),
64
    also update the operating state of the VM.
65

66
    """
67
    # See #1492, #1031, #1111 why this line has been removed
68
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
69
    if status not in [x[0] for x in BACKEND_STATUSES]:
70
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
71

    
72
    vm.backendjobid = jobid
73
    vm.backendjobstatus = status
74
    vm.backendopcode = opcode
75
    vm.backendlogmsg = logmsg
76

    
77
    # Notifications of success change the operating state
78
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
79
    if status == 'success' and state_for_success is not None:
80
        vm.operstate = state_for_success
81

    
82
    # Update the NICs of the VM
83
    if status == "success" and nics is not None:
84
        _process_net_status(vm, etime, nics)
85

    
86
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
87
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
88
        vm.operstate = 'ERROR'
89
        vm.backendtime = etime
90
    elif opcode == 'OP_INSTANCE_REMOVE':
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        if status == "success" or (status == "error" and
94
                                   not vm_exists_in_backend(vm)):
95
            _process_net_status(vm, etime, nics=[])
96
            vm.deleted = True
97
            vm.operstate = state_for_success
98
            vm.backendtime = etime
99
            # Issue and accept commission to Quotaholder
100
            quotas.issue_and_accept_commission(vm, delete=True)
101

    
102
    # Update backendtime only for jobs that have been successfully completed,
103
    # since only these jobs update the state of the VM. Else a "race condition"
104
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
105
    # before an error job and messages arrive in reversed order.
106
    if status == 'success':
107
        vm.backendtime = etime
108

    
109
    vm.save()
110

    
111

    
112
@transaction.commit_on_success
113
def process_net_status(vm, etime, nics):
114
    """Wrap _process_net_status inside transaction."""
115
    _process_net_status(vm, etime, nics)
116

    
117

    
118
def _process_net_status(vm, etime, nics):
119
    """Process a net status notification from the backend
120

121
    Process an incoming message from the Ganeti backend,
122
    detailing the NIC configuration of a VM instance.
123

124
    Update the state of the VM in the DB accordingly.
125
    """
126

    
127
    ganeti_nics = process_ganeti_nics(nics)
128
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
129
        log.debug("NICs for VM %s have not changed", vm)
130
        return
131

    
132
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
133
    # guarantee that no deadlock will occur with Backend allocator.
134
    Backend.objects.select_for_update().get(id=vm.backend_id)
135

    
136
    release_instance_nics(vm)
137

    
138
    for nic in ganeti_nics:
139
        ipv4 = nic.get('ipv4', '')
140
        net = nic['network']
141
        if ipv4:
142
            net.reserve_address(ipv4)
143

    
144
        nic['dirty'] = False
145
        vm.nics.create(**nic)
146
        # Dummy save the network, because UI uses changed-since for VMs
147
        # and Networks in order to show the VM NICs
148
        net.save()
149

    
150
    vm.backendtime = etime
151
    vm.save()
152

    
153

    
154
def process_ganeti_nics(ganeti_nics):
155
    """Process NIC dict from ganeti hooks."""
156
    new_nics = []
157
    for i, new_nic in enumerate(ganeti_nics):
158
        network = new_nic.get('network', '')
159
        n = str(network)
160
        pk = utils.id_from_network_name(n)
161

    
162
        net = Network.objects.get(pk=pk)
163

    
164
        # Get the new nic info
165
        mac = new_nic.get('mac', '')
166
        ipv4 = new_nic.get('ip', '')
167
        if net.subnet6:
168
            ipv6 = mac2eui64(mac, net.subnet6)
169
        else:
170
            ipv6 = ''
171

    
172
        firewall = new_nic.get('firewall', '')
173
        firewall_profile = _reverse_tags.get(firewall, '')
174
        if not firewall_profile and net.public:
175
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
176

    
177
        nic = {
178
            'index': i,
179
            'network': net,
180
            'mac': mac,
181
            'ipv4': ipv4,
182
            'ipv6': ipv6,
183
            'firewall_profile': firewall_profile,
184
            'state': 'ACTIVE'}
185

    
186
        new_nics.append(nic)
187
    return new_nics
188

    
189

    
190
def nics_changed(old_nics, new_nics):
191
    """Return True if NICs have changed in any way."""
192
    if len(old_nics) != len(new_nics):
193
        return True
194
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
195
    for old_nic, new_nic in zip(old_nics, new_nics):
196
        for field in fields:
197
            if getattr(old_nic, field) != new_nic[field]:
198
                return True
199
    return False
200

    
201

    
202
def release_instance_nics(vm):
203
    for nic in vm.nics.all():
204
        net = nic.network
205
        if nic.ipv4:
206
            net.release_address(nic.ipv4)
207
        nic.delete()
208
        net.save()
209

    
210

    
211
@transaction.commit_on_success
212
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
213
    if status not in [x[0] for x in BACKEND_STATUSES]:
214
        raise Network.InvalidBackendMsgError(opcode, status)
215

    
216
    back_network.backendjobid = jobid
217
    back_network.backendjobstatus = status
218
    back_network.backendopcode = opcode
219
    back_network.backendlogmsg = logmsg
220

    
221
    network = back_network.network
222

    
223
    # Notifications of success change the operating state
224
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
225
    if status == 'success' and state_for_success is not None:
226
        back_network.operstate = state_for_success
227

    
228
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
229
        back_network.operstate = 'ERROR'
230
        back_network.backendtime = etime
231

    
232
    if opcode == 'OP_NETWORK_REMOVE':
233
        network_is_deleted = (status == "success")
234
        if network_is_deleted or (status == "error" and not
235
                                  network_exists_in_backend(back_network)):
236
            back_network.operstate = state_for_success
237
            back_network.deleted = True
238
            back_network.backendtime = etime
239

    
240
    if status == 'success':
241
        back_network.backendtime = etime
242
    back_network.save()
243
    # Also you must update the state of the Network!!
244
    update_network_state(network)
245

    
246

    
247
def update_network_state(network):
248
    """Update the state of a Network based on BackendNetwork states.
249

250
    Update the state of a Network based on the operstate of the networks in the
251
    backends that network exists.
252

253
    The state of the network is:
254
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
255
    * DELETED: If it is is 'DELETED' in all backends that have been created.
256

257
    This function also releases the resources (MAC prefix or Bridge) and the
258
    quotas for the network.
259

260
    """
261
    if network.deleted:
262
        # Network has already been deleted. Just assert that state is also
263
        # DELETED
264
        if not network.state == "DELETED":
265
            network.state = "DELETED"
266
            network.save()
267
        return
268

    
269
    backend_states = [s.operstate for s in network.backend_networks.all()]
270
    if not backend_states and network.action != "DESTROY":
271
        if network.state != "ACTIVE":
272
            network.state = "ACTIVE"
273
            network.save()
274
            return
275

    
276
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
277
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
278
                     "DELETED")
279

    
280
    # Release the resources on the deletion of the Network
281
    if deleted:
282
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
283
                 network.id, network.mac_prefix, network.link)
284
        network.deleted = True
285
        network.state = "DELETED"
286
        if network.mac_prefix:
287
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
288
                release_resource(res_type="mac_prefix",
289
                                 value=network.mac_prefix)
290
        if network.link:
291
            if network.FLAVORS[network.flavor]["link"] == "pool":
292
                release_resource(res_type="bridge", value=network.link)
293

    
294
        # Issue commission
295
        if network.userid:
296
            quotas.issue_and_accept_commission(network, delete=True)
297
        elif not network.public:
298
            log.warning("Network %s does not have an owner!", network.id)
299
    network.save()
300

    
301

    
302
@transaction.commit_on_success
303
def process_network_modify(back_network, etime, jobid, opcode, status,
304
                           add_reserved_ips, remove_reserved_ips):
305
    assert (opcode == "OP_NETWORK_SET_PARAMS")
306
    if status not in [x[0] for x in BACKEND_STATUSES]:
307
        raise Network.InvalidBackendMsgError(opcode, status)
308

    
309
    back_network.backendjobid = jobid
310
    back_network.backendjobstatus = status
311
    back_network.opcode = opcode
312

    
313
    if add_reserved_ips or remove_reserved_ips:
314
        net = back_network.network
315
        pool = net.get_pool()
316
        if add_reserved_ips:
317
            for ip in add_reserved_ips:
318
                pool.reserve(ip, external=True)
319
        if remove_reserved_ips:
320
            for ip in remove_reserved_ips:
321
                pool.put(ip, external=True)
322
        pool.save()
323

    
324
    if status == 'success':
325
        back_network.backendtime = etime
326
    back_network.save()
327

    
328

    
329
@transaction.commit_on_success
330
def process_create_progress(vm, etime, progress):
331

    
332
    percentage = int(progress)
333

    
334
    # The percentage may exceed 100%, due to the way
335
    # snf-image:copy-progress tracks bytes read by image handling processes
336
    percentage = 100 if percentage > 100 else percentage
337
    if percentage < 0:
338
        raise ValueError("Percentage cannot be negative")
339

    
340
    # FIXME: log a warning here, see #1033
341
#   if last_update > percentage:
342
#       raise ValueError("Build percentage should increase monotonically " \
343
#                        "(old = %d, new = %d)" % (last_update, percentage))
344

    
345
    # This assumes that no message of type 'ganeti-create-progress' is going to
346
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
347
    # the instance is STARTED.  What if the two messages are processed by two
348
    # separate dispatcher threads, and the 'ganeti-op-status' message for
349
    # successful creation gets processed before the 'ganeti-create-progress'
350
    # message? [vkoukis]
351
    #
352
    #if not vm.operstate == 'BUILD':
353
    #    raise VirtualMachine.IllegalState("VM is not in building state")
354

    
355
    vm.buildpercentage = percentage
356
    vm.backendtime = etime
357
    vm.save()
358

    
359

    
360
@transaction.commit_on_success
361
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
362
                               details=None):
363
    """
364
    Create virtual machine instance diagnostic entry.
365

366
    :param vm: VirtualMachine instance to create diagnostic for.
367
    :param message: Diagnostic message.
368
    :param source: Diagnostic source identifier (e.g. image-helper).
369
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
370
    :param etime: The time the message occured (if available).
371
    :param details: Additional details or debug information.
372
    """
373
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
374
                                                   source_date=etime,
375
                                                   message=message,
376
                                                   details=details)
377

    
378

    
379
def create_instance(vm, public_nic, flavor, image):
380
    """`image` is a dictionary which should contain the keys:
381
            'backend_id', 'format' and 'metadata'
382

383
        metadata value should be a dictionary.
384
    """
385

    
386
    # Handle arguments to CreateInstance() as a dictionary,
387
    # initialize it based on a deployment-specific value.
388
    # This enables the administrator to override deployment-specific
389
    # arguments, such as the disk template to use, name of os provider
390
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
391
    #
392
    kw = vm.backend.get_create_params()
393
    kw['mode'] = 'create'
394
    kw['name'] = vm.backend_vm_id
395
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
396

    
397
    kw['disk_template'] = flavor.disk_template
398
    kw['disks'] = [{"size": flavor.disk * 1024}]
399
    provider = flavor.disk_provider
400
    if provider:
401
        kw['disks'][0]['provider'] = provider
402
        kw['disks'][0]['origin'] = flavor.disk_origin
403

    
404
    kw['nics'] = [public_nic]
405
    if vm.backend.use_hotplug():
406
        kw['hotplug'] = True
407
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
408
    # kw['os'] = settings.GANETI_OS_PROVIDER
409
    kw['ip_check'] = False
410
    kw['name_check'] = False
411

    
412
    # Do not specific a node explicitly, have
413
    # Ganeti use an iallocator instead
414
    #kw['pnode'] = rapi.GetNodes()[0]
415

    
416
    kw['dry_run'] = settings.TEST
417

    
418
    kw['beparams'] = {
419
        'auto_balance': True,
420
        'vcpus': flavor.cpu,
421
        'memory': flavor.ram}
422

    
423
    kw['osparams'] = {
424
        'config_url': vm.config_url,
425
        # Store image id and format to Ganeti
426
        'img_id': image['backend_id'],
427
        'img_format': image['format']}
428

    
429
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
430
    # kw['hvparams'] = dict(serial_console=False)
431

    
432
    log.debug("Creating instance %s", utils.hide_pass(kw))
433
    with pooled_rapi_client(vm) as client:
434
        return client.CreateInstance(**kw)
435

    
436

    
437
def delete_instance(vm):
438
    with pooled_rapi_client(vm) as client:
439
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
440

    
441

    
442
def reboot_instance(vm, reboot_type):
443
    assert reboot_type in ('soft', 'hard')
444
    with pooled_rapi_client(vm) as client:
445
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
446
                                     dry_run=settings.TEST)
447

    
448

    
449
def startup_instance(vm):
450
    with pooled_rapi_client(vm) as client:
451
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
452

    
453

    
454
def shutdown_instance(vm):
455
    with pooled_rapi_client(vm) as client:
456
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
457

    
458

    
459
def get_instance_console(vm):
460
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
461
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
462
    # useless (see #783).
463
    #
464
    # Until this is fixed on the Ganeti side, construct a console info reply
465
    # directly.
466
    #
467
    # WARNING: This assumes that VNC runs on port network_port on
468
    #          the instance's primary node, and is probably
469
    #          hypervisor-specific.
470
    #
471
    log.debug("Getting console for vm %s", vm)
472

    
473
    console = {}
474
    console['kind'] = 'vnc'
475

    
476
    with pooled_rapi_client(vm) as client:
477
        i = client.GetInstance(vm.backend_vm_id)
478

    
479
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
480
        raise Exception("hv parameter serial_console cannot be true")
481
    console['host'] = i['pnode']
482
    console['port'] = i['network_port']
483

    
484
    return console
485

    
486

    
487
def get_instance_info(vm):
488
    with pooled_rapi_client(vm) as client:
489
        return client.GetInstance(vm.backend_vm_id)
490

    
491

    
492
def vm_exists_in_backend(vm):
493
    try:
494
        get_instance_info(vm)
495
        return True
496
    except GanetiApiError as e:
497
        if e.code == 404:
498
            return False
499
        raise e
500

    
501

    
502
def get_network_info(backend_network):
503
    with pooled_rapi_client(backend_network) as client:
504
        return client.GetNetwork(backend_network.network.backend_id)
505

    
506

    
507
def network_exists_in_backend(backend_network):
508
    try:
509
        get_network_info(backend_network)
510
        return True
511
    except GanetiApiError as e:
512
        if e.code == 404:
513
            return False
514

    
515

    
516
def create_network(network, backend, connect=True):
517
    """Create a network in a Ganeti backend"""
518
    log.debug("Creating network %s in backend %s", network, backend)
519

    
520
    job_id = _create_network(network, backend)
521

    
522
    if connect:
523
        job_ids = connect_network(network, backend, depends=[job_id])
524
        return job_ids
525
    else:
526
        return [job_id]
527

    
528

    
529
def _create_network(network, backend):
530
    """Create a network."""
531

    
532
    network_type = network.public and 'public' or 'private'
533

    
534
    tags = network.backend_tag
535
    if network.dhcp:
536
        tags.append('nfdhcpd')
537

    
538
    if network.public:
539
        conflicts_check = True
540
    else:
541
        conflicts_check = False
542

    
543
    try:
544
        bn = BackendNetwork.objects.get(network=network, backend=backend)
545
        mac_prefix = bn.mac_prefix
546
    except BackendNetwork.DoesNotExist:
547
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
548
                        " does not exist" % (network.id, backend.id))
549

    
550
    with pooled_rapi_client(backend) as client:
551
        return client.CreateNetwork(network_name=network.backend_id,
552
                                    network=network.subnet,
553
                                    network6=network.subnet6,
554
                                    gateway=network.gateway,
555
                                    gateway6=network.gateway6,
556
                                    network_type=network_type,
557
                                    mac_prefix=mac_prefix,
558
                                    conflicts_check=conflicts_check,
559
                                    tags=tags)
560

    
561

    
562
def connect_network(network, backend, depends=[], group=None):
563
    """Connect a network to nodegroups."""
564
    log.debug("Connecting network %s to backend %s", network, backend)
565

    
566
    if network.public:
567
        conflicts_check = True
568
    else:
569
        conflicts_check = False
570

    
571
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
572
    with pooled_rapi_client(backend) as client:
573
        groups = [group] if group is not None else client.GetGroups()
574
        job_ids = []
575
        for group in groups:
576
            job_id = client.ConnectNetwork(network.backend_id, group,
577
                                           network.mode, network.link,
578
                                           conflicts_check,
579
                                           depends=depends)
580
            job_ids.append(job_id)
581
    return job_ids
582

    
583

    
584
def delete_network(network, backend, disconnect=True):
585
    log.debug("Deleting network %s from backend %s", network, backend)
586

    
587
    depends = []
588
    if disconnect:
589
        depends = disconnect_network(network, backend)
590
    _delete_network(network, backend, depends=depends)
591

    
592

    
593
def _delete_network(network, backend, depends=[]):
594
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
595
    with pooled_rapi_client(backend) as client:
596
        return client.DeleteNetwork(network.backend_id, depends)
597

    
598

    
599
def disconnect_network(network, backend, group=None):
600
    log.debug("Disconnecting network %s to backend %s", network, backend)
601

    
602
    with pooled_rapi_client(backend) as client:
603
        groups = [group] if group is not None else client.GetGroups()
604
        job_ids = []
605
        for group in groups:
606
            job_id = client.DisconnectNetwork(network.backend_id, group)
607
            job_ids.append(job_id)
608
    return job_ids
609

    
610

    
611
def connect_to_network(vm, network, address=None):
612
    backend = vm.backend
613
    network = Network.objects.select_for_update().get(id=network.id)
614
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
615
                                                         network=network)
616
    depend_jobs = []
617
    if bnet.operstate != "ACTIVE":
618
        depend_jobs = create_network(network, backend, connect=True)
619

    
620
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
621

    
622
    nic = {'ip': address, 'network': network.backend_id}
623

    
624
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
625

    
626
    with pooled_rapi_client(vm) as client:
627
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
628
                                     hotplug=vm.backend.use_hotplug(),
629
                                     depends=depends,
630
                                     dry_run=settings.TEST)
631

    
632

    
633
def disconnect_from_network(vm, nic):
634
    op = [('remove', nic.index, {})]
635

    
636
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
637

    
638
    with pooled_rapi_client(vm) as client:
639
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
640
                                     hotplug=vm.backend.use_hotplug(),
641
                                     dry_run=settings.TEST)
642

    
643

    
644
def set_firewall_profile(vm, profile):
645
    try:
646
        tag = _firewall_tags[profile]
647
    except KeyError:
648
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
649

    
650
    log.debug("Setting tag of VM %s to %s", vm, profile)
651

    
652
    with pooled_rapi_client(vm) as client:
653
        # Delete all firewall tags
654
        for t in _firewall_tags.values():
655
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
656
                                      dry_run=settings.TEST)
657

    
658
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
659

    
660
        # XXX NOP ModifyInstance call to force process_net_status to run
661
        # on the dispatcher
662
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
663
        client.ModifyInstance(vm.backend_vm_id,
664
                              os_name=os_name)
665

    
666

    
667
def get_ganeti_instances(backend=None, bulk=False):
668
    instances = []
669
    for backend in get_backends(backend):
670
        with pooled_rapi_client(backend) as client:
671
            instances.append(client.GetInstances(bulk=bulk))
672

    
673
    return reduce(list.__add__, instances, [])
674

    
675

    
676
def get_ganeti_nodes(backend=None, bulk=False):
677
    nodes = []
678
    for backend in get_backends(backend):
679
        with pooled_rapi_client(backend) as client:
680
            nodes.append(client.GetNodes(bulk=bulk))
681

    
682
    return reduce(list.__add__, nodes, [])
683

    
684

    
685
def get_ganeti_jobs(backend=None, bulk=False):
686
    jobs = []
687
    for backend in get_backends(backend):
688
        with pooled_rapi_client(backend) as client:
689
            jobs.append(client.GetJobs(bulk=bulk))
690
    return reduce(list.__add__, jobs, [])
691

    
692
##
693
##
694
##
695

    
696

    
697
def get_backends(backend=None):
698
    if backend:
699
        if backend.offline:
700
            return []
701
        return [backend]
702
    return Backend.objects.filter(offline=False)
703

    
704

    
705
def get_physical_resources(backend):
706
    """ Get the physical resources of a backend.
707

708
    Get the resources of a backend as reported by the backend (not the db).
709

710
    """
711
    nodes = get_ganeti_nodes(backend, bulk=True)
712
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
713
    res = {}
714
    for a in attr:
715
        res[a] = 0
716
    for n in nodes:
717
        # Filter out drained, offline and not vm_capable nodes since they will
718
        # not take part in the vm allocation process
719
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
720
        if can_host_vms and n['cnodes']:
721
            for a in attr:
722
                res[a] += int(n[a])
723
    return res
724

    
725

    
726
def update_resources(backend, resources=None):
727
    """ Update the state of the backend resources in db.
728

729
    """
730

    
731
    if not resources:
732
        resources = get_physical_resources(backend)
733

    
734
    backend.mfree = resources['mfree']
735
    backend.mtotal = resources['mtotal']
736
    backend.dfree = resources['dfree']
737
    backend.dtotal = resources['dtotal']
738
    backend.pinst_cnt = resources['pinst_cnt']
739
    backend.ctotal = resources['ctotal']
740
    backend.updated = datetime.now()
741
    backend.save()
742

    
743

    
744
def get_memory_from_instances(backend):
745
    """ Get the memory that is used from instances.
746

747
    Get the used memory of a backend. Note: This is different for
748
    the real memory used, due to kvm's memory de-duplication.
749

750
    """
751
    with pooled_rapi_client(backend) as client:
752
        instances = client.GetInstances(bulk=True)
753
    mem = 0
754
    for i in instances:
755
        mem += i['oper_ram']
756
    return mem
757

    
758
##
759
## Synchronized operations for reconciliation
760
##
761

    
762

    
763
def create_network_synced(network, backend):
764
    result = _create_network_synced(network, backend)
765
    if result[0] != 'success':
766
        return result
767
    result = connect_network_synced(network, backend)
768
    return result
769

    
770

    
771
def _create_network_synced(network, backend):
772
    with pooled_rapi_client(backend) as client:
773
        job = _create_network(network, backend)
774
        result = wait_for_job(client, job)
775
    return result
776

    
777

    
778
def connect_network_synced(network, backend):
779
    with pooled_rapi_client(backend) as client:
780
        for group in client.GetGroups():
781
            job = client.ConnectNetwork(network.backend_id, group,
782
                                        network.mode, network.link)
783
            result = wait_for_job(client, job)
784
            if result[0] != 'success':
785
                return result
786

    
787
    return result
788

    
789

    
790
def wait_for_job(client, jobid):
791
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
792
    status = result['job_info'][0]
793
    while status not in ['success', 'error', 'cancel']:
794
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
795
                                         [result], None)
796
        status = result['job_info'][0]
797

    
798
    if status == 'success':
799
        return (status, None)
800
    else:
801
        error = result['job_info'][1]
802
        return (status, error)