Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ e60b4630

History | View | Annotate | Download (22.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80
        # Set the deleted flag explicitly, cater for admin-initiated removals
81
        if opcode == 'OP_INSTANCE_REMOVE':
82
            release_instance_nics(vm)
83
            vm.deleted = True
84
            vm.nics.all().delete()
85

    
86
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
87
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
88
        vm.operstate = 'ERROR'
89
        vm.backendtime = etime
90

    
91
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
    # when no instance exists at the Ganeti backend.
93
    # See ticket #799 for all the details.
94
    #
95
    if (status == 'error' and vm.operstate == 'ERROR' and\
96
        opcode == 'OP_INSTANCE_REMOVE'):
97
        vm.deleted = True
98
        vm.nics.all().delete()
99
        vm.operstate = 'DESTROYED'
100
        vm.backendtime = etime
101

    
102
    # Update backendtime only for jobs that have been successfully completed,
103
    # since only these jobs update the state of the VM. Else a "race condition"
104
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
105
    # before an error job and messages arrive in reversed order.
106
    if status == 'success':
107
        vm.backendtime = etime
108

    
109
    vm.save()
110

    
111

    
112
@transaction.commit_on_success
113
def process_net_status(vm, etime, nics):
114
    """Process a net status notification from the backend
115

116
    Process an incoming message from the Ganeti backend,
117
    detailing the NIC configuration of a VM instance.
118

119
    Update the state of the VM in the DB accordingly.
120
    """
121

    
122
    release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        net = Network.objects.get(pk=pk)
131

    
132
        # Get the new nic info
133
        mac = new_nic.get('mac', '')
134
        ipv4 = new_nic.get('ip', '')
135
        ipv6 = new_nic.get('ipv6', '')
136

    
137
        firewall = new_nic.get('firewall', '')
138
        firewall_profile = _reverse_tags.get(firewall, '')
139
        if not firewall_profile and net.public:
140
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
141

    
142
        if ipv4:
143
            net.reserve_address(ipv4)
144

    
145
        vm.nics.create(
146
            network=net,
147
            index=i,
148
            mac=mac,
149
            ipv4=ipv4,
150
            ipv6=ipv6,
151
            firewall_profile=firewall_profile,
152
            dirty=False)
153

    
154
    vm.backendtime = etime
155
    vm.save()
156

    
157

    
158
def release_instance_nics(vm):
159
    for nic in vm.nics.all():
160
        if nic.ipv4:
161
            nic.network.release_address(nic.ipv4)
162
        nic.delete()
163

    
164

    
165
@transaction.commit_on_success
166
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
167
    if status not in [x[0] for x in BACKEND_STATUSES]:
168
        return
169
        #raise Network.InvalidBackendMsgError(opcode, status)
170

    
171
    back_network.backendjobid = jobid
172
    back_network.backendjobstatus = status
173
    back_network.backendopcode = opcode
174
    back_network.backendlogmsg = logmsg
175

    
176
    # Notifications of success change the operating state
177
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
178
    if status == 'success' and state_for_success is not None:
179
        back_network.operstate = state_for_success
180
        if opcode == 'OP_NETWORK_REMOVE':
181
            back_network.deleted = True
182

    
183
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
184
        utils.update_state(back_network, 'ERROR')
185

    
186
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
187
        back_network.deleted = True
188
        back_network.operstate = 'DELETED'
189

    
190
    back_network.save()
191

    
192

    
193
@transaction.commit_on_success
194
def process_create_progress(vm, etime, rprogress, wprogress):
195

    
196
    # XXX: This only uses the read progress for now.
197
    #      Explore whether it would make sense to use the value of wprogress
198
    #      somewhere.
199
    percentage = int(rprogress)
200

    
201
    # The percentage may exceed 100%, due to the way
202
    # snf-progress-monitor tracks bytes read by image handling processes
203
    percentage = 100 if percentage > 100 else percentage
204
    if percentage < 0:
205
        raise ValueError("Percentage cannot be negative")
206

    
207
    # FIXME: log a warning here, see #1033
208
#   if last_update > percentage:
209
#       raise ValueError("Build percentage should increase monotonically " \
210
#                        "(old = %d, new = %d)" % (last_update, percentage))
211

    
212
    # This assumes that no message of type 'ganeti-create-progress' is going to
213
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
214
    # the instance is STARTED.  What if the two messages are processed by two
215
    # separate dispatcher threads, and the 'ganeti-op-status' message for
216
    # successful creation gets processed before the 'ganeti-create-progress'
217
    # message? [vkoukis]
218
    #
219
    #if not vm.operstate == 'BUILD':
220
    #    raise VirtualMachine.IllegalState("VM is not in building state")
221

    
222
    vm.buildpercentage = percentage
223
    vm.backendtime = etime
224
    vm.save()
225

    
226

    
227
def start_action(vm, action):
228
    """Update the state of a VM when a new action is initiated."""
229
    log.debug("Applying action %s to VM %s", action, vm)
230

    
231
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
232
        raise VirtualMachine.InvalidActionError(action)
233

    
234
    # No actions to deleted and no actions beside destroy to suspended VMs
235
    if vm.deleted:
236
        raise VirtualMachine.DeletedError
237

    
238
    # No actions to machines being built. They may be destroyed, however.
239
    if vm.operstate == 'BUILD' and action != 'DESTROY':
240
        raise VirtualMachine.BuildingError
241

    
242
    vm.action = action
243
    vm.backendjobid = None
244
    vm.backendopcode = None
245
    vm.backendjobstatus = None
246
    vm.backendlogmsg = None
247

    
248
    # Update the relevant flags if the VM is being suspended or destroyed.
249
    # Do not set the deleted flag here, see ticket #721.
250
    #
251
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
252
    # completes successfully. Hence, a server may be visible for some time
253
    # after a DELETE /servers/id returns HTTP 204.
254
    #
255
    if action == "DESTROY":
256
        # vm.deleted = True
257
        pass
258
    elif action == "SUSPEND":
259
        vm.suspended = True
260
    elif action == "START":
261
        vm.suspended = False
262
    vm.save()
263

    
264

    
265
def create_instance(vm, public_nic, flavor, image, password, personality):
266
    """`image` is a dictionary which should contain the keys:
267
            'backend_id', 'format' and 'metadata'
268

269
        metadata value should be a dictionary.
270
    """
271

    
272
    if settings.IGNORE_FLAVOR_DISK_SIZES:
273
        if image['backend_id'].find("windows") >= 0:
274
            sz = 14000
275
        else:
276
            sz = 4000
277
    else:
278
        sz = flavor.disk * 1024
279

    
280
    # Handle arguments to CreateInstance() as a dictionary,
281
    # initialize it based on a deployment-specific value.
282
    # This enables the administrator to override deployment-specific
283
    # arguments, such as the disk template to use, name of os provider
284
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
285
    #
286
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
287
    kw['mode'] = 'create'
288
    kw['name'] = vm.backend_vm_id
289
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
290

    
291
    # Identify if provider parameter should be set in disk options.
292
    # Current implementation support providers only fo ext template.
293
    # To select specific provider for an ext template, template name
294
    # should be formated as `ext_<provider_name>`.
295
    provider = None
296
    disk_template = flavor.disk_template
297
    if flavor.disk_template.startswith("ext"):
298
        disk_template, provider = flavor.disk_template.split("_", 1)
299

    
300
    kw['disk_template'] = disk_template
301
    kw['disks'] = [{"size": sz}]
302
    if provider:
303
        kw['disks'][0]['provider'] = provider
304

    
305
        if provider == 'vlmc':
306
            kw['disks'][0]['origin'] = image['backend_id']
307

    
308
    kw['nics'] = [public_nic]
309
    if settings.GANETI_USE_HOTPLUG:
310
        kw['hotplug'] = True
311
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
312
    # kw['os'] = settings.GANETI_OS_PROVIDER
313
    kw['ip_check'] = False
314
    kw['name_check'] = False
315
    # Do not specific a node explicitly, have
316
    # Ganeti use an iallocator instead
317
    #
318
    # kw['pnode']=rapi.GetNodes()[0]
319
    kw['dry_run'] = settings.TEST
320

    
321
    kw['beparams'] = {
322
        'auto_balance': True,
323
        'vcpus': flavor.cpu,
324
        'memory': flavor.ram}
325

    
326
    kw['osparams'] = {
327
        'img_id': image['backend_id'],
328
        'img_passwd': password,
329
        'img_format': image['format']}
330
    if personality:
331
        kw['osparams']['img_personality'] = json.dumps(personality)
332

    
333
    if provider != None and provider == 'vlmc':
334
        kw['osparams']['img_id'] = 'null'
335

    
336
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
337

    
338
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
339
    # kw['hvparams'] = dict(serial_console=False)
340
    log.debug("Creating instance %s", utils.hide_pass(kw))
341
    with pooled_rapi_client(vm) as client:
342
        return client.CreateInstance(**kw)
343

    
344

    
345
def delete_instance(vm):
346
    start_action(vm, 'DESTROY')
347
    with pooled_rapi_client(vm) as client:
348
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
349

    
350

    
351
def reboot_instance(vm, reboot_type):
352
    assert reboot_type in ('soft', 'hard')
353
    with pooled_rapi_client(vm) as client:
354
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
355
                                     dry_run=settings.TEST)
356

    
357

    
358
def startup_instance(vm):
359
    start_action(vm, 'START')
360
    with pooled_rapi_client(vm) as client:
361
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
362

    
363

    
364
def shutdown_instance(vm):
365
    start_action(vm, 'STOP')
366
    with pooled_rapi_client(vm) as client:
367
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
368

    
369

    
370
def get_instance_console(vm):
371
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
372
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
373
    # useless (see #783).
374
    #
375
    # Until this is fixed on the Ganeti side, construct a console info reply
376
    # directly.
377
    #
378
    # WARNING: This assumes that VNC runs on port network_port on
379
    #          the instance's primary node, and is probably
380
    #          hypervisor-specific.
381
    #
382
    log.debug("Getting console for vm %s", vm)
383

    
384
    console = {}
385
    console['kind'] = 'vnc'
386

    
387
    with pooled_rapi_client(vm) as client:
388
        i = client.GetInstance(vm.backend_vm_id)
389

    
390
    if i['hvparams']['serial_console']:
391
        raise Exception("hv parameter serial_console cannot be true")
392
    console['host'] = i['pnode']
393
    console['port'] = i['network_port']
394

    
395
    return console
396

    
397

    
398
def get_instance_info(vm):
399
    with pooled_rapi_client(vm) as client:
400
        return client.GetInstanceInfo(vm.backend_vm_id)
401

    
402

    
403
def create_network(network, backends=None, connect=True):
404
    """Create and connect a network."""
405
    if not backends:
406
        backends = Backend.objects.exclude(offline=True)
407

    
408
    log.debug("Creating network %s in backends %s", network, backends)
409

    
410
    for backend in backends:
411
        create_jobID = _create_network(network, backend)
412
        if connect:
413
            connect_network(network, backend, create_jobID)
414

    
415

    
416
def _create_network(network, backend):
417
    """Create a network."""
418

    
419
    network_type = network.public and 'public' or 'private'
420

    
421
    tags = network.backend_tag
422
    if network.dhcp:
423
        tags.append('nfdhcpd')
424
    tags = ','.join(tags)
425

    
426
    try:
427
        bn = BackendNetwork.objects.get(network=network, backend=backend)
428
        mac_prefix = bn.mac_prefix
429
    except BackendNetwork.DoesNotExist:
430
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
431
                        " does not exist" % (network.id, backend.id))
432

    
433
    with pooled_rapi_client(backend) as client:
434
        return client.CreateNetwork(network_name=network.backend_id,
435
                                    network=network.subnet,
436
                                    network6=network.subnet6,
437
                                    gateway=network.gateway,
438
                                    gateway6=network.gateway6,
439
                                    network_type=network_type,
440
                                    mac_prefix=mac_prefix,
441
                                    tags=tags)
442

    
443

    
444
def connect_network(network, backend, depend_job=None, group=None):
445
    """Connect a network to nodegroups."""
446
    log.debug("Connecting network %s to backend %s", network, backend)
447

    
448
    mode = "routed" if "ROUTED" in network.type else "bridged"
449

    
450
    with pooled_rapi_client(backend) as client:
451
        if group:
452
            client.ConnectNetwork(network.backend_id, group, mode,
453
                                  network.link, [depend_job])
454
        else:
455
            for group in client.GetGroups():
456
                client.ConnectNetwork(network.backend_id, group, mode,
457
                                      network.link, [depend_job])
458

    
459

    
460
def delete_network(network, backends=None, disconnect=True):
461
    if not backends:
462
        backends = Backend.objects.exclude(offline=True)
463

    
464
    log.debug("Deleting network %s from backends %s", network, backends)
465

    
466
    for backend in backends:
467
        disconnect_jobIDs = []
468
        if disconnect:
469
            disconnect_jobIDs = disconnect_network(network, backend)
470
        _delete_network(network, backend, disconnect_jobIDs)
471

    
472

    
473
def _delete_network(network, backend, depend_jobs=[]):
474
    with pooled_rapi_client(backend) as client:
475
        return client.DeleteNetwork(network.backend_id, depend_jobs)
476

    
477

    
478
def disconnect_network(network, backend, group=None):
479
    log.debug("Disconnecting network %s to backend %s", network, backend)
480

    
481
    with pooled_rapi_client(backend) as client:
482
        if group:
483
            return [client.DisconnectNetwork(network.backend_id, group)]
484
        else:
485
            jobs = []
486
            for group in client.GetGroups():
487
                job = client.DisconnectNetwork(network.backend_id, group)
488
                jobs.append(job)
489
            return jobs
490

    
491

    
492
def connect_to_network(vm, network, address):
493
    nic = {'ip': address, 'network': network.backend_id}
494

    
495
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
496

    
497
    with pooled_rapi_client(vm) as client:
498
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
499
                                     hotplug=settings.GANETI_USE_HOTPLUG,
500
                                     dry_run=settings.TEST)
501

    
502

    
503
def disconnect_from_network(vm, nic):
504
    op = [('remove', nic.index, {})]
505

    
506
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
507

    
508
    with pooled_rapi_client(vm) as client:
509
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
510
                                     hotplug=settings.GANETI_USE_HOTPLUG,
511
                                     dry_run=settings.TEST)
512

    
513

    
514
def set_firewall_profile(vm, profile):
515
    try:
516
        tag = _firewall_tags[profile]
517
    except KeyError:
518
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
519

    
520
    log.debug("Setting tag of VM %s to %s", vm, profile)
521

    
522
    with pooled_rapi_client(vm) as client:
523
        # Delete all firewall tags
524
        for t in _firewall_tags.values():
525
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
526
                                      dry_run=settings.TEST)
527

    
528
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
529

    
530
        # XXX NOP ModifyInstance call to force process_net_status to run
531
        # on the dispatcher
532
        client.ModifyInstance(vm.backend_vm_id,
533
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
534

    
535

    
536
def get_ganeti_instances(backend=None, bulk=False):
537
    instances = []
538
    for backend in get_backends(backend):
539
        with pooled_rapi_client(backend) as client:
540
            instances.append(client.GetInstances(bulk=bulk))
541

    
542
    return reduce(list.__add__, instances, [])
543

    
544

    
545
def get_ganeti_nodes(backend=None, bulk=False):
546
    nodes = []
547
    for backend in get_backends(backend):
548
        with pooled_rapi_client(backend) as client:
549
            nodes.append(client.GetNodes(bulk=bulk))
550

    
551
    return reduce(list.__add__, nodes, [])
552

    
553

    
554
def get_ganeti_jobs(backend=None, bulk=False):
555
    jobs = []
556
    for backend in get_backends(backend):
557
        with pooled_rapi_client(backend) as client:
558
            jobs.append(client.GetJobs(bulk=bulk))
559
    return reduce(list.__add__, jobs, [])
560

    
561
##
562
##
563
##
564

    
565

    
566
def get_backends(backend=None):
567
    if backend:
568
        return [backend]
569
    return Backend.objects.filter(offline=False)
570

    
571

    
572
def get_physical_resources(backend):
573
    """ Get the physical resources of a backend.
574

575
    Get the resources of a backend as reported by the backend (not the db).
576

577
    """
578
    nodes = get_ganeti_nodes(backend, bulk=True)
579
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
580
    res = {}
581
    for a in attr:
582
        res[a] = 0
583
    for n in nodes:
584
        # Filter out drained, offline and not vm_capable nodes since they will
585
        # not take part in the vm allocation process
586
        if n['vm_capable'] and not n['drained'] and not n['offline']\
587
           and n['cnodes']:
588
            for a in attr:
589
                res[a] += int(n[a])
590
    return res
591

    
592

    
593
def update_resources(backend, resources=None):
594
    """ Update the state of the backend resources in db.
595

596
    """
597

    
598
    if not resources:
599
        resources = get_physical_resources(backend)
600

    
601
    backend.mfree = resources['mfree']
602
    backend.mtotal = resources['mtotal']
603
    backend.dfree = resources['dfree']
604
    backend.dtotal = resources['dtotal']
605
    backend.pinst_cnt = resources['pinst_cnt']
606
    backend.ctotal = resources['ctotal']
607
    backend.updated = datetime.now()
608
    backend.save()
609

    
610

    
611
def get_memory_from_instances(backend):
612
    """ Get the memory that is used from instances.
613

614
    Get the used memory of a backend. Note: This is different for
615
    the real memory used, due to kvm's memory de-duplication.
616

617
    """
618
    with pooled_rapi_client(backend) as client:
619
        instances = client.GetInstances(bulk=True)
620
    mem = 0
621
    for i in instances:
622
        mem += i['oper_ram']
623
    return mem
624

    
625
##
626
## Synchronized operations for reconciliation
627
##
628

    
629

    
630
def create_network_synced(network, backend):
631
    result = _create_network_synced(network, backend)
632
    if result[0] != 'success':
633
        return result
634
    result = connect_network_synced(network, backend)
635
    return result
636

    
637

    
638
def _create_network_synced(network, backend):
639
    with pooled_rapi_client(backend) as client:
640
        job = _create_network(network, backend)
641
        result = wait_for_job(client, job)
642
    return result
643

    
644

    
645
def connect_network_synced(network, backend):
646
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
647
        mode = 'routed'
648
    else:
649
        mode = 'bridged'
650
    with pooled_rapi_client(backend) as client:
651
        for group in client.GetGroups():
652
            job = client.ConnectNetwork(network.backend_id, group, mode,
653
                                        network.link)
654
            result = wait_for_job(client, job)
655
            if result[0] != 'success':
656
                return result
657

    
658
    return result
659

    
660

    
661
def wait_for_job(client, jobid):
662
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
663
    status = result['job_info'][0]
664
    while status not in ['success', 'error', 'cancel']:
665
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
666
                                        [result], None)
667
        status = result['job_info'][0]
668

    
669
    if status == 'success':
670
        return (status, None)
671
    else:
672
        error = result['job_info'][1]
673
        return (status, error)