Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ b578d9e7

History | View | Annotate | Download (24.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, VirtualMachineDiagnostic)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80

    
81

    
82
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
83
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
84
        vm.operstate = 'ERROR'
85
        vm.backendtime = etime
86

    
87
    if opcode == 'OP_INSTANCE_REMOVE':
88
        # Set the deleted flag explicitly, cater for admin-initiated removals
89
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
90
        # when no instance exists at the Ganeti backend.
91
        # See ticket #799 for all the details.
92
        #
93
        if status == 'success' or (status == 'error' and
94
                                   vm.operstate == 'ERROR'):
95
            release_instance_nics(vm)
96
            vm.nics.all().delete()
97
            vm.deleted = True
98
            vm.operstate = state_for_success
99
            vm.backendtime = etime
100

    
101
    # Update backendtime only for jobs that have been successfully completed,
102
    # since only these jobs update the state of the VM. Else a "race condition"
103
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
104
    # before an error job and messages arrive in reversed order.
105
    if status == 'success':
106
        vm.backendtime = etime
107

    
108
    vm.save()
109

    
110

    
111
@transaction.commit_on_success
112
def process_net_status(vm, etime, nics):
113
    """Process a net status notification from the backend
114

115
    Process an incoming message from the Ganeti backend,
116
    detailing the NIC configuration of a VM instance.
117

118
    Update the state of the VM in the DB accordingly.
119
    """
120

    
121
    ganeti_nics = process_ganeti_nics(nics)
122
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
123
        log.debug("NICs for VM %s have not changed", vm)
124

    
125
    release_instance_nics(vm)
126

    
127
    for nic in ganeti_nics:
128
        ipv4 = nic.get('ipv4', '')
129
        if ipv4:
130
            net = nic['network']
131
            net.reserve_address(ipv4)
132

    
133
        nic['dirty'] = False
134
        vm.nics.create(**nic)
135
        # Dummy save the network, because UI uses changed-since for VMs
136
        # and Networks in order to show the VM NICs
137
        net.save()
138

    
139
    vm.backendtime = etime
140
    vm.save()
141

    
142

    
143
def process_ganeti_nics(ganeti_nics):
144
    """Process NIC dict from ganeti hooks."""
145
    new_nics = []
146
    for i, new_nic in enumerate(ganeti_nics):
147
        network = new_nic.get('network', '')
148
        n = str(network)
149
        pk = utils.id_from_network_name(n)
150

    
151
        net = Network.objects.get(pk=pk)
152

    
153
        # Get the new nic info
154
        mac = new_nic.get('mac', '')
155
        ipv4 = new_nic.get('ip', '')
156
        ipv6 = new_nic.get('ipv6', '')
157

    
158
        firewall = new_nic.get('firewall', '')
159
        firewall_profile = _reverse_tags.get(firewall, '')
160
        if not firewall_profile and net.public:
161
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
162

    
163
        nic = {
164
               'index': i,
165
               'network': net,
166
               'mac': mac,
167
               'ipv4': ipv4,
168
               'ipv6': ipv6,
169
               'firewall_profile': firewall_profile}
170

    
171
        new_nics.append(nic)
172
    return new_nics
173

    
174

    
175
def nics_changed(old_nics, new_nics):
176
    """Return True if NICs have changed in any way."""
177
    if len(old_nics) != len(new_nics):
178
        return True
179
    for old_nic, new_nic in zip(old_nics, new_nics):
180
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
181
                old_nic.ipv6 == new_nic['ipv6'] and\
182
                old_nic.mac == new_nic['mac'] and\
183
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
184
                old_nic.index == new_nic['index'] and\
185
                old_nic.network == new_nic['network']):
186
            return True
187
    return False
188

    
189

    
190
def release_instance_nics(vm):
191
    for nic in vm.nics.all():
192
        net = nic.network
193
        if nic.ipv4:
194
            net.release_address(nic.ipv4)
195
        nic.delete()
196
        net.save()
197

    
198

    
199
@transaction.commit_on_success
200
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
201
    if status not in [x[0] for x in BACKEND_STATUSES]:
202
        raise Network.InvalidBackendMsgError(opcode, status)
203

    
204
    back_network.backendjobid = jobid
205
    back_network.backendjobstatus = status
206
    back_network.backendopcode = opcode
207
    back_network.backendlogmsg = logmsg
208

    
209
    # Notifications of success change the operating state
210
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
211
    if status == 'success' and state_for_success is not None:
212
        back_network.operstate = state_for_success
213

    
214
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
215
        utils.update_state(back_network, 'ERROR')
216
        back_network.backendtime = etime
217

    
218
    if opcode == 'OP_NETWORK_REMOVE':
219
        if status == 'success' or (status == 'error' and
220
                                   back_network.operstate == 'ERROR'):
221
            back_network.operstate = state_for_success
222
            back_network.deleted = True
223
            back_network.backendtime = etime
224

    
225
    if status == 'success':
226
        back_network.backendtime = etime
227
    back_network.save()
228

    
229

    
230
@transaction.commit_on_success
231
def process_network_modify(back_network, etime, jobid, opcode, status,
232
                           add_reserved_ips, remove_reserved_ips):
233
    assert (opcode == "OP_NETWORK_SET_PARAMS")
234
    if status not in [x[0] for x in BACKEND_STATUSES]:
235
        raise Network.InvalidBackendMsgError(opcode, status)
236

    
237
    back_network.backendjobid = jobid
238
    back_network.backendjobstatus = status
239
    back_network.opcode = opcode
240

    
241
    if add_reserved_ips or remove_reserved_ips:
242
        net = back_network.network
243
        pool = net.get_pool()
244
        if add_reserved_ips:
245
            for ip in add_reserved_ips:
246
                pool.reserve(ip, external=True)
247
        if remove_reserved_ips:
248
            for ip in remove_reserved_ips:
249
                pool.put(ip, external=True)
250
        pool.save()
251

    
252
    if status == 'success':
253
        back_network.backendtime = etime
254
    back_network.save()
255

    
256

    
257
@transaction.commit_on_success
258
def process_create_progress(vm, etime, progress):
259

    
260
    percentage = int(progress)
261

    
262
    # The percentage may exceed 100%, due to the way
263
    # snf-image:copy-progress tracks bytes read by image handling processes
264
    percentage = 100 if percentage > 100 else percentage
265
    if percentage < 0:
266
        raise ValueError("Percentage cannot be negative")
267

    
268
    # FIXME: log a warning here, see #1033
269
#   if last_update > percentage:
270
#       raise ValueError("Build percentage should increase monotonically " \
271
#                        "(old = %d, new = %d)" % (last_update, percentage))
272

    
273
    # This assumes that no message of type 'ganeti-create-progress' is going to
274
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
275
    # the instance is STARTED.  What if the two messages are processed by two
276
    # separate dispatcher threads, and the 'ganeti-op-status' message for
277
    # successful creation gets processed before the 'ganeti-create-progress'
278
    # message? [vkoukis]
279
    #
280
    #if not vm.operstate == 'BUILD':
281
    #    raise VirtualMachine.IllegalState("VM is not in building state")
282

    
283
    vm.buildpercentage = percentage
284
    vm.backendtime = etime
285
    vm.save()
286

    
287

    
288
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
289
    details=None):
290
    """
291
    Create virtual machine instance diagnostic entry.
292

293
    :param vm: VirtualMachine instance to create diagnostic for.
294
    :param message: Diagnostic message.
295
    :param source: Diagnostic source identifier (e.g. image-helper).
296
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
297
    :param etime: The time the message occured (if available).
298
    :param details: Additional details or debug information.
299
    """
300
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
301
            source_date=etime, message=message, details=details)
302

    
303

    
304
def create_instance(vm, public_nic, flavor, image, password, personality):
305
    """`image` is a dictionary which should contain the keys:
306
            'backend_id', 'format' and 'metadata'
307

308
        metadata value should be a dictionary.
309
    """
310

    
311
    if settings.IGNORE_FLAVOR_DISK_SIZES:
312
        if image['backend_id'].find("windows") >= 0:
313
            sz = 14000
314
        else:
315
            sz = 4000
316
    else:
317
        sz = flavor.disk * 1024
318

    
319
    # Handle arguments to CreateInstance() as a dictionary,
320
    # initialize it based on a deployment-specific value.
321
    # This enables the administrator to override deployment-specific
322
    # arguments, such as the disk template to use, name of os provider
323
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
324
    #
325
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
326
    kw['mode'] = 'create'
327
    kw['name'] = vm.backend_vm_id
328
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
329

    
330
    # Identify if provider parameter should be set in disk options.
331
    # Current implementation support providers only fo ext template.
332
    # To select specific provider for an ext template, template name
333
    # should be formated as `ext_<provider_name>`.
334
    provider = None
335
    disk_template = flavor.disk_template
336
    if flavor.disk_template.startswith("ext"):
337
        disk_template, provider = flavor.disk_template.split("_", 1)
338

    
339
    kw['disk_template'] = disk_template
340
    kw['disks'] = [{"size": sz}]
341
    if provider:
342
        kw['disks'][0]['provider'] = provider
343

    
344
        if provider == 'vlmc':
345
            kw['disks'][0]['origin'] = image['backend_id']
346

    
347
    kw['nics'] = [public_nic]
348
    if settings.GANETI_USE_HOTPLUG:
349
        kw['hotplug'] = True
350
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
351
    # kw['os'] = settings.GANETI_OS_PROVIDER
352
    kw['ip_check'] = False
353
    kw['name_check'] = False
354
    # Do not specific a node explicitly, have
355
    # Ganeti use an iallocator instead
356
    #
357
    #kw['pnode'] = rapi.GetNodes()[0]
358
    kw['dry_run'] = settings.TEST
359

    
360
    kw['beparams'] = {
361
        'auto_balance': True,
362
        'vcpus': flavor.cpu,
363
        'memory': flavor.ram}
364

    
365
    kw['osparams'] = {
366
        'img_id': image['backend_id'],
367
        'img_passwd': password,
368
        'img_format': image['format']}
369
    if personality:
370
        kw['osparams']['img_personality'] = json.dumps(personality)
371

    
372
    if provider != None and provider == 'vlmc':
373
        kw['osparams']['img_id'] = 'null'
374

    
375
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
376

    
377
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
378
    # kw['hvparams'] = dict(serial_console=False)
379
    log.debug("Creating instance %s", utils.hide_pass(kw))
380
    with pooled_rapi_client(vm) as client:
381
        return client.CreateInstance(**kw)
382

    
383

    
384
def delete_instance(vm):
385
    with pooled_rapi_client(vm) as client:
386
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
387

    
388

    
389
def reboot_instance(vm, reboot_type):
390
    assert reboot_type in ('soft', 'hard')
391
    with pooled_rapi_client(vm) as client:
392
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
393
                                     dry_run=settings.TEST)
394

    
395

    
396
def startup_instance(vm):
397
    with pooled_rapi_client(vm) as client:
398
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
399

    
400

    
401
def shutdown_instance(vm):
402
    with pooled_rapi_client(vm) as client:
403
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
404

    
405

    
406
def get_instance_console(vm):
407
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
408
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
409
    # useless (see #783).
410
    #
411
    # Until this is fixed on the Ganeti side, construct a console info reply
412
    # directly.
413
    #
414
    # WARNING: This assumes that VNC runs on port network_port on
415
    #          the instance's primary node, and is probably
416
    #          hypervisor-specific.
417
    #
418
    log.debug("Getting console for vm %s", vm)
419

    
420
    console = {}
421
    console['kind'] = 'vnc'
422

    
423
    with pooled_rapi_client(vm) as client:
424
        i = client.GetInstance(vm.backend_vm_id)
425

    
426
    if i['hvparams']['serial_console']:
427
        raise Exception("hv parameter serial_console cannot be true")
428
    console['host'] = i['pnode']
429
    console['port'] = i['network_port']
430

    
431
    return console
432

    
433

    
434
def get_instance_info(vm):
435
    with pooled_rapi_client(vm) as client:
436
        return client.GetInstanceInfo(vm.backend_vm_id)
437

    
438

    
439
def create_network(network, backends=None, connect=True):
440
    """Create and connect a network."""
441
    if not backends:
442
        backends = Backend.objects.exclude(offline=True)
443

    
444
    log.debug("Creating network %s in backends %s", network, backends)
445

    
446
    for backend in backends:
447
        create_jobID = _create_network(network, backend)
448
        if connect:
449
            connect_network(network, backend, create_jobID)
450

    
451

    
452
def _create_network(network, backend):
453
    """Create a network."""
454

    
455
    network_type = network.public and 'public' or 'private'
456

    
457
    tags = network.backend_tag
458
    if network.dhcp:
459
        tags.append('nfdhcpd')
460

    
461
    if network.public:
462
        conflicts_check = True
463
    else:
464
        conflicts_check = False
465

    
466
    try:
467
        bn = BackendNetwork.objects.get(network=network, backend=backend)
468
        mac_prefix = bn.mac_prefix
469
    except BackendNetwork.DoesNotExist:
470
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
471
                        " does not exist" % (network.id, backend.id))
472

    
473
    with pooled_rapi_client(backend) as client:
474
        return client.CreateNetwork(network_name=network.backend_id,
475
                                    network=network.subnet,
476
                                    network6=network.subnet6,
477
                                    gateway=network.gateway,
478
                                    gateway6=network.gateway6,
479
                                    network_type=network_type,
480
                                    mac_prefix=mac_prefix,
481
                                    conflicts_check=conflicts_check,
482
                                    tags=tags)
483

    
484

    
485
def connect_network(network, backend, depend_job=None, group=None):
486
    """Connect a network to nodegroups."""
487
    log.debug("Connecting network %s to backend %s", network, backend)
488

    
489
    mode = "routed" if "ROUTED" in network.type else "bridged"
490

    
491
    if network.public:
492
        conflicts_check = True
493
    else:
494
        conflicts_check = False
495

    
496
    depend_jobs = [depend_job] if depend_job else []
497
    with pooled_rapi_client(backend) as client:
498
        if group:
499
            client.ConnectNetwork(network.backend_id, group, mode,
500
                                  network.link, conflicts_check, depend_jobs)
501
        else:
502
            for group in client.GetGroups():
503
                client.ConnectNetwork(network.backend_id, group, mode,
504
                                      network.link, conflicts_check,
505
                                      depend_jobs)
506

    
507

    
508
def delete_network(network, backends=None, disconnect=True):
509
    if not backends:
510
        backends = Backend.objects.exclude(offline=True)
511

    
512
    log.debug("Deleting network %s from backends %s", network, backends)
513

    
514
    for backend in backends:
515
        disconnect_jobIDs = []
516
        if disconnect:
517
            disconnect_jobIDs = disconnect_network(network, backend)
518
        _delete_network(network, backend, disconnect_jobIDs)
519

    
520

    
521
def _delete_network(network, backend, depend_jobs=[]):
522
    with pooled_rapi_client(backend) as client:
523
        return client.DeleteNetwork(network.backend_id, depend_jobs)
524

    
525

    
526
def disconnect_network(network, backend, group=None):
527
    log.debug("Disconnecting network %s to backend %s", network, backend)
528

    
529
    with pooled_rapi_client(backend) as client:
530
        if group:
531
            return [client.DisconnectNetwork(network.backend_id, group)]
532
        else:
533
            jobs = []
534
            for group in client.GetGroups():
535
                job = client.DisconnectNetwork(network.backend_id, group)
536
                jobs.append(job)
537
            return jobs
538

    
539

    
540
def connect_to_network(vm, network, address=None):
541
    nic = {'ip': address, 'network': network.backend_id}
542

    
543
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
544

    
545
    with pooled_rapi_client(vm) as client:
546
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
547
                                     hotplug=settings.GANETI_USE_HOTPLUG,
548
                                     dry_run=settings.TEST)
549

    
550

    
551
def disconnect_from_network(vm, nic):
552
    op = [('remove', nic.index, {})]
553

    
554
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
555

    
556
    with pooled_rapi_client(vm) as client:
557
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
558
                                     hotplug=settings.GANETI_USE_HOTPLUG,
559
                                     dry_run=settings.TEST)
560

    
561

    
562
def set_firewall_profile(vm, profile):
563
    try:
564
        tag = _firewall_tags[profile]
565
    except KeyError:
566
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
567

    
568
    log.debug("Setting tag of VM %s to %s", vm, profile)
569

    
570
    with pooled_rapi_client(vm) as client:
571
        # Delete all firewall tags
572
        for t in _firewall_tags.values():
573
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
574
                                      dry_run=settings.TEST)
575

    
576
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
577

    
578
        # XXX NOP ModifyInstance call to force process_net_status to run
579
        # on the dispatcher
580
        client.ModifyInstance(vm.backend_vm_id,
581
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
582

    
583

    
584
def get_ganeti_instances(backend=None, bulk=False):
585
    instances = []
586
    for backend in get_backends(backend):
587
        with pooled_rapi_client(backend) as client:
588
            instances.append(client.GetInstances(bulk=bulk))
589

    
590
    return reduce(list.__add__, instances, [])
591

    
592

    
593
def get_ganeti_nodes(backend=None, bulk=False):
594
    nodes = []
595
    for backend in get_backends(backend):
596
        with pooled_rapi_client(backend) as client:
597
            nodes.append(client.GetNodes(bulk=bulk))
598

    
599
    return reduce(list.__add__, nodes, [])
600

    
601

    
602
def get_ganeti_jobs(backend=None, bulk=False):
603
    jobs = []
604
    for backend in get_backends(backend):
605
        with pooled_rapi_client(backend) as client:
606
            jobs.append(client.GetJobs(bulk=bulk))
607
    return reduce(list.__add__, jobs, [])
608

    
609
##
610
##
611
##
612

    
613

    
614
def get_backends(backend=None):
615
    if backend:
616
        return [backend]
617
    return Backend.objects.filter(offline=False)
618

    
619

    
620
def get_physical_resources(backend):
621
    """ Get the physical resources of a backend.
622

623
    Get the resources of a backend as reported by the backend (not the db).
624

625
    """
626
    nodes = get_ganeti_nodes(backend, bulk=True)
627
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
628
    res = {}
629
    for a in attr:
630
        res[a] = 0
631
    for n in nodes:
632
        # Filter out drained, offline and not vm_capable nodes since they will
633
        # not take part in the vm allocation process
634
        if n['vm_capable'] and not n['drained'] and not n['offline']\
635
           and n['cnodes']:
636
            for a in attr:
637
                res[a] += int(n[a])
638
    return res
639

    
640

    
641
def update_resources(backend, resources=None):
642
    """ Update the state of the backend resources in db.
643

644
    """
645

    
646
    if not resources:
647
        resources = get_physical_resources(backend)
648

    
649
    backend.mfree = resources['mfree']
650
    backend.mtotal = resources['mtotal']
651
    backend.dfree = resources['dfree']
652
    backend.dtotal = resources['dtotal']
653
    backend.pinst_cnt = resources['pinst_cnt']
654
    backend.ctotal = resources['ctotal']
655
    backend.updated = datetime.now()
656
    backend.save()
657

    
658

    
659
def get_memory_from_instances(backend):
660
    """ Get the memory that is used from instances.
661

662
    Get the used memory of a backend. Note: This is different for
663
    the real memory used, due to kvm's memory de-duplication.
664

665
    """
666
    with pooled_rapi_client(backend) as client:
667
        instances = client.GetInstances(bulk=True)
668
    mem = 0
669
    for i in instances:
670
        mem += i['oper_ram']
671
    return mem
672

    
673
##
674
## Synchronized operations for reconciliation
675
##
676

    
677

    
678
def create_network_synced(network, backend):
679
    result = _create_network_synced(network, backend)
680
    if result[0] != 'success':
681
        return result
682
    result = connect_network_synced(network, backend)
683
    return result
684

    
685

    
686
def _create_network_synced(network, backend):
687
    with pooled_rapi_client(backend) as client:
688
        job = _create_network(network, backend)
689
        result = wait_for_job(client, job)
690
    return result
691

    
692

    
693
def connect_network_synced(network, backend):
694
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
695
        mode = 'routed'
696
    else:
697
        mode = 'bridged'
698
    with pooled_rapi_client(backend) as client:
699
        for group in client.GetGroups():
700
            job = client.ConnectNetwork(network.backend_id, group, mode,
701
                                        network.link)
702
            result = wait_for_job(client, job)
703
            if result[0] != 'success':
704
                return result
705

    
706
    return result
707

    
708

    
709
def wait_for_job(client, jobid):
710
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
711
    status = result['job_info'][0]
712
    while status not in ['success', 'error', 'cancel']:
713
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
714
                                        [result], None)
715
        status = result['job_info'][0]
716

    
717
    if status == 'success':
718
        return (status, None)
719
    else:
720
        error = result['job_info'][1]
721
        return (status, error)