Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ c4ce868e

History | View | Annotate | Download (22.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80
        # Set the deleted flag explicitly, cater for admin-initiated removals
81
        if opcode == 'OP_INSTANCE_REMOVE':
82
            release_instance_nics(vm)
83
            vm.deleted = True
84
            vm.nics.all().delete()
85

    
86
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
87
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
88
        vm.operstate = 'ERROR'
89
        vm.backendtime = etime
90

    
91
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
    # when no instance exists at the Ganeti backend.
93
    # See ticket #799 for all the details.
94
    #
95
    if (status == 'error' and vm.operstate == 'ERROR' and\
96
        opcode == 'OP_INSTANCE_REMOVE'):
97
        vm.deleted = True
98
        vm.nics.all().delete()
99
        vm.operstate = 'DESTROYED'
100
        vm.backendtime = etime
101

    
102
    # Update backendtime only for jobs that have been successfully completed,
103
    # since only these jobs update the state of the VM. Else a "race condition"
104
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
105
    # before an error job and messages arrive in reversed order.
106
    if status == 'success':
107
        vm.backendtime = etime
108

    
109
    vm.save()
110

    
111

    
112
@transaction.commit_on_success
113
def process_net_status(vm, etime, nics):
114
    """Process a net status notification from the backend
115

116
    Process an incoming message from the Ganeti backend,
117
    detailing the NIC configuration of a VM instance.
118

119
    Update the state of the VM in the DB accordingly.
120
    """
121

    
122
    release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        net = Network.objects.get(pk=pk)
131

    
132
        # Get the new nic info
133
        mac = new_nic.get('mac', '')
134
        ipv4 = new_nic.get('ip', '')
135
        ipv6 = new_nic.get('ipv6', '')
136

    
137
        firewall = new_nic.get('firewall', '')
138
        firewall_profile = _reverse_tags.get(firewall, '')
139
        if not firewall_profile and net.public:
140
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
141

    
142
        if ipv4:
143
            net.reserve_address(ipv4)
144

    
145
        vm.nics.create(
146
            network=net,
147
            index=i,
148
            mac=mac,
149
            ipv4=ipv4,
150
            ipv6=ipv6,
151
            firewall_profile=firewall_profile,
152
            dirty=False)
153

    
154
    vm.backendtime = etime
155
    vm.save()
156

    
157

    
158
def release_instance_nics(vm):
159
    for nic in vm.nics.all():
160
        nic.network.release_address(nic.ipv4)
161
        nic.delete()
162

    
163

    
164
@transaction.commit_on_success
165
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
166
    if status not in [x[0] for x in BACKEND_STATUSES]:
167
        return
168
        #raise Network.InvalidBackendMsgError(opcode, status)
169

    
170
    back_network.backendjobid = jobid
171
    back_network.backendjobstatus = status
172
    back_network.backendopcode = opcode
173
    back_network.backendlogmsg = logmsg
174

    
175
    # Notifications of success change the operating state
176
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
177
    if status == 'success' and state_for_success is not None:
178
        back_network.operstate = state_for_success
179
        if opcode == 'OP_NETWORK_REMOVE':
180
            back_network.deleted = True
181

    
182
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
183
        utils.update_state(back_network, 'ERROR')
184

    
185
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
186
        back_network.deleted = True
187
        back_network.operstate = 'DELETED'
188

    
189
    back_network.save()
190

    
191

    
192
@transaction.commit_on_success
193
def process_create_progress(vm, etime, rprogress, wprogress):
194

    
195
    # XXX: This only uses the read progress for now.
196
    #      Explore whether it would make sense to use the value of wprogress
197
    #      somewhere.
198
    percentage = int(rprogress)
199

    
200
    # The percentage may exceed 100%, due to the way
201
    # snf-progress-monitor tracks bytes read by image handling processes
202
    percentage = 100 if percentage > 100 else percentage
203
    if percentage < 0:
204
        raise ValueError("Percentage cannot be negative")
205

    
206
    # FIXME: log a warning here, see #1033
207
#   if last_update > percentage:
208
#       raise ValueError("Build percentage should increase monotonically " \
209
#                        "(old = %d, new = %d)" % (last_update, percentage))
210

    
211
    # This assumes that no message of type 'ganeti-create-progress' is going to
212
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
213
    # the instance is STARTED.  What if the two messages are processed by two
214
    # separate dispatcher threads, and the 'ganeti-op-status' message for
215
    # successful creation gets processed before the 'ganeti-create-progress'
216
    # message? [vkoukis]
217
    #
218
    #if not vm.operstate == 'BUILD':
219
    #    raise VirtualMachine.IllegalState("VM is not in building state")
220

    
221
    vm.buildpercentage = percentage
222
    vm.backendtime = etime
223
    vm.save()
224

    
225

    
226
def start_action(vm, action):
227
    """Update the state of a VM when a new action is initiated."""
228
    log.debug("Applying action %s to VM %s", action, vm)
229

    
230
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
231
        raise VirtualMachine.InvalidActionError(action)
232

    
233
    # No actions to deleted and no actions beside destroy to suspended VMs
234
    if vm.deleted:
235
        raise VirtualMachine.DeletedError
236

    
237
    # No actions to machines being built. They may be destroyed, however.
238
    if vm.operstate == 'BUILD' and action != 'DESTROY':
239
        raise VirtualMachine.BuildingError
240

    
241
    vm.action = action
242
    vm.backendjobid = None
243
    vm.backendopcode = None
244
    vm.backendjobstatus = None
245
    vm.backendlogmsg = None
246

    
247
    # Update the relevant flags if the VM is being suspended or destroyed.
248
    # Do not set the deleted flag here, see ticket #721.
249
    #
250
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
251
    # completes successfully. Hence, a server may be visible for some time
252
    # after a DELETE /servers/id returns HTTP 204.
253
    #
254
    if action == "DESTROY":
255
        # vm.deleted = True
256
        pass
257
    elif action == "SUSPEND":
258
        vm.suspended = True
259
    elif action == "START":
260
        vm.suspended = False
261
    vm.save()
262

    
263

    
264
def create_instance(vm, public_nic, flavor, image, password, personality):
265
    """`image` is a dictionary which should contain the keys:
266
            'backend_id', 'format' and 'metadata'
267

268
        metadata value should be a dictionary.
269
    """
270

    
271
    if settings.IGNORE_FLAVOR_DISK_SIZES:
272
        if image['backend_id'].find("windows") >= 0:
273
            sz = 14000
274
        else:
275
            sz = 4000
276
    else:
277
        sz = flavor.disk * 1024
278

    
279
    # Handle arguments to CreateInstance() as a dictionary,
280
    # initialize it based on a deployment-specific value.
281
    # This enables the administrator to override deployment-specific
282
    # arguments, such as the disk template to use, name of os provider
283
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
284
    #
285
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
286
    kw['mode'] = 'create'
287
    kw['name'] = vm.backend_vm_id
288
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
289

    
290
    # Identify if provider parameter should be set in disk options.
291
    # Current implementation support providers only fo ext template.
292
    # To select specific provider for an ext template, template name
293
    # should be formated as `ext_<provider_name>`.
294
    provider = None
295
    disk_template = flavor.disk_template
296
    if flavor.disk_template.startswith("ext"):
297
        disk_template, provider = flavor.disk_template.split("_", 1)
298

    
299
    kw['disk_template'] = disk_template
300
    kw['disks'] = [{"size": sz}]
301
    if provider:
302
        kw['disks'][0]['provider'] = provider
303

    
304
        if provider == 'vlmc':
305
            kw['disks'][0]['origin'] = image['backend_id']
306

    
307
    kw['nics'] = [public_nic]
308
    if settings.GANETI_USE_HOTPLUG:
309
        kw['hotplug'] = True
310
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
311
    # kw['os'] = settings.GANETI_OS_PROVIDER
312
    kw['ip_check'] = False
313
    kw['name_check'] = False
314
    # Do not specific a node explicitly, have
315
    # Ganeti use an iallocator instead
316
    #
317
    # kw['pnode']=rapi.GetNodes()[0]
318
    kw['dry_run'] = settings.TEST
319

    
320
    kw['beparams'] = {
321
        'auto_balance': True,
322
        'vcpus': flavor.cpu,
323
        'memory': flavor.ram}
324

    
325
    kw['osparams'] = {
326
        'img_id': image['backend_id'],
327
        'img_passwd': password,
328
        'img_format': image['format']}
329
    if personality:
330
        kw['osparams']['img_personality'] = json.dumps(personality)
331

    
332
    if provider != None and provider == 'vlmc':
333
        kw['osparams']['img_id'] = 'null'
334

    
335
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
336

    
337
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
338
    # kw['hvparams'] = dict(serial_console=False)
339
    log.debug("Creating instance %s", utils.hide_pass(kw))
340
    with pooled_rapi_client(vm) as client:
341
        return client.CreateInstance(**kw)
342

    
343

    
344
def delete_instance(vm):
345
    start_action(vm, 'DESTROY')
346
    with pooled_rapi_client(vm) as client:
347
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
348

    
349

    
350
def reboot_instance(vm, reboot_type):
351
    assert reboot_type in ('soft', 'hard')
352
    with pooled_rapi_client(vm) as client:
353
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
354
                                     dry_run=settings.TEST)
355

    
356

    
357
def startup_instance(vm):
358
    start_action(vm, 'START')
359
    with pooled_rapi_client(vm) as client:
360
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
361

    
362

    
363
def shutdown_instance(vm):
364
    start_action(vm, 'STOP')
365
    with pooled_rapi_client(vm) as client:
366
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
367

    
368

    
369
def get_instance_console(vm):
370
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
371
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
372
    # useless (see #783).
373
    #
374
    # Until this is fixed on the Ganeti side, construct a console info reply
375
    # directly.
376
    #
377
    # WARNING: This assumes that VNC runs on port network_port on
378
    #          the instance's primary node, and is probably
379
    #          hypervisor-specific.
380
    #
381
    log.debug("Getting console for vm %s", vm)
382

    
383
    console = {}
384
    console['kind'] = 'vnc'
385

    
386
    with pooled_rapi_client(vm) as client:
387
        i = client.GetInstance(vm.backend_vm_id)
388

    
389
    if i['hvparams']['serial_console']:
390
        raise Exception("hv parameter serial_console cannot be true")
391
    console['host'] = i['pnode']
392
    console['port'] = i['network_port']
393

    
394
    return console
395

    
396

    
397
def get_instance_info(vm):
398
    with pooled_rapi_client(vm) as client:
399
        return client.GetInstanceInfo(vm.backend_vm_id)
400

    
401

    
402
def create_network(network, backends=None, connect=True):
403
    """Create and connect a network."""
404
    if not backends:
405
        backends = Backend.objects.exclude(offline=True)
406

    
407
    log.debug("Creating network %s in backends %s", network, backends)
408

    
409
    for backend in backends:
410
        create_jobID = _create_network(network, backend)
411
        if connect:
412
            connect_network(network, backend, create_jobID)
413

    
414

    
415
def _create_network(network, backend):
416
    """Create a network."""
417

    
418
    network_type = network.public and 'public' or 'private'
419

    
420
    tags = network.backend_tag
421
    if network.dhcp:
422
        tags.append('nfdhcpd')
423
    tags = ','.join(tags)
424

    
425
    try:
426
        bn = BackendNetwork.objects.get(network=network, backend=backend)
427
        mac_prefix = bn.mac_prefix
428
    except BackendNetwork.DoesNotExist:
429
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
430
                        " does not exist" % (network.id, backend.id))
431

    
432
    with pooled_rapi_client(backend) as client:
433
        return client.CreateNetwork(network_name=network.backend_id,
434
                                    network=network.subnet,
435
                                    gateway=network.gateway,
436
                                    network_type=network_type,
437
                                    mac_prefix=mac_prefix,
438
                                    tags=tags)
439

    
440

    
441
def connect_network(network, backend, depend_job=None, group=None):
442
    """Connect a network to nodegroups."""
443
    log.debug("Connecting network %s to backend %s", network, backend)
444

    
445
    mode = "routed" if "ROUTED" in network.type else "bridged"
446

    
447
    with pooled_rapi_client(backend) as client:
448
        if group:
449
            client.ConnectNetwork(network.backend_id, group, mode,
450
                                  network.link, [depend_job])
451
        else:
452
            for group in client.GetGroups():
453
                client.ConnectNetwork(network.backend_id, group, mode,
454
                                      network.link, [depend_job])
455

    
456

    
457
def delete_network(network, backends=None, disconnect=True):
458
    if not backends:
459
        backends = Backend.objects.exclude(offline=True)
460

    
461
    log.debug("Deleting network %s from backends %s", network, backends)
462

    
463
    for backend in backends:
464
        disconnect_jobIDs = []
465
        if disconnect:
466
            disconnect_jobIDs = disconnect_network(network, backend)
467
        _delete_network(network, backend, disconnect_jobIDs)
468

    
469

    
470
def _delete_network(network, backend, depend_jobs=[]):
471
    with pooled_rapi_client(backend) as client:
472
        return client.DeleteNetwork(network.backend_id, depend_jobs)
473

    
474

    
475
def disconnect_network(network, backend, group=None):
476
    log.debug("Disconnecting network %s to backend %s", network, backend)
477

    
478
    with pooled_rapi_client(backend) as client:
479
        if group:
480
            return [client.DisconnectNetwork(network.backend_id, group)]
481
        else:
482
            jobs = []
483
            for group in client.GetGroups():
484
                job = client.DisconnectNetwork(network.backend_id, group)
485
                jobs.append(job)
486
            return jobs
487

    
488

    
489
def connect_to_network(vm, network, address):
490
    nic = {'ip': address, 'network': network.backend_id}
491

    
492
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
493

    
494
    with pooled_rapi_client(vm) as client:
495
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
496
                                     hotplug=settings.GANETI_USE_HOTPLUG,
497
                                     dry_run=settings.TEST)
498

    
499

    
500
def disconnect_from_network(vm, nic):
501
    op = [('remove', nic.index, {})]
502

    
503
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
504

    
505
    with pooled_rapi_client(vm) as client:
506
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
507
                                     hotplug=settings.GANETI_USE_HOTPLUG,
508
                                     dry_run=settings.TEST)
509

    
510

    
511
def set_firewall_profile(vm, profile):
512
    try:
513
        tag = _firewall_tags[profile]
514
    except KeyError:
515
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
516

    
517
    log.debug("Setting tag of VM %s to %s", vm, profile)
518

    
519
    with pooled_rapi_client(vm) as client:
520
        # Delete all firewall tags
521
        for t in _firewall_tags.values():
522
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
523
                                      dry_run=settings.TEST)
524

    
525
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
526

    
527
        # XXX NOP ModifyInstance call to force process_net_status to run
528
        # on the dispatcher
529
        client.ModifyInstance(vm.backend_vm_id,
530
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
531

    
532

    
533
def get_ganeti_instances(backend=None, bulk=False):
534
    instances = []
535
    for backend in get_backends(backend):
536
        with pooled_rapi_client(backend) as client:
537
            instances.append(client.GetInstances(bulk=bulk))
538

    
539
    return reduce(list.__add__, instances, [])
540

    
541

    
542
def get_ganeti_nodes(backend=None, bulk=False):
543
    nodes = []
544
    for backend in get_backends(backend):
545
        with pooled_rapi_client(backend) as client:
546
            nodes.append(client.GetNodes(bulk=bulk))
547

    
548
    return reduce(list.__add__, nodes, [])
549

    
550

    
551
def get_ganeti_jobs(backend=None, bulk=False):
552
    jobs = []
553
    for backend in get_backends(backend):
554
        with pooled_rapi_client(backend) as client:
555
            jobs.append(client.GetJobs(bulk=bulk))
556
    return reduce(list.__add__, jobs, [])
557

    
558
##
559
##
560
##
561

    
562

    
563
def get_backends(backend=None):
564
    if backend:
565
        return [backend]
566
    return Backend.objects.filter(offline=False)
567

    
568

    
569
def get_physical_resources(backend):
570
    """ Get the physical resources of a backend.
571

572
    Get the resources of a backend as reported by the backend (not the db).
573

574
    """
575
    nodes = get_ganeti_nodes(backend, bulk=True)
576
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
577
    res = {}
578
    for a in attr:
579
        res[a] = 0
580
    for n in nodes:
581
        # Filter out drained, offline and not vm_capable nodes since they will
582
        # not take part in the vm allocation process
583
        if n['vm_capable'] and not n['drained'] and not n['offline']\
584
           and n['cnodes']:
585
            for a in attr:
586
                res[a] += int(n[a])
587
    return res
588

    
589

    
590
def update_resources(backend, resources=None):
591
    """ Update the state of the backend resources in db.
592

593
    """
594

    
595
    if not resources:
596
        resources = get_physical_resources(backend)
597

    
598
    backend.mfree = resources['mfree']
599
    backend.mtotal = resources['mtotal']
600
    backend.dfree = resources['dfree']
601
    backend.dtotal = resources['dtotal']
602
    backend.pinst_cnt = resources['pinst_cnt']
603
    backend.ctotal = resources['ctotal']
604
    backend.updated = datetime.now()
605
    backend.save()
606

    
607

    
608
def get_memory_from_instances(backend):
609
    """ Get the memory that is used from instances.
610

611
    Get the used memory of a backend. Note: This is different for
612
    the real memory used, due to kvm's memory de-duplication.
613

614
    """
615
    with pooled_rapi_client(backend) as client:
616
        instances = client.GetInstances(bulk=True)
617
    mem = 0
618
    for i in instances:
619
        mem += i['oper_ram']
620
    return mem
621

    
622
##
623
## Synchronized operations for reconciliation
624
##
625

    
626

    
627
def create_network_synced(network, backend):
628
    result = _create_network_synced(network, backend)
629
    if result[0] != 'success':
630
        return result
631
    result = connect_network_synced(network, backend)
632
    return result
633

    
634

    
635
def _create_network_synced(network, backend):
636
    with pooled_rapi_client(backend) as client:
637
        backend_jobs = _create_network(network, [backend])
638
        (_, job) = backend_jobs[0]
639
        result = wait_for_job(client, job)
640
    return result
641

    
642

    
643
def connect_network_synced(network, backend):
644
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
645
        mode = 'routed'
646
    else:
647
        mode = 'bridged'
648
    with pooled_rapi_client(backend) as client:
649
        for group in client.GetGroups():
650
            job = client.ConnectNetwork(network.backend_id, group, mode,
651
                                        network.link)
652
            result = wait_for_job(client, job)
653
            if result[0] != 'success':
654
                return result
655

    
656
    return result
657

    
658

    
659
def wait_for_job(client, jobid):
660
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
661
    status = result['job_info'][0]
662
    while status not in ['success', 'error', 'cancel']:
663
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
664
                                        [result], None)
665
        status = result['job_info'][0]
666

    
667
    if status == 'success':
668
        return (status, None)
669
    else:
670
        error = result['job_info'][1]
671
        return (status, error)