Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 07602322

History | View | Annotate | Download (26.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic)
40
from synnefo.logic import utils
41
from synnefo import quotas
42
from synnefo.api.util import release_resource
43
from synnefo.util.mac2eui64 import mac2eui64
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80

    
81
    # Update the NICs of the VM
82
    if status == "success" and nics is not None:
83
        _process_net_status(vm, etime, nics)
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if (status == 'success' or
96
           (status == 'error' and (vm.operstate == 'ERROR' or
97
                                   vm.action == 'DESTROY'))):
98
            _process_net_status(vm, etime, nics=[])
99
            vm.deleted = True
100
            vm.operstate = state_for_success
101
            vm.backendtime = etime
102
            # Issue and accept commission to Quotaholder
103
            quotas.issue_and_accept_commission(vm, delete=True)
104

    
105
    # Update backendtime only for jobs that have been successfully completed,
106
    # since only these jobs update the state of the VM. Else a "race condition"
107
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
108
    # before an error job and messages arrive in reversed order.
109
    if status == 'success':
110
        vm.backendtime = etime
111

    
112
    vm.save()
113

    
114

    
115
@transaction.commit_on_success
116
def process_net_status(vm, etime, nics):
117
    """Wrap _process_net_status inside transaction."""
118
    _process_net_status(vm, etime, nics)
119

    
120

    
121
def _process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133
        return
134

    
135
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
136
    # guarantee that no deadlock will occur with Backend allocator.
137
    Backend.objects.select_for_update().get(id=vm.backend_id)
138

    
139
    release_instance_nics(vm)
140

    
141
    for nic in ganeti_nics:
142
        ipv4 = nic.get('ipv4', '')
143
        net = nic['network']
144
        if ipv4:
145
            net.reserve_address(ipv4)
146

    
147
        nic['dirty'] = False
148
        vm.nics.create(**nic)
149
        # Dummy save the network, because UI uses changed-since for VMs
150
        # and Networks in order to show the VM NICs
151
        net.save()
152

    
153
    vm.backendtime = etime
154
    vm.save()
155

    
156

    
157
def process_ganeti_nics(ganeti_nics):
158
    """Process NIC dict from ganeti hooks."""
159
    new_nics = []
160
    for i, new_nic in enumerate(ganeti_nics):
161
        network = new_nic.get('network', '')
162
        n = str(network)
163
        pk = utils.id_from_network_name(n)
164

    
165
        net = Network.objects.get(pk=pk)
166

    
167
        # Get the new nic info
168
        mac = new_nic.get('mac', '')
169
        ipv4 = new_nic.get('ip', '')
170
        if net.subnet6:
171
            ipv6 = mac2eui64(mac, net.subnet6)
172
        else:
173
            ipv6 = ''
174

    
175
        firewall = new_nic.get('firewall', '')
176
        firewall_profile = _reverse_tags.get(firewall, '')
177
        if not firewall_profile and net.public:
178
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
179

    
180
        nic = {
181
            'index': i,
182
            'network': net,
183
            'mac': mac,
184
            'ipv4': ipv4,
185
            'ipv6': ipv6,
186
            'firewall_profile': firewall_profile,
187
            'state': 'ACTIVE'}
188

    
189
        new_nics.append(nic)
190
    return new_nics
191

    
192

    
193
def nics_changed(old_nics, new_nics):
194
    """Return True if NICs have changed in any way."""
195
    if len(old_nics) != len(new_nics):
196
        return True
197
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
198
    for old_nic, new_nic in zip(old_nics, new_nics):
199
        for field in fields:
200
            if getattr(old_nic, field) != new_nic[field]:
201
                return True
202
    return False
203

    
204

    
205
def release_instance_nics(vm):
206
    for nic in vm.nics.all():
207
        net = nic.network
208
        if nic.ipv4:
209
            net.release_address(nic.ipv4)
210
        nic.delete()
211
        net.save()
212

    
213

    
214
@transaction.commit_on_success
215
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
216
    if status not in [x[0] for x in BACKEND_STATUSES]:
217
        raise Network.InvalidBackendMsgError(opcode, status)
218

    
219
    back_network.backendjobid = jobid
220
    back_network.backendjobstatus = status
221
    back_network.backendopcode = opcode
222
    back_network.backendlogmsg = logmsg
223

    
224
    network = back_network.network
225

    
226
    # Notifications of success change the operating state
227
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
228
    if status == 'success' and state_for_success is not None:
229
        back_network.operstate = state_for_success
230

    
231
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
232
        back_network.operstate = 'ERROR'
233
        back_network.backendtime = etime
234

    
235
    if opcode == 'OP_NETWORK_REMOVE':
236
        if (status == 'success' or
237
           (status == 'error' and (back_network.operstate == 'ERROR' or
238
                                   network.action == 'DESTROY'))):
239
            back_network.operstate = state_for_success
240
            back_network.deleted = True
241
            back_network.backendtime = etime
242

    
243
    if status == 'success':
244
        back_network.backendtime = etime
245
    back_network.save()
246
    # Also you must update the state of the Network!!
247
    update_network_state(network)
248

    
249

    
250
def update_network_state(network):
251
    """Update the state of a Network based on BackendNetwork states.
252

253
    Update the state of a Network based on the operstate of the networks in the
254
    backends that network exists.
255

256
    The state of the network is:
257
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
258
    * DELETED: If it is is 'DELETED' in all backends that have been created.
259

260
    This function also releases the resources (MAC prefix or Bridge) and the
261
    quotas for the network.
262

263
    """
264
    if network.deleted:
265
        # Network has already been deleted. Just assert that state is also
266
        # DELETED
267
        if not network.state == "DELETED":
268
            network.state = "DELETED"
269
            network.save()
270
        return
271

    
272
    backend_states = [s.operstate for s in network.backend_networks.all()]
273
    if not backend_states and network.action != "DESTROY":
274
        if network.state != "ACTIVE":
275
            network.state = "ACTIVE"
276
            network.save()
277
            return
278

    
279
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
280
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
281
                     "DELETED")
282

    
283
    # Release the resources on the deletion of the Network
284
    if deleted:
285
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
286
                 network.id, network.mac_prefix, network.link)
287
        network.deleted = True
288
        network.state = "DELETED"
289
        if network.mac_prefix:
290
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
291
                release_resource(res_type="mac_prefix",
292
                                 value=network.mac_prefix)
293
        if network.link:
294
            if network.FLAVORS[network.flavor]["link"] == "pool":
295
                release_resource(res_type="bridge", value=network.link)
296

    
297
        # Issue commission
298
        if network.userid:
299
            quotas.issue_and_accept_commission(network, delete=True)
300
        elif not network.public:
301
            log.warning("Network %s does not have an owner!", network.id)
302
    network.save()
303

    
304

    
305
@transaction.commit_on_success
306
def process_network_modify(back_network, etime, jobid, opcode, status,
307
                           add_reserved_ips, remove_reserved_ips):
308
    assert (opcode == "OP_NETWORK_SET_PARAMS")
309
    if status not in [x[0] for x in BACKEND_STATUSES]:
310
        raise Network.InvalidBackendMsgError(opcode, status)
311

    
312
    back_network.backendjobid = jobid
313
    back_network.backendjobstatus = status
314
    back_network.opcode = opcode
315

    
316
    if add_reserved_ips or remove_reserved_ips:
317
        net = back_network.network
318
        pool = net.get_pool()
319
        if add_reserved_ips:
320
            for ip in add_reserved_ips:
321
                pool.reserve(ip, external=True)
322
        if remove_reserved_ips:
323
            for ip in remove_reserved_ips:
324
                pool.put(ip, external=True)
325
        pool.save()
326

    
327
    if status == 'success':
328
        back_network.backendtime = etime
329
    back_network.save()
330

    
331

    
332
@transaction.commit_on_success
333
def process_create_progress(vm, etime, progress):
334

    
335
    percentage = int(progress)
336

    
337
    # The percentage may exceed 100%, due to the way
338
    # snf-image:copy-progress tracks bytes read by image handling processes
339
    percentage = 100 if percentage > 100 else percentage
340
    if percentage < 0:
341
        raise ValueError("Percentage cannot be negative")
342

    
343
    # FIXME: log a warning here, see #1033
344
#   if last_update > percentage:
345
#       raise ValueError("Build percentage should increase monotonically " \
346
#                        "(old = %d, new = %d)" % (last_update, percentage))
347

    
348
    # This assumes that no message of type 'ganeti-create-progress' is going to
349
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
350
    # the instance is STARTED.  What if the two messages are processed by two
351
    # separate dispatcher threads, and the 'ganeti-op-status' message for
352
    # successful creation gets processed before the 'ganeti-create-progress'
353
    # message? [vkoukis]
354
    #
355
    #if not vm.operstate == 'BUILD':
356
    #    raise VirtualMachine.IllegalState("VM is not in building state")
357

    
358
    vm.buildpercentage = percentage
359
    vm.backendtime = etime
360
    vm.save()
361

    
362

    
363
@transaction.commit_on_success
364
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
365
                               details=None):
366
    """
367
    Create virtual machine instance diagnostic entry.
368

369
    :param vm: VirtualMachine instance to create diagnostic for.
370
    :param message: Diagnostic message.
371
    :param source: Diagnostic source identifier (e.g. image-helper).
372
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
373
    :param etime: The time the message occured (if available).
374
    :param details: Additional details or debug information.
375
    """
376
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
377
                                                   source_date=etime,
378
                                                   message=message,
379
                                                   details=details)
380

    
381

    
382
def create_instance(vm, public_nic, flavor, image):
383
    """`image` is a dictionary which should contain the keys:
384
            'backend_id', 'format' and 'metadata'
385

386
        metadata value should be a dictionary.
387
    """
388

    
389
    # Handle arguments to CreateInstance() as a dictionary,
390
    # initialize it based on a deployment-specific value.
391
    # This enables the administrator to override deployment-specific
392
    # arguments, such as the disk template to use, name of os provider
393
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
394
    #
395
    kw = vm.backend.get_create_params()
396
    kw['mode'] = 'create'
397
    kw['name'] = vm.backend_vm_id
398
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
399

    
400
    kw['disk_template'] = flavor.disk_template
401
    kw['disks'] = [{"size": flavor.disk * 1024}]
402
    provider = flavor.disk_provider
403
    if provider:
404
        kw['disks'][0]['provider'] = provider
405
        kw['disks'][0]['origin'] = flavor.disk_origin
406

    
407
    kw['nics'] = [public_nic]
408
    if vm.backend.use_hotplug():
409
        kw['hotplug'] = True
410
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
411
    # kw['os'] = settings.GANETI_OS_PROVIDER
412
    kw['ip_check'] = False
413
    kw['name_check'] = False
414

    
415
    # Do not specific a node explicitly, have
416
    # Ganeti use an iallocator instead
417
    #kw['pnode'] = rapi.GetNodes()[0]
418

    
419
    kw['dry_run'] = settings.TEST
420

    
421
    kw['beparams'] = {
422
        'auto_balance': True,
423
        'vcpus': flavor.cpu,
424
        'memory': flavor.ram}
425

    
426
    kw['osparams'] = {
427
        'config_url': vm.config_url,
428
        # Store image id and format to Ganeti
429
        'img_id': image['backend_id'],
430
        'img_format': image['format']}
431

    
432
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
433
    # kw['hvparams'] = dict(serial_console=False)
434

    
435
    log.debug("Creating instance %s", utils.hide_pass(kw))
436
    with pooled_rapi_client(vm) as client:
437
        return client.CreateInstance(**kw)
438

    
439

    
440
def delete_instance(vm):
441
    with pooled_rapi_client(vm) as client:
442
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
443

    
444

    
445
def reboot_instance(vm, reboot_type):
446
    assert reboot_type in ('soft', 'hard')
447
    with pooled_rapi_client(vm) as client:
448
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
449
                                     dry_run=settings.TEST)
450

    
451

    
452
def startup_instance(vm):
453
    with pooled_rapi_client(vm) as client:
454
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
455

    
456

    
457
def shutdown_instance(vm):
458
    with pooled_rapi_client(vm) as client:
459
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
460

    
461

    
462
def get_instance_console(vm):
463
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
464
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
465
    # useless (see #783).
466
    #
467
    # Until this is fixed on the Ganeti side, construct a console info reply
468
    # directly.
469
    #
470
    # WARNING: This assumes that VNC runs on port network_port on
471
    #          the instance's primary node, and is probably
472
    #          hypervisor-specific.
473
    #
474
    log.debug("Getting console for vm %s", vm)
475

    
476
    console = {}
477
    console['kind'] = 'vnc'
478

    
479
    with pooled_rapi_client(vm) as client:
480
        i = client.GetInstance(vm.backend_vm_id)
481

    
482
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
483
        raise Exception("hv parameter serial_console cannot be true")
484
    console['host'] = i['pnode']
485
    console['port'] = i['network_port']
486

    
487
    return console
488

    
489

    
490
def get_instance_info(vm):
491
    with pooled_rapi_client(vm) as client:
492
        return client.GetInstanceInfo(vm.backend_vm_id)
493

    
494

    
495
def create_network(network, backend, connect=True):
496
    """Create a network in a Ganeti backend"""
497
    log.debug("Creating network %s in backend %s", network, backend)
498

    
499
    job_id = _create_network(network, backend)
500

    
501
    if connect:
502
        job_ids = connect_network(network, backend, depends=[job_id])
503
        return job_ids
504
    else:
505
        return [job_id]
506

    
507

    
508
def _create_network(network, backend):
509
    """Create a network."""
510

    
511
    network_type = network.public and 'public' or 'private'
512

    
513
    tags = network.backend_tag
514
    if network.dhcp:
515
        tags.append('nfdhcpd')
516

    
517
    if network.public:
518
        conflicts_check = True
519
    else:
520
        conflicts_check = False
521

    
522
    try:
523
        bn = BackendNetwork.objects.get(network=network, backend=backend)
524
        mac_prefix = bn.mac_prefix
525
    except BackendNetwork.DoesNotExist:
526
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
527
                        " does not exist" % (network.id, backend.id))
528

    
529
    with pooled_rapi_client(backend) as client:
530
        return client.CreateNetwork(network_name=network.backend_id,
531
                                    network=network.subnet,
532
                                    network6=network.subnet6,
533
                                    gateway=network.gateway,
534
                                    gateway6=network.gateway6,
535
                                    network_type=network_type,
536
                                    mac_prefix=mac_prefix,
537
                                    conflicts_check=conflicts_check,
538
                                    tags=tags)
539

    
540

    
541
def connect_network(network, backend, depends=[], group=None):
542
    """Connect a network to nodegroups."""
543
    log.debug("Connecting network %s to backend %s", network, backend)
544

    
545
    if network.public:
546
        conflicts_check = True
547
    else:
548
        conflicts_check = False
549

    
550
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
551
    with pooled_rapi_client(backend) as client:
552
        groups = [group] if group is not None else client.GetGroups()
553
        job_ids = []
554
        for group in groups:
555
            job_id = client.ConnectNetwork(network.backend_id, group,
556
                                           network.mode, network.link,
557
                                           conflicts_check,
558
                                           depends=depends)
559
            job_ids.append(job_id)
560
    return job_ids
561

    
562

    
563
def delete_network(network, backend, disconnect=True):
564
    log.debug("Deleting network %s from backend %s", network, backend)
565

    
566
    depends = []
567
    if disconnect:
568
        depends = disconnect_network(network, backend)
569
    _delete_network(network, backend, depends=depends)
570

    
571

    
572
def _delete_network(network, backend, depends=[]):
573
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
574
    with pooled_rapi_client(backend) as client:
575
        return client.DeleteNetwork(network.backend_id, depends)
576

    
577

    
578
def disconnect_network(network, backend, group=None):
579
    log.debug("Disconnecting network %s to backend %s", network, backend)
580

    
581
    with pooled_rapi_client(backend) as client:
582
        groups = [group] if group is not None else client.GetGroups()
583
        job_ids = []
584
        for group in groups:
585
            job_id = client.DisconnectNetwork(network.backend_id, group)
586
            job_ids.append(job_id)
587
    return job_ids
588

    
589

    
590
def connect_to_network(vm, network, address=None):
591
    backend = vm.backend
592
    network = Network.objects.select_for_update().get(id=network.id)
593
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
594
                                                         network=network)
595
    depend_jobs = []
596
    if bnet.operstate != "ACTIVE":
597
        depend_jobs = create_network(network, backend, connect=True)
598

    
599
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
600

    
601
    nic = {'ip': address, 'network': network.backend_id}
602

    
603
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
604

    
605
    with pooled_rapi_client(vm) as client:
606
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
607
                                     hotplug=vm.backend.use_hotplug(),
608
                                     depends=depends,
609
                                     dry_run=settings.TEST)
610

    
611

    
612
def disconnect_from_network(vm, nic):
613
    op = [('remove', nic.index, {})]
614

    
615
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
616

    
617
    with pooled_rapi_client(vm) as client:
618
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
619
                                     hotplug=vm.backend.use_hotplug(),
620
                                     dry_run=settings.TEST)
621

    
622

    
623
def set_firewall_profile(vm, profile):
624
    try:
625
        tag = _firewall_tags[profile]
626
    except KeyError:
627
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
628

    
629
    log.debug("Setting tag of VM %s to %s", vm, profile)
630

    
631
    with pooled_rapi_client(vm) as client:
632
        # Delete all firewall tags
633
        for t in _firewall_tags.values():
634
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
635
                                      dry_run=settings.TEST)
636

    
637
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
638

    
639
        # XXX NOP ModifyInstance call to force process_net_status to run
640
        # on the dispatcher
641
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
642
        client.ModifyInstance(vm.backend_vm_id,
643
                              os_name=os_name)
644

    
645

    
646
def get_ganeti_instances(backend=None, bulk=False):
647
    instances = []
648
    for backend in get_backends(backend):
649
        with pooled_rapi_client(backend) as client:
650
            instances.append(client.GetInstances(bulk=bulk))
651

    
652
    return reduce(list.__add__, instances, [])
653

    
654

    
655
def get_ganeti_nodes(backend=None, bulk=False):
656
    nodes = []
657
    for backend in get_backends(backend):
658
        with pooled_rapi_client(backend) as client:
659
            nodes.append(client.GetNodes(bulk=bulk))
660

    
661
    return reduce(list.__add__, nodes, [])
662

    
663

    
664
def get_ganeti_jobs(backend=None, bulk=False):
665
    jobs = []
666
    for backend in get_backends(backend):
667
        with pooled_rapi_client(backend) as client:
668
            jobs.append(client.GetJobs(bulk=bulk))
669
    return reduce(list.__add__, jobs, [])
670

    
671
##
672
##
673
##
674

    
675

    
676
def get_backends(backend=None):
677
    if backend:
678
        if backend.offline:
679
            return []
680
        return [backend]
681
    return Backend.objects.filter(offline=False)
682

    
683

    
684
def get_physical_resources(backend):
685
    """ Get the physical resources of a backend.
686

687
    Get the resources of a backend as reported by the backend (not the db).
688

689
    """
690
    nodes = get_ganeti_nodes(backend, bulk=True)
691
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
692
    res = {}
693
    for a in attr:
694
        res[a] = 0
695
    for n in nodes:
696
        # Filter out drained, offline and not vm_capable nodes since they will
697
        # not take part in the vm allocation process
698
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
699
        if can_host_vms and n['cnodes']:
700
            for a in attr:
701
                res[a] += int(n[a])
702
    return res
703

    
704

    
705
def update_resources(backend, resources=None):
706
    """ Update the state of the backend resources in db.
707

708
    """
709

    
710
    if not resources:
711
        resources = get_physical_resources(backend)
712

    
713
    backend.mfree = resources['mfree']
714
    backend.mtotal = resources['mtotal']
715
    backend.dfree = resources['dfree']
716
    backend.dtotal = resources['dtotal']
717
    backend.pinst_cnt = resources['pinst_cnt']
718
    backend.ctotal = resources['ctotal']
719
    backend.updated = datetime.now()
720
    backend.save()
721

    
722

    
723
def get_memory_from_instances(backend):
724
    """ Get the memory that is used from instances.
725

726
    Get the used memory of a backend. Note: This is different for
727
    the real memory used, due to kvm's memory de-duplication.
728

729
    """
730
    with pooled_rapi_client(backend) as client:
731
        instances = client.GetInstances(bulk=True)
732
    mem = 0
733
    for i in instances:
734
        mem += i['oper_ram']
735
    return mem
736

    
737
##
738
## Synchronized operations for reconciliation
739
##
740

    
741

    
742
def create_network_synced(network, backend):
743
    result = _create_network_synced(network, backend)
744
    if result[0] != 'success':
745
        return result
746
    result = connect_network_synced(network, backend)
747
    return result
748

    
749

    
750
def _create_network_synced(network, backend):
751
    with pooled_rapi_client(backend) as client:
752
        job = _create_network(network, backend)
753
        result = wait_for_job(client, job)
754
    return result
755

    
756

    
757
def connect_network_synced(network, backend):
758
    with pooled_rapi_client(backend) as client:
759
        for group in client.GetGroups():
760
            job = client.ConnectNetwork(network.backend_id, group,
761
                                        network.mode, network.link)
762
            result = wait_for_job(client, job)
763
            if result[0] != 'success':
764
                return result
765

    
766
    return result
767

    
768

    
769
def wait_for_job(client, jobid):
770
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
771
    status = result['job_info'][0]
772
    while status not in ['success', 'error', 'cancel']:
773
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
774
                                         [result], None)
775
        status = result['job_info'][0]
776

    
777
    if status == 'success':
778
        return (status, None)
779
    else:
780
        error = result['job_info'][1]
781
        return (status, error)