Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3165f027

History | View | Annotate | Download (22.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils, ippool
44
from synnefo.api.faults import OverLimit
45
from synnefo.util.rapi import GanetiRapiClient
46

    
47
log = getLogger('synnefo.logic')
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
def create_client(hostname, port=5080, username=None, password=None):
59
    return GanetiRapiClient(hostname, port, username, password)
60

    
61

    
62
@transaction.commit_on_success
63
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
64
    """Process a job progress notification from the backend
65

66
    Process an incoming message from the backend (currently Ganeti).
67
    Job notifications with a terminating status (sucess, error, or canceled),
68
    also update the operating state of the VM.
69

70
    """
71
    # See #1492, #1031, #1111 why this line has been removed
72
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
73
    if status not in [x[0] for x in BACKEND_STATUSES]:
74
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
75

    
76
    vm.backendjobid = jobid
77
    vm.backendjobstatus = status
78
    vm.backendopcode = opcode
79
    vm.backendlogmsg = logmsg
80

    
81
    # Notifications of success change the operating state
82
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
83
    if status == 'success' and state_for_success is not None:
84
        utils.update_state(vm, state_for_success)
85
        # Set the deleted flag explicitly, cater for admin-initiated removals
86
        if opcode == 'OP_INSTANCE_REMOVE':
87
            release_instance_nics(vm)
88
            vm.deleted = True
89
            vm.nics.all().delete()
90

    
91
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
92
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
93
        utils.update_state(vm, 'ERROR')
94

    
95
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
96
    # when no instance exists at the Ganeti backend.
97
    # See ticket #799 for all the details.
98
    #
99
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
100
        vm.deleted = True
101
        vm.nics.all().delete()
102

    
103
    vm.backendtime = etime
104
    # Any other notification of failure leaves the operating state unchanged
105

    
106
    vm.save()
107

    
108

    
109
@transaction.commit_on_success
110
def process_net_status(vm, etime, nics):
111
    """Process a net status notification from the backend
112

113
    Process an incoming message from the Ganeti backend,
114
    detailing the NIC configuration of a VM instance.
115

116
    Update the state of the VM in the DB accordingly.
117
    """
118

    
119
    # Release the ips of the old nics. Get back the networks as multiple
120
    # changes in the same network, must happen in the same Network object,
121
    # because transaction will be commited only on exit of the function.
122
    networks = release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        # Get the cached Network or get it from DB
131
        if pk in networks:
132
            net = networks[pk]
133
        else:
134
            net = Network.objects.select_for_update().get(pk=pk)
135

    
136
        # Get the new nic info
137
        mac = new_nic.get('mac', '')
138
        ipv4 = new_nic.get('ip', '')
139
        ipv6 = new_nic.get('ipv6', '')
140

    
141
        firewall = new_nic.get('firewall', '')
142
        firewall_profile = _reverse_tags.get(firewall, '')
143
        if not firewall_profile and net.public:
144
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
145

    
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        vm.nics.create(
150
            network=net,
151
            index=i,
152
            mac=mac,
153
            ipv4=ipv4,
154
            ipv6=ipv6,
155
            firewall_profile=firewall_profile,
156
            dirty=False)
157

    
158
    vm.backendtime = etime
159
    vm.save()
160

    
161

    
162
def release_instance_nics(vm):
163
    networks = {}
164

    
165
    for nic in vm.nics.all():
166
        pk = nic.network.pk
167
        # Get the cached Network or get it from DB
168
        if pk in networks:
169
            net = networks[pk]
170
        else:
171
            # Get the network object in exclusive mode in order
172
            # to guarantee consistency of the address pool
173
            net = Network.objects.select_for_update().get(pk=pk)
174
        if nic.ipv4:
175
            net.release_address(nic.ipv4)
176
        nic.delete()
177

    
178
    return networks
179

    
180

    
181
@transaction.commit_on_success
182
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
183
    if status not in [x[0] for x in BACKEND_STATUSES]:
184
        return
185
        #raise Network.InvalidBackendMsgError(opcode, status)
186

    
187
    back_network.backendjobid = jobid
188
    back_network.backendjobstatus = status
189
    back_network.backendopcode = opcode
190
    back_network.backendlogmsg = logmsg
191

    
192
    # Notifications of success change the operating state
193
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
194
    if status == 'success' and state_for_success is not None:
195
        back_network.operstate = state_for_success
196
        if opcode == 'OP_NETWORK_REMOVE':
197
            back_network.deleted = True
198

    
199
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
200
        utils.update_state(back_network, 'ERROR')
201

    
202
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
203
        back_network.deleted = True
204
        back_network.operstate = 'DELETED'
205

    
206
    back_network.save()
207

    
208

    
209
@transaction.commit_on_success
210
def process_create_progress(vm, etime, rprogress, wprogress):
211

    
212
    # XXX: This only uses the read progress for now.
213
    #      Explore whether it would make sense to use the value of wprogress
214
    #      somewhere.
215
    percentage = int(rprogress)
216

    
217
    # The percentage may exceed 100%, due to the way
218
    # snf-progress-monitor tracks bytes read by image handling processes
219
    percentage = 100 if percentage > 100 else percentage
220
    if percentage < 0:
221
        raise ValueError("Percentage cannot be negative")
222

    
223
    # FIXME: log a warning here, see #1033
224
#   if last_update > percentage:
225
#       raise ValueError("Build percentage should increase monotonically " \
226
#                        "(old = %d, new = %d)" % (last_update, percentage))
227

    
228
    # This assumes that no message of type 'ganeti-create-progress' is going to
229
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
230
    # the instance is STARTED.  What if the two messages are processed by two
231
    # separate dispatcher threads, and the 'ganeti-op-status' message for
232
    # successful creation gets processed before the 'ganeti-create-progress'
233
    # message? [vkoukis]
234
    #
235
    #if not vm.operstate == 'BUILD':
236
    #    raise VirtualMachine.IllegalState("VM is not in building state")
237

    
238
    vm.buildpercentage = percentage
239
    vm.backendtime = etime
240
    vm.save()
241

    
242

    
243
def start_action(vm, action):
244
    """Update the state of a VM when a new action is initiated."""
245
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
246
        raise VirtualMachine.InvalidActionError(action)
247

    
248
    # No actions to deleted and no actions beside destroy to suspended VMs
249
    if vm.deleted:
250
        raise VirtualMachine.DeletedError
251

    
252
    # No actions to machines being built. They may be destroyed, however.
253
    if vm.operstate == 'BUILD' and action != 'DESTROY':
254
        raise VirtualMachine.BuildingError
255

    
256
    vm.action = action
257
    vm.backendjobid = None
258
    vm.backendopcode = None
259
    vm.backendjobstatus = None
260
    vm.backendlogmsg = None
261

    
262
    # Update the relevant flags if the VM is being suspended or destroyed.
263
    # Do not set the deleted flag here, see ticket #721.
264
    #
265
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
266
    # completes successfully. Hence, a server may be visible for some time
267
    # after a DELETE /servers/id returns HTTP 204.
268
    #
269
    if action == "DESTROY":
270
        # vm.deleted = True
271
        pass
272
    elif action == "SUSPEND":
273
        vm.suspended = True
274
    elif action == "START":
275
        vm.suspended = False
276
    vm.save()
277

    
278

    
279
@transaction.commit_on_success
280
def create_instance(vm, flavor, image, password, personality):
281
    """`image` is a dictionary which should contain the keys:
282
            'backend_id', 'format' and 'metadata'
283

284
        metadata value should be a dictionary.
285
    """
286

    
287
    if settings.PUBLIC_ROUTED_USE_POOL:
288
        # Get the Network object in exclusive mode in order to
289
        # safely (isolated) reserve an IP address
290
        try:
291
            network = Network.objects.select_for_update().get(public=True)
292
        except Network.DoesNotExist:
293
            raise Exception('No public network available')
294
        pool = ippool.IPPool(network)
295
        try:
296
            address = pool.get_free_address()
297
        except ippool.IPPool.IPPoolExhausted:
298
            raise OverLimit("Can not allocate IP for new machine."
299
                            " Public network is full.")
300
        pool.save()
301
        nic = {'ip': address, 'network': settings.GANETI_PUBLIC_NETWORK}
302
    else:
303
        nic = {'ip': 'pool', 'network': settings.GANETI_PUBLIC_NETWORK}
304

    
305
    if settings.IGNORE_FLAVOR_DISK_SIZES:
306
        if image['backend_id'].find("windows") >= 0:
307
            sz = 14000
308
        else:
309
            sz = 4000
310
    else:
311
        sz = flavor.disk * 1024
312

    
313
    # Handle arguments to CreateInstance() as a dictionary,
314
    # initialize it based on a deployment-specific value.
315
    # This enables the administrator to override deployment-specific
316
    # arguments, such as the disk template to use, name of os provider
317
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
318
    #
319
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
320
    kw['mode'] = 'create'
321
    kw['name'] = vm.backend_vm_id
322
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
323
    kw['disk_template'] = flavor.disk_template
324
    kw['disks'] = [{"size": sz}]
325
    kw['nics'] = [nic]
326
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
327
    # kw['os'] = settings.GANETI_OS_PROVIDER
328
    kw['ip_check'] = False
329
    kw['name_check'] = False
330
    # Do not specific a node explicitly, have
331
    # Ganeti use an iallocator instead
332
    #
333
    # kw['pnode']=rapi.GetNodes()[0]
334
    kw['dry_run'] = settings.TEST
335

    
336
    kw['beparams'] = {
337
        'auto_balance': True,
338
        'vcpus': flavor.cpu,
339
        'memory': flavor.ram}
340

    
341
    kw['osparams'] = {
342
        'img_id': image['backend_id'],
343
        'img_passwd': password,
344
        'img_format': image['format']}
345
    if personality:
346
        kw['osparams']['img_personality'] = json.dumps(personality)
347

    
348
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
349

    
350
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
351
    # kw['hvparams'] = dict(serial_console=False)
352

    
353
    return vm.client.CreateInstance(**kw)
354

    
355

    
356
def delete_instance(vm):
357
    start_action(vm, 'DESTROY')
358
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
359

    
360

    
361
def reboot_instance(vm, reboot_type):
362
    assert reboot_type in ('soft', 'hard')
363
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
364
    log.info('Rebooting instance %s', vm.backend_vm_id)
365

    
366

    
367
def startup_instance(vm):
368
    start_action(vm, 'START')
369
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
370

    
371

    
372
def shutdown_instance(vm):
373
    start_action(vm, 'STOP')
374
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
375

    
376

    
377
def get_instance_console(vm):
378
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
379
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
380
    # useless (see #783).
381
    #
382
    # Until this is fixed on the Ganeti side, construct a console info reply
383
    # directly.
384
    #
385
    # WARNING: This assumes that VNC runs on port network_port on
386
    #          the instance's primary node, and is probably
387
    #          hypervisor-specific.
388
    #
389
    console = {}
390
    console['kind'] = 'vnc'
391
    i = vm.client.GetInstance(vm.backend_vm_id)
392
    if i['hvparams']['serial_console']:
393
        raise Exception("hv parameter serial_console cannot be true")
394
    console['host'] = i['pnode']
395
    console['port'] = i['network_port']
396

    
397
    return console
398
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
399

    
400

    
401
def request_status_update(vm):
402
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
403

    
404

    
405
def update_status(vm, status):
406
    utils.update_state(vm, status)
407

    
408

    
409
def create_network(network, backends=None):
410
    """ Add and connect a network to backends.
411

412
    @param network: Network object
413
    @param backends: List of Backend objects. None defaults to all.
414

415
    """
416
    backend_jobs = _create_network(network, backends)
417
    connect_network(network, backend_jobs)
418
    return network
419

    
420

    
421
def _create_network(network, backends=None):
422
    """Add a network to backends.
423
    @param network: Network object
424
    @param backends: List of Backend objects. None defaults to all.
425

426
    """
427

    
428
    network_type = network.public and 'public' or 'private'
429
    if not backends:
430
        backends = Backend.objects.exclude(offline=True)
431

    
432
    tags = network.backend_tag
433
    if network.dhcp:
434
        tags.append('nfdhcpd')
435
    tags = ','.join(tags)
436

    
437
    backend_jobs = []
438
    for backend in backends:
439
        try:
440
            backend_network = BackendNetwork.objects.get(network=network,
441
                                                         backend=backend)
442
        except BackendNetwork.DoesNotExist:
443
            raise Exception("BackendNetwork for network '%s' in backend '%s'"\
444
                            " does not exist" % (network.id, backend.id))
445
        job = backend.client.CreateNetwork(
446
                network_name=network.backend_id,
447
                network=network.subnet,
448
                gateway=network.gateway,
449
                network_type=network_type,
450
                mac_prefix=backend_network.mac_prefix,
451
                tags=tags)
452
        backend_jobs.append((backend, job))
453

    
454
    return backend_jobs
455

    
456

    
457
def connect_network(network, backend_jobs=None):
458
    """Connect a network to all nodegroups.
459

460
    @param network: Network object
461
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
462
                         the backends to connect the network and the jobs on
463
                         which the connect job depends.
464

465
    """
466

    
467
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
468
        mode = 'routed'
469
    else:
470
        mode = 'bridged'
471

    
472
    if not backend_jobs:
473
        backend_jobs = [(backend, []) for backend in
474
                        Backend.objects.exclude(offline=True)]
475

    
476
    for backend, job in backend_jobs:
477
        client = backend.client
478
        for group in client.GetGroups():
479
            client.ConnectNetwork(network.backend_id, group, mode,
480
                                  network.link, [job])
481

    
482

    
483
def connect_network_group(backend, network, group):
484
    """Connect a network to a specific nodegroup of a backend.
485

486
    """
487
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
488
        mode = 'routed'
489
    else:
490
        mode = 'bridged'
491

    
492
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
493
                                         network.link)
494

    
495

    
496
def delete_network(network, backends=None):
497
    """ Disconnect and a remove a network from backends.
498

499
    @param network: Network object
500
    @param backends: List of Backend objects. None defaults to all.
501

502
    """
503
    backend_jobs = disconnect_network(network, backends)
504
    _delete_network(network, backend_jobs)
505

    
506

    
507
def disconnect_network(network, backends=None):
508
    """Disconnect a network from all nodegroups.
509

510
    @param network: Network object
511
    @param backends: List of Backend objects. None defaults to all.
512

513
    """
514

    
515
    if not backends:
516
        backends = Backend.objects.exclude(offline=True)
517

    
518
    backend_jobs = []
519
    for backend in backends:
520
        client = backend.client
521
        jobs = []
522
        for group in client.GetGroups():
523
            job = client.DisconnectNetwork(network.backend_id, group)
524
            jobs.append(job)
525
        backend_jobs.append((backend, jobs))
526

    
527
    return backend_jobs
528

    
529

    
530
def disconnect_from_network(vm, nic):
531
    """Disconnect a virtual machine from a network by removing it's nic.
532

533
    @param vm: VirtualMachine object
534
    @param network: Network object
535

536
    """
537

    
538
    op = [('remove', nic.index, {})]
539
    return vm.client.ModifyInstance(vm.backend_vm_id, nics=op,
540
                                   hotplug=True, dry_run=settings.TEST)
541

    
542

    
543
def _delete_network(network, backend_jobs=None):
544
    if not backend_jobs:
545
        backend_jobs = [(backend, []) for backend in
546
                Backend.objects.exclude(offline=True)]
547
    for backend, jobs in backend_jobs:
548
        backend.client.DeleteNetwork(network.backend_id, jobs)
549

    
550

    
551
def connect_to_network(vm, network, address):
552
    """Connect a virtual machine to a network.
553

554
    @param vm: VirtualMachine object
555
    @param network: Network object
556

557
    """
558

    
559
    # ip = network.dhcp and 'pool' or None
560

    
561
    nic = {'ip': address, 'network': network.backend_id}
562
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
563
                             hotplug=True, dry_run=settings.TEST)
564

    
565

    
566
def set_firewall_profile(vm, profile):
567
    try:
568
        tag = _firewall_tags[profile]
569
    except KeyError:
570
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
571

    
572
    client = vm.client
573
    # Delete all firewall tags
574
    for t in _firewall_tags.values():
575
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
576

    
577
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
578

    
579
    # XXX NOP ModifyInstance call to force process_net_status to run
580
    # on the dispatcher
581
    vm.client.ModifyInstance(vm.backend_vm_id,
582
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
583

    
584

    
585
def get_ganeti_instances(backend=None, bulk=False):
586
    Instances = [c.client.GetInstances(bulk=bulk)\
587
                 for c in get_backends(backend)]
588
    return reduce(list.__add__, Instances, [])
589

    
590

    
591
def get_ganeti_nodes(backend=None, bulk=False):
592
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
593
    return reduce(list.__add__, Nodes, [])
594

    
595

    
596
def get_ganeti_jobs(backend=None, bulk=False):
597
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
598
    return reduce(list.__add__, Jobs, [])
599

    
600
##
601
##
602
##
603

    
604

    
605
def get_backends(backend=None):
606
    if backend:
607
        return [backend]
608
    return Backend.objects.filter(offline=False)
609

    
610

    
611
def get_physical_resources(backend):
612
    """ Get the physical resources of a backend.
613

614
    Get the resources of a backend as reported by the backend (not the db).
615

616
    """
617
    nodes = get_ganeti_nodes(backend, bulk=True)
618
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
619
    res = {}
620
    for a in attr:
621
        res[a] = 0
622
    for n in nodes:
623
        # Filter out drained, offline and not vm_capable nodes since they will
624
        # not take part in the vm allocation process
625
        if n['vm_capable'] and not n['drained'] and not n['offline']\
626
           and n['cnodes']:
627
            for a in attr:
628
                res[a] += int(n[a])
629
    return res
630

    
631

    
632
def update_resources(backend, resources=None):
633
    """ Update the state of the backend resources in db.
634

635
    """
636

    
637
    if not resources:
638
        resources = get_physical_resources(backend)
639

    
640
    backend.mfree = resources['mfree']
641
    backend.mtotal = resources['mtotal']
642
    backend.dfree = resources['dfree']
643
    backend.dtotal = resources['dtotal']
644
    backend.pinst_cnt = resources['pinst_cnt']
645
    backend.ctotal = resources['ctotal']
646
    backend.updated = datetime.now()
647
    backend.save()
648

    
649

    
650
def get_memory_from_instances(backend):
651
    """ Get the memory that is used from instances.
652

653
    Get the used memory of a backend. Note: This is different for
654
    the real memory used, due to kvm's memory de-duplication.
655

656
    """
657
    instances = backend.client.GetInstances(bulk=True)
658
    mem = 0
659
    for i in instances:
660
        mem += i['oper_ram']
661
    return mem
662

    
663
##
664
## Synchronized operations for reconciliation
665
##
666

    
667

    
668
def create_network_synced(network, backend):
669
    result = _create_network_synced(network, backend)
670
    if result[0] != 'success':
671
        return result
672
    result = connect_network_synced(network, backend)
673
    return result
674

    
675

    
676
def _create_network_synced(network, backend):
677
    client = backend.client
678

    
679
    backend_jobs = _create_network(network, [backend])
680
    (_, job) = backend_jobs[0]
681
    return wait_for_job(client, job)
682

    
683

    
684
def connect_network_synced(network, backend):
685
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
686
        mode = 'routed'
687
    else:
688
        mode = 'bridged'
689
    client = backend.client
690

    
691
    for group in client.GetGroups():
692
        job = client.ConnectNetwork(network.backend_id, group, mode,
693
                                    network.link)
694
        result = wait_for_job(client, job)
695
        if result[0] != 'success':
696
            return result
697

    
698
    return result
699

    
700

    
701
def wait_for_job(client, jobid):
702
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
703
    status = result['job_info'][0]
704
    while status not in ['success', 'error', 'cancel']:
705
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
706
                                        [result], None)
707
        status = result['job_info'][0]
708

    
709
    if status == 'success':
710
        return (status, None)
711
    else:
712
        error = result['job_info'][1]
713
        return (status, error)