Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ e221ade2

History | View | Annotate | Download (22.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, VirtualMachineDiagnostic)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80

    
81

    
82
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
83
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
84
        vm.operstate = 'ERROR'
85
        vm.backendtime = etime
86

    
87
    if opcode == 'OP_INSTANCE_REMOVE':
88
        # Set the deleted flag explicitly, cater for admin-initiated removals
89
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
90
        # when no instance exists at the Ganeti backend.
91
        # See ticket #799 for all the details.
92
        #
93
        if status == 'success' or (status == 'error' and
94
                                   vm.operstate == 'ERROR'):
95
            release_instance_nics(vm)
96
            vm.nics.all().delete()
97
            vm.deleted = True
98
            vm.operstate = state_for_success
99
            vm.backendtime = etime
100

    
101
    # Update backendtime only for jobs that have been successfully completed,
102
    # since only these jobs update the state of the VM. Else a "race condition"
103
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
104
    # before an error job and messages arrive in reversed order.
105
    if status == 'success':
106
        vm.backendtime = etime
107

    
108
    vm.save()
109

    
110

    
111
@transaction.commit_on_success
112
def process_net_status(vm, etime, nics):
113
    """Process a net status notification from the backend
114

115
    Process an incoming message from the Ganeti backend,
116
    detailing the NIC configuration of a VM instance.
117

118
    Update the state of the VM in the DB accordingly.
119
    """
120

    
121
    release_instance_nics(vm)
122

    
123
    new_nics = enumerate(nics)
124
    for i, new_nic in new_nics:
125
        network = new_nic.get('network', '')
126
        n = str(network)
127
        pk = utils.id_from_network_name(n)
128

    
129
        net = Network.objects.get(pk=pk)
130

    
131
        # Get the new nic info
132
        mac = new_nic.get('mac', '')
133
        ipv4 = new_nic.get('ip', '')
134
        ipv6 = new_nic.get('ipv6', '')
135

    
136
        firewall = new_nic.get('firewall', '')
137
        firewall_profile = _reverse_tags.get(firewall, '')
138
        if not firewall_profile and net.public:
139
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
140

    
141
        if ipv4:
142
            net.reserve_address(ipv4)
143

    
144
        vm.nics.create(
145
            network=net,
146
            index=i,
147
            mac=mac,
148
            ipv4=ipv4,
149
            ipv6=ipv6,
150
            firewall_profile=firewall_profile,
151
            dirty=False)
152
        # Dummy save the network, because UI uses changed-since for VMs
153
        # and Networks in order to show the VM NICs
154
        net.save()
155

    
156
    vm.backendtime = etime
157
    vm.save()
158

    
159

    
160
def release_instance_nics(vm):
161
    for nic in vm.nics.all():
162
        net = nic.network
163
        if nic.ipv4:
164
            net.release_address(nic.ipv4)
165
        nic.delete()
166
        net.save()
167

    
168

    
169
@transaction.commit_on_success
170
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
171
    if status not in [x[0] for x in BACKEND_STATUSES]:
172
        return
173
        #raise Network.InvalidBackendMsgError(opcode, status)
174

    
175
    back_network.backendjobid = jobid
176
    back_network.backendjobstatus = status
177
    back_network.backendopcode = opcode
178
    back_network.backendlogmsg = logmsg
179

    
180
    # Notifications of success change the operating state
181
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
182
    if status == 'success' and state_for_success is not None:
183
        back_network.operstate = state_for_success
184

    
185
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
186
        utils.update_state(back_network, 'ERROR')
187
        back_network.backendtime = etime
188

    
189
    if opcode == 'OP_NETWORK_REMOVE':
190
        if status == 'success' or (status == 'error' and
191
                                   back_network.operstate == 'ERROR'):
192
            back_network.operstate = state_for_success
193
            back_network.deleted = True
194
            back_network.backendtime = etime
195

    
196
    if status == 'success':
197
        back_network.backendtime = etime
198
    back_network.save()
199

    
200

    
201
@transaction.commit_on_success
202
def process_create_progress(vm, etime, progress):
203

    
204
    percentage = int(progress)
205

    
206
    # The percentage may exceed 100%, due to the way
207
    # snf-image:copy-progress tracks bytes read by image handling processes
208
    percentage = 100 if percentage > 100 else percentage
209
    if percentage < 0:
210
        raise ValueError("Percentage cannot be negative")
211

    
212
    # FIXME: log a warning here, see #1033
213
#   if last_update > percentage:
214
#       raise ValueError("Build percentage should increase monotonically " \
215
#                        "(old = %d, new = %d)" % (last_update, percentage))
216

    
217
    # This assumes that no message of type 'ganeti-create-progress' is going to
218
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
219
    # the instance is STARTED.  What if the two messages are processed by two
220
    # separate dispatcher threads, and the 'ganeti-op-status' message for
221
    # successful creation gets processed before the 'ganeti-create-progress'
222
    # message? [vkoukis]
223
    #
224
    #if not vm.operstate == 'BUILD':
225
    #    raise VirtualMachine.IllegalState("VM is not in building state")
226

    
227
    vm.buildpercentage = percentage
228
    vm.backendtime = etime
229
    vm.save()
230

    
231

    
232
def start_action(vm, action):
233
    """Update the state of a VM when a new action is initiated."""
234
    log.debug("Applying action %s to VM %s", action, vm)
235

    
236
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
237
        raise VirtualMachine.InvalidActionError(action)
238

    
239
    # No actions to deleted VMs
240
    if vm.deleted:
241
        raise VirtualMachine.DeletedError
242

    
243
    # No actions to machines being built. They may be destroyed, however.
244
    if vm.operstate == 'BUILD' and action != 'DESTROY':
245
        raise VirtualMachine.BuildingError
246

    
247
    vm.action = action
248
    vm.backendjobid = None
249
    vm.backendopcode = None
250
    vm.backendjobstatus = None
251
    vm.backendlogmsg = None
252

    
253
    vm.save()
254

    
255

    
256
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
257
    details=None):
258
    """
259
    Create virtual machine instance diagnostic entry.
260

261
    :param vm: VirtualMachine instance to create diagnostic for.
262
    :param message: Diagnostic message.
263
    :param source: Diagnostic source identifier (e.g. image-helper).
264
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
265
    :param etime: The time the message occured (if available).
266
    :param details: Additional details or debug information.
267
    """
268
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
269
            source_date=etime, message=message, details=details)
270

    
271

    
272
def create_instance(vm, public_nic, flavor, image, password, personality):
273
    """`image` is a dictionary which should contain the keys:
274
            'backend_id', 'format' and 'metadata'
275

276
        metadata value should be a dictionary.
277
    """
278

    
279
    if settings.IGNORE_FLAVOR_DISK_SIZES:
280
        if image['backend_id'].find("windows") >= 0:
281
            sz = 14000
282
        else:
283
            sz = 4000
284
    else:
285
        sz = flavor.disk * 1024
286

    
287
    # Handle arguments to CreateInstance() as a dictionary,
288
    # initialize it based on a deployment-specific value.
289
    # This enables the administrator to override deployment-specific
290
    # arguments, such as the disk template to use, name of os provider
291
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
292
    #
293
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
294
    kw['mode'] = 'create'
295
    kw['name'] = vm.backend_vm_id
296
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
297

    
298
    # Identify if provider parameter should be set in disk options.
299
    # Current implementation support providers only fo ext template.
300
    # To select specific provider for an ext template, template name
301
    # should be formated as `ext_<provider_name>`.
302
    provider = None
303
    disk_template = flavor.disk_template
304
    if flavor.disk_template.startswith("ext"):
305
        disk_template, provider = flavor.disk_template.split("_", 1)
306

    
307
    kw['disk_template'] = disk_template
308
    kw['disks'] = [{"size": sz}]
309
    if provider:
310
        kw['disks'][0]['provider'] = provider
311

    
312
        if provider == 'vlmc':
313
            kw['disks'][0]['origin'] = image['backend_id']
314

    
315
    kw['nics'] = [public_nic]
316
    if settings.GANETI_USE_HOTPLUG:
317
        kw['hotplug'] = True
318
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
319
    # kw['os'] = settings.GANETI_OS_PROVIDER
320
    kw['ip_check'] = False
321
    kw['name_check'] = False
322
    # Do not specific a node explicitly, have
323
    # Ganeti use an iallocator instead
324
    #
325
    #kw['pnode'] = rapi.GetNodes()[0]
326
    kw['dry_run'] = settings.TEST
327

    
328
    kw['beparams'] = {
329
        'auto_balance': True,
330
        'vcpus': flavor.cpu,
331
        'memory': flavor.ram}
332

    
333
    kw['osparams'] = {
334
        'img_id': image['backend_id'],
335
        'img_passwd': password,
336
        'img_format': image['format']}
337
    if personality:
338
        kw['osparams']['img_personality'] = json.dumps(personality)
339

    
340
    if provider != None and provider == 'vlmc':
341
        kw['osparams']['img_id'] = 'null'
342

    
343
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
344

    
345
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
346
    # kw['hvparams'] = dict(serial_console=False)
347
    log.debug("Creating instance %s", utils.hide_pass(kw))
348
    with pooled_rapi_client(vm) as client:
349
        return client.CreateInstance(**kw)
350

    
351

    
352
def delete_instance(vm):
353
    start_action(vm, 'DESTROY')
354
    with pooled_rapi_client(vm) as client:
355
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
356

    
357

    
358
def reboot_instance(vm, reboot_type):
359
    assert reboot_type in ('soft', 'hard')
360
    with pooled_rapi_client(vm) as client:
361
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
362
                                     dry_run=settings.TEST)
363

    
364

    
365
def startup_instance(vm):
366
    start_action(vm, 'START')
367
    with pooled_rapi_client(vm) as client:
368
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
369

    
370

    
371
def shutdown_instance(vm):
372
    start_action(vm, 'STOP')
373
    with pooled_rapi_client(vm) as client:
374
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
375

    
376

    
377
def get_instance_console(vm):
378
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
379
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
380
    # useless (see #783).
381
    #
382
    # Until this is fixed on the Ganeti side, construct a console info reply
383
    # directly.
384
    #
385
    # WARNING: This assumes that VNC runs on port network_port on
386
    #          the instance's primary node, and is probably
387
    #          hypervisor-specific.
388
    #
389
    log.debug("Getting console for vm %s", vm)
390

    
391
    console = {}
392
    console['kind'] = 'vnc'
393

    
394
    with pooled_rapi_client(vm) as client:
395
        i = client.GetInstance(vm.backend_vm_id)
396

    
397
    if i['hvparams']['serial_console']:
398
        raise Exception("hv parameter serial_console cannot be true")
399
    console['host'] = i['pnode']
400
    console['port'] = i['network_port']
401

    
402
    return console
403

    
404

    
405
def get_instance_info(vm):
406
    with pooled_rapi_client(vm) as client:
407
        return client.GetInstanceInfo(vm.backend_vm_id)
408

    
409

    
410
def create_network(network, backends=None, connect=True):
411
    """Create and connect a network."""
412
    if not backends:
413
        backends = Backend.objects.exclude(offline=True)
414

    
415
    log.debug("Creating network %s in backends %s", network, backends)
416

    
417
    for backend in backends:
418
        create_jobID = _create_network(network, backend)
419
        if connect:
420
            connect_network(network, backend, create_jobID)
421

    
422

    
423
def _create_network(network, backend):
424
    """Create a network."""
425

    
426
    network_type = network.public and 'public' or 'private'
427

    
428
    tags = network.backend_tag
429
    if network.dhcp:
430
        tags.append('nfdhcpd')
431
    tags = ','.join(tags)
432

    
433
    try:
434
        bn = BackendNetwork.objects.get(network=network, backend=backend)
435
        mac_prefix = bn.mac_prefix
436
    except BackendNetwork.DoesNotExist:
437
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
438
                        " does not exist" % (network.id, backend.id))
439

    
440
    with pooled_rapi_client(backend) as client:
441
        return client.CreateNetwork(network_name=network.backend_id,
442
                                    network=network.subnet,
443
                                    network6=network.subnet6,
444
                                    gateway=network.gateway,
445
                                    gateway6=network.gateway6,
446
                                    network_type=network_type,
447
                                    mac_prefix=mac_prefix,
448
                                    tags=tags)
449

    
450

    
451
def connect_network(network, backend, depend_job=None, group=None):
452
    """Connect a network to nodegroups."""
453
    log.debug("Connecting network %s to backend %s", network, backend)
454

    
455
    mode = "routed" if "ROUTED" in network.type else "bridged"
456

    
457
    depend_jobs = [depend_job] if depend_job else []
458
    with pooled_rapi_client(backend) as client:
459
        if group:
460
            client.ConnectNetwork(network.backend_id, group, mode,
461
                                  network.link, depend_jobs)
462
        else:
463
            for group in client.GetGroups():
464
                client.ConnectNetwork(network.backend_id, group, mode,
465
                                      network.link, depend_jobs)
466

    
467

    
468
def delete_network(network, backends=None, disconnect=True):
469
    if not backends:
470
        backends = Backend.objects.exclude(offline=True)
471

    
472
    log.debug("Deleting network %s from backends %s", network, backends)
473

    
474
    for backend in backends:
475
        disconnect_jobIDs = []
476
        if disconnect:
477
            disconnect_jobIDs = disconnect_network(network, backend)
478
        _delete_network(network, backend, disconnect_jobIDs)
479

    
480

    
481
def _delete_network(network, backend, depend_jobs=[]):
482
    with pooled_rapi_client(backend) as client:
483
        return client.DeleteNetwork(network.backend_id, depend_jobs)
484

    
485

    
486
def disconnect_network(network, backend, group=None):
487
    log.debug("Disconnecting network %s to backend %s", network, backend)
488

    
489
    with pooled_rapi_client(backend) as client:
490
        if group:
491
            return [client.DisconnectNetwork(network.backend_id, group)]
492
        else:
493
            jobs = []
494
            for group in client.GetGroups():
495
                job = client.DisconnectNetwork(network.backend_id, group)
496
                jobs.append(job)
497
            return jobs
498

    
499

    
500
def connect_to_network(vm, network, address=None):
501
    nic = {'ip': address, 'network': network.backend_id}
502

    
503
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
504

    
505
    with pooled_rapi_client(vm) as client:
506
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
507
                                     hotplug=settings.GANETI_USE_HOTPLUG,
508
                                     dry_run=settings.TEST)
509

    
510

    
511
def disconnect_from_network(vm, nic):
512
    op = [('remove', nic.index, {})]
513

    
514
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
515

    
516
    with pooled_rapi_client(vm) as client:
517
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
518
                                     hotplug=settings.GANETI_USE_HOTPLUG,
519
                                     dry_run=settings.TEST)
520

    
521

    
522
def set_firewall_profile(vm, profile):
523
    try:
524
        tag = _firewall_tags[profile]
525
    except KeyError:
526
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
527

    
528
    log.debug("Setting tag of VM %s to %s", vm, profile)
529

    
530
    with pooled_rapi_client(vm) as client:
531
        # Delete all firewall tags
532
        for t in _firewall_tags.values():
533
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
534
                                      dry_run=settings.TEST)
535

    
536
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
537

    
538
        # XXX NOP ModifyInstance call to force process_net_status to run
539
        # on the dispatcher
540
        client.ModifyInstance(vm.backend_vm_id,
541
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
542

    
543

    
544
def get_ganeti_instances(backend=None, bulk=False):
545
    instances = []
546
    for backend in get_backends(backend):
547
        with pooled_rapi_client(backend) as client:
548
            instances.append(client.GetInstances(bulk=bulk))
549

    
550
    return reduce(list.__add__, instances, [])
551

    
552

    
553
def get_ganeti_nodes(backend=None, bulk=False):
554
    nodes = []
555
    for backend in get_backends(backend):
556
        with pooled_rapi_client(backend) as client:
557
            nodes.append(client.GetNodes(bulk=bulk))
558

    
559
    return reduce(list.__add__, nodes, [])
560

    
561

    
562
def get_ganeti_jobs(backend=None, bulk=False):
563
    jobs = []
564
    for backend in get_backends(backend):
565
        with pooled_rapi_client(backend) as client:
566
            jobs.append(client.GetJobs(bulk=bulk))
567
    return reduce(list.__add__, jobs, [])
568

    
569
##
570
##
571
##
572

    
573

    
574
def get_backends(backend=None):
575
    if backend:
576
        return [backend]
577
    return Backend.objects.filter(offline=False)
578

    
579

    
580
def get_physical_resources(backend):
581
    """ Get the physical resources of a backend.
582

583
    Get the resources of a backend as reported by the backend (not the db).
584

585
    """
586
    nodes = get_ganeti_nodes(backend, bulk=True)
587
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
588
    res = {}
589
    for a in attr:
590
        res[a] = 0
591
    for n in nodes:
592
        # Filter out drained, offline and not vm_capable nodes since they will
593
        # not take part in the vm allocation process
594
        if n['vm_capable'] and not n['drained'] and not n['offline']\
595
           and n['cnodes']:
596
            for a in attr:
597
                res[a] += int(n[a])
598
    return res
599

    
600

    
601
def update_resources(backend, resources=None):
602
    """ Update the state of the backend resources in db.
603

604
    """
605

    
606
    if not resources:
607
        resources = get_physical_resources(backend)
608

    
609
    backend.mfree = resources['mfree']
610
    backend.mtotal = resources['mtotal']
611
    backend.dfree = resources['dfree']
612
    backend.dtotal = resources['dtotal']
613
    backend.pinst_cnt = resources['pinst_cnt']
614
    backend.ctotal = resources['ctotal']
615
    backend.updated = datetime.now()
616
    backend.save()
617

    
618

    
619
def get_memory_from_instances(backend):
620
    """ Get the memory that is used from instances.
621

622
    Get the used memory of a backend. Note: This is different for
623
    the real memory used, due to kvm's memory de-duplication.
624

625
    """
626
    with pooled_rapi_client(backend) as client:
627
        instances = client.GetInstances(bulk=True)
628
    mem = 0
629
    for i in instances:
630
        mem += i['oper_ram']
631
    return mem
632

    
633
##
634
## Synchronized operations for reconciliation
635
##
636

    
637

    
638
def create_network_synced(network, backend):
639
    result = _create_network_synced(network, backend)
640
    if result[0] != 'success':
641
        return result
642
    result = connect_network_synced(network, backend)
643
    return result
644

    
645

    
646
def _create_network_synced(network, backend):
647
    with pooled_rapi_client(backend) as client:
648
        job = _create_network(network, backend)
649
        result = wait_for_job(client, job)
650
    return result
651

    
652

    
653
def connect_network_synced(network, backend):
654
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
655
        mode = 'routed'
656
    else:
657
        mode = 'bridged'
658
    with pooled_rapi_client(backend) as client:
659
        for group in client.GetGroups():
660
            job = client.ConnectNetwork(network.backend_id, group, mode,
661
                                        network.link)
662
            result = wait_for_job(client, job)
663
            if result[0] != 'success':
664
                return result
665

    
666
    return result
667

    
668

    
669
def wait_for_job(client, jobid):
670
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
671
    status = result['job_info'][0]
672
    while status not in ['success', 'error', 'cancel']:
673
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
674
                                        [result], None)
675
        status = result['job_info'][0]
676

    
677
    if status == 'success':
678
        return (status, None)
679
    else:
680
        error = result['job_info'][1]
681
        return (status, error)