Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ aeb523a4

History | View | Annotate | Download (22.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils, ippool
44
from synnefo.api.faults import OverLimit
45
from synnefo.util.rapi import GanetiRapiClient
46

    
47
log = getLogger('synnefo.logic')
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
def create_client(hostname, port=5080, username=None, password=None):
59
    return GanetiRapiClient(hostname, port, username, password)
60

    
61

    
62
@transaction.commit_on_success
63
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
64
    """Process a job progress notification from the backend
65

66
    Process an incoming message from the backend (currently Ganeti).
67
    Job notifications with a terminating status (sucess, error, or canceled),
68
    also update the operating state of the VM.
69

70
    """
71
    # See #1492, #1031, #1111 why this line has been removed
72
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
73
    if status not in [x[0] for x in BACKEND_STATUSES]:
74
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
75

    
76
    vm.backendjobid = jobid
77
    vm.backendjobstatus = status
78
    vm.backendopcode = opcode
79
    vm.backendlogmsg = logmsg
80

    
81
    # Notifications of success change the operating state
82
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
83
    if status == 'success' and state_for_success is not None:
84
        utils.update_state(vm, state_for_success)
85
        # Set the deleted flag explicitly, cater for admin-initiated removals
86
        if opcode == 'OP_INSTANCE_REMOVE':
87
            release_instance_nics(vm)
88
            vm.deleted = True
89
            vm.nics.all().delete()
90

    
91
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
92
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
93
        utils.update_state(vm, 'ERROR')
94

    
95
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
96
    # when no instance exists at the Ganeti backend.
97
    # See ticket #799 for all the details.
98
    #
99
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
100
        vm.deleted = True
101
        vm.nics.all().delete()
102

    
103
    vm.backendtime = etime
104
    # Any other notification of failure leaves the operating state unchanged
105

    
106
    vm.save()
107

    
108

    
109
@transaction.commit_on_success
110
def process_net_status(vm, etime, nics):
111
    """Process a net status notification from the backend
112

113
    Process an incoming message from the Ganeti backend,
114
    detailing the NIC configuration of a VM instance.
115

116
    Update the state of the VM in the DB accordingly.
117
    """
118

    
119
    # Release the ips of the old nics. Get back the networks as multiple
120
    # changes in the same network, must happen in the same Network object,
121
    # because transaction will be commited only on exit of the function.
122
    networks = release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        # Get the cached Network or get it from DB
131
        if pk in networks:
132
            net = networks[pk]
133
        else:
134
            net = Network.objects.select_for_update().get(pk=pk)
135

    
136
        # Get the new nic info
137
        mac = new_nic.get('mac', '')
138
        ipv4 = new_nic.get('ip', '')
139
        ipv6 = new_nic.get('ipv6', '')
140

    
141
        firewall = new_nic.get('firewall', '')
142
        firewall_profile = _reverse_tags.get(firewall, '')
143
        if not firewall_profile and net.public:
144
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
145

    
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        vm.nics.create(
150
            network=net,
151
            index=i,
152
            mac=mac,
153
            ipv4=ipv4,
154
            ipv6=ipv6,
155
            firewall_profile=firewall_profile,
156
            dirty=False)
157

    
158
    vm.backendtime = etime
159
    vm.save()
160

    
161

    
162
def release_instance_nics(vm):
163
    networks = {}
164

    
165
    for nic in vm.nics.all():
166
        pk = nic.network.pk
167
        # Get the cached Network or get it from DB
168
        if pk in networks:
169
            net = networks[pk]
170
        else:
171
            # Get the network object in exclusive mode in order
172
            # to guarantee consistency of the address pool
173
            net = Network.objects.select_for_update().get(pk=pk)
174
        if nic.ipv4:
175
            net.release_address(nic.ipv4)
176
        nic.delete()
177

    
178
    return networks
179

    
180

    
181
@transaction.commit_on_success
182
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
183
    if status not in [x[0] for x in BACKEND_STATUSES]:
184
        return
185
        #raise Network.InvalidBackendMsgError(opcode, status)
186

    
187
    back_network.backendjobid = jobid
188
    back_network.backendjobstatus = status
189
    back_network.backendopcode = opcode
190
    back_network.backendlogmsg = logmsg
191

    
192
    # Notifications of success change the operating state
193
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
194
    if status == 'success' and state_for_success is not None:
195
        back_network.operstate = state_for_success
196
        if opcode == 'OP_NETWORK_REMOVE':
197
            back_network.deleted = True
198

    
199
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
200
        utils.update_state(back_network, 'ERROR')
201

    
202
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
203
        back_network.deleted = True
204
        back_network.operstate = 'DELETED'
205

    
206
    back_network.save()
207

    
208

    
209
@transaction.commit_on_success
210
def process_create_progress(vm, etime, rprogress, wprogress):
211

    
212
    # XXX: This only uses the read progress for now.
213
    #      Explore whether it would make sense to use the value of wprogress
214
    #      somewhere.
215
    percentage = int(rprogress)
216

    
217
    # The percentage may exceed 100%, due to the way
218
    # snf-progress-monitor tracks bytes read by image handling processes
219
    percentage = 100 if percentage > 100 else percentage
220
    if percentage < 0:
221
        raise ValueError("Percentage cannot be negative")
222

    
223
    # FIXME: log a warning here, see #1033
224
#   if last_update > percentage:
225
#       raise ValueError("Build percentage should increase monotonically " \
226
#                        "(old = %d, new = %d)" % (last_update, percentage))
227

    
228
    # This assumes that no message of type 'ganeti-create-progress' is going to
229
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
230
    # the instance is STARTED.  What if the two messages are processed by two
231
    # separate dispatcher threads, and the 'ganeti-op-status' message for
232
    # successful creation gets processed before the 'ganeti-create-progress'
233
    # message? [vkoukis]
234
    #
235
    #if not vm.operstate == 'BUILD':
236
    #    raise VirtualMachine.IllegalState("VM is not in building state")
237

    
238
    vm.buildpercentage = percentage
239
    vm.backendtime = etime
240
    vm.save()
241

    
242

    
243
def start_action(vm, action):
244
    """Update the state of a VM when a new action is initiated."""
245
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
246
        raise VirtualMachine.InvalidActionError(action)
247

    
248
    # No actions to deleted and no actions beside destroy to suspended VMs
249
    if vm.deleted:
250
        raise VirtualMachine.DeletedError
251

    
252
    # No actions to machines being built. They may be destroyed, however.
253
    if vm.operstate == 'BUILD' and action != 'DESTROY':
254
        raise VirtualMachine.BuildingError
255

    
256
    vm.action = action
257
    vm.backendjobid = None
258
    vm.backendopcode = None
259
    vm.backendjobstatus = None
260
    vm.backendlogmsg = None
261

    
262
    # Update the relevant flags if the VM is being suspended or destroyed.
263
    # Do not set the deleted flag here, see ticket #721.
264
    #
265
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
266
    # completes successfully. Hence, a server may be visible for some time
267
    # after a DELETE /servers/id returns HTTP 204.
268
    #
269
    if action == "DESTROY":
270
        # vm.deleted = True
271
        pass
272
    elif action == "SUSPEND":
273
        vm.suspended = True
274
    elif action == "START":
275
        vm.suspended = False
276
    vm.save()
277

    
278

    
279
@transaction.commit_on_success
280
def create_instance(vm, flavor, image, password, personality):
281
    """`image` is a dictionary which should contain the keys:
282
            'backend_id', 'format' and 'metadata'
283

284
        metadata value should be a dictionary.
285
    """
286

    
287
    if settings.PUBLIC_ROUTED_USE_POOL:
288
        # Get the Network object in exclusive mode in order to
289
        # safely (isolated) reserve an IP address
290
        try:
291
            network = Network.objects.select_for_update().get(public=True)
292
        except Network.DoesNotExist:
293
            raise Exception('No public network available')
294
        pool = ippool.IPPool(network)
295
        try:
296
            address = pool.get_free_address()
297
        except ippool.IPPool.IPPoolExhausted:
298
            raise OverLimit("Can not allocate IP for new machine."
299
                            " Public network is full.")
300
        pool.save()
301
        nic = {'ip': address, 'network': settings.GANETI_PUBLIC_NETWORK}
302
    else:
303
        nic = {'ip': 'pool', 'network': settings.GANETI_PUBLIC_NETWORK}
304

    
305
    if settings.IGNORE_FLAVOR_DISK_SIZES:
306
        if image['backend_id'].find("windows") >= 0:
307
            sz = 14000
308
        else:
309
            sz = 4000
310
    else:
311
        sz = flavor.disk * 1024
312

    
313
    # Handle arguments to CreateInstance() as a dictionary,
314
    # initialize it based on a deployment-specific value.
315
    # This enables the administrator to override deployment-specific
316
    # arguments, such as the disk template to use, name of os provider
317
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
318
    #
319
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
320
    kw['mode'] = 'create'
321
    kw['name'] = vm.backend_vm_id
322
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
323
    kw['disk_template'] = flavor.disk_template
324
    kw['disks'] = [{"size": sz}]
325
    kw['nics'] = [nic]
326
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
327
    # kw['os'] = settings.GANETI_OS_PROVIDER
328
    kw['ip_check'] = False
329
    kw['name_check'] = False
330
    # Do not specific a node explicitly, have
331
    # Ganeti use an iallocator instead
332
    #
333
    # kw['pnode']=rapi.GetNodes()[0]
334
    kw['dry_run'] = settings.TEST
335

    
336
    kw['beparams'] = {
337
        'auto_balance': True,
338
        'vcpus': flavor.cpu,
339
        'memory': flavor.ram}
340

    
341
    kw['osparams'] = {
342
        'img_id': image['backend_id'],
343
        'img_passwd': password,
344
        'img_format': image['format']}
345
    if personality:
346
        kw['osparams']['img_personality'] = json.dumps(personality)
347

    
348
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
349

    
350
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
351
    # kw['hvparams'] = dict(serial_console=False)
352

    
353
    return vm.client.CreateInstance(**kw)
354

    
355

    
356
def delete_instance(vm):
357
    start_action(vm, 'DESTROY')
358
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
359

    
360

    
361
def reboot_instance(vm, reboot_type):
362
    assert reboot_type in ('soft', 'hard')
363
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
364
    log.info('Rebooting instance %s', vm.backend_vm_id)
365

    
366

    
367
def startup_instance(vm):
368
    start_action(vm, 'START')
369
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
370

    
371

    
372
def shutdown_instance(vm):
373
    start_action(vm, 'STOP')
374
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
375

    
376

    
377
def get_instance_console(vm):
378
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
379
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
380
    # useless (see #783).
381
    #
382
    # Until this is fixed on the Ganeti side, construct a console info reply
383
    # directly.
384
    #
385
    # WARNING: This assumes that VNC runs on port network_port on
386
    #          the instance's primary node, and is probably
387
    #          hypervisor-specific.
388
    #
389
    console = {}
390
    console['kind'] = 'vnc'
391
    i = vm.client.GetInstance(vm.backend_vm_id)
392
    if i['hvparams']['serial_console']:
393
        raise Exception("hv parameter serial_console cannot be true")
394
    console['host'] = i['pnode']
395
    console['port'] = i['network_port']
396

    
397
    return console
398
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
399

    
400

    
401
def request_status_update(vm):
402
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
403

    
404

    
405
def update_status(vm, status):
406
    utils.update_state(vm, status)
407

    
408

    
409
def create_network(network, backends=None):
410
    """ Add and connect a network to backends.
411

412
    @param network: Network object
413
    @param backends: List of Backend objects. None defaults to all.
414

415
    """
416
    backend_jobs = _create_network(network, backends)
417
    connect_network(network, backend_jobs)
418
    return network
419

    
420

    
421
def _create_network(network, backends=None):
422
    """Add a network to backends.
423
    @param network: Network object
424
    @param backends: List of Backend objects. None defaults to all.
425

426
    """
427

    
428
    network_type = network.public and 'public' or 'private'
429

    
430
    if not backends:
431
        backends = Backend.objects.exclude(offline=True)
432

    
433
    tags = network.backend_tag
434
    if network.dhcp:
435
        tags.append('nfdhcpd')
436
    tags = ','.join(tags)
437

    
438
    backend_jobs = []
439
    for backend in backends:
440
        job = backend.client.CreateNetwork(
441
                network_name=network.backend_id,
442
                network=network.subnet,
443
                gateway=network.gateway,
444
                network_type=network_type,
445
                mac_prefix=network.mac_prefix,
446
                tags=tags)
447
        backend_jobs.append((backend, job))
448

    
449
    return backend_jobs
450

    
451

    
452
def connect_network(network, backend_jobs=None):
453
    """Connect a network to all nodegroups.
454

455
    @param network: Network object
456
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
457
                         the backends to connect the network and the jobs on
458
                         which the connect job depends.
459

460
    """
461

    
462
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
463
        mode = 'routed'
464
    else:
465
        mode = 'bridged'
466

    
467
    if not backend_jobs:
468
        backend_jobs = [(backend, []) for backend in
469
                        Backend.objects.exclude(offline=True)]
470

    
471
    for backend, job in backend_jobs:
472
        client = backend.client
473
        for group in client.GetGroups():
474
            client.ConnectNetwork(network.backend_id, group, mode,
475
                                  network.link, [job])
476

    
477

    
478
def connect_network_group(backend, network, group):
479
    """Connect a network to a specific nodegroup of a backend.
480

481
    """
482
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
483
        mode = 'routed'
484
    else:
485
        mode = 'bridged'
486

    
487
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
488
                                         network.link)
489

    
490

    
491
def delete_network(network, backends=None):
492
    """ Disconnect and a remove a network from backends.
493

494
    @param network: Network object
495
    @param backends: List of Backend objects. None defaults to all.
496

497
    """
498
    backend_jobs = disconnect_network(network, backends)
499
    _delete_network(network, backend_jobs)
500

    
501

    
502
def disconnect_network(network, backends=None):
503
    """Disconnect a network from all nodegroups.
504

505
    @param network: Network object
506
    @param backends: List of Backend objects. None defaults to all.
507

508
    """
509

    
510
    if not backends:
511
        backends = Backend.objects.exclude(offline=True)
512

    
513
    backend_jobs = []
514
    for backend in backends:
515
        client = backend.client
516
        jobs = []
517
        for group in client.GetGroups():
518
            job = client.DisconnectNetwork(network.backend_id, group)
519
            jobs.append(job)
520
        backend_jobs.append((backend, jobs))
521

    
522
    return backend_jobs
523

    
524

    
525
def disconnect_from_network(vm, nic):
526
    """Disconnect a virtual machine from a network by removing it's nic.
527

528
    @param vm: VirtualMachine object
529
    @param network: Network object
530

531
    """
532

    
533
    op = [('remove', nic.index, {})]
534
    return vm.client.ModifyInstance(vm.backend_vm_id, nics=op,
535
                                   hotplug=True, dry_run=settings.TEST)
536

    
537

    
538
def _delete_network(network, backend_jobs=None):
539
    if not backend_jobs:
540
        backend_jobs = [(backend, []) for backend in
541
                Backend.objects.exclude(offline=True)]
542
    for backend, jobs in backend_jobs:
543
        backend.client.DeleteNetwork(network.backend_id, jobs)
544

    
545

    
546
def connect_to_network(vm, network, address):
547
    """Connect a virtual machine to a network.
548

549
    @param vm: VirtualMachine object
550
    @param network: Network object
551

552
    """
553

    
554
    # ip = network.dhcp and 'pool' or None
555

    
556
    nic = {'ip': address, 'network': network.backend_id}
557
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
558
                             hotplug=True, dry_run=settings.TEST)
559

    
560

    
561
def set_firewall_profile(vm, profile):
562
    try:
563
        tag = _firewall_tags[profile]
564
    except KeyError:
565
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
566

    
567
    client = vm.client
568
    # Delete all firewall tags
569
    for t in _firewall_tags.values():
570
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
571

    
572
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
573

    
574
    # XXX NOP ModifyInstance call to force process_net_status to run
575
    # on the dispatcher
576
    vm.client.ModifyInstance(vm.backend_vm_id,
577
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
578

    
579

    
580
def get_ganeti_instances(backend=None, bulk=False):
581
    Instances = [c.client.GetInstances(bulk=bulk)\
582
                 for c in get_backends(backend)]
583
    return reduce(list.__add__, Instances, [])
584

    
585

    
586
def get_ganeti_nodes(backend=None, bulk=False):
587
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
588
    return reduce(list.__add__, Nodes, [])
589

    
590

    
591
def get_ganeti_jobs(backend=None, bulk=False):
592
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
593
    return reduce(list.__add__, Jobs, [])
594

    
595
##
596
##
597
##
598

    
599

    
600
def get_backends(backend=None):
601
    if backend:
602
        return [backend]
603
    return Backend.objects.filter(offline=False)
604

    
605

    
606
def get_physical_resources(backend):
607
    """ Get the physical resources of a backend.
608

609
    Get the resources of a backend as reported by the backend (not the db).
610

611
    """
612
    nodes = get_ganeti_nodes(backend, bulk=True)
613
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
614
    res = {}
615
    for a in attr:
616
        res[a] = 0
617
    for n in nodes:
618
        # Filter out drained, offline and not vm_capable nodes since they will
619
        # not take part in the vm allocation process
620
        if n['vm_capable'] and not n['drained'] and not n['offline']\
621
           and n['cnodes']:
622
            for a in attr:
623
                res[a] += int(n[a])
624
    return res
625

    
626

    
627
def update_resources(backend, resources=None):
628
    """ Update the state of the backend resources in db.
629

630
    """
631

    
632
    if not resources:
633
        resources = get_physical_resources(backend)
634

    
635
    backend.mfree = resources['mfree']
636
    backend.mtotal = resources['mtotal']
637
    backend.dfree = resources['dfree']
638
    backend.dtotal = resources['dtotal']
639
    backend.pinst_cnt = resources['pinst_cnt']
640
    backend.ctotal = resources['ctotal']
641
    backend.updated = datetime.now()
642
    backend.save()
643

    
644

    
645
def get_memory_from_instances(backend):
646
    """ Get the memory that is used from instances.
647

648
    Get the used memory of a backend. Note: This is different for
649
    the real memory used, due to kvm's memory de-duplication.
650

651
    """
652
    instances = backend.client.GetInstances(bulk=True)
653
    mem = 0
654
    for i in instances:
655
        mem += i['oper_ram']
656
    return mem
657

    
658
##
659
## Synchronized operations for reconciliation
660
##
661

    
662

    
663
def create_network_synced(network, backend):
664
    result = _create_network_synced(network, backend)
665
    if result[0] != 'success':
666
        return result
667
    result = connect_network_synced(network, backend)
668
    return result
669

    
670

    
671
def _create_network_synced(network, backend):
672
    client = backend.client
673

    
674
    backend_jobs = _create_network(network, [backend])
675
    (_, job) = backend_jobs[0]
676
    return wait_for_job(client, job)
677

    
678

    
679
def connect_network_synced(network, backend):
680
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
681
        mode = 'routed'
682
    else:
683
        mode = 'bridged'
684
    client = backend.client
685

    
686
    for group in client.GetGroups():
687
        job = client.ConnectNetwork(network.backend_id, group, mode,
688
                                    network.link)
689
        result = wait_for_job(client, job)
690
        if result[0] != 'success':
691
            return result
692

    
693
    return result
694

    
695

    
696
def wait_for_job(client, jobid):
697
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
698
    status = result['job_info'][0]
699
    while status not in ['success', 'error', 'cancel']:
700
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
701
                                        [result], None)
702
        status = result['job_info'][0]
703

    
704
    if status == 'success':
705
        return (status, None)
706
    else:
707
        error = result['job_info'][1]
708
        return (status, error)