Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 6cc3a31c

History | View | Annotate | Download (22.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80
        # Set the deleted flag explicitly, cater for admin-initiated removals
81
        if opcode == 'OP_INSTANCE_REMOVE':
82
            release_instance_nics(vm)
83
            vm.deleted = True
84
            vm.nics.all().delete()
85

    
86
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
87
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
88
        vm.operstate = 'ERROR'
89
        vm.backendtime = etime
90

    
91
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
    # when no instance exists at the Ganeti backend.
93
    # See ticket #799 for all the details.
94
    #
95
    if (status == 'error' and vm.operstate == 'ERROR' and\
96
        opcode == 'OP_INSTANCE_REMOVE'):
97
        vm.deleted = True
98
        vm.nics.all().delete()
99
        vm.operstate = 'DESTROYED'
100
        vm.backendtime = etime
101

    
102
    # Update backendtime only for jobs that have been successfully completed,
103
    # since only these jobs update the state of the VM. Else a "race condition"
104
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
105
    # before an error job and messages arrive in reversed order.
106
    if status == 'success':
107
        vm.backendtime = etime
108

    
109
    vm.save()
110

    
111

    
112
@transaction.commit_on_success
113
def process_net_status(vm, etime, nics):
114
    """Process a net status notification from the backend
115

116
    Process an incoming message from the Ganeti backend,
117
    detailing the NIC configuration of a VM instance.
118

119
    Update the state of the VM in the DB accordingly.
120
    """
121

    
122
    release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        net = Network.objects.get(pk=pk)
131

    
132
        # Get the new nic info
133
        mac = new_nic.get('mac', '')
134
        ipv4 = new_nic.get('ip', '')
135
        ipv6 = new_nic.get('ipv6', '')
136

    
137
        firewall = new_nic.get('firewall', '')
138
        firewall_profile = _reverse_tags.get(firewall, '')
139
        if not firewall_profile and net.public:
140
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
141

    
142
        if ipv4:
143
            net.reserve_address(ipv4)
144

    
145
        vm.nics.create(
146
            network=net,
147
            index=i,
148
            mac=mac,
149
            ipv4=ipv4,
150
            ipv6=ipv6,
151
            firewall_profile=firewall_profile,
152
            dirty=False)
153

    
154
    vm.backendtime = etime
155
    vm.save()
156

    
157

    
158
def release_instance_nics(vm):
159
    for nic in vm.nics.all():
160
        nic.network.release_address(nic.ipv4)
161
        nic.delete()
162

    
163

    
164
@transaction.commit_on_success
165
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
166
    if status not in [x[0] for x in BACKEND_STATUSES]:
167
        return
168
        #raise Network.InvalidBackendMsgError(opcode, status)
169

    
170
    back_network.backendjobid = jobid
171
    back_network.backendjobstatus = status
172
    back_network.backendopcode = opcode
173
    back_network.backendlogmsg = logmsg
174

    
175
    # Notifications of success change the operating state
176
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
177
    if status == 'success' and state_for_success is not None:
178
        back_network.operstate = state_for_success
179
        if opcode == 'OP_NETWORK_REMOVE':
180
            back_network.deleted = True
181

    
182
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
183
        utils.update_state(back_network, 'ERROR')
184

    
185
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
186
        back_network.deleted = True
187
        back_network.operstate = 'DELETED'
188

    
189
    back_network.save()
190

    
191

    
192
@transaction.commit_on_success
193
def process_create_progress(vm, etime, rprogress, wprogress):
194

    
195
    # XXX: This only uses the read progress for now.
196
    #      Explore whether it would make sense to use the value of wprogress
197
    #      somewhere.
198
    percentage = int(rprogress)
199

    
200
    # The percentage may exceed 100%, due to the way
201
    # snf-progress-monitor tracks bytes read by image handling processes
202
    percentage = 100 if percentage > 100 else percentage
203
    if percentage < 0:
204
        raise ValueError("Percentage cannot be negative")
205

    
206
    # FIXME: log a warning here, see #1033
207
#   if last_update > percentage:
208
#       raise ValueError("Build percentage should increase monotonically " \
209
#                        "(old = %d, new = %d)" % (last_update, percentage))
210

    
211
    # This assumes that no message of type 'ganeti-create-progress' is going to
212
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
213
    # the instance is STARTED.  What if the two messages are processed by two
214
    # separate dispatcher threads, and the 'ganeti-op-status' message for
215
    # successful creation gets processed before the 'ganeti-create-progress'
216
    # message? [vkoukis]
217
    #
218
    #if not vm.operstate == 'BUILD':
219
    #    raise VirtualMachine.IllegalState("VM is not in building state")
220

    
221
    vm.buildpercentage = percentage
222
    vm.backendtime = etime
223
    vm.save()
224

    
225

    
226
def start_action(vm, action):
227
    """Update the state of a VM when a new action is initiated."""
228
    log.debug("Applying action %s to VM %s", action, vm)
229

    
230
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
231
        raise VirtualMachine.InvalidActionError(action)
232

    
233
    # No actions to deleted and no actions beside destroy to suspended VMs
234
    if vm.deleted:
235
        raise VirtualMachine.DeletedError
236

    
237
    # No actions to machines being built. They may be destroyed, however.
238
    if vm.operstate == 'BUILD' and action != 'DESTROY':
239
        raise VirtualMachine.BuildingError
240

    
241
    vm.action = action
242
    vm.backendjobid = None
243
    vm.backendopcode = None
244
    vm.backendjobstatus = None
245
    vm.backendlogmsg = None
246

    
247
    # Update the relevant flags if the VM is being suspended or destroyed.
248
    # Do not set the deleted flag here, see ticket #721.
249
    #
250
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
251
    # completes successfully. Hence, a server may be visible for some time
252
    # after a DELETE /servers/id returns HTTP 204.
253
    #
254
    if action == "DESTROY":
255
        # vm.deleted = True
256
        pass
257
    elif action == "SUSPEND":
258
        vm.suspended = True
259
    elif action == "START":
260
        vm.suspended = False
261
    vm.save()
262

    
263

    
264
def create_instance(vm, public_nic, flavor, image, password, personality):
265
    """`image` is a dictionary which should contain the keys:
266
            'backend_id', 'format' and 'metadata'
267

268
        metadata value should be a dictionary.
269
    """
270

    
271
    if settings.IGNORE_FLAVOR_DISK_SIZES:
272
        if image['backend_id'].find("windows") >= 0:
273
            sz = 14000
274
        else:
275
            sz = 4000
276
    else:
277
        sz = flavor.disk * 1024
278

    
279
    # Handle arguments to CreateInstance() as a dictionary,
280
    # initialize it based on a deployment-specific value.
281
    # This enables the administrator to override deployment-specific
282
    # arguments, such as the disk template to use, name of os provider
283
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
284
    #
285
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
286
    kw['mode'] = 'create'
287
    kw['name'] = vm.backend_vm_id
288
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
289

    
290
    # Identify if provider parameter should be set in disk options.
291
    # Current implementation support providers only fo ext template.
292
    # To select specific provider for an ext template, template name
293
    # should be formated as `ext_<provider_name>`.
294
    provider = None
295
    disk_template = flavor.disk_template
296
    if flavor.disk_template.startswith("ext"):
297
        disk_template, provider = flavor.disk_template.split("_", 1)
298

    
299
    kw['disk_template'] = disk_template
300
    kw['disks'] = [{"size": sz}]
301
    if provider:
302
        kw['disks'][0]['provider'] = provider
303

    
304
        if provider == 'vlmc':
305
            kw['disks'][0]['origin'] = image['backend_id']
306

    
307
    kw['nics'] = [public_nic]
308
    if settings.GANETI_USE_HOTPLUG:
309
        kw['hotplug'] = True
310
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
311
    # kw['os'] = settings.GANETI_OS_PROVIDER
312
    kw['ip_check'] = False
313
    kw['name_check'] = False
314
    # Do not specific a node explicitly, have
315
    # Ganeti use an iallocator instead
316
    #
317
    # kw['pnode']=rapi.GetNodes()[0]
318
    kw['dry_run'] = settings.TEST
319

    
320
    kw['beparams'] = {
321
        'auto_balance': True,
322
        'vcpus': flavor.cpu,
323
        'memory': flavor.ram}
324

    
325
    kw['osparams'] = {
326
        'img_id': image['backend_id'],
327
        'img_passwd': password,
328
        'img_format': image['format']}
329
    if personality:
330
        kw['osparams']['img_personality'] = json.dumps(personality)
331

    
332
    if provider != None and provider == 'vlmc':
333
        kw['osparams']['img_id'] = 'null'
334

    
335
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
336

    
337
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
338
    # kw['hvparams'] = dict(serial_console=False)
339
    log.debug("Creating instance %s", utils.hide_pass(kw))
340
    with pooled_rapi_client(vm) as client:
341
        return client.CreateInstance(**kw)
342

    
343

    
344
def delete_instance(vm):
345
    start_action(vm, 'DESTROY')
346
    with pooled_rapi_client(vm) as client:
347
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
348

    
349

    
350
def reboot_instance(vm, reboot_type):
351
    assert reboot_type in ('soft', 'hard')
352
    with pooled_rapi_client(vm) as client:
353
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
354
                                     dry_run=settings.TEST)
355

    
356

    
357
def startup_instance(vm):
358
    start_action(vm, 'START')
359
    with pooled_rapi_client(vm) as client:
360
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
361

    
362

    
363
def shutdown_instance(vm):
364
    start_action(vm, 'STOP')
365
    with pooled_rapi_client(vm) as client:
366
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
367

    
368

    
369
def get_instance_console(vm):
370
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
371
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
372
    # useless (see #783).
373
    #
374
    # Until this is fixed on the Ganeti side, construct a console info reply
375
    # directly.
376
    #
377
    # WARNING: This assumes that VNC runs on port network_port on
378
    #          the instance's primary node, and is probably
379
    #          hypervisor-specific.
380
    #
381
    log.debug("Getting console for vm %s", vm)
382

    
383
    console = {}
384
    console['kind'] = 'vnc'
385

    
386
    with pooled_rapi_client(vm) as client:
387
        i = client.GetInstance(vm.backend_vm_id)
388

    
389
    if i['hvparams']['serial_console']:
390
        raise Exception("hv parameter serial_console cannot be true")
391
    console['host'] = i['pnode']
392
    console['port'] = i['network_port']
393

    
394
    return console
395

    
396

    
397
def get_instance_info(vm):
398
    with pooled_rapi_client(vm) as client:
399
        return client.GetInstanceInfo(vm.backend_vm_id)
400

    
401

    
402
def create_network(network, backends=None, connect=True):
403
    """Create and connect a network."""
404
    if not backends:
405
        backends = Backend.objects.exclude(offline=True)
406

    
407
    log.debug("Creating network %s in backends %s", network, backends)
408

    
409
    for backend in backends:
410
        create_jobID = _create_network(network, backend)
411
        if connect:
412
            connect_network(network, backend, create_jobID)
413

    
414

    
415
def _create_network(network, backend):
416
    """Create a network."""
417

    
418
    network_type = network.public and 'public' or 'private'
419

    
420
    tags = network.backend_tag
421
    if network.dhcp:
422
        tags.append('nfdhcpd')
423
    tags = ','.join(tags)
424

    
425
    try:
426
        bn = BackendNetwork.objects.get(network=network, backend=backend)
427
        mac_prefix = bn.mac_prefix
428
    except BackendNetwork.DoesNotExist:
429
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
430
                        " does not exist" % (network.id, backend.id))
431

    
432
    with pooled_rapi_client(backend) as client:
433
        return client.CreateNetwork(network_name=network.backend_id,
434
                                    network=network.subnet,
435
                                    network6=network.subnet6,
436
                                    gateway=network.gateway,
437
                                    gateway6=network.gateway6,
438
                                    network_type=network_type,
439
                                    mac_prefix=mac_prefix,
440
                                    tags=tags)
441

    
442

    
443
def connect_network(network, backend, depend_job=None, group=None):
444
    """Connect a network to nodegroups."""
445
    log.debug("Connecting network %s to backend %s", network, backend)
446

    
447
    mode = "routed" if "ROUTED" in network.type else "bridged"
448

    
449
    with pooled_rapi_client(backend) as client:
450
        if group:
451
            client.ConnectNetwork(network.backend_id, group, mode,
452
                                  network.link, [depend_job])
453
        else:
454
            for group in client.GetGroups():
455
                client.ConnectNetwork(network.backend_id, group, mode,
456
                                      network.link, [depend_job])
457

    
458

    
459
def delete_network(network, backends=None, disconnect=True):
460
    if not backends:
461
        backends = Backend.objects.exclude(offline=True)
462

    
463
    log.debug("Deleting network %s from backends %s", network, backends)
464

    
465
    for backend in backends:
466
        disconnect_jobIDs = []
467
        if disconnect:
468
            disconnect_jobIDs = disconnect_network(network, backend)
469
        _delete_network(network, backend, disconnect_jobIDs)
470

    
471

    
472
def _delete_network(network, backend, depend_jobs=[]):
473
    with pooled_rapi_client(backend) as client:
474
        return client.DeleteNetwork(network.backend_id, depend_jobs)
475

    
476

    
477
def disconnect_network(network, backend, group=None):
478
    log.debug("Disconnecting network %s to backend %s", network, backend)
479

    
480
    with pooled_rapi_client(backend) as client:
481
        if group:
482
            return [client.DisconnectNetwork(network.backend_id, group)]
483
        else:
484
            jobs = []
485
            for group in client.GetGroups():
486
                job = client.DisconnectNetwork(network.backend_id, group)
487
                jobs.append(job)
488
            return jobs
489

    
490

    
491
def connect_to_network(vm, network, address):
492
    nic = {'ip': address, 'network': network.backend_id}
493

    
494
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
495

    
496
    with pooled_rapi_client(vm) as client:
497
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
498
                                     hotplug=settings.GANETI_USE_HOTPLUG,
499
                                     dry_run=settings.TEST)
500

    
501

    
502
def disconnect_from_network(vm, nic):
503
    op = [('remove', nic.index, {})]
504

    
505
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
506

    
507
    with pooled_rapi_client(vm) as client:
508
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
509
                                     hotplug=settings.GANETI_USE_HOTPLUG,
510
                                     dry_run=settings.TEST)
511

    
512

    
513
def set_firewall_profile(vm, profile):
514
    try:
515
        tag = _firewall_tags[profile]
516
    except KeyError:
517
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
518

    
519
    log.debug("Setting tag of VM %s to %s", vm, profile)
520

    
521
    with pooled_rapi_client(vm) as client:
522
        # Delete all firewall tags
523
        for t in _firewall_tags.values():
524
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
525
                                      dry_run=settings.TEST)
526

    
527
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
528

    
529
        # XXX NOP ModifyInstance call to force process_net_status to run
530
        # on the dispatcher
531
        client.ModifyInstance(vm.backend_vm_id,
532
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
533

    
534

    
535
def get_ganeti_instances(backend=None, bulk=False):
536
    instances = []
537
    for backend in get_backends(backend):
538
        with pooled_rapi_client(backend) as client:
539
            instances.append(client.GetInstances(bulk=bulk))
540

    
541
    return reduce(list.__add__, instances, [])
542

    
543

    
544
def get_ganeti_nodes(backend=None, bulk=False):
545
    nodes = []
546
    for backend in get_backends(backend):
547
        with pooled_rapi_client(backend) as client:
548
            nodes.append(client.GetNodes(bulk=bulk))
549

    
550
    return reduce(list.__add__, nodes, [])
551

    
552

    
553
def get_ganeti_jobs(backend=None, bulk=False):
554
    jobs = []
555
    for backend in get_backends(backend):
556
        with pooled_rapi_client(backend) as client:
557
            jobs.append(client.GetJobs(bulk=bulk))
558
    return reduce(list.__add__, jobs, [])
559

    
560
##
561
##
562
##
563

    
564

    
565
def get_backends(backend=None):
566
    if backend:
567
        return [backend]
568
    return Backend.objects.filter(offline=False)
569

    
570

    
571
def get_physical_resources(backend):
572
    """ Get the physical resources of a backend.
573

574
    Get the resources of a backend as reported by the backend (not the db).
575

576
    """
577
    nodes = get_ganeti_nodes(backend, bulk=True)
578
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
579
    res = {}
580
    for a in attr:
581
        res[a] = 0
582
    for n in nodes:
583
        # Filter out drained, offline and not vm_capable nodes since they will
584
        # not take part in the vm allocation process
585
        if n['vm_capable'] and not n['drained'] and not n['offline']\
586
           and n['cnodes']:
587
            for a in attr:
588
                res[a] += int(n[a])
589
    return res
590

    
591

    
592
def update_resources(backend, resources=None):
593
    """ Update the state of the backend resources in db.
594

595
    """
596

    
597
    if not resources:
598
        resources = get_physical_resources(backend)
599

    
600
    backend.mfree = resources['mfree']
601
    backend.mtotal = resources['mtotal']
602
    backend.dfree = resources['dfree']
603
    backend.dtotal = resources['dtotal']
604
    backend.pinst_cnt = resources['pinst_cnt']
605
    backend.ctotal = resources['ctotal']
606
    backend.updated = datetime.now()
607
    backend.save()
608

    
609

    
610
def get_memory_from_instances(backend):
611
    """ Get the memory that is used from instances.
612

613
    Get the used memory of a backend. Note: This is different for
614
    the real memory used, due to kvm's memory de-duplication.
615

616
    """
617
    with pooled_rapi_client(backend) as client:
618
        instances = client.GetInstances(bulk=True)
619
    mem = 0
620
    for i in instances:
621
        mem += i['oper_ram']
622
    return mem
623

    
624
##
625
## Synchronized operations for reconciliation
626
##
627

    
628

    
629
def create_network_synced(network, backend):
630
    result = _create_network_synced(network, backend)
631
    if result[0] != 'success':
632
        return result
633
    result = connect_network_synced(network, backend)
634
    return result
635

    
636

    
637
def _create_network_synced(network, backend):
638
    with pooled_rapi_client(backend) as client:
639
        backend_jobs = _create_network(network, [backend])
640
        (_, job) = backend_jobs[0]
641
        result = wait_for_job(client, job)
642
    return result
643

    
644

    
645
def connect_network_synced(network, backend):
646
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
647
        mode = 'routed'
648
    else:
649
        mode = 'bridged'
650
    with pooled_rapi_client(backend) as client:
651
        for group in client.GetGroups():
652
            job = client.ConnectNetwork(network.backend_id, group, mode,
653
                                        network.link)
654
            result = wait_for_job(client, job)
655
            if result[0] != 'success':
656
                return result
657

    
658
    return result
659

    
660

    
661
def wait_for_job(client, jobid):
662
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
663
    status = result['job_info'][0]
664
    while status not in ['success', 'error', 'cancel']:
665
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
666
                                        [result], None)
667
        status = result['job_info'][0]
668

    
669
    if status == 'success':
670
        return (status, None)
671
    else:
672
        error = result['job_info'][1]
673
        return (status, error)