Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 4d5d0b9c

History | View | Annotate | Download (24.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45

    
46
from logging import getLogger
47
log = getLogger(__name__)
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
@transaction.commit_on_success
59
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
60
    """Process a job progress notification from the backend
61

62
    Process an incoming message from the backend (currently Ganeti).
63
    Job notifications with a terminating status (sucess, error, or canceled),
64
    also update the operating state of the VM.
65

66
    """
67
    # See #1492, #1031, #1111 why this line has been removed
68
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
69
    if status not in [x[0] for x in BACKEND_STATUSES]:
70
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
71

    
72
    vm.backendjobid = jobid
73
    vm.backendjobstatus = status
74
    vm.backendopcode = opcode
75
    vm.backendlogmsg = logmsg
76

    
77
    # Notifications of success change the operating state
78
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
79
    if status == 'success' and state_for_success is not None:
80
        vm.operstate = state_for_success
81

    
82

    
83
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
84
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
85
        vm.operstate = 'ERROR'
86
        vm.backendtime = etime
87

    
88
    if opcode == 'OP_INSTANCE_REMOVE':
89
        # Set the deleted flag explicitly, cater for admin-initiated removals
90
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
        # when no instance exists at the Ganeti backend.
92
        # See ticket #799 for all the details.
93
        #
94
        if status == 'success' or (status == 'error' and
95
                                   vm.operstate == 'ERROR'):
96
            release_instance_nics(vm)
97
            vm.nics.all().delete()
98
            vm.deleted = True
99
            vm.operstate = state_for_success
100
            vm.backendtime = etime
101

    
102
    # Update backendtime only for jobs that have been successfully completed,
103
    # since only these jobs update the state of the VM. Else a "race condition"
104
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
105
    # before an error job and messages arrive in reversed order.
106
    if status == 'success':
107
        vm.backendtime = etime
108

    
109
    vm.save()
110

    
111

    
112
@transaction.commit_on_success
113
def process_net_status(vm, etime, nics):
114
    """Process a net status notification from the backend
115

116
    Process an incoming message from the Ganeti backend,
117
    detailing the NIC configuration of a VM instance.
118

119
    Update the state of the VM in the DB accordingly.
120
    """
121

    
122
    ganeti_nics = process_ganeti_nics(nics)
123
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
124
        log.debug("NICs for VM %s have not changed", vm)
125

    
126
    release_instance_nics(vm)
127

    
128
    for nic in ganeti_nics:
129
        ipv4 = nic.get('ipv4', '')
130
        if ipv4:
131
            net = nic['network']
132
            net.reserve_address(ipv4)
133

    
134
        nic['dirty'] = False
135
        vm.nics.create(**nic)
136
        # Dummy save the network, because UI uses changed-since for VMs
137
        # and Networks in order to show the VM NICs
138
        net.save()
139

    
140
    vm.backendtime = etime
141
    vm.save()
142

    
143

    
144
def process_ganeti_nics(ganeti_nics):
145
    """Process NIC dict from ganeti hooks."""
146
    new_nics = []
147
    for i, new_nic in enumerate(ganeti_nics):
148
        network = new_nic.get('network', '')
149
        n = str(network)
150
        pk = utils.id_from_network_name(n)
151

    
152
        net = Network.objects.get(pk=pk)
153

    
154
        # Get the new nic info
155
        mac = new_nic.get('mac', '')
156
        ipv4 = new_nic.get('ip', '')
157
        ipv6 = new_nic.get('ipv6', '')
158

    
159
        firewall = new_nic.get('firewall', '')
160
        firewall_profile = _reverse_tags.get(firewall, '')
161
        if not firewall_profile and net.public:
162
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
163

    
164
        nic = {
165
               'index': i,
166
               'network': net,
167
               'mac': mac,
168
               'ipv4': ipv4,
169
               'ipv6': ipv6,
170
               'firewall_profile': firewall_profile}
171

    
172
        new_nics.append(nic)
173
    return new_nics
174

    
175

    
176
def nics_changed(old_nics, new_nics):
177
    """Return True if NICs have changed in any way."""
178
    if len(old_nics) != len(new_nics):
179
        return True
180
    for old_nic, new_nic in zip(old_nics, new_nics):
181
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
182
                old_nic.ipv6 == new_nic['ipv6'] and\
183
                old_nic.mac == new_nic['mac'] and\
184
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
185
                old_nic.index == new_nic['index'] and\
186
                old_nic.network == new_nic['network']):
187
            return True
188
    return False
189

    
190

    
191
def release_instance_nics(vm):
192
    for nic in vm.nics.all():
193
        net = nic.network
194
        if nic.ipv4:
195
            net.release_address(nic.ipv4)
196
        nic.delete()
197
        net.save()
198

    
199

    
200
@transaction.commit_on_success
201
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
202
    if status not in [x[0] for x in BACKEND_STATUSES]:
203
        raise Network.InvalidBackendMsgError(opcode, status)
204

    
205
    back_network.backendjobid = jobid
206
    back_network.backendjobstatus = status
207
    back_network.backendopcode = opcode
208
    back_network.backendlogmsg = logmsg
209

    
210
    # Notifications of success change the operating state
211
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
212
    if status == 'success' and state_for_success is not None:
213
        back_network.operstate = state_for_success
214

    
215
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
216
        utils.update_state(back_network, 'ERROR')
217
        back_network.backendtime = etime
218

    
219
    if opcode == 'OP_NETWORK_REMOVE':
220
        if status == 'success' or (status == 'error' and
221
                                   back_network.operstate == 'ERROR'):
222
            back_network.operstate = state_for_success
223
            back_network.deleted = True
224
            back_network.backendtime = etime
225

    
226
    if status == 'success':
227
        back_network.backendtime = etime
228
    back_network.save()
229
    # Also you must update the state of the Network!!
230
    update_network_state(back_network.network)
231

    
232

    
233
@transaction.commit_on_success
234
def process_network_modify(back_network, etime, jobid, opcode, status,
235
                           add_reserved_ips, remove_reserved_ips):
236
    assert (opcode == "OP_NETWORK_SET_PARAMS")
237
    if status not in [x[0] for x in BACKEND_STATUSES]:
238
        raise Network.InvalidBackendMsgError(opcode, status)
239

    
240
    back_network.backendjobid = jobid
241
    back_network.backendjobstatus = status
242
    back_network.opcode = opcode
243

    
244
    if add_reserved_ips or remove_reserved_ips:
245
        net = back_network.network
246
        pool = net.get_pool()
247
        if add_reserved_ips:
248
            for ip in add_reserved_ips:
249
                pool.reserve(ip, external=True)
250
        if remove_reserved_ips:
251
            for ip in remove_reserved_ips:
252
                pool.put(ip, external=True)
253
        pool.save()
254

    
255
    if status == 'success':
256
        back_network.backendtime = etime
257
    back_network.save()
258

    
259

    
260
@transaction.commit_on_success
261
def process_create_progress(vm, etime, progress):
262

    
263
    percentage = int(progress)
264

    
265
    # The percentage may exceed 100%, due to the way
266
    # snf-image:copy-progress tracks bytes read by image handling processes
267
    percentage = 100 if percentage > 100 else percentage
268
    if percentage < 0:
269
        raise ValueError("Percentage cannot be negative")
270

    
271
    # FIXME: log a warning here, see #1033
272
#   if last_update > percentage:
273
#       raise ValueError("Build percentage should increase monotonically " \
274
#                        "(old = %d, new = %d)" % (last_update, percentage))
275

    
276
    # This assumes that no message of type 'ganeti-create-progress' is going to
277
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
278
    # the instance is STARTED.  What if the two messages are processed by two
279
    # separate dispatcher threads, and the 'ganeti-op-status' message for
280
    # successful creation gets processed before the 'ganeti-create-progress'
281
    # message? [vkoukis]
282
    #
283
    #if not vm.operstate == 'BUILD':
284
    #    raise VirtualMachine.IllegalState("VM is not in building state")
285

    
286
    vm.buildpercentage = percentage
287
    vm.backendtime = etime
288
    vm.save()
289

    
290

    
291
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
292
    details=None):
293
    """
294
    Create virtual machine instance diagnostic entry.
295

296
    :param vm: VirtualMachine instance to create diagnostic for.
297
    :param message: Diagnostic message.
298
    :param source: Diagnostic source identifier (e.g. image-helper).
299
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
300
    :param etime: The time the message occured (if available).
301
    :param details: Additional details or debug information.
302
    """
303
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
304
            source_date=etime, message=message, details=details)
305

    
306

    
307
def create_instance(vm, public_nic, flavor, image, password, personality):
308
    """`image` is a dictionary which should contain the keys:
309
            'backend_id', 'format' and 'metadata'
310

311
        metadata value should be a dictionary.
312
    """
313

    
314
    if settings.IGNORE_FLAVOR_DISK_SIZES:
315
        if image['backend_id'].find("windows") >= 0:
316
            sz = 14000
317
        else:
318
            sz = 4000
319
    else:
320
        sz = flavor.disk * 1024
321

    
322
    # Handle arguments to CreateInstance() as a dictionary,
323
    # initialize it based on a deployment-specific value.
324
    # This enables the administrator to override deployment-specific
325
    # arguments, such as the disk template to use, name of os provider
326
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
327
    #
328
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
329
    kw['mode'] = 'create'
330
    kw['name'] = vm.backend_vm_id
331
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
332

    
333
    # Identify if provider parameter should be set in disk options.
334
    # Current implementation support providers only fo ext template.
335
    # To select specific provider for an ext template, template name
336
    # should be formated as `ext_<provider_name>`.
337
    provider = None
338
    disk_template = flavor.disk_template
339
    if flavor.disk_template.startswith("ext"):
340
        disk_template, provider = flavor.disk_template.split("_", 1)
341

    
342
    kw['disk_template'] = disk_template
343
    kw['disks'] = [{"size": sz}]
344
    if provider:
345
        kw['disks'][0]['provider'] = provider
346

    
347
        if provider == 'vlmc':
348
            kw['disks'][0]['origin'] = image['backend_id']
349

    
350
    kw['nics'] = [public_nic]
351
    if settings.GANETI_USE_HOTPLUG:
352
        kw['hotplug'] = True
353
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
354
    # kw['os'] = settings.GANETI_OS_PROVIDER
355
    kw['ip_check'] = False
356
    kw['name_check'] = False
357
    # Do not specific a node explicitly, have
358
    # Ganeti use an iallocator instead
359
    #
360
    #kw['pnode'] = rapi.GetNodes()[0]
361
    kw['dry_run'] = settings.TEST
362

    
363
    kw['beparams'] = {
364
        'auto_balance': True,
365
        'vcpus': flavor.cpu,
366
        'memory': flavor.ram}
367

    
368
    kw['osparams'] = {
369
        'img_id': image['backend_id'],
370
        'img_passwd': password,
371
        'img_format': image['format']}
372
    if personality:
373
        kw['osparams']['img_personality'] = json.dumps(personality)
374

    
375
    if provider != None and provider == 'vlmc':
376
        kw['osparams']['img_id'] = 'null'
377

    
378
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
379

    
380
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
381
    # kw['hvparams'] = dict(serial_console=False)
382
    log.debug("Creating instance %s", utils.hide_pass(kw))
383
    with pooled_rapi_client(vm) as client:
384
        return client.CreateInstance(**kw)
385

    
386

    
387
def delete_instance(vm):
388
    with pooled_rapi_client(vm) as client:
389
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
390

    
391

    
392
def reboot_instance(vm, reboot_type):
393
    assert reboot_type in ('soft', 'hard')
394
    with pooled_rapi_client(vm) as client:
395
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
396
                                     dry_run=settings.TEST)
397

    
398

    
399
def startup_instance(vm):
400
    with pooled_rapi_client(vm) as client:
401
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
402

    
403

    
404
def shutdown_instance(vm):
405
    with pooled_rapi_client(vm) as client:
406
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
407

    
408

    
409
def get_instance_console(vm):
410
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
411
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
412
    # useless (see #783).
413
    #
414
    # Until this is fixed on the Ganeti side, construct a console info reply
415
    # directly.
416
    #
417
    # WARNING: This assumes that VNC runs on port network_port on
418
    #          the instance's primary node, and is probably
419
    #          hypervisor-specific.
420
    #
421
    log.debug("Getting console for vm %s", vm)
422

    
423
    console = {}
424
    console['kind'] = 'vnc'
425

    
426
    with pooled_rapi_client(vm) as client:
427
        i = client.GetInstance(vm.backend_vm_id)
428

    
429
    if i['hvparams']['serial_console']:
430
        raise Exception("hv parameter serial_console cannot be true")
431
    console['host'] = i['pnode']
432
    console['port'] = i['network_port']
433

    
434
    return console
435

    
436

    
437
def get_instance_info(vm):
438
    with pooled_rapi_client(vm) as client:
439
        return client.GetInstanceInfo(vm.backend_vm_id)
440

    
441

    
442
def create_network(network, backends=None, connect=True):
443
    """Create and connect a network."""
444
    if not backends:
445
        backends = Backend.objects.exclude(offline=True)
446

    
447
    log.debug("Creating network %s in backends %s", network, backends)
448

    
449
    for backend in backends:
450
        create_jobID = _create_network(network, backend)
451
        if connect:
452
            connect_network(network, backend, create_jobID)
453

    
454

    
455
def _create_network(network, backend):
456
    """Create a network."""
457

    
458
    network_type = network.public and 'public' or 'private'
459

    
460
    tags = network.backend_tag
461
    if network.dhcp:
462
        tags.append('nfdhcpd')
463

    
464
    if network.public:
465
        conflicts_check = True
466
    else:
467
        conflicts_check = False
468

    
469
    try:
470
        bn = BackendNetwork.objects.get(network=network, backend=backend)
471
        mac_prefix = bn.mac_prefix
472
    except BackendNetwork.DoesNotExist:
473
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
474
                        " does not exist" % (network.id, backend.id))
475

    
476
    with pooled_rapi_client(backend) as client:
477
        return client.CreateNetwork(network_name=network.backend_id,
478
                                    network=network.subnet,
479
                                    network6=network.subnet6,
480
                                    gateway=network.gateway,
481
                                    gateway6=network.gateway6,
482
                                    network_type=network_type,
483
                                    mac_prefix=mac_prefix,
484
                                    conflicts_check=conflicts_check,
485
                                    tags=tags)
486

    
487

    
488
def connect_network(network, backend, depend_job=None, group=None):
489
    """Connect a network to nodegroups."""
490
    log.debug("Connecting network %s to backend %s", network, backend)
491

    
492
    mode = "routed" if "ROUTED" in network.type else "bridged"
493

    
494
    if network.public:
495
        conflicts_check = True
496
    else:
497
        conflicts_check = False
498

    
499
    depend_jobs = [depend_job] if depend_job else []
500
    with pooled_rapi_client(backend) as client:
501
        if group:
502
            client.ConnectNetwork(network.backend_id, group, mode,
503
                                  network.link, conflicts_check, depend_jobs)
504
        else:
505
            for group in client.GetGroups():
506
                client.ConnectNetwork(network.backend_id, group, mode,
507
                                      network.link, conflicts_check,
508
                                      depend_jobs)
509

    
510

    
511
def delete_network(network, backends=None, disconnect=True):
512
    if not backends:
513
        backends = Backend.objects.exclude(offline=True)
514

    
515
    log.debug("Deleting network %s from backends %s", network, backends)
516

    
517
    for backend in backends:
518
        disconnect_jobIDs = []
519
        if disconnect:
520
            disconnect_jobIDs = disconnect_network(network, backend)
521
        _delete_network(network, backend, disconnect_jobIDs)
522

    
523

    
524
def _delete_network(network, backend, depend_jobs=[]):
525
    with pooled_rapi_client(backend) as client:
526
        return client.DeleteNetwork(network.backend_id, depend_jobs)
527

    
528

    
529
def disconnect_network(network, backend, group=None):
530
    log.debug("Disconnecting network %s to backend %s", network, backend)
531

    
532
    with pooled_rapi_client(backend) as client:
533
        if group:
534
            return [client.DisconnectNetwork(network.backend_id, group)]
535
        else:
536
            jobs = []
537
            for group in client.GetGroups():
538
                job = client.DisconnectNetwork(network.backend_id, group)
539
                jobs.append(job)
540
            return jobs
541

    
542

    
543
def connect_to_network(vm, network, address=None):
544
    nic = {'ip': address, 'network': network.backend_id}
545

    
546
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
547

    
548
    with pooled_rapi_client(vm) as client:
549
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
550
                                     hotplug=settings.GANETI_USE_HOTPLUG,
551
                                     dry_run=settings.TEST)
552

    
553

    
554
def disconnect_from_network(vm, nic):
555
    op = [('remove', nic.index, {})]
556

    
557
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
558

    
559
    with pooled_rapi_client(vm) as client:
560
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
561
                                     hotplug=settings.GANETI_USE_HOTPLUG,
562
                                     dry_run=settings.TEST)
563

    
564

    
565
def set_firewall_profile(vm, profile):
566
    try:
567
        tag = _firewall_tags[profile]
568
    except KeyError:
569
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
570

    
571
    log.debug("Setting tag of VM %s to %s", vm, profile)
572

    
573
    with pooled_rapi_client(vm) as client:
574
        # Delete all firewall tags
575
        for t in _firewall_tags.values():
576
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
577
                                      dry_run=settings.TEST)
578

    
579
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
580

    
581
        # XXX NOP ModifyInstance call to force process_net_status to run
582
        # on the dispatcher
583
        client.ModifyInstance(vm.backend_vm_id,
584
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
585

    
586

    
587
def get_ganeti_instances(backend=None, bulk=False):
588
    instances = []
589
    for backend in get_backends(backend):
590
        with pooled_rapi_client(backend) as client:
591
            instances.append(client.GetInstances(bulk=bulk))
592

    
593
    return reduce(list.__add__, instances, [])
594

    
595

    
596
def get_ganeti_nodes(backend=None, bulk=False):
597
    nodes = []
598
    for backend in get_backends(backend):
599
        with pooled_rapi_client(backend) as client:
600
            nodes.append(client.GetNodes(bulk=bulk))
601

    
602
    return reduce(list.__add__, nodes, [])
603

    
604

    
605
def get_ganeti_jobs(backend=None, bulk=False):
606
    jobs = []
607
    for backend in get_backends(backend):
608
        with pooled_rapi_client(backend) as client:
609
            jobs.append(client.GetJobs(bulk=bulk))
610
    return reduce(list.__add__, jobs, [])
611

    
612
##
613
##
614
##
615

    
616

    
617
def get_backends(backend=None):
618
    if backend:
619
        return [backend]
620
    return Backend.objects.filter(offline=False)
621

    
622

    
623
def get_physical_resources(backend):
624
    """ Get the physical resources of a backend.
625

626
    Get the resources of a backend as reported by the backend (not the db).
627

628
    """
629
    nodes = get_ganeti_nodes(backend, bulk=True)
630
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
631
    res = {}
632
    for a in attr:
633
        res[a] = 0
634
    for n in nodes:
635
        # Filter out drained, offline and not vm_capable nodes since they will
636
        # not take part in the vm allocation process
637
        if n['vm_capable'] and not n['drained'] and not n['offline']\
638
           and n['cnodes']:
639
            for a in attr:
640
                res[a] += int(n[a])
641
    return res
642

    
643

    
644
def update_resources(backend, resources=None):
645
    """ Update the state of the backend resources in db.
646

647
    """
648

    
649
    if not resources:
650
        resources = get_physical_resources(backend)
651

    
652
    backend.mfree = resources['mfree']
653
    backend.mtotal = resources['mtotal']
654
    backend.dfree = resources['dfree']
655
    backend.dtotal = resources['dtotal']
656
    backend.pinst_cnt = resources['pinst_cnt']
657
    backend.ctotal = resources['ctotal']
658
    backend.updated = datetime.now()
659
    backend.save()
660

    
661

    
662
def get_memory_from_instances(backend):
663
    """ Get the memory that is used from instances.
664

665
    Get the used memory of a backend. Note: This is different for
666
    the real memory used, due to kvm's memory de-duplication.
667

668
    """
669
    with pooled_rapi_client(backend) as client:
670
        instances = client.GetInstances(bulk=True)
671
    mem = 0
672
    for i in instances:
673
        mem += i['oper_ram']
674
    return mem
675

    
676
##
677
## Synchronized operations for reconciliation
678
##
679

    
680

    
681
def create_network_synced(network, backend):
682
    result = _create_network_synced(network, backend)
683
    if result[0] != 'success':
684
        return result
685
    result = connect_network_synced(network, backend)
686
    return result
687

    
688

    
689
def _create_network_synced(network, backend):
690
    with pooled_rapi_client(backend) as client:
691
        job = _create_network(network, backend)
692
        result = wait_for_job(client, job)
693
    return result
694

    
695

    
696
def connect_network_synced(network, backend):
697
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
698
        mode = 'routed'
699
    else:
700
        mode = 'bridged'
701
    with pooled_rapi_client(backend) as client:
702
        for group in client.GetGroups():
703
            job = client.ConnectNetwork(network.backend_id, group, mode,
704
                                        network.link)
705
            result = wait_for_job(client, job)
706
            if result[0] != 'success':
707
                return result
708

    
709
    return result
710

    
711

    
712
def wait_for_job(client, jobid):
713
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
714
    status = result['job_info'][0]
715
    while status not in ['success', 'error', 'cancel']:
716
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
717
                                        [result], None)
718
        status = result['job_info'][0]
719

    
720
    if status == 'success':
721
        return (status, None)
722
    else:
723
        error = result['job_info'][1]
724
        return (status, error)